Coverage for changes_metadata_manager / zenodo_upload.py: 82%
348 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-21 12:19 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-21 12:19 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelomas@gmail.com>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import csv
7import re
8import unicodedata
9import zipfile
10from collections import defaultdict
11from datetime import date
12from pathlib import Path
14import yaml
15from rdflib import Graph, URIRef
16from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn
18from piccione.upload.on_zenodo import main as piccione_upload
20from changes_metadata_manager.folder_metadata_builder import (
21 BASE_URI,
22 FOLDER_TO_ID,
23 KG_PATH,
24 SKIP_FOLDERS,
25 STAGE_STEPS,
26 extract_id_from_folder_name,
27 load_kg,
28 scan_folder_structure,
29)
32class LiteralBlockDumper(yaml.SafeDumper):
33 pass
36def _literal_str_representer(dumper: yaml.SafeDumper, data):
37 if "\n" in data:
38 return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|")
39 return dumper.represent_scalar("tag:yaml.org,2002:str", data)
42LiteralBlockDumper.add_representer(str, _literal_str_representer)
44CREATORS_LOOKUP_PATH = Path(__file__).parent.parent / "data" / "creators_lookup.yaml"
47def slugify(text: str) -> str:
48 text = unicodedata.normalize("NFKD", text)
49 text = text.encode("ascii", "ignore").decode("ascii")
50 text = re.sub(r"[^\w\s-]", "", text.lower())
51 return re.sub(r"[-\s]+", "-", text).strip("-")
53STEP_TO_STAGE = {
54 "00": "raw",
55 "01": "rawp",
56 "02": "dcho",
57 "03": "dchoo",
58 "04": "dchoo",
59 "05": "dchoo",
60 "06": "dchoo",
61}
63CRM = "http://www.cidoc-crm.org/cidoc-crm/"
64CRMDIG = "http://www.cidoc-crm.org/extensions/crmdig/"
65AAT = "http://vocab.getty.edu/aat/"
67P70I = URIRef(f"{CRM}P70i_is_documented_in")
68P3_HAS_NOTE = URIRef(f"{CRM}P3_has_note")
69P14_CARRIED_OUT_BY = URIRef(f"{CRM}P14_carried_out_by")
70P1_IS_IDENTIFIED_BY = URIRef(f"{CRM}P1_is_identified_by")
71P190_HAS_SYMBOLIC_CONTENT = URIRef(f"{CRM}P190_has_symbolic_content")
72P74_HAS_RESIDENCE = URIRef(f"{CRM}P74_has_current_or_former_residence")
73P32_USED_GENERAL_TECHNIQUE = URIRef(f"{CRM}P32_used_general_technique")
74P16_USED_SPECIFIC_OBJECT = URIRef(f"{CRM}P16_used_specific_object")
75E21_PERSON = URIRef(f"{CRM}E21_Person")
76RDF_TYPE = URIRef("http://www.w3.org/1999/02/22-rdf-syntax-ns#type")
77L23_USED_SOFTWARE = URIRef(f"{CRMDIG}L23_used_software_or_firmware")
79AAT_TECHNIQUE_LABELS: dict[str, str] = {
80 f"{AAT}300266792": "digital photography",
81 f"{AAT}300429747": "optical scanning",
82}
84SLUG_LABELS: dict[str, str] = {
85 "3df_zephyr": "3DF Zephyr",
86 "adobe_photoshop_2023": "Adobe Photoshop 2023",
87 "agisoft_metashape": "Agisoft Metashape",
88 "artec_eva": "Artec Eva",
89 "artec_leo": "Artec Leo",
90 "artec_spider": "Artec Spider",
91 "artec_studio_14": "Artec Studio 14",
92 "artec_studio_15": "Artec Studio 15",
93 "artec_studio_16": "Artec Studio 16",
94 "artec_studio_19": "Artec Studio 19",
95 "aton": "ATON",
96 "blender": "Blender",
97 "canon_eos_6d": "Canon EOS 6D",
98 "chad-ap": "CHAD-AP",
99 "cloudcompare": "CloudCompare",
100 "gestaltor": "Gestaltor",
101 "gimp": "GIMP",
102 "heritrace": "HERITRACE",
103 "instalod": "InstaLOD",
104 "instant_meshes": "Instant Meshes",
105 "lente_24-70_f2_8_l": "Canon EF 24-70mm f/2.8L",
106 "meshlab": "MeshLab",
107 "metashape": "Agisoft Metashape",
108 "microscopio_digitale_bresciani": "Microscopio Digitale Bresciani",
109 "modo": "Modo",
110 "morph-kgc": "Morph-KGC",
111 "nextcloud": "Nextcloud",
112 "nikkor_35mm": "Nikkor 35mm",
113 "nikkor_50mm": "Nikkor 50mm",
114 "nikon_d3300": "Nikon D3300",
115 "nikon_d5200": "Nikon D5200",
116 "nikon_d5600": "Nikon D5600",
117 "nikon_d7200": "Nikon D7200",
118 "nikon_d750": "Nikon D750",
119 "panasonic_dmc-lx100": "Panasonic DMC-LX100",
120 "sony_alpha_6100": "Sony Alpha 6100",
121 "sony_alpha_7_i": "Sony Alpha 7 I",
122 "substance_3d_painter": "Substance 3D Painter",
123 "substance_painter": "Substance Painter",
124 "zbrush": "ZBrush",
125}
128def _format_slug(slug: str) -> str:
129 return SLUG_LABELS[slug]
132def load_creators_lookup(path: Path) -> dict[str, dict]:
133 with open(path) as f:
134 data = yaml.safe_load(f)
135 return {
136 creator["name_in_rdf"]: {
137 "family_name": creator["family_name"],
138 "given_name": creator["given_name"],
139 "affiliation": creator["affiliation"],
140 "orcid": creator["orcid"],
141 }
142 for creator in data["creators"]
143 }
146def _format_creator(creator_data: dict, role: str) -> dict:
147 return {
148 "person_or_org": {
149 "type": "personal",
150 "family_name": creator_data["family_name"],
151 "given_name": creator_data["given_name"],
152 "identifiers": [{"scheme": "orcid", "identifier": creator_data["orcid"]}],
153 },
154 "role": {"id": role},
155 "affiliations": [{"name": creator_data["affiliation"]}],
156 }
159METADATA_STEP = "05"
162def _extract_actor_names(graph: Graph, act_uri: URIRef) -> set[str]:
163 names = set()
164 for _, _, actor_uri in graph.triples((act_uri, P14_CARRIED_OUT_BY, None)):
165 assert (actor_uri, RDF_TYPE, E21_PERSON) in graph
166 for _, _, apl_uri in graph.triples((actor_uri, P1_IS_IDENTIFIED_BY, None)):
167 for _, _, name in graph.triples((apl_uri, P190_HAS_SYMBOLIC_CONTENT, None)):
168 names.add(str(name))
169 return names
172def extract_authors_for_entity_stage(graph: Graph, entity_id: str, stage: str) -> set[str]:
173 steps = [s for s in STAGE_STEPS[stage] if s != METADATA_STEP]
174 authors = set()
175 for step in steps:
176 authors |= _extract_actor_names(graph, URIRef(f"{BASE_URI}/act/{entity_id}/{step}/1"))
177 return authors
180def extract_metadata_authors(graph: Graph, entity_id: str) -> set[str]:
181 return _extract_actor_names(graph, URIRef(f"{BASE_URI}/act/{entity_id}/05/1"))
184def build_creators_for_entity_stage(
185 graph: Graph, entity_id: str, stage: str, creators_lookup: dict[str, dict]
186) -> list[dict]:
187 author_names = extract_authors_for_entity_stage(graph, entity_id, stage)
188 return [
189 _format_creator(creators_lookup[name], "researcher")
190 for name in sorted(author_names)
191 if name in creators_lookup
192 ]
195def build_metadata_creators(
196 graph: Graph, entity_id: str, creators_lookup: dict[str, dict]
197) -> list[dict]:
198 author_names = extract_metadata_authors(graph, entity_id)
199 return [
200 _format_creator(creators_lookup[name], "datacurator")
201 for name in sorted(author_names)
202 if name in creators_lookup
203 ]
206def merge_creators(digitization_creators: list[dict], metadata_creators: list[dict]) -> list[dict]:
207 seen_orcids: set[str] = set()
208 merged: list[dict] = []
209 for creator in digitization_creators:
210 orcid = creator["person_or_org"]["identifiers"][0]["identifier"]
211 seen_orcids.add(orcid)
212 merged.append(creator)
213 for creator in metadata_creators:
214 orcid = creator["person_or_org"]["identifiers"][0]["identifier"]
215 if orcid not in seen_orcids:
216 seen_orcids.add(orcid)
217 merged.append(creator)
218 return merged
221def extract_licensed_entity_stages(graph: Graph) -> set[tuple[str, str]]:
222 pattern = re.compile(rf"^{re.escape(BASE_URI)}/lic/([^/]+)/(\d{{2}})/1$")
223 licensed = set()
224 for s, p, o in graph.triples((None, P70I, None)):
225 match = pattern.match(str(s))
226 if match:
227 entity_id, step = match.groups()
228 stage = STEP_TO_STAGE.get(step)
229 if stage:
230 licensed.add((entity_id, stage))
231 return licensed
234def group_folders_by_entity(structure: dict) -> dict[str, list[tuple[str, str, dict]]]:
235 groups = defaultdict(list)
236 for sala_name, sala_items in structure["structure"].items():
237 for folder_name, subfolders in sala_items.items():
238 if folder_name in SKIP_FOLDERS:
239 continue
240 entity_id = extract_id_from_folder_name(folder_name)
241 if entity_id in FOLDER_TO_ID.values():
242 base_id = entity_id
243 else:
244 base_id = entity_id.rstrip("abcdefghijklmnopqrstuvwxyz")
245 groups[base_id].append((sala_name, folder_name, subfolders))
246 return dict(groups)
249STAGES = ("raw", "rawp", "dcho", "dchoo")
252def create_stage_zip(
253 entity_id: str,
254 stage: str,
255 folders: list[tuple[str, str, dict]],
256 root: Path,
257 licensed_stages: set[tuple[str, str]],
258 output_dir: Path,
259 title: str,
260) -> tuple[Path, bool] | None:
261 sala_name = folders[0][0]
262 sala_slug = slugify(sala_name)
263 title_slug = slugify(title)
264 zip_path = output_dir / f"{sala_slug}-{title_slug}-{stage}.zip"
265 has_files = False
266 with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf:
267 for sala_name, folder_name, stages_dict in folders:
268 stage_name_in_folder = None
269 for name in stages_dict:
270 if name.lower() == stage:
271 stage_name_in_folder = name
272 break
273 if stage_name_in_folder is None:
274 continue
275 stage_dir = root / sala_name / folder_name / stage_name_in_folder
276 has_license = (entity_id, stage) in licensed_stages
277 for file_path in stage_dir.iterdir():
278 if not file_path.is_file():
279 continue
280 if has_license or file_path.name in ("meta.ttl", "prov.trig"):
281 arc_name = f"{folder_name}/{stage_name_in_folder}/{file_path.name}"
282 zf.write(file_path, arc_name)
283 has_files = True
284 if not has_files:
285 zip_path.unlink()
286 return None
287 return zip_path, (entity_id, stage) in licensed_stages
290def _get_label(graph: Graph, uri: URIRef) -> str | None:
291 for _, _, apl_uri in graph.triples((uri, P1_IS_IDENTIFIED_BY, None)):
292 for _, _, name in graph.triples((apl_uri, P190_HAS_SYMBOLIC_CONTENT, None)):
293 return str(name)
294 return None
297def extract_keeper_info(graph: Graph, entity_id: str) -> tuple[str | None, str | None]:
298 custody_uri = URIRef(f"{BASE_URI}/act/{entity_id}/ob08/1")
299 for _, _, keeper_uri in graph.triples((custody_uri, P14_CARRIED_OUT_BY, None)):
300 assert isinstance(keeper_uri, URIRef)
301 keeper_name = _get_label(graph, keeper_uri)
302 location_name = None
303 for _, _, place_uri in graph.triples((keeper_uri, P74_HAS_RESIDENCE, None)):
304 assert isinstance(place_uri, URIRef)
305 location_name = _get_label(graph, place_uri)
306 return keeper_name, location_name
307 return None, None
310def extract_entity_title(graph: Graph, entity_id: str) -> str:
311 item_uri = URIRef(f"{BASE_URI}/itm/{entity_id}/ob00/1")
312 for s, p, o in graph.triples((item_uri, P3_HAS_NOTE, None)):
313 note = str(o)
314 return note.split("\n")[0].strip()
315 return f"Entity {entity_id}"
318def extract_acquisition_technique(graph: Graph, entity_id: str) -> str | None:
319 act_uri = URIRef(f"{BASE_URI}/act/{entity_id}/00/1")
320 for _, _, technique_uri in graph.triples((act_uri, P32_USED_GENERAL_TECHNIQUE, None)):
321 return AAT_TECHNIQUE_LABELS[str(technique_uri)]
322 return None
325def extract_devices(graph: Graph, entity_id: str) -> list[str]:
326 act_uri = URIRef(f"{BASE_URI}/act/{entity_id}/00/1")
327 devices = []
328 for _, _, obj_uri in graph.triples((act_uri, P16_USED_SPECIFIC_OBJECT, None)):
329 uri_str = str(obj_uri)
330 if "/dev/" in uri_str:
331 slug = uri_str.split("/dev/")[1].split("/")[0]
332 devices.append(_format_slug(slug))
333 return sorted(devices)
336def extract_software_for_stage(graph: Graph, entity_id: str, stage: str) -> list[str]:
337 steps = [s for s in STAGE_STEPS[stage] if s != METADATA_STEP]
338 software: set[str] = set()
339 for step in steps:
340 act_uri = URIRef(f"{BASE_URI}/act/{entity_id}/{step}/1")
341 for _, _, sfw_uri in graph.triples((act_uri, L23_USED_SOFTWARE, None)):
342 uri_str = str(sfw_uri)
343 slug = uri_str.split("/sfw/")[1].split("/")[0]
344 software.add(_format_slug(slug))
345 return sorted(software)
348LICENSE_URI_TO_ZENODO = {
349 "https://creativecommons.org/publicdomain/zero/1.0/": "cc0-1.0",
350 "https://creativecommons.org/licenses/by/4.0/": "cc-by-4.0",
351 "https://creativecommons.org/licenses/by-nc/4.0/": "cc-by-nc-4.0",
352 "https://creativecommons.org/licenses/by-sa/4.0/": "cc-by-sa-4.0",
353 "https://creativecommons.org/licenses/by-nc-sa/4.0/": "cc-by-nc-sa-4.0",
354}
356STAGE_TITLE_NAMES = {
357 "raw": "Raw",
358 "rawp": "Processed raw model",
359 "dcho": "Digital Cultural Heritage Object",
360 "dchoo": "Optimized Digital Cultural Heritage Object",
361}
363STAGE_DESCRIPTION_NAMES = {
364 "raw": "Raw acquisition data",
365 "rawp": "Processed raw model",
366 "dcho": "Digital Cultural Heritage Object",
367 "dchoo": "Optimized Digital Cultural Heritage Object",
368}
370STAGE_DESCRIPTIONS = {
371 "raw": "This dataset contains the raw material generated during the acquisition phase.",
372 "rawp": "This dataset contains the preliminary output from the photogrammetry or scanner software after initial data processing but without any interpolation or geometry corrections.",
373 "dcho": "This dataset contains the version that includes interpolation, gap filling, and resolution of geometric issues, resulting in a refined and improved model.",
374 "dchoo": "This dataset contains the version optimised for real-time online interaction.",
375}
377PROPAGATED_FIELDS = (
378 "zenodo_url", "access_token", "user_agent",
379 "subjects", "publication_date",
380 "version", "community",
381 "contributors", "funding",
382 "references", "dates",
383)
386def extract_license_for_entity_stage(graph: Graph, entity_id: str, stage: str) -> str | None:
387 steps = STAGE_STEPS[stage]
388 for step in steps:
389 lic_uri = URIRef(f"{BASE_URI}/lic/{entity_id}/{step}/1")
390 for _, _, license_url in graph.triples((lic_uri, P70I, None)):
391 zenodo_license = LICENSE_URI_TO_ZENODO.get(str(license_url))
392 if zenodo_license:
393 return zenodo_license
394 return None
397CC0_DISCLAIMER = (
398 "No copyright or related rights are claimed in these digital reproductions. "
399 "The files are released under CC0 1.0 Universal (Public Domain Dedication).\n"
400 "\n"
401 "Please note that the original works may qualify as cultural heritage assets "
402 "under Italian law (D. Lgs. 42/2004). Consequently, although the digital "
403 "reproductions are released under CC0, certain uses — and in particular "
404 "commercial uses — may be subject to specific authorisations, restrictions, "
405 "or fees pursuant to the applicable provisions governing the reproduction "
406 "and publication of cultural heritage assets. Users are therefore responsible "
407 "for ensuring compliance with Italian cultural heritage regulations before "
408 "undertaking any commercial exploitation of the images."
409)
412CHAD_AP_URL = "https://w3id.org/dharc/ontology/chad-ap"
414RESTRICTED_NOTICE = (
415 "The digital object files are not included in this dataset "
416 "because the holding institution did not grant permission for their publication. "
417 "Only metadata and provenance files are provided."
418)
421def build_enhanced_description(
422 stage: str,
423 title: str,
424 keeper_name: str | None = None,
425 keeper_location: str | None = None,
426) -> str:
427 parts = [
428 f'{STAGE_DESCRIPTION_NAMES[stage]} of "{title}" from the Aldrovandi Digital Twin.',
429 ]
430 if keeper_name:
431 keeper_line = f"The original object is held by {keeper_name}"
432 if keeper_location:
433 keeper_line += f" ({keeper_location})"
434 keeper_line += "."
435 parts.append(keeper_line)
436 parts.append(STAGE_DESCRIPTIONS[stage])
437 parts.append(
438 f"Includes metadata (meta.ttl) and provenance (prov.trig) files following the <a href=\"{CHAD_AP_URL}\">CHAD-AP</a> ontology.",
439 )
440 return " ".join(parts) + "\n"
443WORKFLOW_DOI_URL = "https://doi.org/10.46298/transformations.14773"
446def build_methods_description(
447 graph: Graph,
448 entity_id: str,
449 stage: str,
450) -> str:
451 parts = [
452 f'Acquisition and digitization followed the reproducible workflow documented in '
453 f'<a href="{WORKFLOW_DOI_URL}">doi:10.46298/transformations.14773</a>.',
454 ]
455 technique = extract_acquisition_technique(graph, entity_id)
456 devices = extract_devices(graph, entity_id)
457 if technique:
458 line = f"Data was acquired using {technique}"
459 if devices:
460 line += f" ({', '.join(devices)})"
461 line += "."
462 parts.append(line)
463 software = extract_software_for_stage(graph, entity_id, stage)
464 if software:
465 parts.append(f"Processing software: {', '.join(software)}.")
466 parts.append(
467 f'Metadata follows the <a href="{CHAD_AP_URL}">Cultural Heritage Acquisition and '
468 f"Digitisation Application Profile (CHAD-AP)</a> based on CIDOC-CRM.",
469 )
470 return "\n\n".join(parts) + "\n"
473def build_entity_uri(entity_id: str) -> str:
474 return f"{BASE_URI}/itm/{entity_id}/ob00/1"
477LICENSE_INFO = {
478 "cc0-1.0": {
479 "title": "Creative Commons Zero v1.0 Universal",
480 "link": "https://creativecommons.org/publicdomain/zero/1.0/",
481 },
482 "cc-by-4.0": {
483 "title": "Creative Commons Attribution 4.0 International",
484 "link": "https://creativecommons.org/licenses/by/4.0/",
485 },
486 "cc-by-nc-4.0": {
487 "title": "Creative Commons Attribution Non Commercial 4.0 International",
488 "link": "https://creativecommons.org/licenses/by-nc/4.0/",
489 },
490 "cc-by-sa-4.0": {
491 "title": "Creative Commons Attribution Share Alike 4.0 International",
492 "link": "https://creativecommons.org/licenses/by-sa/4.0/",
493 },
494 "cc-by-nc-sa-4.0": {
495 "title": "Creative Commons Attribution Non Commercial Share Alike 4.0 International",
496 "link": "https://creativecommons.org/licenses/by-nc-sa/4.0/",
497 },
498}
501def build_rights(content_license: str | None) -> list[dict]:
502 metadata_info = LICENSE_INFO["cc0-1.0"]
503 rights = [{
504 "title": {"en": f"{metadata_info['title']} (Metadata license)"},
505 "description": {"en": "Applies to metadata files: meta.ttl, prov.trig"},
506 "link": metadata_info["link"],
507 }]
508 if content_license and content_license in LICENSE_INFO:
509 content_info = LICENSE_INFO[content_license]
510 rights.append({
511 "title": {"en": f"{content_info['title']} (Content license)"},
512 "description": {"en": "Applies to all data files except meta.ttl and prov.trig"},
513 "link": content_info["link"],
514 })
515 return rights
518def generate_zenodo_config(
519 stage: str,
520 zip_path: Path,
521 title: str,
522 base_config: dict,
523 creators: list[dict],
524 methods_description: str,
525 license: str | None = None,
526 entity_uri: str | None = None,
527 keeper_name: str | None = None,
528 keeper_location: str | None = None,
529 has_license: bool = True,
530) -> dict:
531 description = build_enhanced_description(stage, title, keeper_name, keeper_location)
533 config: dict = {
534 "title": f"{title} - {STAGE_TITLE_NAMES[stage]} - Aldrovandi Digital Twin",
535 "description": description,
536 "resource_type": {"id": "dataset"},
537 "publisher": "Zenodo",
538 "access": {"record": "public", "files": "public"},
539 "files": [str(zip_path.absolute())],
540 "creators": creators,
541 "publication_date": date.today().isoformat(),
542 "rights": build_rights(license),
543 }
545 additional_descriptions: list[dict] = [
546 {
547 "description": methods_description,
548 "type": {"id": "methods"},
549 },
550 {
551 "description": base_config["notes"],
552 "type": {"id": "notes"},
553 },
554 ]
555 if not has_license:
556 additional_descriptions.append({
557 "description": RESTRICTED_NOTICE,
558 "type": {"id": "notes"},
559 })
560 if license == "cc0-1.0":
561 additional_descriptions.append({
562 "description": CC0_DISCLAIMER,
563 "type": {"id": "notes"},
564 })
565 config["additional_descriptions"] = additional_descriptions
567 config["locations"] = {
568 "features": [
569 {
570 "geometry": {
571 "type": "Point",
572 "coordinates": [loc["lon"], loc["lat"]],
573 },
574 "place": loc["place"],
575 "description": loc["description"],
576 }
577 for loc in base_config["locations"]
578 ]
579 }
581 for field in PROPAGATED_FIELDS:
582 if field in base_config and field not in config:
583 config[field] = base_config[field]
585 if "related_identifiers" in base_config:
586 converted = []
587 for ri in base_config["related_identifiers"]:
588 entry: dict = {
589 "identifier": ri["identifier"],
590 "relation_type": {"id": ri["relation"]},
591 }
592 if "resource_type" in ri:
593 entry["resource_type"] = {"id": ri["resource_type"]}
594 if "scheme" in ri:
595 entry["scheme"] = ri["scheme"]
596 converted.append(entry)
597 config["related_identifiers"] = converted
599 if entity_uri:
600 config["identifiers"] = [{"identifier": entity_uri, "scheme": "url"}]
602 return config
605def prepare_all(
606 root: Path,
607 zenodo_base_config_path: Path,
608 output_dir: Path,
609 kg_path: Path = KG_PATH,
610) -> None:
611 structure = scan_folder_structure(root)
613 kg = load_kg(kg_path)
614 licensed_stages = extract_licensed_entity_stages(kg)
615 entity_groups = group_folders_by_entity(structure)
617 with open(zenodo_base_config_path) as f:
618 base_config = yaml.safe_load(f)
620 creators_lookup = load_creators_lookup(CREATORS_LOOKUP_PATH)
622 zips_dir = output_dir / "zips"
623 configs_dir = output_dir / "configs"
624 zips_dir.mkdir(parents=True, exist_ok=True)
625 configs_dir.mkdir(parents=True, exist_ok=True)
627 with Progress(
628 SpinnerColumn(),
629 TextColumn("[progress.description]{task.description}"),
630 BarColumn(),
631 MofNCompleteColumn(),
632 ) as progress:
633 task = progress.add_task("Creating stage packages", total=len(entity_groups) * len(STAGES))
635 for entity_id, folders in entity_groups.items():
636 title = extract_entity_title(kg, entity_id)
637 keeper_name, keeper_location = extract_keeper_info(kg, entity_id)
638 sala_slug = slugify(folders[0][0])
639 title_slug = slugify(title)
640 metadata_creators = build_metadata_creators(kg, entity_id, creators_lookup)
641 for stage in STAGES:
642 progress.update(task, description=f"Entity {entity_id} - {stage}")
643 result = create_stage_zip(entity_id, stage, folders, root, licensed_stages, zips_dir, title)
644 if result is None:
645 progress.advance(task)
646 continue
647 zip_path, has_license = result
648 digitization_creators = build_creators_for_entity_stage(kg, entity_id, stage, creators_lookup)
649 creators = merge_creators(digitization_creators, metadata_creators)
650 license = extract_license_for_entity_stage(kg, entity_id, stage)
651 entity_uri = build_entity_uri(entity_id)
652 methods_description = build_methods_description(kg, entity_id, stage)
653 config = generate_zenodo_config(stage, zip_path, title, base_config, creators, methods_description, license, entity_uri, keeper_name, keeper_location, has_license)
654 config_path = configs_dir / f"{sala_slug}-{title_slug}-{stage}.yaml"
655 with open(config_path, "w") as f:
656 yaml.dump(config, f, Dumper=LiteralBlockDumper, default_flow_style=False, allow_unicode=True, sort_keys=False)
657 progress.advance(task)
660def _extract_doi(record: dict) -> str:
661 pids = record.get("pids", {})
662 doi_info = pids.get("doi", {})
663 return doi_info.get("identifier", "")
666def _extract_record_url(record: dict) -> str:
667 return record["links"]["self_html"]
670LICENSE_TITLE_TO_SHORT: dict[str, str] = {
671 info["title"]: short_name for short_name, info in LICENSE_INFO.items()
672}
675def _format_creators_for_table(config: dict) -> str:
676 creators = config["creators"]
677 parts: list[str] = []
678 for c in creators:
679 org = c["person_or_org"]
680 orcid = org["identifiers"][0]["identifier"]
681 parts.append(f"{org['family_name']}, {org['given_name']} [orcid:{orcid}]")
682 return "; ".join(parts)
685def _format_licenses_for_table(config: dict) -> str:
686 parts: list[str] = []
687 for right in config["rights"]:
688 title_en = right["title"]["en"]
689 for full_name, short_name in LICENSE_TITLE_TO_SHORT.items():
690 if title_en.startswith(full_name):
691 context = title_en.removeprefix(full_name).strip(" ()")
692 parts.append(f"{short_name} ({context})")
693 break
694 return "; ".join(parts)
697DOI_TABLE_FIELDNAMES = [
698 "Numero su DMP",
699 "Caso di studio",
700 "Autore/i",
701 "Tipo",
702 "Titolo",
703 "Data pubblicazione",
704 "DOI",
705 "URL",
706 "Repository",
707 "Licenza",
708 "Note",
709]
712def upload_all(configs_dir: Path, publish: bool = False) -> Path:
713 config_files = sorted(configs_dir.glob("*.yaml"))
714 doi_table: list[dict[str, str]] = []
716 with Progress(
717 SpinnerColumn(),
718 TextColumn("[progress.description]{task.description}"),
719 BarColumn(),
720 MofNCompleteColumn(),
721 ) as progress:
722 task = progress.add_task("Uploading to Zenodo", total=len(config_files))
723 for config_file in config_files:
724 progress.update(task, description=f"Uploading {config_file.stem}")
725 record = piccione_upload(str(config_file), publish=publish)
726 with open(config_file) as f:
727 config = yaml.safe_load(f)
728 row: dict[str, str] = {
729 "Numero su DMP": "",
730 "Caso di studio": "Aldrovandi",
731 "Autore/i": _format_creators_for_table(config),
732 "Tipo": "Dataset",
733 "Titolo": config["title"],
734 "Data pubblicazione": config["publication_date"],
735 "DOI": _extract_doi(record),
736 "URL": _extract_record_url(record),
737 "Repository": "Zenodo",
738 "Licenza": _format_licenses_for_table(config),
739 "Note": "",
740 }
741 doi_table.append(row)
742 progress.advance(task)
744 csv_path = configs_dir.parent / "doi_table.csv"
745 with open(csv_path, "w", newline="") as f:
746 writer = csv.DictWriter(f, fieldnames=DOI_TABLE_FIELDNAMES)
747 writer.writeheader()
748 writer.writerows(doi_table)
749 print(f"DOI table written to {csv_path}")
750 return csv_path
753def parse_arguments(): # pragma: no cover
754 parser = argparse.ArgumentParser(description="Prepare and upload Zenodo packages")
755 subparsers = parser.add_subparsers(dest="command", required=True)
757 prepare_parser = subparsers.add_parser("prepare", help="Create zips and YAML configs")
758 prepare_parser.add_argument("root", type=Path, help="Root directory with Sala/Folder/Stage structure")
759 prepare_parser.add_argument("zenodo_config", type=Path, help="Base Zenodo configuration YAML")
760 prepare_parser.add_argument("--output", "-o", type=Path, default=Path("zenodo_output"), help="Output directory")
761 upload_parser = subparsers.add_parser("upload", help="Upload to Zenodo")
762 upload_parser.add_argument("configs_dir", type=Path, help="Directory containing YAML configs")
763 upload_parser.add_argument("--publish", action="store_true", help="Publish after upload")
765 return parser.parse_args()
768def main(): # pragma: no cover
769 args = parse_arguments()
770 if args.command == "prepare":
771 prepare_all(
772 root=args.root,
773 zenodo_base_config_path=args.zenodo_config,
774 output_dir=args.output,
775 )
776 elif args.command == "upload":
777 upload_all(configs_dir=args.configs_dir, publish=args.publish)
780if __name__ == "__main__": # pragma: no cover
781 main()