Coverage for changes_metadata_manager / generate_provenance.py: 98%
49 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-04 14:41 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-03-04 14:41 +0000
1"""
2Module to generate provenance snapshots from RDF data.
3Loads RDF data in various formats from all files in a directory,
4extracts all subjects, and creates provenance snapshots as named graphs
5with type prov:Entity.
6"""
8import os
9import datetime
10from rdflib import Dataset, URIRef, Namespace, Literal
11from rdflib.namespace import RDF, XSD, DCTERMS
13CC0 = URIRef("https://creativecommons.org/publicdomain/zero/1.0/")
15def generate_provenance_snapshots(input_directory: str, output_file: str, input_format: str | None = None, output_format: str = 'json-ld', agent_orcid: str = '', primary_source: str = ''):
16 """
17 Generate provenance snapshots from RDF data.
19 Args:
20 input_directory: Path to directory containing RDF files
21 output_file: Path to output file with provenance snapshots (N-Quads format)
22 input_format: Optional format to use for all input files (overrides auto-detection)
23 output_format: Format to use for output file (default: nquads)
24 agent_orcid: ORCID of the responsible agent
25 primary_source: URI of the primary source for the data
26 """
28 input_graph = Dataset()
29 default_graph = input_graph.graph()
31 file_count = 0
33 rdf_extensions = {
34 '.ttl': 'turtle',
35 '.nt': 'nt',
36 '.n3': 'n3',
37 '.xml': 'xml',
38 '.rdf': 'xml',
39 '.jsonld': 'json-ld',
40 '.nq': 'nquads',
41 '.trig': 'trig'
42 }
44 for filename in os.listdir(input_directory):
45 file_path = os.path.join(input_directory, filename)
47 if input_format:
48 format_name = input_format
49 else:
50 _, ext = os.path.splitext(filename.lower())
51 if ext not in rdf_extensions:
52 continue
53 format_name = rdf_extensions[ext]
55 default_graph.parse(file_path, format=format_name)
56 file_count += 1
58 if file_count == 0:
59 return
61 dataset = Dataset()
63 PROV = Namespace('http://www.w3.org/ns/prov#')
64 dataset.namespace_manager.bind('prov', PROV)
65 dataset.namespace_manager.bind('dcterms', DCTERMS)
67 dataset.default_graph.add((URIRef(""), DCTERMS.license, CC0))
69 for prefix, namespace in input_graph.namespace_manager.namespaces():
70 dataset.namespace_manager.bind(prefix, namespace)
72 license_subjects = {s for s, p, _ in default_graph if p == DCTERMS.license}
73 subjects = set()
74 for s, _, _ in default_graph:
75 if isinstance(s, URIRef) and s not in license_subjects:
76 subjects.add(s)
78 generation_time = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).isoformat()
80 responsible_agent = URIRef(agent_orcid)
81 primary_source_uri = URIRef(primary_source)
83 for subject in subjects:
84 prov_graph_uri = URIRef(f"{subject}/prov/")
86 snapshot_uri = URIRef(f"{subject}/prov/se/1")
88 prov_graph = dataset.graph(identifier=prov_graph_uri)
90 prov_graph.add((snapshot_uri, RDF.type, PROV.Entity))
91 prov_graph.add((snapshot_uri, PROV.specializationOf, subject))
93 prov_graph.add((snapshot_uri, PROV.generatedAtTime, Literal(generation_time, datatype=XSD.dateTime)))
95 prov_graph.add((snapshot_uri, PROV.wasAttributedTo, responsible_agent))
97 prov_graph.add((snapshot_uri, PROV.hadPrimarySource, primary_source_uri))
99 description = f"Entity <{str(subject)}> was created"
100 prov_graph.add((snapshot_uri, DCTERMS.description, Literal(description, lang="en")))
102 dataset.serialize(destination=output_file, format=output_format)