Coverage for changes_metadata_manager / zenodo_upload.py: 76%

568 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-29 18:29 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelomas@gmail.com> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import csv 

7import json 

8import os 

9import re 

10import signal 

11import tempfile 

12import time 

13import unicodedata 

14import zipfile 

15from collections import defaultdict 

16from collections.abc import Mapping 

17from concurrent.futures import ProcessPoolExecutor, as_completed 

18from contextlib import contextmanager 

19from datetime import date 

20from pathlib import Path 

21 

22import requests 

23import yaml 

24from rdflib import Graph, URIRef 

25from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn, TimeElapsedColumn, TimeRemainingColumn 

26 

27from piccione.upload.on_zenodo import get_headers, main as piccione_upload, publish_draft as piccione_publish_draft 

28 

29from changes_metadata_manager.folder_metadata_builder import ( 

30 BASE_URI, 

31 FOLDER_TO_ID, 

32 KG_PATH, 

33 SKIP_FOLDERS, 

34 STAGE_STEPS, 

35 extract_id_from_folder_name, 

36 load_kg, 

37 scan_folder_structure, 

38) 

39 

40 

41class LiteralBlockDumper(yaml.SafeDumper): 

42 pass 

43 

44 

45def _literal_str_representer(dumper: yaml.SafeDumper, data): 

46 if "\n" in data: 

47 return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") 

48 return dumper.represent_scalar("tag:yaml.org,2002:str", data) 

49 

50 

51LiteralBlockDumper.add_representer(str, _literal_str_representer) 

52 

53CREATORS_LOOKUP_PATH = Path(__file__).parent.parent / "data" / "creators_lookup.yaml" 

54 

55 

56def slugify(text: str) -> str: 

57 text = unicodedata.normalize("NFKD", text) 

58 text = text.encode("ascii", "ignore").decode("ascii") 

59 text = re.sub(r"[^\w\s-]", "", text.lower()) 

60 return re.sub(r"[-\s]+", "-", text).strip("-") 

61 

62STEP_TO_STAGE = { 

63 "00": "raw", 

64 "01": "rawp", 

65 "02": "dcho", 

66 "03": "dchoo", 

67 "04": "dchoo", 

68 "05": "dchoo", 

69 "06": "dchoo", 

70} 

71 

72STAGE_LICENSE_STEP: dict[str, str] = { 

73 "raw": "00", 

74 "rawp": "01", 

75 "dcho": "02", 

76 "dchoo": "03", 

77} 

78 

79CRM = "http://www.cidoc-crm.org/cidoc-crm/" 

80CRMDIG = "http://www.cidoc-crm.org/extensions/crmdig/" 

81AAT = "http://vocab.getty.edu/aat/" 

82 

83P70I = URIRef(f"{CRM}P70i_is_documented_in") 

84P3_HAS_NOTE = URIRef(f"{CRM}P3_has_note") 

85P14_CARRIED_OUT_BY = URIRef(f"{CRM}P14_carried_out_by") 

86P1_IS_IDENTIFIED_BY = URIRef(f"{CRM}P1_is_identified_by") 

87P190_HAS_SYMBOLIC_CONTENT = URIRef(f"{CRM}P190_has_symbolic_content") 

88P74_HAS_RESIDENCE = URIRef(f"{CRM}P74_has_current_or_former_residence") 

89P32_USED_GENERAL_TECHNIQUE = URIRef(f"{CRM}P32_used_general_technique") 

90P16_USED_SPECIFIC_OBJECT = URIRef(f"{CRM}P16_used_specific_object") 

91E21_PERSON = URIRef(f"{CRM}E21_Person") 

92RDF_TYPE = URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type") 

93L23_USED_SOFTWARE = URIRef(f"{CRMDIG}L23_used_software_or_firmware") 

94 

95AAT_TECHNIQUE_LABELS: dict[str, str] = { 

96 f"{AAT}300266792": "digital photography", 

97 f"{AAT}300429747": "optical scanning", 

98} 

99 

100SLUG_LABELS: dict[str, str] = { 

101 "3df_zephyr": "3DF Zephyr", 

102 "adobe_photoshop_2023": "Adobe Photoshop 2023", 

103 "agisoft_metashape": "Agisoft Metashape", 

104 "artec_eva": "Artec Eva", 

105 "artec_leo": "Artec Leo", 

106 "artec_spider": "Artec Spider", 

107 "artec_studio_14": "Artec Studio 14", 

108 "artec_studio_15": "Artec Studio 15", 

109 "artec_studio_16": "Artec Studio 16", 

110 "artec_studio_19": "Artec Studio 19", 

111 "aton": "ATON", 

112 "blender": "Blender", 

113 "canon_eos_6d": "Canon EOS 6D", 

114 "chad-ap": "CHAD-AP", 

115 "cloudcompare": "CloudCompare", 

116 "gestaltor": "Gestaltor", 

117 "gimp": "GIMP", 

118 "heritrace": "HERITRACE", 

119 "instalod": "InstaLOD", 

120 "instant_meshes": "Instant Meshes", 

121 "lente_24-70_f2_8_l": "Canon EF 24-70mm f/2.8L", 

122 "meshlab": "MeshLab", 

123 "metashape": "Agisoft Metashape", 

124 "microscopio_digitale_bresciani": "Microscopio Digitale Bresciani", 

125 "modo": "Modo", 

126 "morph-kgc": "Morph-KGC", 

127 "nextcloud": "Nextcloud", 

128 "nikkor_35mm": "Nikkor 35mm", 

129 "nikkor_50mm": "Nikkor 50mm", 

130 "nikon_d3300": "Nikon D3300", 

131 "nikon_d5200": "Nikon D5200", 

132 "nikon_d5600": "Nikon D5600", 

133 "nikon_d7200": "Nikon D7200", 

134 "nikon_d750": "Nikon D750", 

135 "panasonic_dmc-lx100": "Panasonic DMC-LX100", 

136 "sony_alpha_6100": "Sony Alpha 6100", 

137 "sony_alpha_7_i": "Sony Alpha 7 I", 

138 "substance_3d_painter": "Substance 3D Painter", 

139 "substance_painter": "Substance Painter", 

140 "zbrush": "ZBrush", 

141} 

