Coverage for changes_metadata_manager / zenodo_upload.py: 76%
568 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-29 18:29 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-29 18:29 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelomas@gmail.com>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import csv
7import json
8import os
9import re
10import signal
11import tempfile
12import time
13import unicodedata
14import zipfile
15from collections import defaultdict
16from collections.abc import Mapping
17from concurrent.futures import ProcessPoolExecutor, as_completed
18from contextlib import contextmanager
19from datetime import date
20from pathlib import Path
22import requests
23import yaml
24from rdflib import Graph, URIRef
25from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn, TimeElapsedColumn, TimeRemainingColumn
27from piccione.upload.on_zenodo import get_headers, main as piccione_upload, publish_draft as piccione_publish_draft
29from changes_metadata_manager.folder_metadata_builder import (
30 BASE_URI,
31 FOLDER_TO_ID,
32 KG_PATH,
33 SKIP_FOLDERS,
34 STAGE_STEPS,
35 extract_id_from_folder_name,
36 load_kg,
37 scan_folder_structure,
38)
41class LiteralBlockDumper(yaml.SafeDumper):
42 pass
45def _literal_str_representer(dumper: yaml.SafeDumper, data):
46 if "\n" in data:
47 return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
48 return dumper.represent_scalar("tag:yaml.org,2002:str", data)
51LiteralBlockDumper.add_representer(str, _literal_str_representer)
53CREATORS_LOOKUP_PATH = Path(__file__).parent.parent / "data" / "creators_lookup.yaml"
56def slugify(text: str) -> str:
57 text = unicodedata.normalize("NFKD", text)
58 text = text.encode("ascii", "ignore").decode("ascii")
59 text = re.sub(r"[^\w\s-]", "", text.lower())
60 return re.sub(r"[-\s]+", "-", text).strip("-")
62STEP_TO_STAGE = {
63 "00": "raw",
64 "01": "rawp",
65 "02": "dcho",
66 "03": "dchoo",
67 "04": "dchoo",
68 "05": "dchoo",
69 "06": "dchoo",
70}
72STAGE_LICENSE_STEP: dict[str, str] = {
73 "raw": "00",
74 "rawp": "01",
75 "dcho": "02",
76 "dchoo": "03",
77}
79CRM = "http://www.cidoc-crm.org/cidoc-crm/"
80CRMDIG = "http://www.cidoc-crm.org/extensions/crmdig/"
81AAT = "http://vocab.getty.edu/aat/"
83P70I = URIRef(f"{CRM}P70i_is_documented_in")
84P3_HAS_NOTE = URIRef(f"{CRM}P3_has_note")
85P14_CARRIED_OUT_BY = URIRef(f"{CRM}P14_carried_out_by")
86P1_IS_IDENTIFIED_BY = URIRef(f"{CRM}P1_is_identified_by")
87P190_HAS_SYMBOLIC_CONTENT = URIRef(f"{CRM}P190_has_symbolic_content")
88P74_HAS_RESIDENCE = URIRef(f"{CRM}P74_has_current_or_former_residence")
89P32_USED_GENERAL_TECHNIQUE = URIRef(f"{CRM}P32_used_general_technique")
90P16_USED_SPECIFIC_OBJECT = URIRef(f"{CRM}P16_used_specific_object")
91E21_PERSON = URIRef(f"{CRM}E21_Person")
92RDF_TYPE = URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type")
93L23_USED_SOFTWARE = URIRef(f"{CRMDIG}L23_used_software_or_firmware")
95AAT_TECHNIQUE_LABELS: dict[str, str] = {
96 f"{AAT}300266792": "digital photography",
97 f"{AAT}300429747": "optical scanning",
98}
100SLUG_LABELS: dict[str, str] = {
101 "3df_zephyr": "3DF Zephyr",
102 "adobe_photoshop_2023": "Adobe Photoshop 2023",
103 "agisoft_metashape": "Agisoft Metashape",
104 "artec_eva": "Artec Eva",
105 "artec_leo": "Artec Leo",
106 "artec_spider": "Artec Spider",
107 "artec_studio_14": "Artec Studio 14",
108 "artec_studio_15": "Artec Studio 15",
109 "artec_studio_16": "Artec Studio 16",
110 "artec_studio_19": "Artec Studio 19",
111 "aton": "ATON",
112 "blender": "Blender",
113 "canon_eos_6d": "Canon EOS 6D",
114 "chad-ap": "CHAD-AP",
115 "cloudcompare": "CloudCompare",
116 "gestaltor": "Gestaltor",
117 "gimp": "GIMP",
118 "heritrace": "HERITRACE",
119 "instalod": "InstaLOD",
120 "instant_meshes": "Instant Meshes",
121 "lente_24-70_f2_8_l": "Canon EF 24-70mm f/2.8L",
122 "meshlab": "MeshLab",
123 "metashape": "Agisoft Metashape",
124 "microscopio_digitale_bresciani": "Microscopio Digitale Bresciani",
125 "modo": "Modo",
126 "morph-kgc": "Morph-KGC",
127 "nextcloud": "Nextcloud",
128 "nikkor_35mm": "Nikkor 35mm",
129 "nikkor_50mm": "Nikkor 50mm",
130 "nikon_d3300": "Nikon D3300",
131 "nikon_d5200": "Nikon D5200",
132 "nikon_d5600": "Nikon D5600",
133 "nikon_d7200": "Nikon D7200",
134 "nikon_d750": "Nikon D750",
135 "panasonic_dmc-lx100": "Panasonic DMC-LX100",
136 "sony_alpha_6100": "Sony Alpha 6100",
137 "sony_alpha_7_i": "Sony Alpha 7 I",
138 "substance_3d_painter": "Substance 3D Painter",
139 "substance_painter": "Substance Painter",
140 "zbrush": "ZBrush",
141}
144def _format_slug(slug: str) -> str:
145 return SLUG_LABELS[slug]
148def load_creators_lookup(path: Path) -> dict[str, dict]:
149 with open(path) as f:
150 data = yaml.safe_load(f)
151 return {
152 creator["name_in_rdf"]: {
153 "family_name": creator["family_name"],
154 "given_name": creator["given_name"],
155 "affiliation": creator["affiliation"],
156 "orcid": creator["orcid"],
157 }
158 for creator in data["creators"]
159 }
162def _format_creator(creator_data: dict, role: str) -> dict:
163 return {
164 "person_or_org": {
165 "type": "personal",
166 "family_name": creator_data["family_name"],
167 "given_name": creator_data["given_name"],
168 "identifiers": [{"scheme": "orcid", "identifier": creator_data["orcid"]}],
169 },
170 "role": {"id": role},
171 "affiliations": [{"name": creator_data["affiliation"]}],
172 }
175METADATA_STEP = "05"
178def _extract_actor_names(graph: Graph, act_uri: URIRef) -> set[str]:
179 names = set()
180 for _, _, actor_uri in graph.triples((act_uri, P14_CARRIED_OUT_BY, None)):
181 assert (actor_uri, RDF_TYPE, E21_PERSON) in graph
182 for _, _, apl_uri in graph.triples((actor_uri, P1_IS_IDENTIFIED_BY, None)):
183 for _, _, name in graph.triples((apl_uri, P190_HAS_SYMBOLIC_CONTENT, None)):
184 names.add(str(name))
185 return names
188def extract_authors_for_entity_stage(graph: Graph, entity_ids: list[str], stage: str) -> set[str]:
189 steps = [s for s in STAGE_STEPS[stage] if s != METADATA_STEP]
190 authors: set[str] = set()
191 for eid in entity_ids:
192 for step in steps:
193 authors |= _extract_actor_names(graph, URIRef(f"{BASE_URI}/act/{eid}/{step}/1"))
194 return authors
197def extract_metadata_authors(graph: Graph, entity_ids: list[str]) -> set[str]:
198 authors: set[str] = set()
199 for eid in entity_ids:
200 authors |= _extract_actor_names(graph, URIRef(f"{BASE_URI}/act/{eid}/05/1"))
201 return authors
204def build_creators_for_entity_stage(
205 graph: Graph, entity_ids: list[str], stage: str, creators_lookup: dict[str, dict]
206) -> list[dict]:
207 author_names = extract_authors_for_entity_stage(graph, entity_ids, stage)
208 return [
209 _format_creator(creators_lookup[name], "researcher")
210 for name in sorted(author_names)
211 if name in creators_lookup
212 ]
215def build_metadata_creators(
216 graph: Graph, entity_ids: list[str], creators_lookup: dict[str, dict]
217) -> list[dict]:
218 author_names = extract_metadata_authors(graph, entity_ids)
219 return [
220 _format_creator(creators_lookup[name], "datacurator")
221 for name in sorted(author_names)
222 if name in creators_lookup
223 ]
226def merge_creators(digitization_creators: list[dict], metadata_creators: list[dict]) -> list[dict]:
227 seen_orcids: set[str] = set()
228 merged: list[dict] = []
229 for creator in digitization_creators:
230 orcid = creator["person_or_org"]["identifiers"][0]["identifier"]
231 seen_orcids.add(orcid)
232 merged.append(creator)
233 for creator in metadata_creators:
234 orcid = creator["person_or_org"]["identifiers"][0]["identifier"]
235 if orcid not in seen_orcids:
236 seen_orcids.add(orcid)
237 merged.append(creator)
238 return merged
241def extract_licensed_entity_stages(graph: Graph) -> set[tuple[str, str]]:
242 pattern = re.compile(rf"^{re.escape(BASE_URI)}/lic/([^/]+)/(\d{{2}})/1$")
243 licensed = set()
244 for s, p, o in graph.triples((None, P70I, None)):
245 match = pattern.match(str(s))
246 if match:
247 entity_id, step = match.groups()
248 stage = STEP_TO_STAGE.get(step)
249 if stage:
250 licensed.add((entity_id, stage))
251 return licensed
254def group_folders_by_entity(structure: dict) -> dict[str, list[tuple[str, str, dict]]]:
255 groups = defaultdict(list)
256 for sala_name, sala_items in structure["structure"].items():
257 for folder_name, subfolders in sala_items.items():
258 if folder_name in SKIP_FOLDERS:
259 continue
260 entity_id = extract_id_from_folder_name(folder_name)
261 if entity_id in FOLDER_TO_ID.values():
262 base_id = entity_id
263 else:
264 base_id = entity_id.rstrip("abcdefghijklmnopqrstuvwxyz")
265 groups[base_id].append((sala_name, folder_name, subfolders))
266 return dict(groups)
269STAGES = ("raw", "rawp", "dcho", "dchoo")
272def _extract_license_from_meta(stage_dir: Path, stage: str) -> str | None:
273 g = Graph()
274 g.parse(stage_dir / "meta.ttl", format="turtle")
275 expected_step = STAGE_LICENSE_STEP[stage]
276 for s, _, o in g.triples((None, P70I, None)):
277 s_str = str(s)
278 if "/lic/" in s_str:
279 step_match = re.search(r"/(\d{2})/\d+$", s_str)
280 if step_match and step_match.group(1) == expected_step:
281 return LICENSE_URI_TO_ZENODO.get(str(o))
282 return None
285def create_stage_zip(
286 entity_id: str,
287 stage: str,
288 folders: list[tuple[str, str, dict]],
289 root: Path,
290 output_dir: Path,
291 title: str,
292) -> tuple[Path, str | None] | None:
293 stage_dirs: list[tuple[str, str, Path]] = []
294 license_id: str | None = None
295 for sala_name, folder_name, stages_dict in folders:
296 stage_name_in_folder = None
297 for name in stages_dict:
298 if name.lower() == stage:
299 stage_name_in_folder = name
300 break
301 if stage_name_in_folder is None:
302 continue
303 stage_dir = root / sala_name / folder_name / stage_name_in_folder
304 stage_dirs.append((folder_name, stage_name_in_folder, stage_dir))
305 folder_license = _extract_license_from_meta(stage_dir, stage)
306 if folder_license:
307 license_id = folder_license
308 if not stage_dirs:
309 return None
310 sala_slug = slugify(folders[0][0])
311 title_slug = slugify(title)
312 zip_path = output_dir / f"{sala_slug}-{title_slug}-{entity_id}-{stage}.zip"
313 with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
314 for folder_name, stage_name_in_folder, stage_dir in stage_dirs:
315 for file_path in stage_dir.rglob("*"):
316 if not file_path.is_file():
317 continue
318 if license_id or file_path.name in ("meta.ttl", "prov.trig"):
319 rel = file_path.relative_to(stage_dir)
320 arc_name = f"{folder_name}/{stage_name_in_folder}/{rel}"
321 zf.write(file_path, arc_name)
322 return zip_path, license_id
325def _get_label(graph: Graph, uri: URIRef) -> str | None:
326 for _, _, apl_uri in graph.triples((uri, P1_IS_IDENTIFIED_BY, None)):
327 for _, _, name in graph.triples((apl_uri, P190_HAS_SYMBOLIC_CONTENT, None)):
328 return str(name)
329 return None
332def extract_keeper_info(graph: Graph, entity_ids: list[str]) -> tuple[str | None, str | None]:
333 for eid in entity_ids:
334 custody_uri = URIRef(f"{BASE_URI}/act/{eid}/ob08/1")
335 for _, _, keeper_uri in graph.triples((custody_uri, P14_CARRIED_OUT_BY, None)):
336 assert isinstance(keeper_uri, URIRef)
337 keeper_name = _get_label(graph, keeper_uri)
338 location_name = None
339 for _, _, place_uri in graph.triples((keeper_uri, P74_HAS_RESIDENCE, None)):
340 assert isinstance(place_uri, URIRef)
341 location_name = _get_label(graph, place_uri)
342 return keeper_name, location_name
343 return None, None
346def extract_entity_title(graph: Graph, entity_ids: list[str]) -> str:
347 for eid in entity_ids:
348 item_uri = URIRef(f"{BASE_URI}/itm/{eid}/ob00/1")
349 for s, p, o in graph.triples((item_uri, P3_HAS_NOTE, None)):
350 note = str(o)
351 return re.split(r"\n|\\n", note)[0].strip()
352 return f"Entity {entity_ids[0]}"
355def extract_acquisition_technique(graph: Graph, entity_ids: list[str]) -> str | None:
356 for eid in entity_ids:
357 act_uri = URIRef(f"{BASE_URI}/act/{eid}/00/1")
358 for _, _, technique_uri in graph.triples((act_uri, P32_USED_GENERAL_TECHNIQUE, None)):
359 return AAT_TECHNIQUE_LABELS[str(technique_uri)]
360 return None
363def extract_devices(graph: Graph, entity_ids: list[str]) -> list[str]:
364 devices: set[str] = set()
365 for eid in entity_ids:
366 act_uri = URIRef(f"{BASE_URI}/act/{eid}/00/1")
367 for _, _, obj_uri in graph.triples((act_uri, P16_USED_SPECIFIC_OBJECT, None)):
368 uri_str = str(obj_uri)
369 if "/dev/" in uri_str:
370 slug = uri_str.split("/dev/")[1].split("/")[0]
371 devices.add(_format_slug(slug))
372 return sorted(devices)
375def extract_software_for_stage(graph: Graph, entity_ids: list[str], stage: str) -> list[str]:
376 steps = [s for s in STAGE_STEPS[stage] if s != METADATA_STEP]
377 software: set[str] = set()
378 for eid in entity_ids:
379 for step in steps:
380 act_uri = URIRef(f"{BASE_URI}/act/{eid}/{step}/1")
381 for _, _, sfw_uri in graph.triples((act_uri, L23_USED_SOFTWARE, None)):
382 uri_str = str(sfw_uri)
383 slug = uri_str.split("/sfw/")[1].split("/")[0]
384 software.add(_format_slug(slug))
385 return sorted(software)
388LICENSE_URI_TO_ZENODO = {
389 "https://creativecommons.org/publicdomain/zero/1.0/": "cc0-1.0",
390 "https://creativecommons.org/licenses/by/4.0/": "cc-by-4.0",
391 "https://creativecommons.org/licenses/by-nc/4.0/": "cc-by-nc-4.0",
392 "https://creativecommons.org/licenses/by-sa/4.0/": "cc-by-sa-4.0",
393 "https://creativecommons.org/licenses/by-nc-sa/4.0/": "cc-by-nc-sa-4.0",
394}
396STAGE_TITLE_NAMES = {
397 "raw": "Raw",
398 "rawp": "Processed raw model",
399 "dcho": "Digital Cultural Heritage Object",
400 "dchoo": "Optimized Digital Cultural Heritage Object",
401}
403STAGE_DESCRIPTION_NAMES = {
404 "raw": "Raw acquisition data",
405 "rawp": "Processed raw model",
406 "dcho": "Digital Cultural Heritage Object",
407 "dchoo": "Optimized Digital Cultural Heritage Object",
408}
410STAGE_DESCRIPTIONS = {
411 "raw": "This dataset contains the raw material generated during the acquisition phase.",
412 "rawp": "This dataset contains the preliminary output from the photogrammetry or scanner software after initial data processing but without any interpolation or geometry corrections.",
413 "dcho": "This dataset contains the version that includes interpolation, gap filling, and resolution of geometric issues, resulting in a refined and improved model.",
414 "dchoo": "This dataset contains the version optimised for real-time online interaction.",
415}
417PROPAGATED_FIELDS = (
418 "zenodo_url", "access_token", "user_agent",
419 "subjects", "publication_date",
420 "version", "community",
421 "contributors", "funding",
422 "references", "dates",
423)
426def extract_license_for_entity_stage(graph: Graph, entity_id: str, stage: str) -> str | None:
427 step = STAGE_LICENSE_STEP[stage]
428 lic_uri = URIRef(f"{BASE_URI}/lic/{entity_id}/{step}/1")
429 for _, _, license_url in graph.triples((lic_uri, P70I, None)):
430 return LICENSE_URI_TO_ZENODO.get(str(license_url))
431 return None
434CC0_DISCLAIMER = (
435 "No copyright or related rights are claimed in these digital reproductions. "
436 "The files are released under CC0 1.0 Universal (Public Domain Dedication).\n"
437 "\n"
438 "Please note that the original works may qualify as cultural heritage assets "
439 "under Italian law (D. Lgs. 42/2004). Consequently, although the digital "
440 "reproductions are released under CC0, certain uses — and in particular "
441 "commercial uses — may be subject to specific authorisations, restrictions, "
442 "or fees pursuant to the applicable provisions governing the reproduction "
443 "and publication of cultural heritage assets. Users are therefore responsible "
444 "for ensuring compliance with Italian cultural heritage regulations before "
445 "undertaking any commercial exploitation of the images."
446)
449CHAD_AP_URL = "https://w3id.org/dharc/ontology/chad-ap"
451RESTRICTED_NOTICE = (
452 "The digital object files are not included in this dataset "
453 "because the holding institution did not grant permission for their publication. "
454 "Only metadata and provenance files are provided."
455)
458def build_enhanced_description(
459 stage: str,
460 title: str,
461 keeper_name: str | None = None,
462 keeper_location: str | None = None,
463) -> str:
464 parts = [
465 f'{STAGE_DESCRIPTION_NAMES[stage]} of "{title}" from the Aldrovandi Digital Twin.',
466 ]
467 if keeper_name:
468 keeper_line = f"The original object is held by {keeper_name}"
469 if keeper_location:
470 keeper_line += f" ({keeper_location})"
471 keeper_line += "."
472 parts.append(keeper_line)
473 parts.append(STAGE_DESCRIPTIONS[stage])
474 parts.append(
475 f"Includes metadata (meta.ttl) and provenance (prov.trig) files following the <a href=\"{CHAD_AP_URL}\">CHAD-AP</a> ontology.",
476 )
477 return " ".join(parts) + "\n"
480WORKFLOW_DOI_URL = "https://doi.org/10.46298/transformations.14773"
483def build_methods_description(
484 graph: Graph,
485 entity_ids: list[str],
486 stage: str,
487) -> str:
488 parts = [
489 f'Acquisition and digitization followed the reproducible workflow documented in '
490 f'<a href="{WORKFLOW_DOI_URL}">doi:10.46298/transformations.14773</a>.',
491 ]
492 technique = extract_acquisition_technique(graph, entity_ids)
493 devices = extract_devices(graph, entity_ids)
494 if technique:
495 line = f"Data was acquired using {technique}"
496 if devices:
497 line += f" ({', '.join(devices)})"
498 line += "."
499 parts.append(line)
500 software = extract_software_for_stage(graph, entity_ids, stage)
501 if software:
502 parts.append(f"Processing software: {', '.join(software)}.")
503 parts.append(
504 f'Metadata follows the <a href="{CHAD_AP_URL}">Cultural Heritage Acquisition and '
505 f"Digitisation Application Profile (CHAD-AP)</a> based on CIDOC-CRM.",
506 )
507 return "\n\n".join(parts) + "\n"
510def build_entity_uri(entity_ids: list[str]) -> str:
511 return f"{BASE_URI}/itm/{entity_ids[0]}/ob00/1"
514LICENSE_INFO = {
515 "cc0-1.0": {
516 "title": "Creative Commons Zero v1.0 Universal",
517 "link": "https://creativecommons.org/publicdomain/zero/1.0/",
518 },
519 "cc-by-4.0": {
520 "title": "Creative Commons Attribution 4.0 International",
521 "link": "https://creativecommons.org/licenses/by/4.0/",
522 },
523 "cc-by-nc-4.0": {
524 "title": "Creative Commons Attribution Non Commercial 4.0 International",
525 "link": "https://creativecommons.org/licenses/by-nc/4.0/",
526 },
527 "cc-by-sa-4.0": {
528 "title": "Creative Commons Attribution Share Alike 4.0 International",
529 "link": "https://creativecommons.org/licenses/by-sa/4.0/",
530 },
531 "cc-by-nc-sa-4.0": {
532 "title": "Creative Commons Attribution Non Commercial Share Alike 4.0 International",
533 "link": "https://creativecommons.org/licenses/by-nc-sa/4.0/",
534 },
535}
538def build_rights(content_license: str | None) -> list[dict]:
539 metadata_info = LICENSE_INFO["cc0-1.0"]
540 rights = [{
541 "title": {"en": f"{metadata_info['title']} (Metadata license)"},
542 "description": {"en": "Applies to metadata files: meta.ttl, prov.trig"},
543 "link": metadata_info["link"],
544 }]
545 if content_license and content_license in LICENSE_INFO:
546 content_info = LICENSE_INFO[content_license]
547 rights.append({
548 "title": {"en": f"{content_info['title']} (Content license)"},
549 "description": {"en": "Applies to all data files except meta.ttl and prov.trig"},
550 "link": content_info["link"],
551 })
552 return rights
555def generate_zenodo_config(
556 stage: str,
557 zip_path: Path,
558 title: str,
559 base_config: dict,
560 creators: list[dict],
561 methods_description: str,
562 license: str | None = None,
563 entity_uri: str | None = None,
564 keeper_name: str | None = None,
565 keeper_location: str | None = None,
566 has_license: bool = True,
567) -> dict:
568 description = build_enhanced_description(stage, title, keeper_name, keeper_location)
570 config: dict = {
571 "title": f"{title} - {STAGE_TITLE_NAMES[stage]} - Aldrovandi Digital Twin",
572 "description": description,
573 "resource_type": {"id": "dataset"},
574 "publisher": "Zenodo",
575 "access": {"record": "public", "files": "public"},
576 "files": [str(zip_path.absolute())],
577 "creators": creators,
578 "publication_date": date.today().isoformat(),
579 "rights": build_rights(license),
580 }
582 additional_descriptions: list[dict] = [
583 {
584 "description": methods_description,
585 "type": {"id": "methods"},
586 },
587 {
588 "description": base_config["notes"],
589 "type": {"id": "notes"},
590 },
591 ]
592 if not has_license:
593 additional_descriptions.append({
594 "description": RESTRICTED_NOTICE,
595 "type": {"id": "notes"},
596 })
597 if license == "cc0-1.0":
598 additional_descriptions.append({
599 "description": CC0_DISCLAIMER,
600 "type": {"id": "notes"},
601 })
602 config["additional_descriptions"] = additional_descriptions
604 config["locations"] = {
605 "features": [
606 {
607 "geometry": {
608 "type": "Point",
609 "coordinates": [loc["lon"], loc["lat"]],
610 },
611 "place": loc["place"],
612 "description": loc["description"],
613 }
614 for loc in base_config["locations"]
615 ]
616 }
618 for field in PROPAGATED_FIELDS:
619 if field in base_config and field not in config:
620 config[field] = base_config[field]
622 if "related_identifiers" in base_config:
623 converted = []
624 for ri in base_config["related_identifiers"]:
625 entry: dict = {
626 "identifier": ri["identifier"],
627 "relation_type": {"id": ri["relation"]},
628 }
629 if "resource_type" in ri:
630 entry["resource_type"] = {"id": ri["resource_type"]}
631 if "scheme" in ri:
632 entry["scheme"] = ri["scheme"]
633 converted.append(entry)
634 config["related_identifiers"] = converted
636 if entity_uri:
637 config["identifiers"] = [{"identifier": entity_uri, "scheme": "url"}]
639 return config
642def _get_sub_entity_ids(folders: list[tuple[str, str, dict]]) -> list[str]:
643 seen: set[str] = set()
644 ids: list[str] = []
645 for _, folder_name, _ in folders:
646 eid = extract_id_from_folder_name(folder_name)
647 if eid not in seen:
648 seen.add(eid)
649 ids.append(eid)
650 return ids
653_worker_kg: Graph
654_worker_base_config: dict
655_worker_creators_lookup: dict
658def _init_worker(kg_path: Path, base_config: dict, creators_lookup: dict) -> None:
659 global _worker_kg, _worker_base_config, _worker_creators_lookup
660 _worker_kg = load_kg(kg_path)
661 _worker_base_config = base_config
662 _worker_creators_lookup = creators_lookup
665def _process_entity(
666 entity_id: str,
667 folders: list[tuple[str, str, dict]],
668 root: Path,
669 zips_dir: Path,
670 configs_dir: Path,
671) -> None:
672 kg = _worker_kg
673 base_config = _worker_base_config
674 creators_lookup = _worker_creators_lookup
675 sub_ids = _get_sub_entity_ids(folders)
676 title = extract_entity_title(kg, sub_ids)
677 keeper_name, keeper_location = extract_keeper_info(kg, sub_ids)
678 sala_slug = slugify(folders[0][0])
679 title_slug = slugify(title)
680 metadata_creators = build_metadata_creators(kg, sub_ids, creators_lookup)
681 for stage in STAGES:
682 result = create_stage_zip(entity_id, stage, folders, root, zips_dir, title)
683 if result is None:
684 continue
685 zip_path, license = result
686 has_license = license is not None
687 digitization_creators = build_creators_for_entity_stage(kg, sub_ids, stage, creators_lookup)
688 creators = merge_creators(digitization_creators, metadata_creators)
689 entity_uri = build_entity_uri(sub_ids)
690 methods_description = build_methods_description(kg, sub_ids, stage)
691 config = generate_zenodo_config(stage, zip_path, title, base_config, creators, methods_description, license, entity_uri, keeper_name, keeper_location, has_license)
692 config_path = configs_dir / f"{sala_slug}-{title_slug}-{entity_id}-{stage}.yaml"
693 with open(config_path, "w") as f:
694 yaml.dump(config, f, Dumper=LiteralBlockDumper, default_flow_style=False, allow_unicode=True, sort_keys=False)
697def prepare_all(
698 root: Path,
699 zenodo_base_config_path: Path,
700 output_dir: Path,
701 kg_path: Path = KG_PATH,
702) -> None:
703 structure = scan_folder_structure(root)
704 entity_groups = group_folders_by_entity(structure)
706 with open(zenodo_base_config_path) as f:
707 base_config = yaml.safe_load(f)
709 creators_lookup = load_creators_lookup(CREATORS_LOOKUP_PATH)
711 zips_dir = output_dir / "zips"
712 configs_dir = output_dir / "configs"
713 zips_dir.mkdir(parents=True, exist_ok=True)
714 configs_dir.mkdir(parents=True, exist_ok=True)
716 with ProcessPoolExecutor(
717 initializer=_init_worker,
718 initargs=(kg_path, base_config, creators_lookup),
719 ) as executor:
720 futures = {
721 executor.submit(_process_entity, entity_id, folders, root, zips_dir, configs_dir): entity_id
722 for entity_id, folders in entity_groups.items()
723 }
724 with Progress(
725 SpinnerColumn(),
726 TextColumn("[progress.description]{task.description}"),
727 BarColumn(),
728 MofNCompleteColumn(),
729 TimeElapsedColumn(),
730 TimeRemainingColumn(),
731 ) as progress:
732 task = progress.add_task("Creating stage packages", total=len(futures))
733 for future in as_completed(futures):
734 entity_id = futures[future]
735 future.result()
736 progress.update(task, description=f"Completed entity {entity_id}")
737 progress.advance(task)
740def _extract_doi(record: Mapping[str, object]) -> str:
741 pids = record.get("pids", {})
742 doi_info = pids.get("doi", {}) if isinstance(pids, dict) else {}
743 return doi_info.get("identifier", "") if isinstance(doi_info, dict) else ""
746def _extract_record_url(record: Mapping[str, object]) -> str:
747 links = record["links"]
748 assert isinstance(links, dict)
749 return links["self_html"]
752LICENSE_TITLE_TO_SHORT: dict[str, str] = {
753 info["title"]: short_name for short_name, info in LICENSE_INFO.items()
754}
757def _format_creators_for_table(config: dict) -> str:
758 creators = config["creators"]
759 parts: list[str] = []
760 for c in creators:
761 org = c["person_or_org"]
762 orcid = org["identifiers"][0]["identifier"]
763 parts.append(f"{org['family_name']}, {org['given_name']} [orcid:{orcid}]")
764 return "; ".join(parts)
767def _format_licenses_for_table(config: dict) -> str:
768 parts: list[str] = []
769 for right in config["rights"]:
770 title_en = right["title"]["en"]
771 for full_name, short_name in LICENSE_TITLE_TO_SHORT.items():
772 if title_en.startswith(full_name):
773 context = title_en.removeprefix(full_name).strip(" ()")
774 parts.append(f"{short_name} ({context})")
775 break
776 return "; ".join(parts)
779DOI_TABLE_FIELDNAMES = [
780 "Numero su DMP",
781 "Caso di studio",
782 "Autore/i",
783 "Tipo",
784 "Titolo",
785 "Data pubblicazione",
786 "DOI",
787 "URL",
788 "Repository",
789 "Licenza",
790 "Note",
791]
794def _atomic_write_json(path: Path, data: list) -> None:
795 fd, tmp_path = tempfile.mkstemp(dir=path.parent, suffix=".tmp")
796 with os.fdopen(fd, "w") as f:
797 json.dump(data, f, indent=2)
798 os.replace(tmp_path, path)
801@contextmanager
802def _graceful_shutdown():
803 stop = [False]
804 original = signal.getsignal(signal.SIGINT)
805 def handler(signum, frame):
806 if stop[0]:
807 signal.signal(signal.SIGINT, original)
808 raise KeyboardInterrupt
809 stop[0] = True
810 print("Finishing current record, then stopping...")
811 signal.signal(signal.SIGINT, handler)
812 try:
813 yield stop
814 finally:
815 signal.signal(signal.SIGINT, original)
818def _write_doi_table(drafts: list[dict], output_dir: Path) -> Path:
819 rows: list[dict[str, str]] = []
820 for draft in drafts:
821 if draft["status"] == "failed":
822 continue
823 with open(draft["config_file"]) as f:
824 config = yaml.safe_load(f)
825 rows.append({
826 "Numero su DMP": "",
827 "Caso di studio": "Aldrovandi",
828 "Autore/i": _format_creators_for_table(config),
829 "Tipo": "Dataset",
830 "Titolo": config["title"],
831 "Data pubblicazione": config["publication_date"],
832 "DOI": draft["doi"],
833 "URL": draft["record_url"],
834 "Repository": "Zenodo",
835 "Licenza": _format_licenses_for_table(config),
836 "Note": "",
837 })
838 csv_path = output_dir / "doi_table.csv"
839 with open(csv_path, "w", newline="") as f:
840 writer = csv.DictWriter(f, fieldnames=DOI_TABLE_FIELDNAMES)
841 writer.writeheader()
842 writer.writerows(rows)
843 return csv_path
846def upload_all(configs_dir: Path, publish: bool = False) -> Path:
847 config_files = sorted(configs_dir.glob("*.yaml"))
848 drafts_path = configs_dir.parent / "drafts.json"
850 drafts: list[dict] = []
851 if drafts_path.exists():
852 with open(drafts_path) as f:
853 drafts = json.load(f)
855 completed_stems = {
856 Path(d["config_file"]).stem for d in drafts
857 if d["status"] in ("uploaded", "published")
858 }
860 skipped = 0
861 failed = 0
862 uploaded = 0
864 with _graceful_shutdown() as shutdown, Progress(
865 SpinnerColumn(),
866 TextColumn("[progress.description]{task.description}"),
867 BarColumn(),
868 MofNCompleteColumn(),
869 ) as progress:
870 task = progress.add_task("Uploading to Zenodo", total=len(config_files))
871 for config_file in config_files:
872 if shutdown[0]:
873 print("\nGraceful shutdown: stopping after last completed record.")
874 break
875 if config_file.stem in completed_stems:
876 skipped += 1
877 progress.update(task, description=f"Skipped {config_file.stem}")
878 progress.advance(task)
879 continue
881 progress.update(task, description=f"Uploading {config_file.stem}")
882 try:
883 record = piccione_upload(str(config_file), publish=publish)
884 with open(config_file) as f:
885 config = yaml.safe_load(f)
886 drafts = [d for d in drafts if not (Path(d["config_file"]).stem == config_file.stem and d["status"] == "failed")]
887 drafts.append({
888 "draft_id": record["id"],
889 "config_file": str(config_file),
890 "title": config["title"],
891 "zenodo_url": config["zenodo_url"],
892 "access_token": config["access_token"],
893 "user_agent": config["user_agent"],
894 "status": "published" if publish else "uploaded",
895 "doi": _extract_doi(record),
896 "record_url": _extract_record_url(record),
897 })
898 uploaded += 1
899 except Exception as exc:
900 drafts.append({
901 "draft_id": "",
902 "config_file": str(config_file),
903 "title": config_file.stem,
904 "zenodo_url": "",
905 "access_token": "",
906 "user_agent": "",
907 "status": "failed",
908 "doi": "",
909 "record_url": "",
910 "error": str(exc),
911 })
912 failed += 1
913 print(f"\n[FAILED] {config_file.stem}: {exc}")
915 _atomic_write_json(drafts_path, drafts)
916 time.sleep(2)
917 progress.advance(task)
919 csv_path = _write_doi_table(drafts, configs_dir.parent)
920 print(f"DOI table written to {csv_path}")
921 print(f"Drafts saved to {drafts_path}")
922 print(f"Summary: {uploaded} uploaded, {skipped} skipped, {failed} failed (of {len(config_files)} total)")
923 return csv_path
926def publish_all_drafts(drafts_path: Path) -> Path:
927 with open(drafts_path) as f:
928 drafts: list[dict] = json.load(f)
930 publishable = [d for d in drafts if d["status"] in ("uploaded", "publish_failed")]
931 published = 0
932 failed = 0
934 with _graceful_shutdown() as shutdown, Progress(
935 SpinnerColumn(),
936 TextColumn("[progress.description]{task.description}"),
937 BarColumn(),
938 MofNCompleteColumn(),
939 ) as progress:
940 task = progress.add_task("Publishing drafts", total=len(publishable))
941 for draft in publishable:
942 if shutdown[0]:
943 print("\nGraceful shutdown: stopping after last completed record.")
944 break
945 progress.update(task, description=f"Publishing {draft['title']}")
946 try:
947 base_url = draft["zenodo_url"].rstrip("/")
948 record = piccione_publish_draft(
949 base_url, draft["access_token"], draft["draft_id"], draft["user_agent"],
950 )
951 draft["status"] = "published"
952 draft["doi"] = _extract_doi(record)
953 draft["record_url"] = _extract_record_url(record)
954 if "error" in draft:
955 del draft["error"]
956 published += 1
957 except Exception as exc:
958 draft["status"] = "publish_failed"
959 draft["error"] = str(exc)
960 failed += 1
961 print(f"\n[FAILED] {draft['title']}: {exc}")
963 _atomic_write_json(drafts_path, drafts)
964 time.sleep(2)
965 progress.advance(task)
967 skipped = len(drafts) - len(publishable)
968 csv_path = _write_doi_table(drafts, drafts_path.parent)
969 print(f"DOI table written to {csv_path}")
970 print(f"Summary: {published} published, {skipped} skipped, {failed} failed")
971 return csv_path
974def sync_status(drafts_path: Path) -> Path:
975 with open(drafts_path) as f:
976 drafts: list[dict] = json.load(f)
978 updated = 0
979 with Progress(
980 SpinnerColumn(),
981 TextColumn("[progress.description]{task.description}"),
982 BarColumn(),
983 MofNCompleteColumn(),
984 ) as progress:
985 task = progress.add_task("Syncing status from Zenodo", total=len(drafts))
986 for draft in drafts:
987 progress.update(task, description=f"Querying {draft['draft_id']}")
988 headers = get_headers(draft["access_token"], draft["user_agent"])
989 base_url = draft["zenodo_url"].rstrip("/")
990 resp = requests.get(f"{base_url}/records/{draft['draft_id']}", headers=headers)
991 if resp.status_code == 404:
992 resp = requests.get(f"{base_url}/records/{draft['draft_id']}/draft", headers=headers)
993 resp.raise_for_status()
994 record = resp.json()
995 new_status = record["status"]
996 new_doi = record.get("doi", "")
997 new_url = record["links"]["self_html"]
998 if draft["status"] != new_status or draft["doi"] != new_doi or draft["record_url"] != new_url:
999 draft["status"] = new_status
1000 draft["doi"] = new_doi
1001 draft["record_url"] = new_url
1002 updated += 1
1003 time.sleep(0.5)
1004 progress.advance(task)
1006 _atomic_write_json(drafts_path, drafts)
1007 csv_path = _write_doi_table(drafts, drafts_path.parent)
1008 print(f"Updated {updated} of {len(drafts)} entries")
1009 print(f"DOI table written to {csv_path}")
1010 return csv_path
1013def _normalize_quotes(text: str) -> str:
1014 return text.replace("‘", "'").replace("’", "'").replace("“", '"').replace("”", '"')
1017def cleanup_duplicates(drafts_path: Path, dry_run: bool = False) -> None:
1018 with open(drafts_path) as f:
1019 drafts: list[dict] = json.load(f)
1021 known_ids = {d["draft_id"] for d in drafts}
1022 known_titles = {_normalize_quotes(d["title"]) for d in drafts}
1023 token = drafts[0]["access_token"]
1024 base_url = drafts[0]["zenodo_url"].rstrip("/")
1025 ua = drafts[0]["user_agent"]
1026 headers = get_headers(token, ua)
1028 duplicates: list[dict] = []
1029 page = 1
1030 while True:
1031 resp = requests.get(f"{base_url}/user/records", params={
1032 "size": 100,
1033 "page": page,
1034 }, headers=headers)
1035 resp.raise_for_status()
1036 hits = resp.json()["hits"]["hits"]
1037 if not hits:
1038 break
1039 for hit in hits:
1040 title = hit.get("title", hit.get("metadata", {}).get("title", ""))
1041 if hit["id"] not in known_ids and _normalize_quotes(title) in known_titles:
1042 duplicates.append(hit)
1043 page += 1
1045 if not duplicates:
1046 print("No duplicates found.")
1047 return
1049 draft_dups = [d for d in duplicates if d.get("status") != "published"]
1050 published_dups = [d for d in duplicates if d.get("status") == "published"]
1052 print(f"Found {len(duplicates)} duplicate(s): {len(draft_dups)} draft(s), {len(published_dups)} published")
1054 for dup in published_dups:
1055 print(f" [PUBLISHED - cannot delete] id={dup['id']}, doi={dup.get('doi', '')}, title={dup.get('title', '')}")
1057 deleted = 0
1058 for dup in draft_dups:
1059 title = dup.get("title", dup.get("metadata", {}).get("title", ""))
1060 if dry_run:
1061 print(f" [DRY RUN] Would delete draft id={dup['id']}, title={title}")
1062 else:
1063 resp = requests.delete(f"{base_url}/records/{dup['id']}/draft", headers=headers)
1064 if resp.status_code == 204:
1065 deleted += 1
1066 print(f" [DELETED] id={dup['id']}, title={title}")
1067 else:
1068 print(f" [FAILED] id={dup['id']}, status={resp.status_code}, body={resp.text[:200]}")
1069 time.sleep(1)
1071 if dry_run:
1072 print(f"Dry run complete. {len(draft_dups)} draft(s) would be deleted.")
1073 else:
1074 print(f"Deleted {deleted} of {len(draft_dups)} draft duplicate(s).")
1077def parse_arguments(): # pragma: no cover
1078 parser = argparse.ArgumentParser(description="Prepare and upload Zenodo packages")
1079 subparsers = parser.add_subparsers(dest="command", required=True)
1081 prepare_parser = subparsers.add_parser("prepare", help="Create zips and YAML configs")
1082 prepare_parser.add_argument("root", type=Path, help="Root directory with Sala/Folder/Stage structure")
1083 prepare_parser.add_argument("zenodo_config", type=Path, help="Base Zenodo configuration YAML")
1084 prepare_parser.add_argument("--output", "-o", type=Path, default=Path("zenodo_output"), help="Output directory")
1085 upload_parser = subparsers.add_parser("upload", help="Upload to Zenodo")
1086 upload_parser.add_argument("configs_dir", type=Path, help="Directory containing YAML configs")
1087 upload_parser.add_argument("--publish", action="store_true", help="Publish after upload")
1088 publish_parser = subparsers.add_parser("publish-drafts", help="Publish previously uploaded drafts")
1089 publish_parser.add_argument("drafts_file", type=Path, help="Path to drafts.json from a previous upload")
1090 sync_parser = subparsers.add_parser("sync-status", help="Sync drafts.json with actual Zenodo record status")
1091 sync_parser.add_argument("drafts_file", type=Path, help="Path to drafts.json")
1092 cleanup_parser = subparsers.add_parser("cleanup-duplicates", help="Find and delete duplicate records not in drafts.json")
1093 cleanup_parser.add_argument("drafts_file", type=Path, help="Path to drafts.json")
1094 cleanup_parser.add_argument("--dry-run", action="store_true", help="Only report duplicates, don't delete")
1096 return parser.parse_args()
1099def main(): # pragma: no cover
1100 args = parse_arguments()
1101 if args.command == "prepare":
1102 prepare_all(
1103 root=args.root,
1104 zenodo_base_config_path=args.zenodo_config,
1105 output_dir=args.output,
1106 )
1107 elif args.command == "upload":
1108 upload_all(configs_dir=args.configs_dir, publish=args.publish)
1109 elif args.command == "publish-drafts":
1110 publish_all_drafts(drafts_path=args.drafts_file)
1111 elif args.command == "sync-status":
1112 sync_status(drafts_path=args.drafts_file)
1113 elif args.command == "cleanup-duplicates":
1114 cleanup_duplicates(drafts_path=args.drafts_file, dry_run=args.dry_run)
1117if __name__ == "__main__": # pragma: no cover
1118 main()