from biocypher import BioCypher import networkx as nx import json import os import uuid #from networkx_based import create_graph from graphCreation import create_graph #from networkx_based.process_references import process_references from graphCreation.process_references import process_references #from networkx_based.property_convolution import property_convolution from graphCreation.property_convolution import property_convolution from schema_config_generation import write_automated_schema #from networkx_based.node_typing import set_ressource_type from graphCreation.node_typing import set_resource_type def load_multiple_fhir_bundles(directory_path): graph = nx.DiGraph() init = True #limit = 2 # Iterate over all files in the directory for filename in os.listdir(directory_path): if filename.endswith('.json'): # Assuming FHIR bundles are in JSON format file_path = os.path.join(directory_path, filename) with open(file_path, 'r') as f: bundle_json = json.load(f) #fix all strings to to enable ' in neo4j fixedQuotes = replace_single_quotes(bundle_json) if init: #print(bundle_json, filename, graph) create_graph.json_to_networkx(fixedQuotes, filename, graph) init = False else: create_graph.add_json_to_networkx(fixedQuotes, filename, graph) print("Imported: ", filename) #if limit == 0: # return graph #limit = limit - 1 return graph def replace_single_quotes(obj): if isinstance(obj, str): # If it's a string, replace single quotes return obj.replace("'", "''") elif isinstance(obj, dict): # If it's a dictionary, process each key-value pair return {key: replace_single_quotes(value) for key, value in obj.items()} elif isinstance(obj, list): # If it's a list, process each item return [replace_single_quotes(item) for item in obj] else: return obj # Leave other data types unchanged def main(): #get a list of nodes that should be imported ## create networkX and run improvement scripts print("Creating the graph...", flush=True) nxGraph = load_multiple_fhir_bundles('./testData/') # 'mockData' for unit test data, 'testData' for Synthea files print(nxGraph) print("Reducing references...", flush=True) process_references(nxGraph) print(nxGraph) print("Convolute references...", flush=True) property_convolution(nxGraph) print(nxGraph) #Set types of all resource nodes to resource_type #set_resource_type(nxGraph) #get lists of node and edge types """ all_nLabels = set() all_eLabels = set() for node, attrs in nxGraph.nodes(data=True): for attr_name, attr_value in attrs.items(): if attr_name == "label": all_nLabels.add(attr_value) for nt in all_nLabels: print(nt) print("-" * 50) for u, v, attrs in nxGraph.edges(data=True): u_label = nxGraph.nodes[u]['label'] if u_label == "resource": u_label = nxGraph.nodes[u]['resourceType'] v_label = nxGraph.nodes[v]['label'] if v_label == "resource": v_label = nxGraph.nodes[v]['resourceType'] all_eLabels.add(u_label + " to " + v_label) for et in all_eLabels: print(et) print("-" * 50) print("...end") return """ print("Generate auto schema...") write_automated_schema(nxGraph, 'config/automated_schema.yaml') # create Biocypher driver bc = BioCypher( biocypher_config_path="config/biocypher_config.yaml", #schema_config_path="/config/manual_schema_config.yaml" ) bc.show_ontology_structure() #BioCypher preperation ## node generator: extract id, label and property dictionary def node_generator(): for node in nxGraph.nodes(): """ #single qoutes break neo4j import, e.g. 'CHILDREN'S Hospital' checkDisplay = nxGraph.nodes[node].get('display') if checkDisplay: checkDisplay = checkDisplay.replace("'", "''") nxGraph.nodes[node]['display'] = checkDisplay #print("------->", nxGraph.nodes[node].get('display')) checkName = nxGraph.nodes[node].get('name') if checkName: checkName = checkName.replace("'", "''") nxGraph.nodes[node]['name'] = checkName #print("------->", nxGraph.nodes[node].get('name')) """ label = nxGraph.nodes[node].get('label') if label == "resource": label = nxGraph.nodes[node].get('resourceType') ''' elif label == 'identifier': label = nxGraph.nodes[node].get('system') print('/' in label) if '/' in label: lastSlash = label.rfind('/') + 1 label = label[lastSlash:] + '-ID' elif label == 'telecom': label = nxGraph.nodes[node].get('system') print('/' in label) if '/' in label: lastSlash = label.rfind('/') + 1 label = 'telecom-' + label[lastSlash:] elif label == 'address': extension = nxGraph.nodes[node].get('extension_url') print("EX!: ", extension) if extension: lastSlash = extension.rfind('/') + 1 label = label + '-' + extension[lastSlash:] ''' yield( nxGraph.nodes[node].get('id', node), #remark: this returns the node id if this attribute exists. otherwise it returns node which equals the identifier that is used by nx label, nxGraph.nodes[node] # get properties ) def edge_generator(): for edge in nxGraph.edges(data = True): source, target, attributes = edge sLabel = nxGraph.nodes[source].get('label') if sLabel == 'resource': sLabel = nxGraph.nodes[source].get('resourceType') tLabel = nxGraph.nodes[target].get('label') if tLabel == 'resource': tLabel = nxGraph.nodes[target].get('resourceType') label = sLabel + '_to_' + tLabel yield( attributes.get('id', str(uuid.uuid4())), # Edge ID (if exists, otherwise use nx internal id) nxGraph.nodes[source].get('id', source), nxGraph.nodes[target].get('id', target), label, attributes # All edge attributes ) #import nodes bc.write_nodes(node_generator()) bc.write_edges(edge_generator()) #write the import script bc.write_import_call() if __name__ == "__main__": #print("Called import script. Should run its main function now...") main()