142 

143 

144def _format_slug(slug: str) -> str: 

145 return SLUG_LABELS[slug] 

146 

147 

148def load_creators_lookup(path: Path) -> dict[str, dict]: 

149 with open(path) as f: 

150 data = yaml.safe_load(f) 

151 return { 

152 creator["name_in_rdf"]: { 

153 "family_name": creator["family_name"], 

154 "given_name": creator["given_name"], 

155 "affiliation": creator["affiliation"], 

156 "orcid": creator["orcid"], 

157 } 

158 for creator in data["creators"] 

159 } 

160 

161 

162def _format_creator(creator_data: dict, role: str) -> dict: 

163 return { 

164 "person_or_org": { 

165 "type": "personal", 

166 "family_name": creator_data["family_name"], 

167 "given_name": creator_data["given_name"], 

168 "identifiers": [{"scheme": "orcid", "identifier": creator_data["orcid"]}], 

169 }, 

170 "role": {"id": role}, 

171 "affiliations": [{"name": creator_data["affiliation"]}], 

172 } 

173 

174 

175METADATA_STEP = "05" 

176 

177 

178def _extract_actor_names(graph: Graph, act_uri: URIRef) -> set[str]: 

179 names = set() 

180 for _, _, actor_uri in graph.triples((act_uri, P14_CARRIED_OUT_BY, None)): 

181 assert (actor_uri, RDF_TYPE, E21_PERSON) in graph 

182 for _, _, apl_uri in graph.triples((actor_uri, P1_IS_IDENTIFIED_BY, None)): 

183 for _, _, name in graph.triples((apl_uri, P190_HAS_SYMBOLIC_CONTENT, None)): 

184 names.add(str(name)) 

185 return names 

186 

187 

188def extract_authors_for_entity_stage(graph: Graph, entity_ids: list[str], stage: str) -> set[str]: 

189 steps = [s for s in STAGE_STEPS[stage] if s != METADATA_STEP] 

190 authors: set[str] = set() 

191 for eid in entity_ids: 

192 for step in steps: 

193 authors |= _extract_actor_names(graph, URIRef(f"{BASE_URI}/act/{eid}/{step}/1")) 

194 return authors 

195 

196 

197def extract_metadata_authors(graph: Graph, entity_ids: list[str]) -> set[str]: 

198 authors: set[str] = set() 

199 for eid in entity_ids: 

200 authors |= _extract_actor_names(graph, URIRef(f"{BASE_URI}/act/{eid}/05/1")) 

201 return authors 

202 

203 

204def build_creators_for_entity_stage( 

205 graph: Graph, entity_ids: list[str], stage: str, creators_lookup: dict[str, dict] 

206) -> list[dict]: 

207 author_names = extract_authors_for_entity_stage(graph, entity_ids, stage) 

208 return [ 

209 _format_creator(creators_lookup[name], "researcher") 

210 for name in sorted(author_names) 

211 if name in creators_lookup 

212 ] 

213 

214 

215def build_metadata_creators( 

216 graph: Graph, entity_ids: list[str], creators_lookup: dict[str, dict] 

217) -> list[dict]: 

218 author_names = extract_metadata_authors(graph, entity_ids) 

219 return [ 

220 _format_creator(creators_lookup[name], "datacurator") 

221 for name in sorted(author_names) 

222 if name in creators_lookup 

223 ] 

224 

225 

226def merge_creators(digitization_creators: list[dict], metadata_creators: list[dict]) -> list[dict]: 

227 seen_orcids: set[str] = set() 

228 merged: list[dict] = [] 

229 for creator in digitization_creators: 

230 orcid = creator["person_or_org"]["identifiers"][0]["identifier"] 

231 seen_orcids.add(orcid) 

232 merged.append(creator) 

233 for creator in metadata_creators: 

234 orcid = creator["person_or_org"]["identifiers"][0]["identifier"] 

235 if orcid not in seen_orcids: 

236 seen_orcids.add(orcid) 

237 merged.append(creator) 

238 return merged 

239 

240 

241def extract_licensed_entity_stages(graph: Graph) -> set[tuple[str, str]]: 

242 pattern = re.compile(rf"^{re.escape(BASE_URI)}/lic/([^/]+)/(\d{{2}})/1$") 

243 licensed = set() 

244 for s, p, o in graph.triples((None, P70I, None)): 

245 match = pattern.match(str(s)) 

246 if match: 

247 entity_id, step = match.groups() 

248 stage = STEP_TO_STAGE.get(step) 

249 if stage: 

250 licensed.add((entity_id, stage)) 

251 return licensed 

252 

253 

254def group_folders_by_entity(structure: dict) -> dict[str, list[tuple[str, str, dict]]]: 

255 groups = defaultdict(list) 

256 for sala_name, sala_items in structure["structure"].items(): 

257 for folder_name, subfolders in sala_items.items(): 

258 if folder_name in SKIP_FOLDERS: 

259 continue 

260 entity_id = extract_id_from_folder_name(folder_name) 

261 if entity_id in FOLDER_TO_ID.values(): 

262 base_id = entity_id 

263 else: 

264 base_id = entity_id.rstrip("abcdefghijklmnopqrstuvwxyz") 

265 groups[base_id].append((sala_name, folder_name, subfolders)) 

266 return dict(groups) 

267 

268 

269STAGES = ("raw", "rawp", "dcho", "dchoo") 

270 

271 

272def _extract_license_from_meta(stage_dir: Path, stage: str) -> str | None: 

273 g = Graph() 

274 g.parse(stage_dir / "meta.ttl", format="turtle") 

275 expected_step = STAGE_LICENSE_STEP[stage] 

276 for s, _, o in g.triples((None, P70I, None)): 

277 s_str = str(s) 

278 if "/lic/" in s_str: 

279 step_match = re.search(r"/(\d{2})/\d+$", s_str) 

280 if step_match and step_match.group(1) == expected_step: 

