#!/usr/bin/env python # # Copyright 2021, Heidelberg University Clinic # # File author(s): Sebastian Lobentanzer # ... # # Distributed under MIT licence, see the file `LICENSE`. # """ BioCypher 'ontology' module. Contains classes and functions to handle parsing and representation of single ontologies as well as their hybridisation and other advanced operations. """ import os from ._logger import logger logger.debug(f"Loading module {__name__}.") from typing import Optional from datetime import datetime from rdflib import Graph from rdflib.extras.external_graph_libs import rdflib_to_networkx_digraph import rdflib import networkx as nx from ._misc import ( to_list, to_lower_sentence_case, create_tree_visualisation, sentencecase_to_pascalcase, ) from ._mapping import OntologyMapping class OntologyAdapter: """ Class that represents an ontology to be used in the Biocypher framework. Can read from a variety of formats, including OWL, OBO, and RDF/XML. The ontology is represented by a networkx.DiGraph object; an RDFlib graph is also kept. By default, the DiGraph reverses the label and identifier of the nodes, such that the node name in the graph is the human-readable label. The edges are oriented from child to parent. Labels are formatted in lower sentence case and underscores are replaced by spaces. Identifiers are taken as defined and the prefixes are removed by default. """ def __init__( self, ontology_file: str, root_label: str, ontology_file_format: Optional[str] = None, head_join_node_label: Optional[str] = None, merge_nodes: Optional[bool] = True, switch_label_and_id: bool = True, remove_prefixes: bool = True, ): """ Initialize the OntologyAdapter class. Args: ontology_file (str): Path to the ontology file. Can be local or remote. root_label (str): The label of the root node in the ontology. In case of a tail ontology, this is the tail join node. ontology_file_format (str): The format of the ontology file (e.g. "application/rdf+xml") If format is not passed, it is determined automatically. head_join_node_label (str): Optional variable to store the label of the node in the head ontology that should be used to join to the root node of the tail ontology. Defaults to None. merge_nodes (bool): If True, head and tail join nodes will be merged, using the label of the head join node. If False, the tail join node will be attached as a child of the head join node. switch_label_and_id (bool): If True, the node names in the graph will be the human-readable labels. If False, the node names will be the identifiers. Defaults to True. remove_prefixes (bool): If True, the prefixes of the identifiers will be removed. Defaults to True. """ logger.info(f"Instantiating OntologyAdapter class for {ontology_file}.") self._ontology_file = ontology_file self._root_label = root_label self._format = ontology_file_format self._merge_nodes = merge_nodes self._head_join_node = head_join_node_label self._switch_label_and_id = switch_label_and_id self._remove_prefixes = remove_prefixes self._rdf_graph = self._load_rdf_graph(ontology_file) self._nx_graph = self._rdf_to_nx( self._rdf_graph, root_label, switch_label_and_id ) def _rdf_to_nx( self, _rdf_graph: rdflib.Graph, root_label: str, switch_label_and_id: bool, rename_nodes: bool = True, ) -> nx.DiGraph: one_to_one_triples, one_to_many_dict = self._get_relevant_rdf_triples( _rdf_graph ) nx_graph = self._convert_to_nx(one_to_one_triples, one_to_many_dict) nx_graph = self._add_labels_to_nodes(nx_graph, switch_label_and_id) nx_graph = self._change_nodes_to_biocypher_format( nx_graph, switch_label_and_id, rename_nodes ) nx_graph = self._get_all_ancestors( nx_graph, root_label, switch_label_and_id, rename_nodes ) return nx.DiGraph(nx_graph) def _get_relevant_rdf_triples(self, g: rdflib.Graph) -> tuple: one_to_one_inheritance_graph = self._get_one_to_one_inheritance_triples( g ) intersection = self._get_multiple_inheritance_dict(g) return one_to_one_inheritance_graph, intersection def _get_one_to_one_inheritance_triples( self, g: rdflib.Graph ) -> rdflib.Graph: """Get the one to one inheritance triples from the RDF graph. Args: g (rdflib.Graph): The RDF graph Returns: rdflib.Graph: The one to one inheritance graph """ one_to_one_inheritance_graph = Graph() for s, p, o in g.triples((None, rdflib.RDFS.subClassOf, None)): if self.has_label(s, g): one_to_one_inheritance_graph.add((s, p, o)) return one_to_one_inheritance_graph def _get_multiple_inheritance_dict(self, g: rdflib.Graph) -> dict: """Get the multiple inheritance dictionary from the RDF graph. Args: g (rdflib.Graph): The RDF graph Returns: dict: The multiple inheritance dictionary """ multiple_inheritance = g.triples( (None, rdflib.OWL.intersectionOf, None) ) intersection = {} for ( node, has_multiple_parents, first_node_of_intersection_list, ) in multiple_inheritance: parents = self._retrieve_rdf_linked_list( first_node_of_intersection_list ) child_name = None for s_, _, _ in g.triples((None, rdflib.RDFS.subClassOf, node)): child_name = s_ # Handle Snomed CT post coordinated expressions if not child_name: for s_, _, _ in g.triples( (None, rdflib.OWL.equivalentClass, node) ): child_name = s_ if child_name: intersection[node] = { "child_name": child_name, "parent_node_names": parents, } return intersection def has_label(self, node: rdflib.URIRef, g: rdflib.Graph) -> bool: """Does the node have a label in g? Args: node (rdflib.URIRef): The node to check g (rdflib.Graph): The graph to check in Returns: bool: True if the node has a label, False otherwise """ return (node, rdflib.RDFS.label, None) in g def _retrieve_rdf_linked_list(self, subject: rdflib.URIRef) -> list: """Recursively retrieves a linked list from RDF. Example RDF list with the items [item1, item2]: list_node - first -> item1 list_node - rest -> list_node2 list_node2 - first -> item2 list_node2 - rest -> nil Args: subject (rdflib.URIRef): One list_node of the RDF list Returns: list: The items of the RDF list """ g = self._rdf_graph rdf_list = [] for s, p, o in g.triples((subject, rdflib.RDF.first, None)): rdf_list.append(o) for s, p, o in g.triples((subject, rdflib.RDF.rest, None)): if o != rdflib.RDF.nil: rdf_list.extend(self._retrieve_rdf_linked_list(o)) return rdf_list def _convert_to_nx( self, one_to_one: rdflib.Graph, one_to_many: dict ) -> nx.DiGraph: """Convert the one to one and one to many inheritance graphs to networkx. Args: one_to_one (rdflib.Graph): The one to one inheritance graph one_to_many (dict): The one to many inheritance dictionary Returns: nx.DiGraph: The networkx graph """ nx_graph = rdflib_to_networkx_digraph( one_to_one, edge_attrs=lambda s, p, o: {}, calc_weights=False ) for key, value in one_to_many.items(): nx_graph.add_edges_from( [ (value["child_name"], parent) for parent in value["parent_node_names"] ] ) if key in nx_graph.nodes: nx_graph.remove_node(key) return nx_graph def _add_labels_to_nodes( self, nx_graph: nx.DiGraph, switch_label_and_id: bool ) -> nx.DiGraph: """Add labels to the nodes in the networkx graph. Args: nx_graph (nx.DiGraph): The networkx graph switch_label_and_id (bool): If True, id and label are switched Returns: nx.DiGraph: The networkx graph with labels """ for node in list(nx_graph.nodes): nx_id, nx_label = self._get_nx_id_and_label( node, switch_label_and_id ) if nx_id == "none": # remove node if it has no id nx_graph.remove_node(node) continue nx_graph.nodes[node]["label"] = nx_label return nx_graph def _change_nodes_to_biocypher_format( self, nx_graph: nx.DiGraph, switch_label_and_id: bool, rename_nodes: bool = True, ) -> nx.DiGraph: """Change the nodes in the networkx graph to BioCypher format: - remove the prefix of the identifier - switch id and label - adapt the labels (replace _ with space and convert to lower sentence case) Args: nx_graph (nx.DiGraph): The networkx graph switch_label_and_id (bool): If True, id and label are switched rename_nodes (bool): If True, the nodes are renamed Returns: nx.DiGraph: The networkx ontology graph in BioCypher format """ mapping = { node: self._get_nx_id_and_label( node, switch_label_and_id, rename_nodes )[0] for node in nx_graph.nodes } renamed = nx.relabel_nodes(nx_graph, mapping, copy=False) return renamed def _get_all_ancestors( self, renamed: nx.DiGraph, root_label: str, switch_label_and_id: bool, rename_nodes: bool = True, ) -> nx.DiGraph: """Get all ancestors of the root node in the networkx graph. Args: renamed (nx.DiGraph): The renamed networkx graph root_label (str): The label of the root node in the ontology switch_label_and_id (bool): If True, id and label are switched rename_nodes (bool): If True, the nodes are renamed Returns: nx.DiGraph: The filtered networkx graph """ root = self._get_nx_id_and_label( self._find_root_label(self._rdf_graph, root_label), switch_label_and_id, rename_nodes, )[0] ancestors = nx.ancestors(renamed, root) ancestors.add(root) filtered_graph = renamed.subgraph(ancestors) return filtered_graph def _get_nx_id_and_label( self, node, switch_id_and_label: bool, rename_nodes: bool = True ) -> tuple[str, str]: """Rename node id and label for nx graph. Args: node (str): The node to rename switch_id_and_label (bool): If True, switch id and label Returns: tuple[str, str]: The renamed node id and label """ node_id_str = self._remove_prefix(str(node)) node_label_str = str(self._rdf_graph.value(node, rdflib.RDFS.label)) if rename_nodes: node_label_str = node_label_str.replace("_", " ") node_label_str = to_lower_sentence_case(node_label_str) nx_id = node_label_str if switch_id_and_label else node_id_str nx_label = node_id_str if switch_id_and_label else node_label_str return nx_id, nx_label def _find_root_label(self, g, root_label): # Loop through all labels in the ontology for label_subject, _, label_in_ontology in g.triples( (None, rdflib.RDFS.label, None) ): # If the label is the root label, set the root node to the label's subject if str(label_in_ontology) == root_label: root = label_subject break else: labels_in_ontology = [] for label_subject, _, label_in_ontology in g.triples( (None, rdflib.RDFS.label, None) ): labels_in_ontology.append(str(label_in_ontology)) raise ValueError( f"Could not find root node with label '{root_label}'. " f"The ontology contains the following labels: {labels_in_ontology}" ) return root def _remove_prefix(self, uri: str) -> str: """ Remove the prefix of a URI. URIs can contain either "#" or "/" as a separator between the prefix and the local name. The prefix is everything before the last separator. """ if self._remove_prefixes: return uri.rsplit("#", 1)[-1].rsplit("/", 1)[-1] else: return uri def _load_rdf_graph(self, ontology_file): """ Load the ontology into an RDFlib graph. The ontology file can be in OWL, OBO, or RDF/XML format. """ g = rdflib.Graph() g.parse(ontology_file, format=self._get_format(ontology_file)) return g def _get_format(self, ontology_file): """ Get the format of the ontology file. """ if self._format: if self._format == "owl": return "application/rdf+xml" elif self._format == "obo": raise NotImplementedError("OBO format not yet supported") elif self._format == "rdf": return "application/rdf+xml" elif self._format == "ttl": return self._format else: raise ValueError( f"Could not determine format of ontology file {ontology_file}" ) if ontology_file.endswith(".owl"): return "application/rdf+xml" elif ontology_file.endswith(".obo"): raise NotImplementedError("OBO format not yet supported") elif ontology_file.endswith(".rdf"): return "application/rdf+xml" elif ontology_file.endswith(".ttl"): return "ttl" else: raise ValueError( f"Could not determine format of ontology file {ontology_file}" ) def get_nx_graph(self): """ Get the networkx graph representing the ontology. """ return self._nx_graph def get_rdf_graph(self): """ Get the RDFlib graph representing the ontology. """ return self._rdf_graph def get_root_node(self): """ Get root node in the ontology. Returns: root_node: If _switch_label_and_id is True, the root node label is returned, otherwise the root node id is returned. """ root_node = None root_label = self._root_label.replace("_", " ") if self._switch_label_and_id: root_node = to_lower_sentence_case(root_label) elif not self._switch_label_and_id: for node, data in self.get_nx_graph().nodes(data=True): if "label" in data and data["label"] == to_lower_sentence_case( root_label ): root_node = node break return root_node def get_ancestors(self, node_label): """ Get the ancestors of a node in the ontology. """ return nx.dfs_preorder_nodes(self._nx_graph, node_label) def get_head_join_node(self): """ Get the head join node of the ontology. """ return self._head_join_node class Ontology: """ A class that represents the ontological "backbone" of a BioCypher knowledge graph. The ontology can be built from a single resource, or hybridised from a combination of resources, with one resource being the "head" ontology, while an arbitrary number of other resources can become "tail" ontologies at arbitrary fusion points inside the "head" ontology. """ def __init__( self, head_ontology: dict, ontology_mapping: Optional["OntologyMapping"] = None, tail_ontologies: Optional[dict] = None, ): """ Initialize the Ontology class. Args: head_ontology (OntologyAdapter): The head ontology. tail_ontologies (list): A list of OntologyAdapters that will be added to the head ontology. Defaults to None. """ self._head_ontology_meta = head_ontology self.mapping = ontology_mapping self._tail_ontology_meta = tail_ontologies self._tail_ontologies = None self._nx_graph = None # keep track of nodes that have been extended self._extended_nodes = set() self._main() def _main(self) -> None: """ Main method to be run on instantiation. Loads the ontologies, joins them, and returns the hybrid ontology. Loads only the head ontology if nothing else is given. Adds user extensions and properties from the mapping. """ self._load_ontologies() if self._tail_ontologies: for adapter in self._tail_ontologies.values(): head_join_node = self._get_head_join_node(adapter) self._join_ontologies(adapter, head_join_node) else: self._nx_graph = self._head_ontology.get_nx_graph() if self.mapping: self._extend_ontology() # experimental: add connections of disjoint classes to entity # self._connect_biolink_classes() self._add_properties() def _load_ontologies(self) -> None: """ For each ontology, load the OntologyAdapter object and store it as an instance variable (head) or a dictionary (tail). """ logger.info("Loading ontologies...") self._head_ontology = OntologyAdapter( ontology_file=self._head_ontology_meta["url"], root_label=self._head_ontology_meta["root_node"], ontology_file_format=self._head_ontology_meta.get("format", None), switch_label_and_id=self._head_ontology_meta.get( "switch_label_and_id", True ), ) if self._tail_ontology_meta: self._tail_ontologies = {} for key, value in self._tail_ontology_meta.items(): self._tail_ontologies[key] = OntologyAdapter( ontology_file=value["url"], root_label=value["tail_join_node"], head_join_node_label=value["head_join_node"], ontology_file_format=value.get("format", None), merge_nodes=value.get("merge_nodes", True), switch_label_and_id=value.get("switch_label_and_id", True), ) def _get_head_join_node(self, adapter: OntologyAdapter) -> str: """ Tries to find the head join node of the given ontology adapter in the head ontology. If the join node is not found, the method will raise an error. Args: adapter (OntologyAdapter): The ontology adapter of which to find the join node in the head ontology. """ head_join_node = None user_defined_head_join_node_label = adapter.get_head_join_node() head_join_node_label_in_bc_format = to_lower_sentence_case( user_defined_head_join_node_label.replace("_", " ") ) if self._head_ontology._switch_label_and_id: head_join_node = head_join_node_label_in_bc_format elif not self._head_ontology._switch_label_and_id: for node_id, data in self._head_ontology.get_nx_graph().nodes( data=True ): if ( "label" in data and data["label"] == head_join_node_label_in_bc_format ): head_join_node = node_id break if head_join_node not in self._head_ontology.get_nx_graph().nodes: head_ontology = self._head_ontology._rdf_to_nx( self._head_ontology.get_rdf_graph(), self._head_ontology._root_label, self._head_ontology._switch_label_and_id, rename_nodes=False, ) raise ValueError( f"Head join node '{head_join_node}' not found in head ontology. " f"The head ontology contains the following nodes: {head_ontology.nodes}." ) return head_join_node def _join_ontologies( self, adapter: OntologyAdapter, head_join_node ) -> None: """ Joins the ontologies by adding the tail ontology as a subgraph to the head ontology at the specified join nodes. Args: adapter (OntologyAdapter): The ontology adapter of the tail ontology to be added to the head ontology. """ if not self._nx_graph: self._nx_graph = self._head_ontology.get_nx_graph().copy() tail_join_node = adapter.get_root_node() tail_ontology = adapter.get_nx_graph() # subtree of tail ontology at join node tail_ontology_subtree = nx.dfs_tree( tail_ontology.reverse(), tail_join_node ).reverse() # transfer node attributes from tail ontology to subtree for node in tail_ontology_subtree.nodes: tail_ontology_subtree.nodes[node].update(tail_ontology.nodes[node]) # if merge_nodes is False, create parent of tail join node from head # join node if not adapter._merge_nodes: # add head join node from head ontology to tail ontology subtree # as parent of tail join node tail_ontology_subtree.add_node( head_join_node, **self._head_ontology.get_nx_graph().nodes[head_join_node], ) tail_ontology_subtree.add_edge(tail_join_node, head_join_node) # else rename tail join node to match head join node if necessary elif not tail_join_node == head_join_node: tail_ontology_subtree = nx.relabel_nodes( tail_ontology_subtree, {tail_join_node: head_join_node} ) # combine head ontology and tail subtree self._nx_graph = nx.compose(self._nx_graph, tail_ontology_subtree) def _extend_ontology(self) -> None: """ Adds the user extensions to the ontology. Tries to find the parent in the ontology, adds it if necessary, and adds the child and a directed edge from child to parent. Can handle multiple parents. """ if not self._nx_graph: self._nx_graph = self._head_ontology.get_nx_graph().copy() for key, value in self.mapping.extended_schema.items(): if not value.get("is_a"): if self._nx_graph.has_node(value.get("synonym_for")): continue if not self._nx_graph.has_node(key): raise ValueError( f"Node {key} not found in ontology, but also has no " "inheritance definition. Please check your schema for " "spelling errors, first letter not in lower case, use of underscores, a missing `is_a` definition (SubClassOf a root node), or missing labels in class or super-classes." ) continue parents = to_list(value.get("is_a")) child = key while parents: parent = parents.pop(0) if parent not in self._nx_graph.nodes: self._nx_graph.add_node(parent) self._nx_graph.nodes[parent][ "label" ] = sentencecase_to_pascalcase(parent) # mark parent as user extension self._nx_graph.nodes[parent]["user_extension"] = True self._extended_nodes.add(parent) if child not in self._nx_graph.nodes: self._nx_graph.add_node(child) self._nx_graph.nodes[child][ "label" ] = sentencecase_to_pascalcase(child) # mark child as user extension self._nx_graph.nodes[child]["user_extension"] = True self._extended_nodes.add(child) self._nx_graph.add_edge(child, parent) child = parent def _connect_biolink_classes(self) -> None: """ Experimental: Adds edges from disjoint classes to the entity node. """ if not self._nx_graph: self._nx_graph = self._head_ontology.get_nx_graph().copy() if "entity" not in self._nx_graph.nodes: return # biolink classes that are disjoint from entity disjoint_classes = [ "frequency qualifier mixin", "chemical entity to entity association mixin", "ontology class", "relationship quantifier", "physical essence or occurrent", "gene or gene product", "subject of investigation", ] for node in disjoint_classes: if not self._nx_graph.nodes.get(node): self._nx_graph.add_node(node) self._nx_graph.nodes[node][ "label" ] = sentencecase_to_pascalcase(node) self._nx_graph.add_edge(node, "entity") def _add_properties(self) -> None: """ For each entity in the mapping, update the ontology with the properties specified in the mapping. Updates synonym information in the graph, setting the synonym as the primary node label. """ for key, value in self.mapping.extended_schema.items(): if key in self._nx_graph.nodes: self._nx_graph.nodes[key].update(value) if value.get("synonym_for"): # change node label to synonym if value["synonym_for"] not in self._nx_graph.nodes: raise ValueError( f'Node {value["synonym_for"]} not found in ontology.' ) self._nx_graph = nx.relabel_nodes( self._nx_graph, {value["synonym_for"]: key} ) def get_ancestors(self, node_label: str) -> list: """ Get the ancestors of a node in the ontology. Args: node_label (str): The label of the node in the ontology. Returns: list: A list of the ancestors of the node. """ return nx.dfs_tree(self._nx_graph, node_label) def show_ontology_structure(self, to_disk: str = None, full: bool = False): """ Show the ontology structure using treelib or write to GRAPHML file. Args: to_disk (str): If specified, the ontology structure will be saved to disk as a GRAPHML file at the location (directory) specified by the `to_disk` string, to be opened in your favourite graph visualisation tool. full (bool): If True, the full ontology structure will be shown, including all nodes and edges. If False, only the nodes and edges that are relevant to the extended schema will be shown. """ if not full and not self.mapping.extended_schema: raise ValueError( "You are attempting to visualise a subset of the loaded" "ontology, but have not provided a schema configuration. " "To display a partial ontology graph, please provide a schema " "configuration file; to visualise the full graph, please use " "the parameter `full = True`." ) if not self._nx_graph: raise ValueError("Ontology not loaded.") if not self._tail_ontologies: msg = f"Showing ontology structure based on {self._head_ontology._ontology_file}" else: msg = f"Showing ontology structure based on {len(self._tail_ontology_meta)+1} ontologies: " logger.info(msg) if not full: # set of leaves and their intermediate parents up to the root filter_nodes = set(self.mapping.extended_schema.keys()) for node in self.mapping.extended_schema.keys(): filter_nodes.update(self.get_ancestors(node).nodes) # filter graph G = self._nx_graph.subgraph(filter_nodes) else: G = self._nx_graph if not to_disk: # create tree tree = create_tree_visualisation(G) # add synonym information for node in self.mapping.extended_schema: if not isinstance(self.mapping.extended_schema[node], dict): continue if self.mapping.extended_schema[node].get("synonym_for"): tree.nodes[node].tag = ( f"{node} = " f"{self.mapping.extended_schema[node].get('synonym_for')}" ) logger.info(f"\n{tree}") return tree else: # convert lists/dicts to strings for vis only for node in G.nodes: # rename node and use former id as label label = G.nodes[node].get("label") if not label: label = node G = nx.relabel_nodes(G, {node: label}) G.nodes[label]["label"] = node for attrib in G.nodes[label]: if type(G.nodes[label][attrib]) in [list, dict]: G.nodes[label][attrib] = str(G.nodes[label][attrib]) path = os.path.join(to_disk, "ontology_structure.graphml") logger.info(f"Writing ontology structure to {path}.") nx.write_graphml(G, path) return True def get_dict(self) -> dict: """ Returns a dictionary compatible with a BioCypher node for compatibility with the Neo4j driver. """ d = { "node_id": self._get_current_id(), "node_label": "BioCypher", "properties": { "schema": "self.ontology_mapping.extended_schema", }, } return d def _get_current_id(self): """ Instantiate a version ID for the current session. For now does simple versioning using datetime. Can later implement incremental versioning, versioning from config file, or manual specification via argument. """ now = datetime.now() return now.strftime("v%Y%m%d-%H%M%S")