Coverage for changes_metadata_manager / generate_provenance.py: 100%

48 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-11 13:06 +0000

1""" 

2Module to generate provenance snapshots from RDF data. 

3Loads RDF data in various formats from all files in a directory, 

4extracts all subjects, and creates provenance snapshots as named graphs 

5with type prov:Entity. 

6""" 

7 

8import os 

9import datetime 

10from rdflib import Dataset, URIRef, Namespace, Literal 

11from rdflib.namespace import RDF, XSD, DCTERMS 

12 

13def generate_provenance_snapshots(input_directory, output_file, input_format=None, output_format='nquads', agent_orcid=None, primary_source=None): 

14 """ 

15 Generate provenance snapshots from RDF data. 

16  

17 Args: 

18 input_directory: Path to directory containing RDF files 

19 output_file: Path to output file with provenance snapshots (N-Quads format) 

20 input_format: Optional format to use for all input files (overrides auto-detection) 

21 output_format: Format to use for output file (default: nquads) 

22 agent_orcid: ORCID of the responsible agent 

23 primary_source: URI of the primary source for the data 

24 """ 

25 

26 input_graph = Dataset() 

27 default_graph = input_graph.graph() 

28 

29 file_count = 0 

30 

31 rdf_extensions = { 

32 '.ttl': 'turtle', 

33 '.nt': 'nt', 

34 '.n3': 'n3', 

35 '.xml': 'xml', 

36 '.rdf': 'xml', 

37 '.jsonld': 'json-ld', 

38 '.nq': 'nquads', 

39 '.trig': 'trig' 

40 } 

41 

42 for filename in os.listdir(input_directory): 

43 file_path = os.path.join(input_directory, filename) 

44 

45 if input_format: 

46 format_name = input_format 

47 else: 

48 _, ext = os.path.splitext(filename.lower()) 

49 format_name = rdf_extensions[ext] 

50 

51 print(f"Processing {file_path} as {format_name}...") 

52 default_graph.parse(file_path, format=format_name) 

53 file_count += 1 

54 

55 if file_count == 0: 

56 print(f"No valid RDF files found in {input_directory}") 

57 return 

58 

59 print(f"Processed {file_count} RDF files") 

60 

61 dataset = Dataset() 

62 

63 PROV = Namespace('http://www.w3.org/ns/prov#') 

64 dataset.namespace_manager.bind('prov', PROV) 

65 dataset.namespace_manager.bind('dcterms', DCTERMS) 

66 

67 for prefix, namespace in input_graph.namespace_manager.namespaces(): 

68 dataset.namespace_manager.bind(prefix, namespace) 

69 

70 subjects = set() 

71 for s, p, o in default_graph: 

72 if isinstance(s, URIRef): 

73 subjects.add(s) 

74 

75 print(f"Found {len(subjects)} subjects in the input files") 

76 

77 generation_time = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0).isoformat() 

78 

79 responsible_agent = URIRef(agent_orcid) 

80 primary_source_uri = URIRef(primary_source) 

81 

82 for subject in subjects: 

83 prov_graph_uri = URIRef(f"{subject}/prov/") 

84 

85 snapshot_uri = URIRef(f"{subject}/prov/se/1") 

86 

87 prov_graph = dataset.graph(identifier=prov_graph_uri) 

88 

89 prov_graph.add((snapshot_uri, RDF.type, PROV.Entity)) 

90 

91 prov_graph.add((snapshot_uri, PROV.generatedAtTime, Literal(generation_time, datatype=XSD.dateTime))) 

92 

93 prov_graph.add((snapshot_uri, PROV.wasAttributedTo, responsible_agent)) 

94 

95 prov_graph.add((snapshot_uri, PROV.hadPrimarySource, primary_source_uri)) 

96 

97 description = f"Entity <{str(subject)}> was created" 

98 prov_graph.add((snapshot_uri, DCTERMS.description, Literal(description, lang="en"))) 

99 

100 dataset.serialize(destination=output_file, format=output_format) 

101 print(f"Provenance snapshots saved to {output_file} in {output_format} format")