281 return LICENSE_URI_TO_ZENODO.get(str(o)) 

282 return None 

283 

284 

285def create_stage_zip( 

286 entity_id: str, 

287 stage: str, 

288 folders: list[tuple[str, str, dict]], 

289 root: Path, 

290 output_dir: Path, 

291 title: str, 

292) -> tuple[Path, str | None] | None: 

293 stage_dirs: list[tuple[str, str, Path]] = [] 

294 license_id: str | None = None 

295 for sala_name, folder_name, stages_dict in folders: 

296 stage_name_in_folder = None 

297 for name in stages_dict: 

298 if name.lower() == stage: 

299 stage_name_in_folder = name 

300 break 

301 if stage_name_in_folder is None: 

302 continue 

303 stage_dir = root / sala_name / folder_name / stage_name_in_folder 

304 stage_dirs.append((folder_name, stage_name_in_folder, stage_dir)) 

305 folder_license = _extract_license_from_meta(stage_dir, stage) 

306 if folder_license: 

307 license_id = folder_license 

308 if not stage_dirs: 

309 return None 

310 sala_slug = slugify(folders[0][0]) 

311 title_slug = slugify(title) 

312 zip_path = output_dir / f"{sala_slug}-{title_slug}-{entity_id}-{stage}.zip" 

313 with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: 

314 for folder_name, stage_name_in_folder, stage_dir in stage_dirs: 

315 for file_path in stage_dir.rglob("*"): 

316 if not file_path.is_file(): 

317 continue 

318 if license_id or file_path.name in ("meta.ttl", "prov.trig"): 

319 rel = file_path.relative_to(stage_dir) 

320 arc_name = f"{folder_name}/{stage_name_in_folder}/{rel}" 

321 zf.write(file_path, arc_name) 

322 return zip_path, license_id 

323 

324 

325def _get_label(graph: Graph, uri: URIRef) -> str | None: 

326 for _, _, apl_uri in graph.triples((uri, P1_IS_IDENTIFIED_BY, None)): 

327 for _, _, name in graph.triples((apl_uri, P190_HAS_SYMBOLIC_CONTENT, None)): 

328 return str(name) 

329 return None 

330 

331 

332def extract_keeper_info(graph: Graph, entity_ids: list[str]) -> tuple[str | None, str | None]: 

333 for eid in entity_ids: 

334 custody_uri = URIRef(f"{BASE_URI}/act/{eid}/ob08/1") 

335 for _, _, keeper_uri in graph.triples((custody_uri, P14_CARRIED_OUT_BY, None)): 

336 assert isinstance(keeper_uri, URIRef) 

337 keeper_name = _get_label(graph, keeper_uri) 

338 location_name = None 

339 for _, _, place_uri in graph.triples((keeper_uri, P74_HAS_RESIDENCE, None)): 

340 assert isinstance(place_uri, URIRef) 

341 location_name = _get_label(graph, place_uri) 

342 return keeper_name, location_name 

343 return None, None 

344 

345 

346def extract_entity_title(graph: Graph, entity_ids: list[str]) -> str: 

347 for eid in entity_ids: 

348 item_uri = URIRef(f"{BASE_URI}/itm/{eid}/ob00/1") 

349 for s, p, o in graph.triples((item_uri, P3_HAS_NOTE, None)): 

350 note = str(o) 

351 return re.split(r"\n|\\n", note)[0].strip() 

352 return f"Entity {entity_ids[0]}" 

353 

354 

355def extract_acquisition_technique(graph: Graph, entity_ids: list[str]) -> str | None: 

356 for eid in entity_ids: 

357 act_uri = URIRef(f"{BASE_URI}/act/{eid}/00/1") 

358 for _, _, technique_uri in graph.triples((act_uri, P32_USED_GENERAL_TECHNIQUE, None)): 

359 return AAT_TECHNIQUE_LABELS[str(technique_uri)] 

360 return None 

361 

362 

363def extract_devices(graph: Graph, entity_ids: list[str]) -> list[str]: 

364 devices: set[str] = set() 

365 for eid in entity_ids: 

366 act_uri = URIRef(f"{BASE_URI}/act/{eid}/00/1") 

367 for _, _, obj_uri in graph.triples((act_uri, P16_USED_SPECIFIC_OBJECT, None)): 

368 uri_str = str(obj_uri) 

369 if "/dev/" in uri_str: 

370 slug = uri_str.split("/dev/")[1].split("/")[0] 

371 devices.add(_format_slug(slug)) 

372 return sorted(devices) 

373 

374 

375def extract_software_for_stage(graph: Graph, entity_ids: list[str], stage: str) -> list[str]: 

376 steps = [s for s in STAGE_STEPS[stage] if s != METADATA_STEP] 

377 software: set[str] = set() 

378 for eid in entity_ids: 

379 for step in steps: 

380 act_uri = URIRef(f"{BASE_URI}/act/{eid}/{step}/1") 

381 for _, _, sfw_uri in graph.triples((act_uri, L23_USED_SOFTWARE, None)): 

382 uri_str = str(sfw_uri) 

383 slug = uri_str.split("/sfw/")[1].split("/")[0] 

384 software.add(_format_slug(slug)) 

385 return sorted(software) 

386 

387 

388LICENSE_URI_TO_ZENODO = { 

389 "https://creativecommons.org/publicdomain/zero/1.0/": "cc0-1.0", 

390 "https://creativecommons.org/licenses/by/4.0/": "cc-by-4.0", 

391 "https://creativecommons.org/licenses/by-nc/4.0/": "cc-by-nc-4.0", 

392 "https://creativecommons.org/licenses/by-sa/4.0/": "cc-by-sa-4.0", 

393 "https://creativecommons.org/licenses/by-nc-sa/4.0/": "cc-by-nc-sa-4.0", 

394} 

395 

396STAGE_TITLE_NAMES = { 

397 "raw": "Raw", 

398 "rawp": "Processed raw model", 

399 "dcho": "Digital Cultural Heritage Object", 

400 "dchoo": "Optimized Digital Cultural Heritage Object", 

401} 

