added batch processing

This commit is contained in:
2026-03-31 14:26:49 +02:00
parent 47abe86ba8
commit e21abec918
5 changed files with 386 additions and 348 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
**__pycache__/
datasource/
old-src/

View File

@@ -1,11 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<content url="file://$MODULE_DIR$">
<sourceFolder url="file://$MODULE_DIR$/src" isTestSource="false" />
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.11 (mdm-to-neo4j) (2)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PackageRequirementsSettings">
<option name="requirementsPath" value="$USER_HOME$/PycharmProjects/mdm-to-neo4j/requirements.txt" />
<option name="requirementsPath" value="$MODULE_DIR$/requirements.txt" />
</component>
</module>

View File

@@ -4,6 +4,7 @@ import logging
from neo4j import GraphDatabase, Driver
from pandas import DataFrame
from run import args
from typing import List, Dict
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -62,147 +63,121 @@ class Neo4jConnection:
logger.debug("Connection to Neo4j database closed")
self.__connection = None
def create_study_node(self, data: DataFrame, modelid):
def execute_batch_query(self, query: str, data: List[Dict], **params) -> None:
"""Execute batch query for better performance"""
try:
if isinstance(data, list) and len(data) > 0: # Only execute if there's data, change if data to ...
self.__connection.execute_query(query, data=data, **params)
logger.debug(f"Executed batch query with {len(data)} records")
else:
self.__connection.execute_query(query, data=data, **params)
logger.debug("Executed batch query with no additional data")
except Exception as e:
logger.error(f"Failed to execute batch query: {e}")
raise
def create_study_node(self, data: List[Dict], modelid: str) -> None:
query = """
UNWIND $data AS row
MERGE (st:Study {OID: row.oid, ModelID: $modelid})
SET st.Name = row.name,
st.Description = row.descr,
st.Protocol = row.protocol
"""
based on https://neo4j.com/docs/python-manual/current/query-simple/
self.execute_batch_query(query, data, modelid=modelid)
def create_studyevent_node(self, data: List[Dict], studyoid: str, modelid: str):
query = """
UNWIND $data AS row
MERGE (se:StudyEvent {OID: row.OID, Name: row.Name, Type: row.Type, StudyOID: $study_oid, ModelID: $modelid})
"""
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (st:Study {OID: $studyoid, Name: $studyname, Description: $studydescr, Protocol: $studyprotocol, ModelID: $modelid})",
studyoid=row['oid'], studyname=row['name'], studydescr=row['descr'], studyprotocol=row["protocol"], modelid=modelid
)
logger.debug(f"Inserted Study node with OID {row['oid']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
self.execute_batch_query(query, data, study_oid=studyoid, modelid=modelid)
def create_studyevent_node(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query("MERGE (:StudyEvent {OID: $oid, Name: $name, Type: $type, StudyOID: $study_oid, ModelID: $modelid})",
oid=row['OID'], name=row['Name'], type=row['Type'], study_oid=studyoid, modelid=modelid
)
logger.debug(f"Inserted StudyEvent node with OID {row['OID']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_form_node(self, data: List[Dict], studyoid: str, modelid: str):
query = """
UNWIND $data AS row
MERGE (f:Form {OID: row.OID, Name: row.Name, StudyOID: $study_oid, ModelID: $modelid})
"""
self.execute_batch_query(query, data, study_oid=studyoid, modelid=modelid)
def create_form_node(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (:Form {OID: $oid, Name: $name, StudyOID: $studyoid, ModelID: $modelid})",
oid=row['OID'], name=row['Name'], studyoid=studyoid, modelid=modelid)
logger.debug(f"Inserted Form node with OID {row['OID']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_itemgroup_node(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MERGE (ig:ItemGroup {OID: row.OID, Name: row.Name, StudyOID: $study_oid, ModelID: $modelid})
"""
self.execute_batch_query(query, data, study_oid=studyoid, modelid=modelid)
def create_itemgroup_node(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (:ItemGroup {OID: $oid, Name: $name, StudyOID: $studyoid, ModelID: $modelid})",
oid=row['OID'], name=row['Name'], studyoid=studyoid, modelid=modelid)
logger.debug(f"Inserted ItemGroup node with OID {row['OID']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_item_node(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MERGE (i:Item {OID: row.itemoid, Name: row.name, DataType: row.datatype, Question: row.question, StudyOID: $studyoid, ModelID: $modelid})
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_item_node(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (:Item {OID: $oid, Name: $name, DataType: $datatype, Question: $question, StudyOID: $studyoid, ModelID: $modelid})",
oid=row['itemoid'], name=row['name'], datatype=row['datatype'], question=row['question'],
studyoid=studyoid, modelid=modelid)
logger.debug(f"Inserted Item node with OID {row['itemoid']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_alias_node(self, data: List[Dict], studyoid: str, modelid: str) -> None:
query = """
UNWIND $data AS row
MERGE (a:Alias {Context: row.end, Name: row.end_two, ItemOID: row.start, StudyOID: $studyoid, ModelID: $modelid})
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_alias_node(self, data: DataFrame,
studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (:Alias {Context: $context, Name: $name, ItemOID: $item, StudyOID: $study, ModelID: $modelid})",
context=row['end'], name=row['end_two'], item=row['start'], study=studyoid, modelid=modelid)
logger.debug(f"Inserted Alias node with name {row['end_two']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_measurementunit_node(self, data: DataFrame):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (:MeasurementUnit {OID: $oid, Name: $name, Symbol: $symbol})",
oid=row['oid'], name=row['name'], symbol=row['symbol'])
logger.debug(f"Inserted MeasurementUnit node with name {row['name']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_measurementunit_node(self, data: List[Dict]):
query = """
UNWIND $data AS row
MERGE (m:MeasurementUnit {OID: row.oid, Name: row.name, Symbol: row.symbol})
"""
self.execute_batch_query(query, data)
def create_basedef_node(self, studyoid: str, modelid):
conn = self.__connection
try:
conn.execute_query(
"MERGE (:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid})", studyoid=studyoid, modelid=modelid)
logger.debug(f"Inserted BasicDefinitions node for study {studyoid}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
query = """
MERGE (b:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid})
"""
self.execute_batch_query(query, data=[], studyoid=studyoid, modelid=modelid)
def create_rangecheck_node(self, data: DataFrame):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (:RangeCheck {Comparator: $comp, Constraint: $cons, CheckValue: $val})",
comp=row['comparator'], cons=row['constraint'], val=row['value'])
logger.debug(f"Inserted RangeCheck node")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_rangecheck_node(self, data: List[Dict]):
query = """
UNWIND $data AS row
MERGE (rc:RangeCheck {Comparator: row.comparator, Constraint: row.constraint, CheckValue: row.value})
"""
self.execute_batch_query(query, data)
def create_codelist_node(self, data: DataFrame, studyoid, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (:CodeList {OID: $oid, Name: $name, DataType: $type, StudyOID: $studyoid, ModelID: $modelid})",
oid=row['OID'], name=row['Name'], type=row['DataType'], studyoid=studyoid, modelid=modelid)
logger.debug(f"Inserted CodeList node")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_codelist_node(self, data: List[Dict], studyoid, modelid):
query = """
UNWIND $data AS row
MERGE (cl:CodeList {OID: row.OID, Name: row.Name, DataType: row.DataType, StudyOID: $studyoid, ModelID: $modelid})
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_clitem_node(self, data: DataFrame):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MERGE (:CodeListItem {CodedValue: $cv, Decode: $dc})",
cv=row['codedvalue'], dc=row['decode'])
logger.debug(f"Inserted CodeListItem node")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_clitem_node(self, data: List[Dict]):
query = """
UNWIND $data AS row
MERGE (cli:CodeListItem {CodedValue: row.codedvalue, Decode: row.decode})
"""
self.execute_batch_query(query, data)
def create_study_to_studyevent_rel(self, studyoid: str, modelid):
# Batch relationship creation methods
def create_relationships_batch(self, query: str, data: List[Dict], **params) -> None: # todo: remove it not needed
"""Generic batch relationship creation"""
self.execute_batch_query(query, data, **params)
def create_study_to_studyevent_rel(self, studyoid: str, modelid: str) -> None:
query = """
MATCH (start:Study {OID: $studyoid, ModelID: $modelid})
WITH start, start.OID as oid, start.ModelID as mid
MATCH (end:StudyEvent)
WHERE end.StudyOID = oid AND end.ModelID = mid
WITH start, end
MERGE (start)-[:STUDY_HAS_STUDYEVENT]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data=[], studyoid=studyoid, modelid=modelid)
def create_study_to_studyevent_rel_old(self, studyoid: str, modelid):
conn = self.__connection
try:
conn.execute_query(
"MATCH (start:Study {OID: $studyoid, ModelID: $modelid}), (end:StudyEvent {StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:STUDY_HAS_STUDYEVENT]->(end)-[:BACKTRACK]->(start)",
"MATCH (start:Study {OID: $studyoid, ModelID: $modelid}),"
" (end:StudyEvent {StudyOID: $studyoid, ModelID: $modelid})"
"MERGE (start)-[:STUDY_HAS_STUDYEVENT]->(end)-[:BACKTRACK]->(start)",
studyoid=studyoid, modelid=modelid
)
logger.debug(f"Inserted rels betw Study and StudyEvent")
@@ -210,166 +185,105 @@ class Neo4jConnection:
logger.error(f"Failed to populate Neo4j database: {e}")
raise
def create_itemgroup_to_item_rel(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:ItemGroup {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:Item {OID: $endoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:ITEMGROUP_HAS_ITEM]->(end)-[:BACKTRACK]->(start)",
startoid=row['start'], endoid=row['end'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw ItemGroup {row['start']} and Item {row['end']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_itemgroup_to_item_rel(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MATCH (start:ItemGroup {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:Item {OID: row.end, StudyOID: $studyoid, ModelID: $modelid})
MERGE (start)-[:ITEMGROUP_HAS_ITEM]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_form_to_itemgroup_rel(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:Form {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:ItemGroup {OID: $endoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:FORM_HAS_ITEMGROUP]->(end)-[:BACKTRACK]->(start)",
startoid=row['start'], endoid=row['end'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw Form {row['start']} and ItemGroup {row['end']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_form_to_itemgroup_rel(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MATCH (start:Form {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:ItemGroup {OID: row.end, StudyOID: $studyoid, ModelID: $modelid})
MERGE (start)-[:FORM_HAS_ITEMGROUP]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_studyevent_to_form_rel(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:StudyEvent {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:Form {OID: $endoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:STUDYEVENT_HAS_FORM]->(end)-[:BACKTRACK]->(start)",
startoid=row['start'], endoid=row['end'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw StudyEvent {row['start']} and Form {row['end']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_studyevent_to_form_rel(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MATCH (start:StudyEvent {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:Form {OID: row.end, StudyOID: $studyoid, ModelID: $modelid})
MERGE (start)-[:STUDYEVENT_HAS_FORM]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_item_to_alias_rel(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:Item {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:Alias {Context: $endcontext, Name: $endname, ItemOID: $startoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:ITEM_HAS_ALIAS]->(end)-[:BACKTRACK]->(start)",
startoid=row['start'], endcontext=row['end'], endname=row['end_two'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw Item {row['start']} and Alias {row['end']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_item_to_alias_rel(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MATCH (start:Item {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:Alias {Context: row.end, Name: row.end_two, ItemOID: row.start, StudyOID: $studyoid, ModelID: $modelid})
MERGE (start)-[:ITEM_HAS_ALIAS]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_itemgroup_to_alias_rel(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:ItemGroup {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:Alias {Context: $endcontext, Name: $endname}) MERGE (start)-[:ITEMGROUP_HAS_ALIAS]->(end)-[:BACKTRACK]->(start)",
startoid=row['start'], endcontext=row['end'], endname=row['end_two'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw ItemGroup {row['start']} and Alias {row['end']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_itemgroup_to_alias_rel(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MATCH (start:ItemGroup {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:Alias {Context: row.end, Name: row.end_two})
MERGE (start)-[:ITEMGROUP_HAS_ALIAS]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_item_to_measurement_rel(self, data: DataFrame, studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:Item {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:MeasurementUnit {OID: $endoid, Name: $endname}) MERGE (start)-[:ITEM_HAS_MEASUREMENTUNIT]->(end)-[:BACKTRACK]->(start)",
startoid=row['start'], endoid=row['endoid'], endname=row['endname'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw Item {row['start']} and MeasurementUnit {row['endname']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_item_to_measurement_rel(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MATCH (start:Item {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:MeasurementUnit {OID: row.endoid, Name: row.endname})
MERGE (start)-[:ITEM_HAS_MEASUREMENTUNIT]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_basedef_to_study_rel(self, studyoid: str, modelid):
conn = self.__connection
try:
conn.execute_query(
"MATCH (start:Study {OID: $studyoid, ModelID: $modelid}), (end:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:HAS_BASEDEF]->(end)",
studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw Study {studyoid} and BasicDefinitions")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
query = """
MATCH (start:Study {OID: $studyoid, ModelID: $modelid}), (end:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid})
MERGE (start)-[:HAS_BASEDEF]->(end)
"""
self.execute_batch_query(query, data=[], studyoid=studyoid, modelid=modelid)
def create_basedef_to_measurement_rel(self, data: DataFrame,
studyoid: str, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid}), (end:MeasurementUnit {OID: $endoid, Name: $endname, Symbol: $endsymbol}) MERGE (start)-[:BASEDEF_HAS_MEASUREMENTUNIT]->(end)",
endoid=row['oid'], endname=row['name'], endsymbol=row['symbol'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw BasicDefinitions and MeasurementUnit {row['name']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_basedef_to_measurement_rel(self, data: List[Dict], studyoid: str, modelid):
query = """
UNWIND $data AS row
MATCH (start:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid}), (end:MeasurementUnit {OID: row.oid, Name: row.name, Symbol: row.symbol})
MERGE (start)-[:BASEDEF_HAS_MEASUREMENTUNIT]->(end)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_item_to_rangecheck_rel(self, data: DataFrame): #TODO check for Item properties (missing StudyOID?)
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:Item {OID: $itemoid}), (end:RangeCheck {Comparator: $comp, Constraint: $cons, CheckValue: $cvalue}) MERGE (start)-[:ITEM_HAS_RANGECHECK]->(end)-[:BACKTRACK]->(start)",
itemoid=row['start'], comp=row['endcomparator'], cons=row['endconstraint'], cvalue=row['endvalue'])
logger.debug(
f"Inserted rels betw Item and RangeCheck")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_item_to_rangecheck_rel(self, data: List[Dict]): # TODO check for Item properties (missing StudyOID?)
query = """
UNWIND $data AS row
MATCH (start:Item {OID: row.start}), (end:RangeCheck {Comparator: row.endcomparator, Constraint: row.endconstraint, CheckValue: row.endvalue})
MERGE (start)-[:ITEM_HAS_RANGECHECK]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data)
def create_item_to_codelist_rel(self, data: DataFrame, studyoid, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:Item {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:CodeList {OID: $endoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:ITEM_HAS_CODELIST]->(end)-[:BACKTRACK]->(start)",
startoid=row['start'], endoid=row['end'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw Item {row['start']} and CodeList {row['end']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_item_to_codelist_rel(self, data: List[Dict], studyoid, modelid):
query = """
UNWIND $data AS row
MATCH (start:Item {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:CodeList {OID: row.end, StudyOID: $studyoid, ModelID: $modelid})
MERGE (start)-[:ITEM_HAS_CODELIST]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
def create_codelist_to_codelistitem_rel(self, data: DataFrame, studyoid, modelid):
conn = self.__connection
try:
for index, row in data.iterrows():
conn.execute_query(
"MATCH (start:CodeList {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:CodeListItem {CodedValue: $cv, Decode: $dc}) MERGE (start)-[:CODELIST_HAS_CODELISTITEM]->(end)-[:BACKTRACK]->(start)",
startoid=row['start'], cv=row['endcodedvalue'], dc=row['enddecode'], studyoid=studyoid, modelid=modelid)
logger.debug(
f"Inserted rels betw CodeList {row['start']} and CodeListItem {row['endcodedvalue']}")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def create_codelist_to_codelistitem_rel(self, data: List[Dict], studyoid, modelid):
query = """
UNWIND $data AS row
MATCH (start:CodeList {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:CodeListItem {CodedValue: row.endcodedvalue, Decode: row.enddecode})
MERGE (start)-[:CODELIST_HAS_CODELISTITEM]->(end)-[:BACKTRACK]->(start)
"""
self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid)
# postprocessing with cypher queries
def postprocess_set_label_post(self):
conn = self.__connection
try:
conn.execute_query(
"MATCH (n:Alias) WHERE n.Context contains '[' and n.Context contains ',' and n.Context contains ']' set n :Post"
)
logger.debug(f"Inserted post property in alias nodes")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def postprocess_connect_postcoord_aliases(self, tuples):
conn = self.__connection
try:
conn.execute_query(
query = """
MATCH (n:Alias)
WHERE n.Context CONTAINS '[' AND n.Context CONTAINS ',' AND n.Context CONTAINS ']'
SET n:Post
"""
self.execute_batch_query(query, data=[])
logger.debug(f"Inserted post property in alias nodes")
def postprocess_connect_postcoord_aliases(self, data):
query = """
UNWIND $data AS row
MATCH (i:Item {OID: row.item_oid})
WITH row, i
@@ -377,8 +291,16 @@ class Neo4jConnection:
MATCH (a2:Post {StudyOID: row.study_oid, Context: row.ontology + '[' + row.x + ',' + row.y + ']'})--(i)
WHERE a1 <> a2
MERGE (a1)-[rel:COMPOSITE]->(a2)
""", data=tuples)
"""
self.execute_batch_query(query, data)
logger.debug(f"Inserted rels btw post-coordinated aliases")
except Exception as e:
logger.error(f"Failed to execute query: {e}")
raise
def postprocess_add_nctid_to_study_nodes(self):
query = """
MATCH (n:Study) WHERE NOT n.OID =~ '.*(NCT\\d{8}).*'
WITH n, apoc.text.regexGroups(n.Name, 'NCT\\d{8}') AS matches
WHERE size(matches) > 0
SET n.NCT_ID = matches[0][0]
"""
self.execute_batch_query(query, data=[])
logger.debug(f"Inserted NCT identifier to Study nodes")

View File

@@ -5,7 +5,7 @@ import atexit
import signal
import logging
MDM2NEO4J_VERSION: str = "0.1"
MDM2NEO4J_VERSION: str = "0.2"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -17,6 +17,7 @@ parser = argparse.ArgumentParser()
parser.add_argument('-c', '--conf', required=True, type=str,
help='Configuration file with database connection parameters')
parser.add_argument('-f', '--files', required=True, type=str, help='Directory with xml files')
parser.add_argument('-b', '--batch-size', required=False, type=int, default=500, help='Number of files per batch')
# parse parameters
args = parser.parse_args()

View File

@@ -517,104 +517,215 @@ def dict_to_list_of_tuples(mydictionary):
return data
def parse_xml(
path_to_datafiles): # creates XmlProcessor object for each xml file
def parse_xml(path_to_datafiles: str, batch_size: int = 500): # creates XmlProcessor object for each xml file
neo4jconnection_object = db.Neo4jConnection()
for file in os.listdir(path_to_datafiles):
if file.endswith(".xml"):
xml_files = [file for file in os.listdir(path_to_datafiles) if file.endswith(".xml")]
total_files = len(xml_files)
logger.info(F"Found {total_files} xml files to process")
for batch_start in range(0, total_files, batch_size):
batch_end = min(batch_start + batch_size, total_files)
batch_files = xml_files[batch_start:batch_end]
logger.info(f"Processing batch {batch_start // batch_end + 1}: " f"{batch_start + 1}-{batch_end} of {total_files} files")
processed_files = 0
failed_files = 0
for file in batch_files:
filename = os.path.splitext(file)[0]
filepath = os.path.join(path_to_datafiles, file)
try:
logger.info(f"Processing file: {filename}.xml")
with open(filepath) as fp:
parsed_xml = BeautifulSoup(fp, "xml")
# create XmlProcessor object for parsed_xml files
xmlprocessor_object = XmlProcessor(parsed_xml)
# do stuff with the object
study_oid = xmlprocessor_object.get_studyoid()
# create nodes
try:
# extract data in batches
study_data = xmlprocessor_object.extract_study_data()
neo4jconnection_object.create_study_node(study_data, filename)
itemgroup_data = xmlprocessor_object.extract_itemgroup_data()
neo4jconnection_object.create_itemgroup_node(itemgroup_data, study_oid, filename)
item_data = xmlprocessor_object.extract_item_data()
neo4jconnection_object.create_item_node(item_data, study_oid, filename)
studyevent_data = xmlprocessor_object.extract_studyevent_data()
neo4jconnection_object.create_studyevent_node(studyevent_data, study_oid, filename)
form_data = xmlprocessor_object.extract_form_data()
neo4jconnection_object.create_form_node(form_data, study_oid, filename)
alias_data = xmlprocessor_object.extract_item_to_alias_data()
neo4jconnection_object.create_alias_node(alias_data, study_oid, filename)
measurement_data = xmlprocessor_object.extract_measurement_data()
neo4jconnection_object.create_measurementunit_node(measurement_data)
rangecheck_data = xmlprocessor_object.extract_rangecheck_data()
neo4jconnection_object.create_rangecheck_node(rangecheck_data)
codelist_data = xmlprocessor_object.extract_codelist_data()
neo4jconnection_object.create_codelist_node(codelist_data, study_oid, filename)
codelistitem_data = xmlprocessor_object.extract_codelistitem_data()
neo4jconnection_object.create_clitem_node(codelistitem_data)
# create nodes in batches
if not study_data.empty:
neo4jconnection_object.create_study_node(study_data.to_dict('records'), filename)
if not itemgroup_data.empty:
neo4jconnection_object.create_itemgroup_node(itemgroup_data.to_dict('records'), study_oid, filename)
if not item_data.empty:
neo4jconnection_object.create_item_node(item_data.to_dict('records'), study_oid, filename)
if not studyevent_data.empty:
neo4jconnection_object.create_studyevent_node(studyevent_data.to_dict('records'), study_oid, filename)
if not form_data.empty:
neo4jconnection_object.create_form_node(form_data.to_dict('records'), study_oid, filename)
if not alias_data.empty:
neo4jconnection_object.create_alias_node(alias_data.to_dict('records'), study_oid, filename)
if not measurement_data.empty:
neo4jconnection_object.create_measurementunit_node(measurement_data.to_dict('records'))
if not rangecheck_data.empty:
neo4jconnection_object.create_rangecheck_node(rangecheck_data.to_dict('records'))
if not codelist_data.empty:
neo4jconnection_object.create_codelist_node(codelist_data.to_dict('records'), study_oid, filename)
if not codelistitem_data.empty:
neo4jconnection_object.create_clitem_node(codelistitem_data.to_dict('records'))
if parsed_xml.findAll('BasicDefinitions'):
neo4jconnection_object.create_basedef_node(study_oid, filename)
logger.info(f"Inserted all nodes for file {filename}.xml")
except:
print(f"Nodes for file {filename}.xml could not be inserted")
# Create relationships in batches
### previous version ###
#for file in os.listdir(path_to_datafiles):
# if file.endswith(".xml"):
# filename = os.path.splitext(file)[0]
# filepath = os.path.join(path_to_datafiles, file)
# with open(filepath) as fp:
# parsed_xml = BeautifulSoup(fp, "xml")
# create XmlProcessor object for parsed_xml files
# xmlprocessor_object = XmlProcessor(parsed_xml)
# do stuff with the object
# study_oid = xmlprocessor_object.get_studyoid()
# create nodes
# try:
# study_data = xmlprocessor_object.extract_study_data()
# neo4jconnection_object.create_study_node(study_data, filename)
# itemgroup_data = xmlprocessor_object.extract_itemgroup_data()
# neo4jconnection_object.create_itemgroup_node(itemgroup_data, study_oid, filename)
# item_data = xmlprocessor_object.extract_item_data()
# neo4jconnection_object.create_item_node(item_data, study_oid, filename)
# studyevent_data = xmlprocessor_object.extract_studyevent_data()
# neo4jconnection_object.create_studyevent_node(studyevent_data, study_oid, filename)
# form_data = xmlprocessor_object.extract_form_data()
# neo4jconnection_object.create_form_node(form_data, study_oid, filename)
# alias_data = xmlprocessor_object.extract_item_to_alias_data()
# neo4jconnection_object.create_alias_node(alias_data, study_oid, filename)
# measurement_data = xmlprocessor_object.extract_measurement_data()
# neo4jconnection_object.create_measurementunit_node(measurement_data)
# rangecheck_data = xmlprocessor_object.extract_rangecheck_data()
# neo4jconnection_object.create_rangecheck_node(rangecheck_data)
# codelist_data = xmlprocessor_object.extract_codelist_data()
# neo4jconnection_object.create_codelist_node(codelist_data, study_oid, filename)
# codelistitem_data = xmlprocessor_object.extract_codelistitem_data()
# neo4jconnection_object.create_clitem_node(codelistitem_data)
# if parsed_xml.findAll('BasicDefinitions'):
# neo4jconnection_object.create_basedef_node(study_oid, filename)
# logger.info(f"Inserted all nodes for file {filename}.xml")
# except:
# print(f"Nodes for file {filename}.xml could not be inserted")
# create rels
try:
# Create relationships in batches
logger.info(f"Creating relationships for {filename}.xml...")
# Get data for rels
itemgroup_to_item_data = xmlprocessor_object.extract_itemgroup_to_item_data
neo4jconnection_object.create_itemgroup_to_item_rel(itemgroup_to_item_data, study_oid, filename)
item_to_alias_data = xmlprocessor_object.extract_item_to_alias_data()
neo4jconnection_object.create_item_to_alias_rel(item_to_alias_data, study_oid, filename)
itemgroup_to_alias_data = xmlprocessor_object.extract_itemgroup_to_alias_data()
neo4jconnection_object.create_itemgroup_to_alias_rel(itemgroup_to_alias_data, study_oid, filename)
form_to_itemgroup_data = xmlprocessor_object.extract_form_to_itemgroup_data()
neo4jconnection_object.create_form_to_itemgroup_rel(form_to_itemgroup_data, study_oid, filename)
studyevent_to_form_data = xmlprocessor_object.extract_studyevent_to_form_data()
neo4jconnection_object.create_studyevent_to_form_rel(studyevent_to_form_data, study_oid, filename)
item_to_measurement_data = xmlprocessor_object.extract_item_to_measurement_data()
item_to_rangecheck_data = xmlprocessor_object.extract_item_to_rangecheck_data()
item_to_cl_data = xmlprocessor_object.extract_item_to_codelist_data()
cl_to_clitem_data = xmlprocessor_object.extract_codelist_to_codelistitem_data()
# Batch creation of relationships
if not itemgroup_to_item_data.empty:
neo4jconnection_object.create_itemgroup_to_item_rel(itemgroup_to_item_data.to_dict('records'), study_oid, filename)
if not item_to_alias_data.empty:
neo4jconnection_object.create_item_to_alias_rel(item_to_alias_data.to_dict('records'), study_oid, filename)
if not itemgroup_to_alias_data.empty:
neo4jconnection_object.create_itemgroup_to_alias_rel(itemgroup_to_alias_data.to_dict('records'), study_oid, filename)
if not form_to_itemgroup_data.empty:
neo4jconnection_object.create_form_to_itemgroup_rel(form_to_itemgroup_data.to_dict('records'), study_oid, filename)
if not studyevent_to_form_data.empty:
neo4jconnection_object.create_studyevent_to_form_rel(studyevent_to_form_data.to_dict('records'), study_oid, filename)
if study_oid:
neo4jconnection_object.create_study_to_studyevent_rel(study_oid, filename)
item_to_measurement_data = xmlprocessor_object.extract_item_to_measurement_data()
neo4jconnection_object.create_item_to_measurement_rel(item_to_measurement_data, study_oid, filename)
if not item_to_measurement_data.empty:
neo4jconnection_object.create_item_to_measurement_rel(item_to_measurement_data.to_dict('records'), study_oid, filename)
item_to_rangecheck_data = xmlprocessor_object.extract_item_to_rangecheck_data()
neo4jconnection_object.create_item_to_rangecheck_rel(item_to_rangecheck_data)
if not item_to_rangecheck_data.empty:
neo4jconnection_object.create_item_to_rangecheck_rel(item_to_rangecheck_data.to_dict('records'))
item_to_cl_data = xmlprocessor_object.extract_item_to_codelist_data()
neo4jconnection_object.create_item_to_codelist_rel(item_to_cl_data, study_oid, filename)
if not item_to_cl_data.empty:
neo4jconnection_object.create_item_to_codelist_rel(item_to_cl_data.to_dict('records'), study_oid, filename)
cl_to_clitem_data = xmlprocessor_object.extract_codelist_to_codelistitem_data()
neo4jconnection_object.create_codelist_to_codelistitem_rel(cl_to_clitem_data, study_oid, filename)
if not cl_to_clitem_data.empty:
neo4jconnection_object.create_codelist_to_codelistitem_rel(cl_to_clitem_data.to_dict('records'), study_oid, filename)
# rels to and from BasicDefinitions node
neo4jconnection_object.create_basedef_to_study_rel(study_oid, filename)
neo4jconnection_object.create_basedef_to_measurement_rel(measurement_data, study_oid, filename)
except:
print(f"Relationships for file {filename}.xml could not be inserted")
if not measurement_data.empty:
neo4jconnection_object.create_basedef_to_measurement_rel(measurement_data.to_dict('records'), study_oid, filename)
logger.info(f"Inserted all rels for file {filename}.xml")
# postprocess
# postprocessing
logger.info(f"Post-processing for file {filename}.xml")
neo4jconnection_object.postprocess_set_label_post()
dictionary = xmlprocessor_object.prepare_tuple_for_postcoord_rels()
datatuple = dict_to_list_of_tuples(dictionary)
if datatuple:
neo4jconnection_object.postprocess_connect_postcoord_aliases(datatuple)
logger.info(f"Post-processing done for file {filename}.xml")
processed_files += 1
except Exception as e:
logger.error(f"Error processing file {filename}.xml: {e}")
failed_files += 1
continue
# Log batch completion
logger.info(f"Batch completed: {processed_files} successful, {failed_files} failed")
# postprocess all Study nodes
logger.info(f"Final post-processing step")
neo4jconnection_object.postprocess_add_nctid_to_study_nodes()
logger.info(f"Batches completed")