Coverage for changes_metadata_manager / patch / license_metadata.py: 94%

158 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-29 18:29 +0000

1# SPDX-FileCopyrightText: 2025-2026 Arcangelo Massari <arcangelomas@gmail.com> 

2# 

3# SPDX-License-Identifier: ISC 

4 

5import argparse 

6import json 

7import re 

8import time 

9from pathlib import Path 

10 

11import requests 

12import yaml 

13from rich.console import Console 

14from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, MofNCompleteColumn 

15 

16from piccione.upload.on_zenodo import ( 

17 build_inveniordm_payload, 

18 get_headers, 

19 publish_draft, 

20 update_draft_metadata, 

21) 

22 

23from changes_metadata_manager.folder_metadata_builder import load_kg 

24from changes_metadata_manager.zenodo_upload import ( 

25 CC0_DISCLAIMER, 

26 LiteralBlockDumper, 

27 build_rights, 

28 extract_license_for_entity_stage, 

29) 

30 

31console = Console() 

32 

33MAX_RETRIES = 5 

34BASE_BACKOFF = 10 

35 

36STAGE_PATTERN = re.compile(r"-(raw|rawp|dcho|dchoo)\.yaml$") 

37ENTITY_URI_PATTERN = re.compile(r"/itm/([^/]+)/ob\d+/\d+$") 

38 

39 

40def _request_with_retry(method: str, url: str, **kwargs) -> requests.Response: 

41 response = requests.request(method, url, **kwargs) 

42 for attempt in range(1, MAX_RETRIES): 

43 if response.status_code != 429: 

44 return response 

45 wait = BASE_BACKOFF * (2 ** attempt) 

46 console.print(f" [yellow]Rate limited, retrying in {wait}s...[/yellow]") 

47 time.sleep(wait) 

48 response = requests.request(method, url, **kwargs) 

49 return response 

50 

51 

52def _create_edit_draft(zenodo_url: str, record_id: str, access_token: str, user_agent: str) -> None: 

53 response = _request_with_retry( 

54 "POST", 

55 f"{zenodo_url}/records/{record_id}/draft", 

56 headers=get_headers(access_token, user_agent), 

57 timeout=30, 

58 ) 

59 if response.status_code == 403 and "already" in response.text.lower(): 

60 return 

61 response.raise_for_status() 

62 

63 

64def _extract_stage_from_config_path(config_file: str) -> str: 

65 m = STAGE_PATTERN.search(config_file) 

66 assert m, f"Cannot extract stage from config path: {config_file}" 

67 return m.group(1) 

68 

69 

70def _extract_entity_id_from_config(config: dict) -> str: 

71 for entry in config["identifiers"]: 

72 m = ENTITY_URI_PATTERN.search(entry["identifier"]) 

73 if m: 

74 return m.group(1) 

75 raise ValueError(f"No entity URI found in identifiers: {config['identifiers']}") 

76 

77 

78def _fetch_record_metadata(zenodo_url: str, record_id: str, access_token: str, user_agent: str) -> dict: 

79 headers = { 

80 "Authorization": f"Bearer {access_token}", 

81 "User-Agent": user_agent, 

82 "Accept": "application/vnd.inveniordm.v1+json", 

83 } 

84 response = _request_with_retry("GET", f"{zenodo_url}/records/{record_id}/draft", headers=headers, timeout=30) 

85 if response.status_code == 404: 

86 response = _request_with_retry("GET", f"{zenodo_url}/records/{record_id}", headers=headers, timeout=30) 

87 response.raise_for_status() 

88 return response.json()["metadata"] 

89 

90 

91def _current_content_license(metadata: dict) -> str | None: 

92 for right in metadata.get("rights", []): 

93 title = right.get("title", {}).get("en", "") 

94 if "(Content license)" in title: 

95 link = right.get("link", "") 

96 if "zero" in link: 

97 return "cc0-1.0" 

98 if "by-nc-sa" in link: 

99 return "cc-by-nc-sa-4.0" 

100 if "by-nc" in link: 

101 return "cc-by-nc-4.0" 

102 if "by-sa" in link: 

103 return "cc-by-sa-4.0" 

104 if "by" in link: 

105 return "cc-by-4.0" 

106 return None 

107 

108 

109def _has_cc0_disclaimer(metadata: dict) -> bool: 

110 for desc in metadata.get("additional_descriptions", []): 

111 if "D. Lgs. 42/2004" in desc.get("description", ""): 

112 return True 

113 return False 

114 

115 

116def _rebuild_additional_descriptions( 

117 current: list[dict], correct_license: str | None 

118) -> list[dict]: 

119 rebuilt = [d for d in current if "D. Lgs. 42/2004" not in d.get("description", "")] 

120 if correct_license == "cc0-1.0": 

121 rebuilt.append({ 

122 "description": CC0_DISCLAIMER, 

123 "type": {"id": "notes"}, 

124 }) 

125 return rebuilt 

126 

127 

128def patch_drafts( 

129 drafts_path: Path, 

130 kg_path: Path, 

131 *, 

132 dry_run: bool = False, 

133) -> None: 

134 console.print(f"Loading KG from {kg_path}...") 

135 kg = load_kg(kg_path) 

136 

137 with open(drafts_path) as f: 

138 drafts = json.load(f) 

139 

140 stats = {"patched": 0, "skipped_correct": 0, "skipped_failed": 0, "skipped_no_kg_license": 0, "errors": 0} 