402 

403STAGE_DESCRIPTION_NAMES = { 

404 "raw": "Raw acquisition data", 

405 "rawp": "Processed raw model", 

406 "dcho": "Digital Cultural Heritage Object", 

407 "dchoo": "Optimized Digital Cultural Heritage Object", 

408} 

409 

410STAGE_DESCRIPTIONS = { 

411 "raw": "This dataset contains the raw material generated during the acquisition phase.", 

412 "rawp": "This dataset contains the preliminary output from the photogrammetry or scanner software after initial data processing but without any interpolation or geometry corrections.", 

413 "dcho": "This dataset contains the version that includes interpolation, gap filling, and resolution of geometric issues, resulting in a refined and improved model.", 

414 "dchoo": "This dataset contains the version optimised for real-time online interaction.", 

415} 

416 

417PROPAGATED_FIELDS = ( 

418 "zenodo_url", "access_token", "user_agent", 

419 "subjects", "publication_date", 

420 "version", "community", 

421 "contributors", "funding", 

422 "references", "dates", 

423) 

424 

425 

426def extract_license_for_entity_stage(graph: Graph, entity_id: str, stage: str) -> str | None: 

427 step = STAGE_LICENSE_STEP[stage] 

428 lic_uri = URIRef(f"{BASE_URI}/lic/{entity_id}/{step}/1") 

429 for _, _, license_url in graph.triples((lic_uri, P70I, None)): 

430 return LICENSE_URI_TO_ZENODO.get(str(license_url)) 

431 return None 

432 

433 

434CC0_DISCLAIMER = ( 

435 "No copyright or related rights are claimed in these digital reproductions. " 

436 "The files are released under CC0 1.0 Universal (Public Domain Dedication).\n" 

437 "\n" 

438 "Please note that the original works may qualify as cultural heritage assets " 

439 "under Italian law (D. Lgs. 42/2004). Consequently, although the digital " 

440 "reproductions are released under CC0, certain uses — and in particular " 

441 "commercial uses — may be subject to specific authorisations, restrictions, " 

442 "or fees pursuant to the applicable provisions governing the reproduction " 

443 "and publication of cultural heritage assets. Users are therefore responsible " 

444 "for ensuring compliance with Italian cultural heritage regulations before " 

445 "undertaking any commercial exploitation of the images." 

446) 

447 

448 

449CHAD_AP_URL = "https://w3id.org/dharc/ontology/chad-ap" 

450 

451RESTRICTED_NOTICE = ( 

452 "The digital object files are not included in this dataset " 

453 "because the holding institution did not grant permission for their publication. " 

454 "Only metadata and provenance files are provided." 

455) 

456 

457 

458def build_enhanced_description( 

459 stage: str, 

460 title: str, 

461 keeper_name: str | None = None, 

462 keeper_location: str | None = None, 

463) -> str: 

464 parts = [ 

465 f'{STAGE_DESCRIPTION_NAMES[stage]} of "{title}" from the Aldrovandi Digital Twin.', 

466 ] 

467 if keeper_name: 

468 keeper_line = f"The original object is held by {keeper_name}" 

469 if keeper_location: 

470 keeper_line += f" ({keeper_location})" 

471 keeper_line += "." 

472 parts.append(keeper_line) 

473 parts.append(STAGE_DESCRIPTIONS[stage]) 

474 parts.append( 

475 f"Includes metadata (meta.ttl) and provenance (prov.trig) files following the <a href=\"{CHAD_AP_URL}\">CHAD-AP</a> ontology.", 

476 ) 

477 return " ".join(parts) + "\n" 

478 

479 

480WORKFLOW_DOI_URL = "https://doi.org/10.46298/transformations.14773" 

481 

482 

483def build_methods_description( 

484 graph: Graph, 

485 entity_ids: list[str], 

486 stage: str, 

487) -> str: 

488 parts = [ 

489 f'Acquisition and digitization followed the reproducible workflow documented in ' 

490 f'<a href="{WORKFLOW_DOI_URL}">doi:10.46298/transformations.14773</a>.', 

491 ] 

492 technique = extract_acquisition_technique(graph, entity_ids) 

493 devices = extract_devices(graph, entity_ids) 

494 if technique: 

495 line = f"Data was acquired using {technique}" 

496 if devices: 

497 line += f" ({', '.join(devices)})" 

498 line += "." 

499 parts.append(line) 

500 software = extract_software_for_stage(graph, entity_ids, stage) 

501 if software: 

502 parts.append(f"Processing software: {', '.join(software)}.") 

503 parts.append( 

504 f'Metadata follows the <a href="{CHAD_AP_URL}">Cultural Heritage Acquisition and ' 

505 f"Digitisation Application Profile (CHAD-AP)</a> based on CIDOC-CRM.", 

506 ) 

507 return "\n\n".join(parts) + "\n" 

508 

509 

510def build_entity_uri(entity_ids: list[str]) -> str: 

511 return f"{BASE_URI}/itm/{entity_ids[0]}/ob00/1" 

512 

513 

514LICENSE_INFO = { 

515 "cc0-1.0": { 

516 "title": "Creative Commons Zero v1.0 Universal", 

517 "link": "https://creativecommons.org/publicdomain/zero/1.0/", 

518 }, 

519 "cc-by-4.0": { 

520 "title": "Creative Commons Attribution 4.0 International", 

521 "link": "https://creativecommons.org/licenses/by/4.0/", 

522 }, 

523 "cc-by-nc-4.0": { 

524 "title": "Creative Commons Attribution Non Commercial 4.0 International", 

525 "link": "https://creativecommons.org/licenses/by-nc/4.0/", 

526 }, 

527 "cc-by-sa-4.0": { 

528 "title": "Creative Commons Attribution Share Alike 4.0 International", 

529 "link": "https://creativecommons.org/licenses/by-sa/4.0/", 

530 }, 

531 "cc-by-nc-sa-4.0": { 

532 "title": "Creative Commons Attribution Non Commercial Share Alike 4.0 International", 

533 "link": "https://creativecommons.org/licenses/by-nc-sa/4.0/", 

534 }, 

535} 

