200 lines
6.9 KiB
Python
200 lines
6.9 KiB
Python
from biocypher import BioCypher
|
|
import networkx as nx
|
|
import json
|
|
import os
|
|
import uuid
|
|
#from networkx_based import create_graph
|
|
from graphCreation import create_graph
|
|
#from networkx_based.process_references import process_references
|
|
from graphCreation.process_references import process_references
|
|
#from networkx_based.property_convolution import property_convolution
|
|
from graphCreation.property_convolution import property_convolution
|
|
|
|
from schema_config_generation import write_automated_schema
|
|
#from networkx_based.node_typing import set_ressource_type
|
|
from graphCreation.node_typing import set_resource_type
|
|
|
|
|
|
|
|
def load_multiple_fhir_bundles(directory_path):
|
|
graph = nx.DiGraph()
|
|
init = True
|
|
#limit = 2
|
|
# Iterate over all files in the directory
|
|
for filename in os.listdir(directory_path):
|
|
if filename.endswith('.json'): # Assuming FHIR bundles are in JSON format
|
|
file_path = os.path.join(directory_path, filename)
|
|
with open(file_path, 'r') as f:
|
|
bundle_json = json.load(f)
|
|
|
|
#fix all strings to to enable ' in neo4j
|
|
fixedQuotes = replace_single_quotes(bundle_json)
|
|
if init:
|
|
#print(bundle_json, filename, graph)
|
|
create_graph.json_to_networkx(fixedQuotes, filename, graph)
|
|
init = False
|
|
else:
|
|
create_graph.add_json_to_networkx(fixedQuotes, filename, graph)
|
|
print("Imported: ", filename)
|
|
|
|
#if limit == 0:
|
|
# return graph
|
|
#limit = limit - 1
|
|
|
|
|
|
return graph
|
|
|
|
def replace_single_quotes(obj):
|
|
if isinstance(obj, str): # If it's a string, replace single quotes
|
|
return obj.replace("'", "''")
|
|
elif isinstance(obj, dict): # If it's a dictionary, process each key-value pair
|
|
return {key: replace_single_quotes(value) for key, value in obj.items()}
|
|
elif isinstance(obj, list): # If it's a list, process each item
|
|
return [replace_single_quotes(item) for item in obj]
|
|
else:
|
|
return obj # Leave other data types unchanged
|
|
|
|
def main():
|
|
#get a list of nodes that should be imported
|
|
## create networkX and run improvement scripts
|
|
print("Creating the graph...", flush=True)
|
|
nxGraph = load_multiple_fhir_bundles('./testData/') # 'mockData' for unit test data, 'testData' for Synthea files
|
|
print(nxGraph)
|
|
|
|
print("Reducing references...", flush=True)
|
|
process_references(nxGraph)
|
|
print(nxGraph)
|
|
|
|
print("Convolute references...", flush=True)
|
|
property_convolution(nxGraph)
|
|
print(nxGraph)
|
|
|
|
|
|
|
|
#Set types of all resource nodes to resource_type
|
|
#set_resource_type(nxGraph)
|
|
|
|
#get lists of node and edge types
|
|
""" all_nLabels = set()
|
|
all_eLabels = set()
|
|
|
|
for node, attrs in nxGraph.nodes(data=True):
|
|
for attr_name, attr_value in attrs.items():
|
|
if attr_name == "label":
|
|
all_nLabels.add(attr_value)
|
|
|
|
for nt in all_nLabels:
|
|
print(nt)
|
|
|
|
print("-" * 50)
|
|
|
|
for u, v, attrs in nxGraph.edges(data=True):
|
|
u_label = nxGraph.nodes[u]['label']
|
|
if u_label == "resource":
|
|
u_label = nxGraph.nodes[u]['resourceType']
|
|
v_label = nxGraph.nodes[v]['label']
|
|
if v_label == "resource":
|
|
v_label = nxGraph.nodes[v]['resourceType']
|
|
all_eLabels.add(u_label + " to " + v_label)
|
|
|
|
for et in all_eLabels:
|
|
print(et)
|
|
|
|
print("-" * 50)
|
|
|
|
print("...end")
|
|
return """
|
|
|
|
print("Generate auto schema...")
|
|
write_automated_schema(nxGraph, 'config/automated_schema.yaml')
|
|
|
|
|
|
# create Biocypher driver
|
|
bc = BioCypher(
|
|
biocypher_config_path="config/biocypher_config.yaml",
|
|
#schema_config_path="/config/manual_schema_config.yaml"
|
|
)
|
|
|
|
bc.show_ontology_structure()
|
|
|
|
#BioCypher preperation
|
|
## node generator: extract id, label and property dictionary
|
|
def node_generator():
|
|
for node in nxGraph.nodes():
|
|
|
|
""" #single qoutes break neo4j import, e.g. 'CHILDREN'S Hospital'
|
|
checkDisplay = nxGraph.nodes[node].get('display')
|
|
if checkDisplay:
|
|
checkDisplay = checkDisplay.replace("'", "''")
|
|
nxGraph.nodes[node]['display'] = checkDisplay
|
|
#print("------->", nxGraph.nodes[node].get('display'))
|
|
|
|
checkName = nxGraph.nodes[node].get('name')
|
|
if checkName:
|
|
checkName = checkName.replace("'", "''")
|
|
nxGraph.nodes[node]['name'] = checkName
|
|
#print("------->", nxGraph.nodes[node].get('name')) """
|
|
|
|
label = nxGraph.nodes[node].get('label')
|
|
|
|
if label == "resource":
|
|
label = nxGraph.nodes[node].get('resourceType')
|
|
'''
|
|
elif label == 'identifier':
|
|
label = nxGraph.nodes[node].get('system')
|
|
print('/' in label)
|
|
if '/' in label:
|
|
lastSlash = label.rfind('/') + 1
|
|
label = label[lastSlash:] + '-ID'
|
|
elif label == 'telecom':
|
|
label = nxGraph.nodes[node].get('system')
|
|
print('/' in label)
|
|
if '/' in label:
|
|
lastSlash = label.rfind('/') + 1
|
|
label = 'telecom-' + label[lastSlash:]
|
|
elif label == 'address':
|
|
extension = nxGraph.nodes[node].get('extension_url')
|
|
print("EX!: ", extension)
|
|
if extension:
|
|
lastSlash = extension.rfind('/') + 1
|
|
label = label + '-' + extension[lastSlash:]
|
|
'''
|
|
|
|
yield(
|
|
nxGraph.nodes[node].get('id', node), #remark: this returns the node id if this attribute exists. otherwise it returns node which equals the identifier that is used by nx
|
|
label,
|
|
nxGraph.nodes[node] # get properties
|
|
)
|
|
|
|
def edge_generator():
|
|
for edge in nxGraph.edges(data = True):
|
|
source, target, attributes = edge
|
|
|
|
sLabel = nxGraph.nodes[source].get('label')
|
|
if sLabel == 'resource':
|
|
sLabel = nxGraph.nodes[source].get('resourceType')
|
|
tLabel = nxGraph.nodes[target].get('label')
|
|
if tLabel == 'resource':
|
|
tLabel = nxGraph.nodes[target].get('resourceType')
|
|
label = sLabel + '_to_' + tLabel
|
|
|
|
yield(
|
|
attributes.get('id', str(uuid.uuid4())), # Edge ID (if exists, otherwise use nx internal id)
|
|
nxGraph.nodes[source].get('id', source),
|
|
nxGraph.nodes[target].get('id', target),
|
|
label,
|
|
attributes # All edge attributes
|
|
)
|
|
|
|
#import nodes
|
|
bc.write_nodes(node_generator())
|
|
bc.write_edges(edge_generator())
|
|
|
|
#write the import script
|
|
bc.write_import_call()
|
|
|
|
if __name__ == "__main__":
|
|
#print("Called import script. Should run its main function now...")
|
|
main()
|
|
|