Coverage for changes_metadata_manager / generate_provenance.py: 100%
48 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-11 13:06 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-11 13:06 +0000
1"""
2Module to generate provenance snapshots from RDF data.
3Loads RDF data in various formats from all files in a directory,
4extracts all subjects, and creates provenance snapshots as named graphs
5with type prov:Entity.
6"""
8import os
9import datetime
10from rdflib import Dataset, URIRef, Namespace, Literal
11from rdflib.namespace import RDF, XSD, DCTERMS
13def generate_provenance_snapshots(input_directory, output_file, input_format=None, output_format='nquads', agent_orcid=None, primary_source=None):
14 """
15 Generate provenance snapshots from RDF data.
17 Args:
18 input_directory: Path to directory containing RDF files
19 output_file: Path to output file with provenance snapshots (N-Quads format)
20 input_format: Optional format to use for all input files (overrides auto-detection)
21 output_format: Format to use for output file (default: nquads)
22 agent_orcid: ORCID of the responsible agent
23 primary_source: URI of the primary source for the data
24 """
26 input_graph = Dataset()
27 default_graph = input_graph.graph()
29 file_count = 0
31 rdf_extensions = {
32 '.ttl': 'turtle',
33 '.nt': 'nt',
34 '.n3': 'n3',
35 '.xml': 'xml',
36 '.rdf': 'xml',
37 '.jsonld': 'json-ld',
38 '.nq': 'nquads',
39 '.trig': 'trig'
40 }
42 for filename in os.listdir(input_directory):
43 file_path = os.path.join(input_directory, filename)
45 if input_format:
46 format_name = input_format
47 else:
48 _, ext = os.path.splitext(filename.lower())
49 format_name = rdf_extensions[ext]
51 print(f"Processing {file_path} as {format_name}...")
52 default_graph.parse(file_path, format=format_name)
53 file_count += 1
55 if file_count == 0:
56 print(f"No valid RDF files found in {input_directory}")
57 return
59 print(f"Processed {file_count} RDF files")
61 dataset = Dataset()
63 PROV = Namespace('http://www.w3.org/ns/prov#')
64 dataset.namespace_manager.bind('prov', PROV)
65 dataset.namespace_manager.bind('dcterms', DCTERMS)
67 for prefix, namespace in input_graph.namespace_manager.namespaces():
68 dataset.namespace_manager.bind(prefix, namespace)
70 subjects = set()
71 for s, p, o in default_graph:
72 if isinstance(s, URIRef):
73 subjects.add(s)
75 print(f"Found {len(subjects)} subjects in the input files")
77 generation_time = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).isoformat()
79 responsible_agent = URIRef(agent_orcid)
80 primary_source_uri = URIRef(primary_source)
82 for subject in subjects:
83 prov_graph_uri = URIRef(f"{subject}/prov/")
85 snapshot_uri = URIRef(f"{subject}/prov/se/1")
87 prov_graph = dataset.graph(identifier=prov_graph_uri)
89 prov_graph.add((snapshot_uri, RDF.type, PROV.Entity))
91 prov_graph.add((snapshot_uri, PROV.generatedAtTime, Literal(generation_time, datatype=XSD.dateTime)))
93 prov_graph.add((snapshot_uri, PROV.wasAttributedTo, responsible_agent))
95 prov_graph.add((snapshot_uri, PROV.hadPrimarySource, primary_source_uri))
97 description = f"Entity <{str(subject)}> was created"
98 prov_graph.add((snapshot_uri, DCTERMS.description, Literal(description, lang="en")))
100 dataset.serialize(destination=output_file, format=output_format)
101 print(f"Provenance snapshots saved to {output_file} in {output_format} format")