536 

537 

538def build_rights(content_license: str | None) -> list[dict]: 

539 metadata_info = LICENSE_INFO["cc0-1.0"] 

540 rights = [{ 

541 "title": {"en": f"{metadata_info['title']} (Metadata license)"}, 

542 "description": {"en": "Applies to metadata files: meta.ttl, prov.trig"}, 

543 "link": metadata_info["link"], 

544 }] 

545 if content_license and content_license in LICENSE_INFO: 

546 content_info = LICENSE_INFO[content_license] 

547 rights.append({ 

548 "title": {"en": f"{content_info['title']} (Content license)"}, 

549 "description": {"en": "Applies to all data files except meta.ttl and prov.trig"}, 

550 "link": content_info["link"], 

551 }) 

552 return rights 

553 

554 

555def generate_zenodo_config( 

556 stage: str, 

557 zip_path: Path, 

558 title: str, 

559 base_config: dict, 

560 creators: list[dict], 

561 methods_description: str, 

562 license: str | None = None, 

563 entity_uri: str | None = None, 

564 keeper_name: str | None = None, 

565 keeper_location: str | None = None, 

566 has_license: bool = True, 

567) -> dict: 

568 description = build_enhanced_description(stage, title, keeper_name, keeper_location) 

569 

570 config: dict = { 

571 "title": f"{title} - {STAGE_TITLE_NAMES[stage]} - Aldrovandi Digital Twin", 

572 "description": description, 

573 "resource_type": {"id": "dataset"}, 

574 "publisher": "Zenodo", 

575 "access": {"record": "public", "files": "public"}, 

576 "files": [str(zip_path.absolute())], 

577 "creators": creators, 

578 "publication_date": date.today().isoformat(), 

579 "rights": build_rights(license), 

580 } 

581 

582 additional_descriptions: list[dict] = [ 

583 { 

584 "description": methods_description, 

585 "type": {"id": "methods"}, 

586 }, 

587 { 

588 "description": base_config["notes"], 

589 "type": {"id": "notes"}, 

590 }, 

591 ] 

592 if not has_license: 

593 additional_descriptions.append({ 

594 "description": RESTRICTED_NOTICE, 

595 "type": {"id": "notes"}, 

596 }) 

597 if license == "cc0-1.0": 

598 additional_descriptions.append({ 

599 "description": CC0_DISCLAIMER, 

600 "type": {"id": "notes"}, 

601 }) 

602 config["additional_descriptions"] = additional_descriptions 

603 

604 config["locations"] = { 

605 "features": [ 

606 { 

607 "geometry": { 

608 "type": "Point", 

609 "coordinates": [loc["lon"], loc["lat"]], 

610 }, 

611 "place": loc["place"], 

612 "description": loc["description"], 

613 } 

614 for loc in base_config["locations"] 

615 ] 

616 } 

617 

618 for field in PROPAGATED_FIELDS: 

619 if field in base_config and field not in config: 

620 config[field] = base_config[field] 

621 

622 if "related_identifiers" in base_config: 

623 converted = [] 

624 for ri in base_config["related_identifiers"]: 

625 entry: dict = { 

626 "identifier": ri["identifier"], 

627 "relation_type": {"id": ri["relation"]}, 

628 } 

629 if "resource_type" in ri: 

630 entry["resource_type"] = {"id": ri["resource_type"]} 

631 if "scheme" in ri: 

632 entry["scheme"] = ri["scheme"] 

633 converted.append(entry) 

634 config["related_identifiers"] = converted 

635 

636 if entity_uri: 

637 config["identifiers"] = [{"identifier": entity_uri, "scheme": "url"}] 

638 

639 return config 

640 

641 

642def _get_sub_entity_ids(folders: list[tuple[str, str, dict]]) -> list[str]: 

643 seen: set[str] = set() 

644 ids: list[str] = [] 

645 for _, folder_name, _ in folders: 

646 eid = extract_id_from_folder_name(folder_name) 

647 if eid not in seen: 

648 seen.add(eid) 

649 ids.append(eid) 

650 return ids 

651 

652 

653_worker_kg: Graph 

654_worker_base_config: dict 

655_worker_creators_lookup: dict 

656 

657 

658def _init_worker(kg_path: Path, base_config: dict, creators_lookup: dict) -> None: 

659 global _worker_kg, _worker_base_config, _worker_creators_lookup 

660 _worker_kg = load_kg(kg_path) 

661 _worker_base_config = base_config 

662 _worker_creators_lookup = creators_lookup 

663 

664 

665def _process_entity( 

666 entity_id: str, 

667 folders: list[tuple[str, str, dict]], 

668 root: Path, 

669 zips_dir: Path, 

670 configs_dir: Path, 

671) -> None: 

672 kg = _worker_kg 

673 base_config = _worker_base_config 

674 creators_lookup = _worker_creators_lookup 

675 sub_ids = _get_sub_entity_ids(folders) 

676 title = extract_entity_title(kg, sub_ids) 

677 keeper_name, keeper_location = extract_keeper_info(kg, sub_ids) 

678 sala_slug = slugify(folders[0][0]) 

679 title_slug = slugify(title) 

680 metadata_creators = build_metadata_creators(kg, sub_ids, creators_lookup) 

681 for stage in STAGES: 

682 result = create_stage_zip(entity_id, stage, folders, root, zips_dir, title) 

683 if result is None: 

684 continue 

685 zip_path, license = result 

686 has_license = license is not None 

687 digitization_creators = build_creators_for_entity_stage(kg, sub_ids, stage, creators_lookup) 

688 creators = merge_creators(digitization_creators, metadata_creators) 

689 entity_uri = build_entity_uri(sub_ids) 

690 methods_description = build_methods_description(kg, sub_ids, stage) 

691 config = generate_zenodo_config(stage, zip_path, title, base_config, creators, methods_description, license, entity_uri, keeper_name, keeper_location, has_license) 

692 config_path = configs_dir / f"{sala_slug}-{title_slug}-{entity_id}-{stage}.yaml" 