141 patch_log: list[dict] = [] 

142 

143 entries_to_check = [] 

144 for entry in drafts: 

145 if entry.get("status") == "failed": 

146 stats["skipped_failed"] += 1 

147 continue 

148 stage = _extract_stage_from_config_path(entry["config_file"]) 

149 entries_to_check.append((entry, stage)) 

150 

151 console.print(f"Checking {len(entries_to_check)} drafts...") 

152 

153 with Progress( 

154 SpinnerColumn(), 

155 TextColumn("[progress.description]{task.description}"), 

156 BarColumn(), 

157 MofNCompleteColumn(), 

158 ) as progress: 

159 task = progress.add_task("Patching", total=len(entries_to_check)) 

160 

161 for entry, stage in entries_to_check: 

162 config_path = Path(entry["config_file"]) 

163 record_id = entry["draft_id"] 

164 zenodo_url = entry["zenodo_url"] 

165 access_token = entry["access_token"] 

166 user_agent = entry.get("user_agent", "changes-metadata-manager/1.0.0") 

167 is_published = entry.get("status") == "published" 

168 progress.update(task, description=f"Record {record_id}") 

169 

170 try: 

171 zenodo_metadata = _fetch_record_metadata(zenodo_url, str(record_id), access_token, user_agent) 

172 

173 entity_id = _extract_entity_id_from_config(zenodo_metadata) 

174 

175 correct_license = extract_license_for_entity_stage(kg, entity_id, stage) 

176 if correct_license is None: 

177 stats["skipped_no_kg_license"] += 1 

178 progress.advance(task) 

179 continue 

180 

181 current_license = _current_content_license(zenodo_metadata) 

182 

183 needs_rights_fix = correct_license != current_license 

184 needs_disclaimer_fix = (correct_license == "cc0-1.0") != _has_cc0_disclaimer(zenodo_metadata) 

185 

186 if not needs_rights_fix and not needs_disclaimer_fix: 

187 stats["skipped_correct"] += 1 

188 progress.advance(task) 

189 continue 

190 

191 new_rights = build_rights(correct_license) 

192 new_additional = _rebuild_additional_descriptions( 

193 zenodo_metadata.get("additional_descriptions", []), correct_license 

194 ) 

195 

196 log_entry = { 

197 "record_id": record_id, 

198 "config_file": entry["config_file"], 

199 "entity_id": entity_id, 

200 "stage": stage, 

201 "old_license": current_license, 

202 "new_license": correct_license, 

203 "rights_changed": needs_rights_fix, 

204 "disclaimer_changed": needs_disclaimer_fix, 

205 } 

206 

207 if dry_run: 

208 console.print(f" [cyan]DRY RUN[/cyan] {record_id}: {current_license}{correct_license}") 

209 log_entry["status"] = "dry_run" 

210 patch_log.append(log_entry) 

211 stats["patched"] += 1 

212 progress.advance(task) 

213 continue 

214 

215 if is_published: 

216 _create_edit_draft(zenodo_url, str(record_id), access_token, user_agent) 

217 

218 with open(config_path) as f: 

219 config = yaml.safe_load(f) 

220 

221 config["rights"] = new_rights 

222 config["additional_descriptions"] = new_additional 

223 

224 access = config["access"] 

225 payload = build_inveniordm_payload(config, access) 

226 update_draft_metadata(zenodo_url, access_token, str(record_id), payload, user_agent) 

227 

228 if is_published: 

229 publish_draft(zenodo_url, access_token, str(record_id), user_agent) 

230 

231 with open(config_path, "w") as f: 

232 yaml.dump(config, f, Dumper=LiteralBlockDumper, default_flow_style=False, allow_unicode=True, sort_keys=False) 

233 

234 log_entry["status"] = "patched" 

235 stats["patched"] += 1 

236 patch_log.append(log_entry) 

237 except Exception as exc: 

238 stats["errors"] += 1 

239 patch_log.append({ 

240 "record_id": record_id, 

241 "config_file": entry["config_file"], 

242 "stage": stage, 

243 "status": "error", 

244 "error": str(exc), 

245 }) 

246 console.print(f"\n[red][FAILED][/red] Record {record_id}: {exc}") 

247 

248 progress.advance(task) 

249 time.sleep(2) 

250 

251 log_path = drafts_path.parent / "patch_license_log.json" 

252 with open(log_path, "w") as f: 

253 json.dump(patch_log, f, indent=2) 

254 

255 console.print() 

256 console.print(f"[bold]Results:[/bold]") 

257 console.print(f" Patched: {stats['patched']}") 

258 console.print(f" Already correct: {stats['skipped_correct']}") 

259 console.print(f" Skipped (failed): {stats['skipped_failed']}") 

260 console.print(f" Skipped (no KG license): {stats['skipped_no_kg_license']}") 

261 console.print(f" Errors: {stats['errors']}") 

262 console.print(f" Log: {log_path}") 

263 

264 

265if __name__ == "__main__": # pragma: no cover 

266 parser = argparse.ArgumentParser(description="Patch license metadata on Zenodo records") 

267 parser.add_argument("drafts_json", type=Path, help="Path to drafts.json") 

268 parser.add_argument("kg_path", type=Path, help="Path to knowledge graph (kg.ttl)") 

269 parser.add_argument("--dry-run", action="store_true", help="Show changes without applying") 

270 args = parser.parse_args() 

271 patch_drafts(args.drafts_json, args.kg_path, dry_run=args.dry_run)