diff --git a/.gitignore b/.gitignore index c067e50..1b19f97 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ **__pycache__/ datasource/ +old-src/ diff --git a/.idea/mdm-to-neo4j.iml b/.idea/mdm-to-neo4j.iml index e1ccf9b..05c31d9 100644 --- a/.idea/mdm-to-neo4j.iml +++ b/.idea/mdm-to-neo4j.iml @@ -1,11 +1,14 @@ - + + + + - \ No newline at end of file diff --git a/src/database_connector/database_connector.py b/src/database_connector/database_connector.py index c909083..22848d3 100644 --- a/src/database_connector/database_connector.py +++ b/src/database_connector/database_connector.py @@ -4,6 +4,7 @@ import logging from neo4j import GraphDatabase, Driver from pandas import DataFrame from run import args +from typing import List, Dict logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -62,147 +63,121 @@ class Neo4jConnection: logger.debug("Connection to Neo4j database closed") self.__connection = None - def create_study_node(self, data: DataFrame, modelid): + def execute_batch_query(self, query: str, data: List[Dict], **params) -> None: + """Execute batch query for better performance""" + try: + if isinstance(data, list) and len(data) > 0: # Only execute if there's data, change if data to ... + self.__connection.execute_query(query, data=data, **params) + logger.debug(f"Executed batch query with {len(data)} records") + else: + self.__connection.execute_query(query, data=data, **params) + logger.debug("Executed batch query with no additional data") + except Exception as e: + logger.error(f"Failed to execute batch query: {e}") + raise + + def create_study_node(self, data: List[Dict], modelid: str) -> None: + query = """ + UNWIND $data AS row + MERGE (st:Study {OID: row.oid, ModelID: $modelid}) + SET st.Name = row.name, + st.Description = row.descr, + st.Protocol = row.protocol """ - based on https://neo4j.com/docs/python-manual/current/query-simple/ + self.execute_batch_query(query, data, modelid=modelid) + + def create_studyevent_node(self, data: List[Dict], studyoid: str, modelid: str): + query = """ + UNWIND $data AS row + MERGE (se:StudyEvent {OID: row.OID, Name: row.Name, Type: row.Type, StudyOID: $study_oid, ModelID: $modelid}) """ - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (st:Study {OID: $studyoid, Name: $studyname, Description: $studydescr, Protocol: $studyprotocol, ModelID: $modelid})", - studyoid=row['oid'], studyname=row['name'], studydescr=row['descr'], studyprotocol=row["protocol"], modelid=modelid - ) - logger.debug(f"Inserted Study node with OID {row['oid']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + self.execute_batch_query(query, data, study_oid=studyoid, modelid=modelid) - def create_studyevent_node(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query("MERGE (:StudyEvent {OID: $oid, Name: $name, Type: $type, StudyOID: $study_oid, ModelID: $modelid})", - oid=row['OID'], name=row['Name'], type=row['Type'], study_oid=studyoid, modelid=modelid - ) - logger.debug(f"Inserted StudyEvent node with OID {row['OID']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_form_node(self, data: List[Dict], studyoid: str, modelid: str): + query = """ + UNWIND $data AS row + MERGE (f:Form {OID: row.OID, Name: row.Name, StudyOID: $study_oid, ModelID: $modelid}) + """ + self.execute_batch_query(query, data, study_oid=studyoid, modelid=modelid) - def create_form_node(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (:Form {OID: $oid, Name: $name, StudyOID: $studyoid, ModelID: $modelid})", - oid=row['OID'], name=row['Name'], studyoid=studyoid, modelid=modelid) - logger.debug(f"Inserted Form node with OID {row['OID']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_itemgroup_node(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MERGE (ig:ItemGroup {OID: row.OID, Name: row.Name, StudyOID: $study_oid, ModelID: $modelid}) + """ + self.execute_batch_query(query, data, study_oid=studyoid, modelid=modelid) - def create_itemgroup_node(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (:ItemGroup {OID: $oid, Name: $name, StudyOID: $studyoid, ModelID: $modelid})", - oid=row['OID'], name=row['Name'], studyoid=studyoid, modelid=modelid) - logger.debug(f"Inserted ItemGroup node with OID {row['OID']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_item_node(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MERGE (i:Item {OID: row.itemoid, Name: row.name, DataType: row.datatype, Question: row.question, StudyOID: $studyoid, ModelID: $modelid}) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_item_node(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (:Item {OID: $oid, Name: $name, DataType: $datatype, Question: $question, StudyOID: $studyoid, ModelID: $modelid})", - oid=row['itemoid'], name=row['name'], datatype=row['datatype'], question=row['question'], - studyoid=studyoid, modelid=modelid) - logger.debug(f"Inserted Item node with OID {row['itemoid']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_alias_node(self, data: List[Dict], studyoid: str, modelid: str) -> None: + query = """ + UNWIND $data AS row + MERGE (a:Alias {Context: row.end, Name: row.end_two, ItemOID: row.start, StudyOID: $studyoid, ModelID: $modelid}) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_alias_node(self, data: DataFrame, - studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (:Alias {Context: $context, Name: $name, ItemOID: $item, StudyOID: $study, ModelID: $modelid})", - context=row['end'], name=row['end_two'], item=row['start'], study=studyoid, modelid=modelid) - logger.debug(f"Inserted Alias node with name {row['end_two']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise - - def create_measurementunit_node(self, data: DataFrame): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (:MeasurementUnit {OID: $oid, Name: $name, Symbol: $symbol})", - oid=row['oid'], name=row['name'], symbol=row['symbol']) - logger.debug(f"Inserted MeasurementUnit node with name {row['name']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_measurementunit_node(self, data: List[Dict]): + query = """ + UNWIND $data AS row + MERGE (m:MeasurementUnit {OID: row.oid, Name: row.name, Symbol: row.symbol}) + """ + self.execute_batch_query(query, data) def create_basedef_node(self, studyoid: str, modelid): + query = """ + MERGE (b:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid}) + """ + self.execute_batch_query(query, data=[], studyoid=studyoid, modelid=modelid) + + def create_rangecheck_node(self, data: List[Dict]): + query = """ + UNWIND $data AS row + MERGE (rc:RangeCheck {Comparator: row.comparator, Constraint: row.constraint, CheckValue: row.value}) + """ + self.execute_batch_query(query, data) + + def create_codelist_node(self, data: List[Dict], studyoid, modelid): + query = """ + UNWIND $data AS row + MERGE (cl:CodeList {OID: row.OID, Name: row.Name, DataType: row.DataType, StudyOID: $studyoid, ModelID: $modelid}) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) + + def create_clitem_node(self, data: List[Dict]): + query = """ + UNWIND $data AS row + MERGE (cli:CodeListItem {CodedValue: row.codedvalue, Decode: row.decode}) + """ + self.execute_batch_query(query, data) + + # Batch relationship creation methods + def create_relationships_batch(self, query: str, data: List[Dict], **params) -> None: # todo: remove it not needed + """Generic batch relationship creation""" + self.execute_batch_query(query, data, **params) + + def create_study_to_studyevent_rel(self, studyoid: str, modelid: str) -> None: + query = """ + MATCH (start:Study {OID: $studyoid, ModelID: $modelid}) + WITH start, start.OID as oid, start.ModelID as mid + MATCH (end:StudyEvent) + WHERE end.StudyOID = oid AND end.ModelID = mid + WITH start, end + MERGE (start)-[:STUDY_HAS_STUDYEVENT]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data=[], studyoid=studyoid, modelid=modelid) + + def create_study_to_studyevent_rel_old(self, studyoid: str, modelid): conn = self.__connection try: conn.execute_query( - "MERGE (:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid})", studyoid=studyoid, modelid=modelid) - logger.debug(f"Inserted BasicDefinitions node for study {studyoid}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise - - def create_rangecheck_node(self, data: DataFrame): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (:RangeCheck {Comparator: $comp, Constraint: $cons, CheckValue: $val})", - comp=row['comparator'], cons=row['constraint'], val=row['value']) - logger.debug(f"Inserted RangeCheck node") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise - - def create_codelist_node(self, data: DataFrame, studyoid, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (:CodeList {OID: $oid, Name: $name, DataType: $type, StudyOID: $studyoid, ModelID: $modelid})", - oid=row['OID'], name=row['Name'], type=row['DataType'], studyoid=studyoid, modelid=modelid) - logger.debug(f"Inserted CodeList node") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise - - def create_clitem_node(self, data: DataFrame): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MERGE (:CodeListItem {CodedValue: $cv, Decode: $dc})", - cv=row['codedvalue'], dc=row['decode']) - logger.debug(f"Inserted CodeListItem node") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise - - def create_study_to_studyevent_rel(self, studyoid: str, modelid): - conn = self.__connection - try: - conn.execute_query( - "MATCH (start:Study {OID: $studyoid, ModelID: $modelid}), (end:StudyEvent {StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:STUDY_HAS_STUDYEVENT]->(end)-[:BACKTRACK]->(start)", + "MATCH (start:Study {OID: $studyoid, ModelID: $modelid})," + " (end:StudyEvent {StudyOID: $studyoid, ModelID: $modelid})" + "MERGE (start)-[:STUDY_HAS_STUDYEVENT]->(end)-[:BACKTRACK]->(start)", studyoid=studyoid, modelid=modelid ) logger.debug(f"Inserted rels betw Study and StudyEvent") @@ -210,175 +185,122 @@ class Neo4jConnection: logger.error(f"Failed to populate Neo4j database: {e}") raise - def create_itemgroup_to_item_rel(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:ItemGroup {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:Item {OID: $endoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:ITEMGROUP_HAS_ITEM]->(end)-[:BACKTRACK]->(start)", - startoid=row['start'], endoid=row['end'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw ItemGroup {row['start']} and Item {row['end']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_itemgroup_to_item_rel(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MATCH (start:ItemGroup {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:Item {OID: row.end, StudyOID: $studyoid, ModelID: $modelid}) + MERGE (start)-[:ITEMGROUP_HAS_ITEM]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_form_to_itemgroup_rel(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:Form {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:ItemGroup {OID: $endoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:FORM_HAS_ITEMGROUP]->(end)-[:BACKTRACK]->(start)", - startoid=row['start'], endoid=row['end'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw Form {row['start']} and ItemGroup {row['end']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_form_to_itemgroup_rel(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MATCH (start:Form {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:ItemGroup {OID: row.end, StudyOID: $studyoid, ModelID: $modelid}) + MERGE (start)-[:FORM_HAS_ITEMGROUP]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_studyevent_to_form_rel(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:StudyEvent {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:Form {OID: $endoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:STUDYEVENT_HAS_FORM]->(end)-[:BACKTRACK]->(start)", - startoid=row['start'], endoid=row['end'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw StudyEvent {row['start']} and Form {row['end']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_studyevent_to_form_rel(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MATCH (start:StudyEvent {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:Form {OID: row.end, StudyOID: $studyoid, ModelID: $modelid}) + MERGE (start)-[:STUDYEVENT_HAS_FORM]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_item_to_alias_rel(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:Item {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:Alias {Context: $endcontext, Name: $endname, ItemOID: $startoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:ITEM_HAS_ALIAS]->(end)-[:BACKTRACK]->(start)", - startoid=row['start'], endcontext=row['end'], endname=row['end_two'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw Item {row['start']} and Alias {row['end']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_item_to_alias_rel(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MATCH (start:Item {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:Alias {Context: row.end, Name: row.end_two, ItemOID: row.start, StudyOID: $studyoid, ModelID: $modelid}) + MERGE (start)-[:ITEM_HAS_ALIAS]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_itemgroup_to_alias_rel(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:ItemGroup {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:Alias {Context: $endcontext, Name: $endname}) MERGE (start)-[:ITEMGROUP_HAS_ALIAS]->(end)-[:BACKTRACK]->(start)", - startoid=row['start'], endcontext=row['end'], endname=row['end_two'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw ItemGroup {row['start']} and Alias {row['end']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_itemgroup_to_alias_rel(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MATCH (start:ItemGroup {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:Alias {Context: row.end, Name: row.end_two}) + MERGE (start)-[:ITEMGROUP_HAS_ALIAS]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_item_to_measurement_rel(self, data: DataFrame, studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:Item {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:MeasurementUnit {OID: $endoid, Name: $endname}) MERGE (start)-[:ITEM_HAS_MEASUREMENTUNIT]->(end)-[:BACKTRACK]->(start)", - startoid=row['start'], endoid=row['endoid'], endname=row['endname'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw Item {row['start']} and MeasurementUnit {row['endname']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_item_to_measurement_rel(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MATCH (start:Item {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:MeasurementUnit {OID: row.endoid, Name: row.endname}) + MERGE (start)-[:ITEM_HAS_MEASUREMENTUNIT]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) def create_basedef_to_study_rel(self, studyoid: str, modelid): - conn = self.__connection - try: - conn.execute_query( - "MATCH (start:Study {OID: $studyoid, ModelID: $modelid}), (end:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:HAS_BASEDEF]->(end)", - studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw Study {studyoid} and BasicDefinitions") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + query = """ + MATCH (start:Study {OID: $studyoid, ModelID: $modelid}), (end:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid}) + MERGE (start)-[:HAS_BASEDEF]->(end) + """ + self.execute_batch_query(query, data=[], studyoid=studyoid, modelid=modelid) - def create_basedef_to_measurement_rel(self, data: DataFrame, - studyoid: str, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid}), (end:MeasurementUnit {OID: $endoid, Name: $endname, Symbol: $endsymbol}) MERGE (start)-[:BASEDEF_HAS_MEASUREMENTUNIT]->(end)", - endoid=row['oid'], endname=row['name'], endsymbol=row['symbol'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw BasicDefinitions and MeasurementUnit {row['name']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_basedef_to_measurement_rel(self, data: List[Dict], studyoid: str, modelid): + query = """ + UNWIND $data AS row + MATCH (start:BasicDefinitions {StudyOID: $studyoid, ModelID: $modelid}), (end:MeasurementUnit {OID: row.oid, Name: row.name, Symbol: row.symbol}) + MERGE (start)-[:BASEDEF_HAS_MEASUREMENTUNIT]->(end) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_item_to_rangecheck_rel(self, data: DataFrame): #TODO check for Item properties (missing StudyOID?) - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:Item {OID: $itemoid}), (end:RangeCheck {Comparator: $comp, Constraint: $cons, CheckValue: $cvalue}) MERGE (start)-[:ITEM_HAS_RANGECHECK]->(end)-[:BACKTRACK]->(start)", - itemoid=row['start'], comp=row['endcomparator'], cons=row['endconstraint'], cvalue=row['endvalue']) - logger.debug( - f"Inserted rels betw Item and RangeCheck") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_item_to_rangecheck_rel(self, data: List[Dict]): # TODO check for Item properties (missing StudyOID?) + query = """ + UNWIND $data AS row + MATCH (start:Item {OID: row.start}), (end:RangeCheck {Comparator: row.endcomparator, Constraint: row.endconstraint, CheckValue: row.endvalue}) + MERGE (start)-[:ITEM_HAS_RANGECHECK]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data) - def create_item_to_codelist_rel(self, data: DataFrame, studyoid, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:Item {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:CodeList {OID: $endoid, StudyOID: $studyoid, ModelID: $modelid}) MERGE (start)-[:ITEM_HAS_CODELIST]->(end)-[:BACKTRACK]->(start)", - startoid=row['start'], endoid=row['end'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw Item {row['start']} and CodeList {row['end']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_item_to_codelist_rel(self, data: List[Dict], studyoid, modelid): + query = """ + UNWIND $data AS row + MATCH (start:Item {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:CodeList {OID: row.end, StudyOID: $studyoid, ModelID: $modelid}) + MERGE (start)-[:ITEM_HAS_CODELIST]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) - def create_codelist_to_codelistitem_rel(self, data: DataFrame, studyoid, modelid): - conn = self.__connection - try: - for index, row in data.iterrows(): - conn.execute_query( - "MATCH (start:CodeList {OID: $startoid, StudyOID: $studyoid, ModelID: $modelid}), (end:CodeListItem {CodedValue: $cv, Decode: $dc}) MERGE (start)-[:CODELIST_HAS_CODELISTITEM]->(end)-[:BACKTRACK]->(start)", - startoid=row['start'], cv=row['endcodedvalue'], dc=row['enddecode'], studyoid=studyoid, modelid=modelid) - logger.debug( - f"Inserted rels betw CodeList {row['start']} and CodeListItem {row['endcodedvalue']}") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def create_codelist_to_codelistitem_rel(self, data: List[Dict], studyoid, modelid): + query = """ + UNWIND $data AS row + MATCH (start:CodeList {OID: row.start, StudyOID: $studyoid, ModelID: $modelid}), (end:CodeListItem {CodedValue: row.endcodedvalue, Decode: row.enddecode}) + MERGE (start)-[:CODELIST_HAS_CODELISTITEM]->(end)-[:BACKTRACK]->(start) + """ + self.execute_batch_query(query, data, studyoid=studyoid, modelid=modelid) # postprocessing with cypher queries def postprocess_set_label_post(self): - conn = self.__connection - try: - conn.execute_query( - "MATCH (n:Alias) WHERE n.Context contains '[' and n.Context contains ',' and n.Context contains ']' set n :Post" - ) - logger.debug(f"Inserted post property in alias nodes") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + query = """ + MATCH (n:Alias) + WHERE n.Context CONTAINS '[' AND n.Context CONTAINS ',' AND n.Context CONTAINS ']' + SET n:Post + """ + self.execute_batch_query(query, data=[]) + logger.debug(f"Inserted post property in alias nodes") - def postprocess_connect_postcoord_aliases(self, tuples): - conn = self.__connection - try: - conn.execute_query( - """ - UNWIND $data AS row - MATCH (i:Item {OID: row.item_oid}) - WITH row, i - MATCH (a1:Post {StudyOID: row.study_oid, Context: row.ontology + '[' + row.x + ',' + 1 + ']'})--(i) - MATCH (a2:Post {StudyOID: row.study_oid, Context: row.ontology + '[' + row.x + ',' + row.y + ']'})--(i) - WHERE a1 <> a2 - MERGE (a1)-[rel:COMPOSITE]->(a2) - """, data=tuples) - logger.debug(f"Inserted rels btw post-coordinated aliases") - except Exception as e: - logger.error(f"Failed to execute query: {e}") - raise + def postprocess_connect_postcoord_aliases(self, data): + query = """ + UNWIND $data AS row + MATCH (i:Item {OID: row.item_oid}) + WITH row, i + MATCH (a1:Post {StudyOID: row.study_oid, Context: row.ontology + '[' + row.x + ',' + 1 + ']'})--(i) + MATCH (a2:Post {StudyOID: row.study_oid, Context: row.ontology + '[' + row.x + ',' + row.y + ']'})--(i) + WHERE a1 <> a2 + MERGE (a1)-[rel:COMPOSITE]->(a2) + """ + self.execute_batch_query(query, data) + logger.debug(f"Inserted rels btw post-coordinated aliases") + + def postprocess_add_nctid_to_study_nodes(self): + query = """ + MATCH (n:Study) WHERE NOT n.OID =~ '.*(NCT\\d{8}).*' + WITH n, apoc.text.regexGroups(n.Name, 'NCT\\d{8}') AS matches + WHERE size(matches) > 0 + SET n.NCT_ID = matches[0][0] + """ + self.execute_batch_query(query, data=[]) + logger.debug(f"Inserted NCT identifier to Study nodes") diff --git a/src/run.py b/src/run.py index 4994804..d18c4f9 100644 --- a/src/run.py +++ b/src/run.py @@ -5,7 +5,7 @@ import atexit import signal import logging -MDM2NEO4J_VERSION: str = "0.1" +MDM2NEO4J_VERSION: str = "0.2" logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -17,6 +17,7 @@ parser = argparse.ArgumentParser() parser.add_argument('-c', '--conf', required=True, type=str, help='Configuration file with database connection parameters') parser.add_argument('-f', '--files', required=True, type=str, help='Directory with xml files') +parser.add_argument('-b', '--batch-size', required=False, type=int, default=500, help='Number of files per batch') # parse parameters args = parser.parse_args() diff --git a/src/xml_processor/xml_processor.py b/src/xml_processor/xml_processor.py index 84765ce..d7d12a4 100644 --- a/src/xml_processor/xml_processor.py +++ b/src/xml_processor/xml_processor.py @@ -517,104 +517,215 @@ def dict_to_list_of_tuples(mydictionary): return data -def parse_xml( - path_to_datafiles): # creates XmlProcessor object for each xml file +def parse_xml(path_to_datafiles: str, batch_size: int = 500): # creates XmlProcessor object for each xml file neo4jconnection_object = db.Neo4jConnection() - for file in os.listdir(path_to_datafiles): - if file.endswith(".xml"): + + xml_files = [file for file in os.listdir(path_to_datafiles) if file.endswith(".xml")] + total_files = len(xml_files) + + logger.info(F"Found {total_files} xml files to process") + + for batch_start in range(0, total_files, batch_size): + batch_end = min(batch_start + batch_size, total_files) + batch_files = xml_files[batch_start:batch_end] + + logger.info(f"Processing batch {batch_start // batch_end + 1}: " f"{batch_start + 1}-{batch_end} of {total_files} files") + + processed_files = 0 + failed_files = 0 + + for file in batch_files: filename = os.path.splitext(file)[0] filepath = os.path.join(path_to_datafiles, file) - with open(filepath) as fp: - parsed_xml = BeautifulSoup(fp, "xml") + + try: + logger.info(f"Processing file: {filename}.xml") + + with open(filepath) as fp: + parsed_xml = BeautifulSoup(fp, "xml") + # create XmlProcessor object for parsed_xml files xmlprocessor_object = XmlProcessor(parsed_xml) # do stuff with the object study_oid = xmlprocessor_object.get_studyoid() + # extract data in batches + study_data = xmlprocessor_object.extract_study_data() + itemgroup_data = xmlprocessor_object.extract_itemgroup_data() + item_data = xmlprocessor_object.extract_item_data() + studyevent_data = xmlprocessor_object.extract_studyevent_data() + form_data = xmlprocessor_object.extract_form_data() + alias_data = xmlprocessor_object.extract_item_to_alias_data() + measurement_data = xmlprocessor_object.extract_measurement_data() + rangecheck_data = xmlprocessor_object.extract_rangecheck_data() + codelist_data = xmlprocessor_object.extract_codelist_data() + codelistitem_data = xmlprocessor_object.extract_codelistitem_data() + + # create nodes in batches + if not study_data.empty: + neo4jconnection_object.create_study_node(study_data.to_dict('records'), filename) + + if not itemgroup_data.empty: + neo4jconnection_object.create_itemgroup_node(itemgroup_data.to_dict('records'), study_oid, filename) + + if not item_data.empty: + neo4jconnection_object.create_item_node(item_data.to_dict('records'), study_oid, filename) + + if not studyevent_data.empty: + neo4jconnection_object.create_studyevent_node(studyevent_data.to_dict('records'), study_oid, filename) + + if not form_data.empty: + neo4jconnection_object.create_form_node(form_data.to_dict('records'), study_oid, filename) + + if not alias_data.empty: + neo4jconnection_object.create_alias_node(alias_data.to_dict('records'), study_oid, filename) + + if not measurement_data.empty: + neo4jconnection_object.create_measurementunit_node(measurement_data.to_dict('records')) + + if not rangecheck_data.empty: + neo4jconnection_object.create_rangecheck_node(rangecheck_data.to_dict('records')) + + if not codelist_data.empty: + neo4jconnection_object.create_codelist_node(codelist_data.to_dict('records'), study_oid, filename) + + if not codelistitem_data.empty: + neo4jconnection_object.create_clitem_node(codelistitem_data.to_dict('records')) + + if parsed_xml.findAll('BasicDefinitions'): + neo4jconnection_object.create_basedef_node(study_oid, filename) + + logger.info(f"Inserted all nodes for file {filename}.xml") + + # Create relationships in batches + + ### previous version ### + #for file in os.listdir(path_to_datafiles): + # if file.endswith(".xml"): + # filename = os.path.splitext(file)[0] + # filepath = os.path.join(path_to_datafiles, file) + # with open(filepath) as fp: + # parsed_xml = BeautifulSoup(fp, "xml") + # create XmlProcessor object for parsed_xml files + # xmlprocessor_object = XmlProcessor(parsed_xml) + # do stuff with the object + # study_oid = xmlprocessor_object.get_studyoid() + # create nodes - try: - study_data = xmlprocessor_object.extract_study_data() - neo4jconnection_object.create_study_node(study_data, filename) + # try: + # study_data = xmlprocessor_object.extract_study_data() + # neo4jconnection_object.create_study_node(study_data, filename) - itemgroup_data = xmlprocessor_object.extract_itemgroup_data() - neo4jconnection_object.create_itemgroup_node(itemgroup_data, study_oid, filename) + # itemgroup_data = xmlprocessor_object.extract_itemgroup_data() + # neo4jconnection_object.create_itemgroup_node(itemgroup_data, study_oid, filename) - item_data = xmlprocessor_object.extract_item_data() - neo4jconnection_object.create_item_node(item_data, study_oid, filename) + # item_data = xmlprocessor_object.extract_item_data() + # neo4jconnection_object.create_item_node(item_data, study_oid, filename) - studyevent_data = xmlprocessor_object.extract_studyevent_data() - neo4jconnection_object.create_studyevent_node(studyevent_data, study_oid, filename) + # studyevent_data = xmlprocessor_object.extract_studyevent_data() + # neo4jconnection_object.create_studyevent_node(studyevent_data, study_oid, filename) - form_data = xmlprocessor_object.extract_form_data() - neo4jconnection_object.create_form_node(form_data, study_oid, filename) + # form_data = xmlprocessor_object.extract_form_data() + # neo4jconnection_object.create_form_node(form_data, study_oid, filename) - alias_data = xmlprocessor_object.extract_item_to_alias_data() - neo4jconnection_object.create_alias_node(alias_data, study_oid, filename) + # alias_data = xmlprocessor_object.extract_item_to_alias_data() + # neo4jconnection_object.create_alias_node(alias_data, study_oid, filename) - measurement_data = xmlprocessor_object.extract_measurement_data() - neo4jconnection_object.create_measurementunit_node(measurement_data) + # measurement_data = xmlprocessor_object.extract_measurement_data() + # neo4jconnection_object.create_measurementunit_node(measurement_data) - rangecheck_data = xmlprocessor_object.extract_rangecheck_data() - neo4jconnection_object.create_rangecheck_node(rangecheck_data) + # rangecheck_data = xmlprocessor_object.extract_rangecheck_data() + # neo4jconnection_object.create_rangecheck_node(rangecheck_data) - codelist_data = xmlprocessor_object.extract_codelist_data() - neo4jconnection_object.create_codelist_node(codelist_data, study_oid, filename) + # codelist_data = xmlprocessor_object.extract_codelist_data() + # neo4jconnection_object.create_codelist_node(codelist_data, study_oid, filename) - codelistitem_data = xmlprocessor_object.extract_codelistitem_data() - neo4jconnection_object.create_clitem_node(codelistitem_data) + # codelistitem_data = xmlprocessor_object.extract_codelistitem_data() + # neo4jconnection_object.create_clitem_node(codelistitem_data) - if parsed_xml.findAll('BasicDefinitions'): - neo4jconnection_object.create_basedef_node(study_oid, filename) + # if parsed_xml.findAll('BasicDefinitions'): + # neo4jconnection_object.create_basedef_node(study_oid, filename) - logger.info(f"Inserted all nodes for file {filename}.xml") + # logger.info(f"Inserted all nodes for file {filename}.xml") - except: - print(f"Nodes for file {filename}.xml could not be inserted") + # except: + # print(f"Nodes for file {filename}.xml could not be inserted") # create rels - try: - itemgroup_to_item_data = xmlprocessor_object.extract_itemgroup_to_item_data - neo4jconnection_object.create_itemgroup_to_item_rel(itemgroup_to_item_data, study_oid, filename) - item_to_alias_data = xmlprocessor_object.extract_item_to_alias_data() - neo4jconnection_object.create_item_to_alias_rel(item_to_alias_data, study_oid, filename) + # Create relationships in batches + logger.info(f"Creating relationships for {filename}.xml...") - itemgroup_to_alias_data = xmlprocessor_object.extract_itemgroup_to_alias_data() - neo4jconnection_object.create_itemgroup_to_alias_rel(itemgroup_to_alias_data, study_oid, filename) + # Get data for rels + itemgroup_to_item_data = xmlprocessor_object.extract_itemgroup_to_item_data + item_to_alias_data = xmlprocessor_object.extract_item_to_alias_data() + itemgroup_to_alias_data = xmlprocessor_object.extract_itemgroup_to_alias_data() + form_to_itemgroup_data = xmlprocessor_object.extract_form_to_itemgroup_data() + studyevent_to_form_data = xmlprocessor_object.extract_studyevent_to_form_data() + item_to_measurement_data = xmlprocessor_object.extract_item_to_measurement_data() + item_to_rangecheck_data = xmlprocessor_object.extract_item_to_rangecheck_data() + item_to_cl_data = xmlprocessor_object.extract_item_to_codelist_data() + cl_to_clitem_data = xmlprocessor_object.extract_codelist_to_codelistitem_data() - form_to_itemgroup_data = xmlprocessor_object.extract_form_to_itemgroup_data() - neo4jconnection_object.create_form_to_itemgroup_rel(form_to_itemgroup_data, study_oid, filename) + # Batch creation of relationships - studyevent_to_form_data = xmlprocessor_object.extract_studyevent_to_form_data() - neo4jconnection_object.create_studyevent_to_form_rel(studyevent_to_form_data, study_oid, filename) + if not itemgroup_to_item_data.empty: + neo4jconnection_object.create_itemgroup_to_item_rel(itemgroup_to_item_data.to_dict('records'), study_oid, filename) + if not item_to_alias_data.empty: + neo4jconnection_object.create_item_to_alias_rel(item_to_alias_data.to_dict('records'), study_oid, filename) + + if not itemgroup_to_alias_data.empty: + neo4jconnection_object.create_itemgroup_to_alias_rel(itemgroup_to_alias_data.to_dict('records'), study_oid, filename) + + if not form_to_itemgroup_data.empty: + neo4jconnection_object.create_form_to_itemgroup_rel(form_to_itemgroup_data.to_dict('records'), study_oid, filename) + + if not studyevent_to_form_data.empty: + neo4jconnection_object.create_studyevent_to_form_rel(studyevent_to_form_data.to_dict('records'), study_oid, filename) + + if study_oid: neo4jconnection_object.create_study_to_studyevent_rel(study_oid, filename) - item_to_measurement_data = xmlprocessor_object.extract_item_to_measurement_data() - neo4jconnection_object.create_item_to_measurement_rel(item_to_measurement_data, study_oid, filename) + if not item_to_measurement_data.empty: + neo4jconnection_object.create_item_to_measurement_rel(item_to_measurement_data.to_dict('records'), study_oid, filename) - item_to_rangecheck_data = xmlprocessor_object.extract_item_to_rangecheck_data() - neo4jconnection_object.create_item_to_rangecheck_rel(item_to_rangecheck_data) + if not item_to_rangecheck_data.empty: + neo4jconnection_object.create_item_to_rangecheck_rel(item_to_rangecheck_data.to_dict('records')) - item_to_cl_data = xmlprocessor_object.extract_item_to_codelist_data() - neo4jconnection_object.create_item_to_codelist_rel(item_to_cl_data, study_oid, filename) + if not item_to_cl_data.empty: + neo4jconnection_object.create_item_to_codelist_rel(item_to_cl_data.to_dict('records'), study_oid, filename) - cl_to_clitem_data = xmlprocessor_object.extract_codelist_to_codelistitem_data() - neo4jconnection_object.create_codelist_to_codelistitem_rel(cl_to_clitem_data, study_oid, filename) + if not cl_to_clitem_data.empty: + neo4jconnection_object.create_codelist_to_codelistitem_rel(cl_to_clitem_data.to_dict('records'), study_oid, filename) - # rels to and from BasicDefinitions node - neo4jconnection_object.create_basedef_to_study_rel(study_oid, filename) - neo4jconnection_object.create_basedef_to_measurement_rel(measurement_data, study_oid, filename) - - except: - print(f"Relationships for file {filename}.xml could not be inserted") + # rels to and from BasicDefinitions node + neo4jconnection_object.create_basedef_to_study_rel(study_oid, filename) + if not measurement_data.empty: + neo4jconnection_object.create_basedef_to_measurement_rel(measurement_data.to_dict('records'), study_oid, filename) logger.info(f"Inserted all rels for file {filename}.xml") - # postprocess + # postprocessing + logger.info(f"Post-processing for file {filename}.xml") neo4jconnection_object.postprocess_set_label_post() dictionary = xmlprocessor_object.prepare_tuple_for_postcoord_rels() datatuple = dict_to_list_of_tuples(dictionary) - neo4jconnection_object.postprocess_connect_postcoord_aliases(datatuple) + if datatuple: + neo4jconnection_object.postprocess_connect_postcoord_aliases(datatuple) logger.info(f"Post-processing done for file {filename}.xml") + processed_files += 1 + + except Exception as e: + logger.error(f"Error processing file {filename}.xml: {e}") + failed_files += 1 + continue + + # Log batch completion + logger.info(f"Batch completed: {processed_files} successful, {failed_files} failed") + + # postprocess all Study nodes + logger.info(f"Final post-processing step") + neo4jconnection_object.postprocess_add_nctid_to_study_nodes() + logger.info(f"Batches completed")