693 with open(config_path, "w") as f: 

694 yaml.dump(config, f, Dumper=LiteralBlockDumper, default_flow_style=False, allow_unicode=True, sort_keys=False) 

695 

696 

697def prepare_all( 

698 root: Path, 

699 zenodo_base_config_path: Path, 

700 output_dir: Path, 

701 kg_path: Path = KG_PATH, 

702) -> None: 

703 structure = scan_folder_structure(root) 

704 entity_groups = group_folders_by_entity(structure) 

705 

706 with open(zenodo_base_config_path) as f: 

707 base_config = yaml.safe_load(f) 

708 

709 creators_lookup = load_creators_lookup(CREATORS_LOOKUP_PATH) 

710 

711 zips_dir = output_dir / "zips" 

712 configs_dir = output_dir / "configs" 

713 zips_dir.mkdir(parents=True, exist_ok=True) 

714 configs_dir.mkdir(parents=True, exist_ok=True) 

715 

716 with ProcessPoolExecutor( 

717 initializer=_init_worker, 

718 initargs=(kg_path, base_config, creators_lookup), 

719 ) as executor: 

720 futures = { 

721 executor.submit(_process_entity, entity_id, folders, root, zips_dir, configs_dir): entity_id 

722 for entity_id, folders in entity_groups.items() 

723 } 

724 with Progress( 

725 SpinnerColumn(), 

726 TextColumn("[progress.description]{task.description}"), 

727 BarColumn(), 

728 MofNCompleteColumn(), 

729 TimeElapsedColumn(), 

730 TimeRemainingColumn(), 

731 ) as progress: 

732 task = progress.add_task("Creating stage packages", total=len(futures)) 

733 for future in as_completed(futures): 

734 entity_id = futures[future] 

735 future.result() 

736 progress.update(task, description=f"Completed entity {entity_id}") 

737 progress.advance(task) 

738 

739 

740def _extract_doi(record: Mapping[str, object]) -> str: 

741 pids = record.get("pids", {}) 

742 doi_info = pids.get("doi", {}) if isinstance(pids, dict) else {} 

743 return doi_info.get("identifier", "") if isinstance(doi_info, dict) else "" 

744 

745 

746def _extract_record_url(record: Mapping[str, object]) -> str: 

747 links = record["links"] 

748 assert isinstance(links, dict) 

749 return links["self_html"] 

750 

751 

752LICENSE_TITLE_TO_SHORT: dict[str, str] = { 

753 info["title"]: short_name for short_name, info in LICENSE_INFO.items() 

754} 

755 

756 

757def _format_creators_for_table(config: dict) -> str: 

758 creators = config["creators"] 

759 parts: list[str] = [] 

760 for c in creators: 

761 org = c["person_or_org"] 

762 orcid = org["identifiers"][0]["identifier"] 

763 parts.append(f"{org['family_name']}, {org['given_name']} [orcid:{orcid}]") 

764 return "; ".join(parts) 

765 

766 

767def _format_licenses_for_table(config: dict) -> str: 

768 parts: list[str] = [] 

769 for right in config["rights"]: 

770 title_en = right["title"]["en"] 

771 for full_name, short_name in LICENSE_TITLE_TO_SHORT.items(): 

772 if title_en.startswith(full_name): 

773 context = title_en.removeprefix(full_name).strip(" ()") 

774 parts.append(f"{short_name} ({context})") 

775 break 

776 return "; ".join(parts) 

777 

778 

779DOI_TABLE_FIELDNAMES = [ 

780 "Numero su DMP", 

781 "Caso di studio", 

782 "Autore/i", 

783 "Tipo", 

784 "Titolo", 

785 "Data pubblicazione", 

786 "DOI", 

787 "URL", 

788 "Repository", 

789 "Licenza", 

790 "Note", 

791] 

792 

793 

794def _atomic_write_json(path: Path, data: list) -> None: 

795 fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix=".tmp") 

796 with os.fdopen(fd, "w") as f: 

797 json.dump(data, f, indent=2) 

798 os.replace(tmp_path, path) 

799 

800 

801@contextmanager 

802def _graceful_shutdown(): 

803 stop = [False] 

804 original = signal.getsignal(signal.SIGINT) 

805 def handler(signum, frame): 

806 if stop[0]: 

807 signal.signal(signal.SIGINT, original) 

808 raise KeyboardInterrupt 

809 stop[0] = True 

810 print("Finishing current record, then stopping...") 

811 signal.signal(signal.SIGINT, handler) 

812 try: 

813 yield stop 

814 finally: 

815 signal.signal(signal.SIGINT, original) 

816 

817 

818def _write_doi_table(drafts: list[dict], output_dir: Path) -> Path: 

819 rows: list[dict[str, str]] = [] 

820 for draft in drafts: 

821 if draft["status"] == "failed": 

822 continue 

823 with open(draft["config_file"]) as f: 

824 config = yaml.safe_load(f) 

825 rows.append({ 

826 "Numero su DMP": "", 

827 "Caso di studio": "Aldrovandi", 

828 "Autore/i": _format_creators_for_table(config), 

829 "Tipo": "Dataset", 

830 "Titolo": config["title"], 

831 "Data pubblicazione": config["publication_date"], 

832 "DOI": draft["doi"], 

833 "URL": draft["record_url"], 

834 "Repository": "Zenodo", 

835 "Licenza": _format_licenses_for_table(config), 

836 "Note": "", 

837 }) 

838 csv_path = output_dir / "doi_table.csv" 

839 with open(csv_path, "w", newline="") as f: 

840 writer = csv.DictWriter(f, fieldnames=DOI_TABLE_FIELDNAMES) 

841 writer.writeheader() 

842 writer.writerows(rows) 

843 return csv_path 

844 

845 

846def upload_all(configs_dir: Path, publish: bool = False) -> Path: 

847 config_files = sorted(configs_dir.glob("*.yaml")) 

848 drafts_path = configs_dir.parent / "drafts.json" 

849 

850 drafts: list[dict] = [] 

851 if drafts_path.exists(): 

