medax_pipeline/import_nx_diGraph.py
2025-04-16 22:12:19 +02:00

200 lines
6.9 KiB
Python

from biocypher import BioCypher
import networkx as nx
import json
import os
import uuid
#from networkx_based import create_graph
from graphCreation import create_graph
#from networkx_based.process_references import process_references
from graphCreation.process_references import process_references
#from networkx_based.property_convolution import property_convolution
from graphCreation.property_convolution import property_convolution
from schema_config_generation import write_automated_schema
#from networkx_based.node_typing import set_ressource_type
from graphCreation.node_typing import set_resource_type
def load_multiple_fhir_bundles(directory_path):
graph = nx.DiGraph()
init = True
#limit = 2
# Iterate over all files in the directory
for filename in os.listdir(directory_path):
if filename.endswith('.json'): # Assuming FHIR bundles are in JSON format
file_path = os.path.join(directory_path, filename)
with open(file_path, 'r') as f:
bundle_json = json.load(f)
#fix all strings to to enable ' in neo4j
fixedQuotes = replace_single_quotes(bundle_json)
if init:
#print(bundle_json, filename, graph)
create_graph.json_to_networkx(fixedQuotes, filename, graph)
init = False
else:
create_graph.add_json_to_networkx(fixedQuotes, filename, graph)
print("Imported: ", filename)
#if limit == 0:
# return graph
#limit = limit - 1
return graph
def replace_single_quotes(obj):
if isinstance(obj, str): # If it's a string, replace single quotes
return obj.replace("'", "''")
elif isinstance(obj, dict): # If it's a dictionary, process each key-value pair
return {key: replace_single_quotes(value) for key, value in obj.items()}
elif isinstance(obj, list): # If it's a list, process each item
return [replace_single_quotes(item) for item in obj]
else:
return obj # Leave other data types unchanged
def main():
#get a list of nodes that should be imported
## create networkX and run improvement scripts
print("Creating the graph...", flush=True)
nxGraph = load_multiple_fhir_bundles('./testData/') # 'mockData' for unit test data, 'testData' for Synthea files
print(nxGraph)
print("Reducing references...", flush=True)
process_references(nxGraph)
print(nxGraph)
print("Convolute references...", flush=True)
property_convolution(nxGraph)
print(nxGraph)
#Set types of all resource nodes to resource_type
#set_resource_type(nxGraph)
#get lists of node and edge types
""" all_nLabels = set()
all_eLabels = set()
for node, attrs in nxGraph.nodes(data=True):
for attr_name, attr_value in attrs.items():
if attr_name == "label":
all_nLabels.add(attr_value)
for nt in all_nLabels:
print(nt)
print("-" * 50)
for u, v, attrs in nxGraph.edges(data=True):
u_label = nxGraph.nodes[u]['label']
if u_label == "resource":
u_label = nxGraph.nodes[u]['resourceType']
v_label = nxGraph.nodes[v]['label']
if v_label == "resource":
v_label = nxGraph.nodes[v]['resourceType']
all_eLabels.add(u_label + " to " + v_label)
for et in all_eLabels:
print(et)
print("-" * 50)
print("...end")
return """
print("Generate auto schema...")
write_automated_schema(nxGraph, 'config/automated_schema.yaml')
# create Biocypher driver
bc = BioCypher(
biocypher_config_path="config/biocypher_config.yaml",
#schema_config_path="/config/manual_schema_config.yaml"
)
bc.show_ontology_structure()
#BioCypher preperation
## node generator: extract id, label and property dictionary
def node_generator():
for node in nxGraph.nodes():
""" #single qoutes break neo4j import, e.g. 'CHILDREN'S Hospital'
checkDisplay = nxGraph.nodes[node].get('display')
if checkDisplay:
checkDisplay = checkDisplay.replace("'", "''")
nxGraph.nodes[node]['display'] = checkDisplay
#print("------->", nxGraph.nodes[node].get('display'))
checkName = nxGraph.nodes[node].get('name')
if checkName:
checkName = checkName.replace("'", "''")
nxGraph.nodes[node]['name'] = checkName
#print("------->", nxGraph.nodes[node].get('name')) """
label = nxGraph.nodes[node].get('label')
if label == "resource":
label = nxGraph.nodes[node].get('resourceType')
'''
elif label == 'identifier':
label = nxGraph.nodes[node].get('system')
print('/' in label)
if '/' in label:
lastSlash = label.rfind('/') + 1
label = label[lastSlash:] + '-ID'
elif label == 'telecom':
label = nxGraph.nodes[node].get('system')
print('/' in label)
if '/' in label:
lastSlash = label.rfind('/') + 1
label = 'telecom-' + label[lastSlash:]
elif label == 'address':
extension = nxGraph.nodes[node].get('extension_url')
print("EX!: ", extension)
if extension:
lastSlash = extension.rfind('/') + 1
label = label + '-' + extension[lastSlash:]
'''
yield(
nxGraph.nodes[node].get('id', node), #remark: this returns the node id if this attribute exists. otherwise it returns node which equals the identifier that is used by nx
label,
nxGraph.nodes[node] # get properties
)
def edge_generator():
for edge in nxGraph.edges(data = True):
source, target, attributes = edge
sLabel = nxGraph.nodes[source].get('label')
if sLabel == 'resource':
sLabel = nxGraph.nodes[source].get('resourceType')
tLabel = nxGraph.nodes[target].get('label')
if tLabel == 'resource':
tLabel = nxGraph.nodes[target].get('resourceType')
label = sLabel + '_to_' + tLabel
yield(
attributes.get('id', str(uuid.uuid4())), # Edge ID (if exists, otherwise use nx internal id)
nxGraph.nodes[source].get('id', source),
nxGraph.nodes[target].get('id', target),
label,
attributes # All edge attributes
)
#import nodes
bc.write_nodes(node_generator())
bc.write_edges(edge_generator())
#write the import script
bc.write_import_call()
if __name__ == "__main__":
#print("Called import script. Should run its main function now...")
main()