Coverage for changes_metadata_manager / generate_provenance.py: 98%

49 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-03-04 14:41 +0000

1""" 

2Module to generate provenance snapshots from RDF data. 

3Loads RDF data in various formats from all files in a directory, 

4extracts all subjects, and creates provenance snapshots as named graphs 

5with type prov:Entity. 

6""" 

7 

8import os 

9import datetime 

10from rdflib import Dataset, URIRef, Namespace, Literal 

11from rdflib.namespace import RDF, XSD, DCTERMS 

12 

13CC0 = URIRef("https://creativecommons.org/publicdomain/zero/1.0/") 

14 

15def generate_provenance_snapshots(input_directory: str, output_file: str, input_format: str | None = None, output_format: str = 'json-ld', agent_orcid: str = '', primary_source: str = ''): 

16 """ 

17 Generate provenance snapshots from RDF data. 

18  

19 Args: 

20 input_directory: Path to directory containing RDF files 

21 output_file: Path to output file with provenance snapshots (N-Quads format) 

22 input_format: Optional format to use for all input files (overrides auto-detection) 

23 output_format: Format to use for output file (default: nquads) 

24 agent_orcid: ORCID of the responsible agent 

25 primary_source: URI of the primary source for the data 

26 """ 

27 

28 input_graph = Dataset() 

29 default_graph = input_graph.graph() 

30 

31 file_count = 0 

32 

33 rdf_extensions = { 

34 '.ttl': 'turtle', 

35 '.nt': 'nt', 

36 '.n3': 'n3', 

37 '.xml': 'xml', 

38 '.rdf': 'xml', 

39 '.jsonld': 'json-ld', 

40 '.nq': 'nquads', 

41 '.trig': 'trig' 

42 } 

43 

44 for filename in os.listdir(input_directory): 

45 file_path = os.path.join(input_directory, filename) 

46 

47 if input_format: 

48 format_name = input_format 

49 else: 

50 _, ext = os.path.splitext(filename.lower()) 

51 if ext not in rdf_extensions: 

52 continue 

53 format_name = rdf_extensions[ext] 

54 

55 default_graph.parse(file_path, format=format_name) 

56 file_count += 1 

57 

58 if file_count == 0: 

59 return 

60 

61 dataset = Dataset() 

62 

63 PROV = Namespace('http://www.w3.org/ns/prov#') 

64 dataset.namespace_manager.bind('prov', PROV) 

65 dataset.namespace_manager.bind('dcterms', DCTERMS) 

66 

67 dataset.default_graph.add((URIRef(""), DCTERMS.license, CC0)) 

68 

69 for prefix, namespace in input_graph.namespace_manager.namespaces(): 

70 dataset.namespace_manager.bind(prefix, namespace) 

71 

72 license_subjects = {s for s, p, _ in default_graph if p == DCTERMS.license} 

73 subjects = set() 

74 for s, _, _ in default_graph: 

75 if isinstance(s, URIRef) and s not in license_subjects: 

76 subjects.add(s) 

77 

78 generation_time = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).isoformat() 

79 

80 responsible_agent = URIRef(agent_orcid) 

81 primary_source_uri = URIRef(primary_source) 

82 

83 for subject in subjects: 

84 prov_graph_uri = URIRef(f"{subject}/prov/") 

85 

86 snapshot_uri = URIRef(f"{subject}/prov/se/1") 

87 

88 prov_graph = dataset.graph(identifier=prov_graph_uri) 

89 

90 prov_graph.add((snapshot_uri, RDF.type, PROV.Entity)) 

91 prov_graph.add((snapshot_uri, PROV.specializationOf, subject)) 

92 

93 prov_graph.add((snapshot_uri, PROV.generatedAtTime, Literal(generation_time, datatype=XSD.dateTime))) 

94 

95 prov_graph.add((snapshot_uri, PROV.wasAttributedTo, responsible_agent)) 

96 

97 prov_graph.add((snapshot_uri, PROV.hadPrimarySource, primary_source_uri)) 

98 

99 description = f"Entity <{str(subject)}> was created" 

100 prov_graph.add((snapshot_uri, DCTERMS.description, Literal(description, lang="en"))) 

101 

102 dataset.serialize(destination=output_file, format=output_format)