852 with open(drafts_path) as f: 

853 drafts = json.load(f) 

854 

855 completed_stems = { 

856 Path(d["config_file"]).stem for d in drafts 

857 if d["status"] in ("uploaded", "published") 

858 } 

859 

860 skipped = 0 

861 failed = 0 

862 uploaded = 0 

863 

864 with _graceful_shutdown() as shutdown, Progress( 

865 SpinnerColumn(), 

866 TextColumn("[progress.description]{task.description}"), 

867 BarColumn(), 

868 MofNCompleteColumn(), 

869 ) as progress: 

870 task = progress.add_task("Uploading to Zenodo", total=len(config_files)) 

871 for config_file in config_files: 

872 if shutdown[0]: 

873 print("\nGraceful shutdown: stopping after last completed record.") 

874 break 

875 if config_file.stem in completed_stems: 

876 skipped += 1 

877 progress.update(task, description=f"Skipped {config_file.stem}") 

878 progress.advance(task) 

879 continue 

880 

881 progress.update(task, description=f"Uploading {config_file.stem}") 

882 try: 

883 record = piccione_upload(str(config_file), publish=publish) 

884 with open(config_file) as f: 

885 config = yaml.safe_load(f) 

886 drafts = [d for d in drafts if not (Path(d["config_file"]).stem == config_file.stem and d["status"] == "failed")] 

887 drafts.append({ 

888 "draft_id": record["id"], 

889 "config_file": str(config_file), 

890 "title": config["title"], 

891 "zenodo_url": config["zenodo_url"], 

892 "access_token": config["access_token"], 

893 "user_agent": config["user_agent"], 

894 "status": "published" if publish else "uploaded", 

895 "doi": _extract_doi(record), 

896 "record_url": _extract_record_url(record), 

897 }) 

898 uploaded += 1 

899 except Exception as exc: 

900 drafts.append({ 

901 "draft_id": "", 

902 "config_file": str(config_file), 

903 "title": config_file.stem, 

904 "zenodo_url": "", 

905 "access_token": "", 

906 "user_agent": "", 

907 "status": "failed", 

908 "doi": "", 

909 "record_url": "", 

910 "error": str(exc), 

911 }) 

912 failed += 1 

913 print(f"\n[FAILED] {config_file.stem}: {exc}") 

914 

915 _atomic_write_json(drafts_path, drafts) 

916 time.sleep(2) 

917 progress.advance(task) 

918 

919 csv_path = _write_doi_table(drafts, configs_dir.parent) 

920 print(f"DOI table written to {csv_path}") 

921 print(f"Drafts saved to {drafts_path}") 

922 print(f"Summary: {uploaded} uploaded, {skipped} skipped, {failed} failed (of {len(config_files)} total)") 

923 return csv_path 

924 

925 

926def publish_all_drafts(drafts_path: Path) -> Path: 

927 with open(drafts_path) as f: 

928 drafts: list[dict] = json.load(f) 

929 

930 publishable = [d for d in drafts if d["status"] in ("uploaded", "publish_failed")] 

931 published = 0 

932 failed = 0 

933 

934 with _graceful_shutdown() as shutdown, Progress( 

935 SpinnerColumn(), 

936 TextColumn("[progress.description]{task.description}"), 

937 BarColumn(), 

938 MofNCompleteColumn(), 

939 ) as progress: 

940 task = progress.add_task("Publishing drafts", total=len(publishable)) 

941 for draft in publishable: 

942 if shutdown[0]: 

943 print("\nGraceful shutdown: stopping after last completed record.") 

944 break 

945 progress.update(task, description=f"Publishing {draft['title']}") 

946 try: 

947 base_url = draft["zenodo_url"].rstrip("/") 

948 record = piccione_publish_draft( 

949 base_url, draft["access_token"], draft["draft_id"], draft["user_agent"], 

950 ) 

951 draft["status"] = "published" 

952 draft["doi"] = _extract_doi(record) 

953 draft["record_url"] = _extract_record_url(record) 

954 if "error" in draft: 

955 del draft["error"] 

956 published += 1 

957 except Exception as exc: 

958 draft["status"] = "publish_failed" 

959 draft["error"] = str(exc) 

960 failed += 1 

961 print(f"\n[FAILED] {draft['title']}: {exc}") 

962 

963 _atomic_write_json(drafts_path, drafts) 

964 time.sleep(2) 

965 progress.advance(task) 

966 

967 skipped = len(drafts) - len(publishable) 

968 csv_path = _write_doi_table(drafts, drafts_path.parent) 

969 print(f"DOI table written to {csv_path}") 

970 print(f"Summary: {published} published, {skipped} skipped, {failed} failed") 

971 return csv_path 

972 

973 

974def sync_status(drafts_path: Path) -> Path: 

975 with open(drafts_path) as f: 

976 drafts: list[dict] = json.load(f) 

977 

978 updated = 0 

979 with Progress( 

980 SpinnerColumn(), 

981 TextColumn("[progress.description]{task.description}"), 

982 BarColumn(), 

983 MofNCompleteColumn(), 

984 ) as progress: 

985 task = progress.add_task("Syncing status from Zenodo", total=len(drafts)) 

986 for draft in drafts: 

987 progress.update(task, description=f"Querying {draft['draft_id']}") 

988 headers = get_headers(draft["access_token"], draft["user_agent"]) 

989 base_url = draft["zenodo_url"].rstrip("/") 

990 resp = requests.get(f"{base_url}/records/{draft['draft_id']}", headers=headers) 

991 if resp.status_code == 404: 

992 resp = requests.get(f"{base_url}/records/{draft['draft_id']}/draft", headers=headers) 

993 resp.raise_for_status() 

994 record = resp.json() 

995 new_status = record["status"] 

996 new_doi = record.get("doi", "") 

997 new_url = record["links"]["self_html"] 

998 if draft["status"] != new_status or draft["doi"] != new_doi or draft["record_url"] != new_url: 

999 draft["status"] = new_status 

1000 draft["doi"] = new_doi 

1001 draft["record_url"] = new_url 

