Coverage for changes_metadata_manager / patch / license_metadata.py: 94%
158 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-29 18:29 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-29 18:29 +0000
1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelomas@gmail.com>
2#
3# SPDX-License-Identifier: ISC
5import argparse
6import json
7import re
8import time
9from pathlib import Path
11import requests
12import yaml
13from rich.console import Console
14from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn
16from piccione.upload.on_zenodo import (
17 build_inveniordm_payload,
18 get_headers,
19 publish_draft,
20 update_draft_metadata,
21)
23from changes_metadata_manager.folder_metadata_builder import load_kg
24from changes_metadata_manager.zenodo_upload import (
25 CC0_DISCLAIMER,
26 LiteralBlockDumper,
27 build_rights,
28 extract_license_for_entity_stage,
29)
31console = Console()
33MAX_RETRIES = 5
34BASE_BACKOFF = 10
36STAGE_PATTERN = re.compile(r"-(raw|rawp|dcho|dchoo)\.yaml$")
37ENTITY_URI_PATTERN = re.compile(r"/itm/([^/]+)/ob\d+/\d+$")
40def _request_with_retry(method: str, url: str, **kwargs) -> requests.Response:
41 response = requests.request(method, url, **kwargs)
42 for attempt in range(1, MAX_RETRIES):
43 if response.status_code != 429:
44 return response
45 wait = BASE_BACKOFF * (2 ** attempt)
46 console.print(f" [yellow]Rate limited, retrying in {wait}s...[/yellow]")
47 time.sleep(wait)
48 response = requests.request(method, url, **kwargs)
49 return response
52def _create_edit_draft(zenodo_url: str, record_id: str, access_token: str, user_agent: str) -> None:
53 response = _request_with_retry(
54 "POST",
55 f"{zenodo_url}/records/{record_id}/draft",
56 headers=get_headers(access_token, user_agent),
57 timeout=30,
58 )
59 if response.status_code == 403 and "already" in response.text.lower():
60 return
61 response.raise_for_status()
64def _extract_stage_from_config_path(config_file: str) -> str:
65 m = STAGE_PATTERN.search(config_file)
66 assert m, f"Cannot extract stage from config path: {config_file}"
67 return m.group(1)
70def _extract_entity_id_from_config(config: dict) -> str:
71 for entry in config["identifiers"]:
72 m = ENTITY_URI_PATTERN.search(entry["identifier"])
73 if m:
74 return m.group(1)
75 raise ValueError(f"No entity URI found in identifiers: {config['identifiers']}")
78def _fetch_record_metadata(zenodo_url: str, record_id: str, access_token: str, user_agent: str) -> dict:
79 headers = {
80 "Authorization": f"Bearer {access_token}",
81 "User-Agent": user_agent,
82 "Accept": "application/vnd.inveniordm.v1+json",
83 }
84 response = _request_with_retry("GET", f"{zenodo_url}/records/{record_id}/draft", headers=headers, timeout=30)
85 if response.status_code == 404:
86 response = _request_with_retry("GET", f"{zenodo_url}/records/{record_id}", headers=headers, timeout=30)
87 response.raise_for_status()
88 return response.json()["metadata"]
91def _current_content_license(metadata: dict) -> str | None:
92 for right in metadata.get("rights", []):
93 title = right.get("title", {}).get("en", "")
94 if "(Content license)" in title:
95 link = right.get("link", "")
96 if "zero" in link:
97 return "cc0-1.0"
98 if "by-nc-sa" in link:
99 return "cc-by-nc-sa-4.0"
100 if "by-nc" in link:
101 return "cc-by-nc-4.0"
102 if "by-sa" in link:
103 return "cc-by-sa-4.0"
104 if "by" in link:
105 return "cc-by-4.0"
106 return None
109def _has_cc0_disclaimer(metadata: dict) -> bool:
110 for desc in metadata.get("additional_descriptions", []):
111 if "D. Lgs. 42/2004" in desc.get("description", ""):
112 return True
113 return False
116def _rebuild_additional_descriptions(
117 current: list[dict], correct_license: str | None
118) -> list[dict]:
119 rebuilt = [d for d in current if "D. Lgs. 42/2004" not in d.get("description", "")]
120 if correct_license == "cc0-1.0":
121 rebuilt.append({
122 "description": CC0_DISCLAIMER,
123 "type": {"id": "notes"},
124 })
125 return rebuilt
128def patch_drafts(
129 drafts_path: Path,
130 kg_path: Path,
131 *,
132 dry_run: bool = False,
133) -> None:
134 console.print(f"Loading KG from {kg_path}...")
135 kg = load_kg(kg_path)
137 with open(drafts_path) as f:
138 drafts = json.load(f)
140 stats = {"patched": 0, "skipped_correct": 0, "skipped_failed": 0, "skipped_no_kg_license": 0, "errors": 0}
141 patch_log: list[dict] = []
143 entries_to_check = []
144 for entry in drafts:
145 if entry.get("status") == "failed":
146 stats["skipped_failed"] += 1
147 continue
148 stage = _extract_stage_from_config_path(entry["config_file"])
149 entries_to_check.append((entry, stage))
151 console.print(f"Checking {len(entries_to_check)} drafts...")
153 with Progress(
154 SpinnerColumn(),
155 TextColumn("[progress.description]{task.description}"),
156 BarColumn(),
157 MofNCompleteColumn(),
158 ) as progress:
159 task = progress.add_task("Patching", total=len(entries_to_check))
161 for entry, stage in entries_to_check:
162 config_path = Path(entry["config_file"])
163 record_id = entry["draft_id"]
164 zenodo_url = entry["zenodo_url"]
165 access_token = entry["access_token"]
166 user_agent = entry.get("user_agent", "changes-metadata-manager/1.0.0")
167 is_published = entry.get("status") == "published"
168 progress.update(task, description=f"Record {record_id}")
170 try:
171 zenodo_metadata = _fetch_record_metadata(zenodo_url, str(record_id), access_token, user_agent)
173 entity_id = _extract_entity_id_from_config(zenodo_metadata)
175 correct_license = extract_license_for_entity_stage(kg, entity_id, stage)
176 if correct_license is None:
177 stats["skipped_no_kg_license"] += 1
178 progress.advance(task)
179 continue
181 current_license = _current_content_license(zenodo_metadata)
183 needs_rights_fix = correct_license != current_license
184 needs_disclaimer_fix = (correct_license == "cc0-1.0") != _has_cc0_disclaimer(zenodo_metadata)
186 if not needs_rights_fix and not needs_disclaimer_fix:
187 stats["skipped_correct"] += 1
188 progress.advance(task)
189 continue
191 new_rights = build_rights(correct_license)
192 new_additional = _rebuild_additional_descriptions(
193 zenodo_metadata.get("additional_descriptions", []), correct_license
194 )
196 log_entry = {
197 "record_id": record_id,
198 "config_file": entry["config_file"],
199 "entity_id": entity_id,
200 "stage": stage,
201 "old_license": current_license,
202 "new_license": correct_license,
203 "rights_changed": needs_rights_fix,
204 "disclaimer_changed": needs_disclaimer_fix,
205 }
207 if dry_run:
208 console.print(f" [cyan]DRY RUN[/cyan] {record_id}: {current_license} → {correct_license}")
209 log_entry["status"] = "dry_run"
210 patch_log.append(log_entry)
211 stats["patched"] += 1
212 progress.advance(task)
213 continue
215 if is_published:
216 _create_edit_draft(zenodo_url, str(record_id), access_token, user_agent)
218 with open(config_path) as f:
219 config = yaml.safe_load(f)
221 config["rights"] = new_rights
222 config["additional_descriptions"] = new_additional
224 access = config["access"]
225 payload = build_inveniordm_payload(config, access)
226 update_draft_metadata(zenodo_url, access_token, str(record_id), payload, user_agent)
228 if is_published:
229 publish_draft(zenodo_url, access_token, str(record_id), user_agent)
231 with open(config_path, "w") as f:
232 yaml.dump(config, f, Dumper=LiteralBlockDumper, default_flow_style=False, allow_unicode=True, sort_keys=False)
234 log_entry["status"] = "patched"
235 stats["patched"] += 1
236 patch_log.append(log_entry)
237 except Exception as exc:
238 stats["errors"] += 1
239 patch_log.append({
240 "record_id": record_id,
241 "config_file": entry["config_file"],
242 "stage": stage,
243 "status": "error",
244 "error": str(exc),
245 })
246 console.print(f"\n[red][FAILED][/red] Record {record_id}: {exc}")
248 progress.advance(task)
249 time.sleep(2)
251 log_path = drafts_path.parent / "patch_license_log.json"
252 with open(log_path, "w") as f:
253 json.dump(patch_log, f, indent=2)
255 console.print()
256 console.print(f"[bold]Results:[/bold]")
257 console.print(f" Patched: {stats['patched']}")
258 console.print(f" Already correct: {stats['skipped_correct']}")
259 console.print(f" Skipped (failed): {stats['skipped_failed']}")
260 console.print(f" Skipped (no KG license): {stats['skipped_no_kg_license']}")
261 console.print(f" Errors: {stats['errors']}")
262 console.print(f" Log: {log_path}")
265if __name__ == "__main__": # pragma: no cover
266 parser = argparse.ArgumentParser(description="Patch license metadata on Zenodo records")
267 parser.add_argument("drafts_json", type=Path, help="Path to drafts.json")
268 parser.add_argument("kg_path", type=Path, help="Path to knowledge graph (kg.ttl)")
269 parser.add_argument("--dry-run", action="store_true", help="Show changes without applying")
270 args = parser.parse_args()
271 patch_drafts(args.drafts_json, args.kg_path, dry_run=args.dry_run)