1002 updated += 1 

1003 time.sleep(0.5) 

1004 progress.advance(task) 

1005 

1006 _atomic_write_json(drafts_path, drafts) 

1007 csv_path = _write_doi_table(drafts, drafts_path.parent) 

1008 print(f"Updated {updated} of {len(drafts)} entries") 

1009 print(f"DOI table written to {csv_path}") 

1010 return csv_path 

1011 

1012 

1013def _normalize_quotes(text: str) -> str: 

1014 return text.replace("‘", "'").replace("’", "'").replace("“", '"').replace("”", '"') 

1015 

1016 

1017def cleanup_duplicates(drafts_path: Path, dry_run: bool = False) -> None: 

1018 with open(drafts_path) as f: 

1019 drafts: list[dict] = json.load(f) 

1020 

1021 known_ids = {d["draft_id"] for d in drafts} 

1022 known_titles = {_normalize_quotes(d["title"]) for d in drafts} 

1023 token = drafts[0]["access_token"] 

1024 base_url = drafts[0]["zenodo_url"].rstrip("/") 

1025 ua = drafts[0]["user_agent"] 

1026 headers = get_headers(token, ua) 

1027 

1028 duplicates: list[dict] = [] 

1029 page = 1 

1030 while True: 

1031 resp = requests.get(f"{base_url}/user/records", params={ 

1032 "size": 100, 

1033 "page": page, 

1034 }, headers=headers) 

1035 resp.raise_for_status() 

1036 hits = resp.json()["hits"]["hits"] 

1037 if not hits: 

1038 break 

1039 for hit in hits: 

1040 title = hit.get("title", hit.get("metadata", {}).get("title", "")) 

1041 if hit["id"] not in known_ids and _normalize_quotes(title) in known_titles: 

1042 duplicates.append(hit) 

1043 page += 1 

1044 

1045 if not duplicates: 

1046 print("No duplicates found.") 

1047 return 

1048 

1049 draft_dups = [d for d in duplicates if d.get("status") != "published"] 

1050 published_dups = [d for d in duplicates if d.get("status") == "published"] 

1051 

1052 print(f"Found {len(duplicates)} duplicate(s): {len(draft_dups)} draft(s), {len(published_dups)} published") 

1053 

1054 for dup in published_dups: 

1055 print(f" [PUBLISHED - cannot delete] id={dup['id']}, doi={dup.get('doi', '')}, title={dup.get('title', '')}") 

1056 

1057 deleted = 0 

1058 for dup in draft_dups: 

1059 title = dup.get("title", dup.get("metadata", {}).get("title", "")) 

1060 if dry_run: 

1061 print(f" [DRY RUN] Would delete draft id={dup['id']}, title={title}") 

1062 else: 

1063 resp = requests.delete(f"{base_url}/records/{dup['id']}/draft", headers=headers) 

1064 if resp.status_code == 204: 

1065 deleted += 1 

1066 print(f" [DELETED] id={dup['id']}, title={title}") 

1067 else: 

1068 print(f" [FAILED] id={dup['id']}, status={resp.status_code}, body={resp.text[:200]}") 

1069 time.sleep(1) 

1070 

1071 if dry_run: 

1072 print(f"Dry run complete. {len(draft_dups)} draft(s) would be deleted.") 

1073 else: 

1074 print(f"Deleted {deleted} of {len(draft_dups)} draft duplicate(s).") 

1075 

1076 

1077def parse_arguments(): # pragma: no cover 

1078 parser = argparse.ArgumentParser(description="Prepare and upload Zenodo packages") 

1079 subparsers = parser.add_subparsers(dest="command", required=True) 

1080 

1081 prepare_parser = subparsers.add_parser("prepare", help="Create zips and YAML configs") 

1082 prepare_parser.add_argument("root", type=Path, help="Root directory with Sala/Folder/Stage structure") 

1083 prepare_parser.add_argument("zenodo_config", type=Path, help="Base Zenodo configuration YAML") 

1084 prepare_parser.add_argument("--output", "-o", type=Path, default=Path("zenodo_output"), help="Output directory") 

1085 upload_parser = subparsers.add_parser("upload", help="Upload to Zenodo") 

1086 upload_parser.add_argument("configs_dir", type=Path, help="Directory containing YAML configs") 

1087 upload_parser.add_argument("--publish", action="store_true", help="Publish after upload") 

1088 publish_parser = subparsers.add_parser("publish-drafts", help="Publish previously uploaded drafts") 

1089 publish_parser.add_argument("drafts_file", type=Path, help="Path to drafts.json from a previous upload") 

1090 sync_parser = subparsers.add_parser("sync-status", help="Sync drafts.json with actual Zenodo record status") 

1091 sync_parser.add_argument("drafts_file", type=Path, help="Path to drafts.json") 

1092 cleanup_parser = subparsers.add_parser("cleanup-duplicates", help="Find and delete duplicate records not in drafts.json") 

1093 cleanup_parser.add_argument("drafts_file", type=Path, help="Path to drafts.json") 

1094 cleanup_parser.add_argument("--dry-run", action="store_true", help="Only report duplicates, don't delete") 

1095 

1096 return parser.parse_args() 

1097 

1098 

1099def main(): # pragma: no cover 

1100 args = parse_arguments() 

1101 if args.command == "prepare": 

1102 prepare_all( 

1103 root=args.root, 

1104 zenodo_base_config_path=args.zenodo_config, 

1105 output_dir=args.output, 

1106 ) 

1107 elif args.command == "upload": 

1108 upload_all(configs_dir=args.configs_dir, publish=args.publish) 

1109 elif args.command == "publish-drafts": 

1110 publish_all_drafts(drafts_path=args.drafts_file) 

1111 elif args.command == "sync-status": 

1112 sync_status(drafts_path=args.drafts_file) 

1113 elif args.command == "cleanup-duplicates": 

1114 cleanup_duplicates(drafts_path=args.drafts_file, dry_run=args.dry_run) 

1115 

1116 

1117if __name__ == "__main__": # pragma: no cover 

1118 main()