2025-04-16 22:12:19 +02:00

887 lines
31 KiB
Python

#!/usr/bin/env python
#
# Copyright 2021, Heidelberg University Clinic
#
# File author(s): Sebastian Lobentanzer
# ...
#
# Distributed under MIT licence, see the file `LICENSE`.
#
"""
BioCypher 'ontology' module. Contains classes and functions to handle parsing
and representation of single ontologies as well as their hybridisation and
other advanced operations.
"""
import os
from ._logger import logger
logger.debug(f"Loading module {__name__}.")
from typing import Optional
from datetime import datetime
from rdflib import Graph
from rdflib.extras.external_graph_libs import rdflib_to_networkx_digraph
import rdflib
import networkx as nx
from ._misc import (
to_list,
to_lower_sentence_case,
create_tree_visualisation,
sentencecase_to_pascalcase,
)
from ._mapping import OntologyMapping
class OntologyAdapter:
"""
Class that represents an ontology to be used in the Biocypher framework. Can
read from a variety of formats, including OWL, OBO, and RDF/XML. The
ontology is represented by a networkx.DiGraph object; an RDFlib graph is
also kept. By default, the DiGraph reverses the label and identifier of the
nodes, such that the node name in the graph is the human-readable label. The
edges are oriented from child to parent.
Labels are formatted in lower sentence case and underscores are replaced by spaces.
Identifiers are taken as defined and the prefixes are removed by default.
"""
def __init__(
self,
ontology_file: str,
root_label: str,
ontology_file_format: Optional[str] = None,
head_join_node_label: Optional[str] = None,
merge_nodes: Optional[bool] = True,
switch_label_and_id: bool = True,
remove_prefixes: bool = True,
):
"""
Initialize the OntologyAdapter class.
Args:
ontology_file (str): Path to the ontology file. Can be local or
remote.
root_label (str): The label of the root node in the ontology. In
case of a tail ontology, this is the tail join node.
ontology_file_format (str): The format of the ontology file (e.g. "application/rdf+xml")
If format is not passed, it is determined automatically.
head_join_node_label (str): Optional variable to store the label of the
node in the head ontology that should be used to join to the
root node of the tail ontology. Defaults to None.
merge_nodes (bool): If True, head and tail join nodes will be
merged, using the label of the head join node. If False, the
tail join node will be attached as a child of the head join
node.
switch_label_and_id (bool): If True, the node names in the graph will be
the human-readable labels. If False, the node names will be the
identifiers. Defaults to True.
remove_prefixes (bool): If True, the prefixes of the identifiers will
be removed. Defaults to True.
"""
logger.info(f"Instantiating OntologyAdapter class for {ontology_file}.")
self._ontology_file = ontology_file
self._root_label = root_label
self._format = ontology_file_format
self._merge_nodes = merge_nodes
self._head_join_node = head_join_node_label
self._switch_label_and_id = switch_label_and_id
self._remove_prefixes = remove_prefixes
self._rdf_graph = self._load_rdf_graph(ontology_file)
self._nx_graph = self._rdf_to_nx(
self._rdf_graph, root_label, switch_label_and_id
)
def _rdf_to_nx(
self,
_rdf_graph: rdflib.Graph,
root_label: str,
switch_label_and_id: bool,
rename_nodes: bool = True,
) -> nx.DiGraph:
one_to_one_triples, one_to_many_dict = self._get_relevant_rdf_triples(
_rdf_graph
)
nx_graph = self._convert_to_nx(one_to_one_triples, one_to_many_dict)
nx_graph = self._add_labels_to_nodes(nx_graph, switch_label_and_id)
nx_graph = self._change_nodes_to_biocypher_format(
nx_graph, switch_label_and_id, rename_nodes
)
nx_graph = self._get_all_ancestors(
nx_graph, root_label, switch_label_and_id, rename_nodes
)
return nx.DiGraph(nx_graph)
def _get_relevant_rdf_triples(self, g: rdflib.Graph) -> tuple:
one_to_one_inheritance_graph = self._get_one_to_one_inheritance_triples(
g
)
intersection = self._get_multiple_inheritance_dict(g)
return one_to_one_inheritance_graph, intersection
def _get_one_to_one_inheritance_triples(
self, g: rdflib.Graph
) -> rdflib.Graph:
"""Get the one to one inheritance triples from the RDF graph.
Args:
g (rdflib.Graph): The RDF graph
Returns:
rdflib.Graph: The one to one inheritance graph
"""
one_to_one_inheritance_graph = Graph()
for s, p, o in g.triples((None, rdflib.RDFS.subClassOf, None)):
if self.has_label(s, g):
one_to_one_inheritance_graph.add((s, p, o))
return one_to_one_inheritance_graph
def _get_multiple_inheritance_dict(self, g: rdflib.Graph) -> dict:
"""Get the multiple inheritance dictionary from the RDF graph.
Args:
g (rdflib.Graph): The RDF graph
Returns:
dict: The multiple inheritance dictionary
"""
multiple_inheritance = g.triples(
(None, rdflib.OWL.intersectionOf, None)
)
intersection = {}
for (
node,
has_multiple_parents,
first_node_of_intersection_list,
) in multiple_inheritance:
parents = self._retrieve_rdf_linked_list(
first_node_of_intersection_list
)
child_name = None
for s_, _, _ in g.triples((None, rdflib.RDFS.subClassOf, node)):
child_name = s_
# Handle Snomed CT post coordinated expressions
if not child_name:
for s_, _, _ in g.triples(
(None, rdflib.OWL.equivalentClass, node)
):
child_name = s_
if child_name:
intersection[node] = {
"child_name": child_name,
"parent_node_names": parents,
}
return intersection
def has_label(self, node: rdflib.URIRef, g: rdflib.Graph) -> bool:
"""Does the node have a label in g?
Args:
node (rdflib.URIRef): The node to check
g (rdflib.Graph): The graph to check in
Returns:
bool: True if the node has a label, False otherwise
"""
return (node, rdflib.RDFS.label, None) in g
def _retrieve_rdf_linked_list(self, subject: rdflib.URIRef) -> list:
"""Recursively retrieves a linked list from RDF.
Example RDF list with the items [item1, item2]:
list_node - first -> item1
list_node - rest -> list_node2
list_node2 - first -> item2
list_node2 - rest -> nil
Args:
subject (rdflib.URIRef): One list_node of the RDF list
Returns:
list: The items of the RDF list
"""
g = self._rdf_graph
rdf_list = []
for s, p, o in g.triples((subject, rdflib.RDF.first, None)):
rdf_list.append(o)
for s, p, o in g.triples((subject, rdflib.RDF.rest, None)):
if o != rdflib.RDF.nil:
rdf_list.extend(self._retrieve_rdf_linked_list(o))
return rdf_list
def _convert_to_nx(
self, one_to_one: rdflib.Graph, one_to_many: dict
) -> nx.DiGraph:
"""Convert the one to one and one to many inheritance graphs to networkx.
Args:
one_to_one (rdflib.Graph): The one to one inheritance graph
one_to_many (dict): The one to many inheritance dictionary
Returns:
nx.DiGraph: The networkx graph
"""
nx_graph = rdflib_to_networkx_digraph(
one_to_one, edge_attrs=lambda s, p, o: {}, calc_weights=False
)
for key, value in one_to_many.items():
nx_graph.add_edges_from(
[
(value["child_name"], parent)
for parent in value["parent_node_names"]
]
)
if key in nx_graph.nodes:
nx_graph.remove_node(key)
return nx_graph
def _add_labels_to_nodes(
self, nx_graph: nx.DiGraph, switch_label_and_id: bool
) -> nx.DiGraph:
"""Add labels to the nodes in the networkx graph.
Args:
nx_graph (nx.DiGraph): The networkx graph
switch_label_and_id (bool): If True, id and label are switched
Returns:
nx.DiGraph: The networkx graph with labels
"""
for node in list(nx_graph.nodes):
nx_id, nx_label = self._get_nx_id_and_label(
node, switch_label_and_id
)
if nx_id == "none":
# remove node if it has no id
nx_graph.remove_node(node)
continue
nx_graph.nodes[node]["label"] = nx_label
return nx_graph
def _change_nodes_to_biocypher_format(
self,
nx_graph: nx.DiGraph,
switch_label_and_id: bool,
rename_nodes: bool = True,
) -> nx.DiGraph:
"""Change the nodes in the networkx graph to BioCypher format:
- remove the prefix of the identifier
- switch id and label
- adapt the labels (replace _ with space and convert to lower sentence case)
Args:
nx_graph (nx.DiGraph): The networkx graph
switch_label_and_id (bool): If True, id and label are switched
rename_nodes (bool): If True, the nodes are renamed
Returns:
nx.DiGraph: The networkx ontology graph in BioCypher format
"""
mapping = {
node: self._get_nx_id_and_label(
node, switch_label_and_id, rename_nodes
)[0]
for node in nx_graph.nodes
}
renamed = nx.relabel_nodes(nx_graph, mapping, copy=False)
return renamed
def _get_all_ancestors(
self,
renamed: nx.DiGraph,
root_label: str,
switch_label_and_id: bool,
rename_nodes: bool = True,
) -> nx.DiGraph:
"""Get all ancestors of the root node in the networkx graph.
Args:
renamed (nx.DiGraph): The renamed networkx graph
root_label (str): The label of the root node in the ontology
switch_label_and_id (bool): If True, id and label are switched
rename_nodes (bool): If True, the nodes are renamed
Returns:
nx.DiGraph: The filtered networkx graph
"""
root = self._get_nx_id_and_label(
self._find_root_label(self._rdf_graph, root_label),
switch_label_and_id,
rename_nodes,
)[0]
ancestors = nx.ancestors(renamed, root)
ancestors.add(root)
filtered_graph = renamed.subgraph(ancestors)
return filtered_graph
def _get_nx_id_and_label(
self, node, switch_id_and_label: bool, rename_nodes: bool = True
) -> tuple[str, str]:
"""Rename node id and label for nx graph.
Args:
node (str): The node to rename
switch_id_and_label (bool): If True, switch id and label
Returns:
tuple[str, str]: The renamed node id and label
"""
node_id_str = self._remove_prefix(str(node))
node_label_str = str(self._rdf_graph.value(node, rdflib.RDFS.label))
if rename_nodes:
node_label_str = node_label_str.replace("_", " ")
node_label_str = to_lower_sentence_case(node_label_str)
nx_id = node_label_str if switch_id_and_label else node_id_str
nx_label = node_id_str if switch_id_and_label else node_label_str
return nx_id, nx_label
def _find_root_label(self, g, root_label):
# Loop through all labels in the ontology
for label_subject, _, label_in_ontology in g.triples(
(None, rdflib.RDFS.label, None)
):
# If the label is the root label, set the root node to the label's subject
if str(label_in_ontology) == root_label:
root = label_subject
break
else:
labels_in_ontology = []
for label_subject, _, label_in_ontology in g.triples(
(None, rdflib.RDFS.label, None)
):
labels_in_ontology.append(str(label_in_ontology))
raise ValueError(
f"Could not find root node with label '{root_label}'. "
f"The ontology contains the following labels: {labels_in_ontology}"
)
return root
def _remove_prefix(self, uri: str) -> str:
"""
Remove the prefix of a URI. URIs can contain either "#" or "/" as a
separator between the prefix and the local name. The prefix is
everything before the last separator.
"""
if self._remove_prefixes:
return uri.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
else:
return uri
def _load_rdf_graph(self, ontology_file):
"""
Load the ontology into an RDFlib graph. The ontology file can be in
OWL, OBO, or RDF/XML format.
"""
g = rdflib.Graph()
g.parse(ontology_file, format=self._get_format(ontology_file))
return g
def _get_format(self, ontology_file):
"""
Get the format of the ontology file.
"""
if self._format:
if self._format == "owl":
return "application/rdf+xml"
elif self._format == "obo":
raise NotImplementedError("OBO format not yet supported")
elif self._format == "rdf":
return "application/rdf+xml"
elif self._format == "ttl":
return self._format
else:
raise ValueError(
f"Could not determine format of ontology file {ontology_file}"
)
if ontology_file.endswith(".owl"):
return "application/rdf+xml"
elif ontology_file.endswith(".obo"):
raise NotImplementedError("OBO format not yet supported")
elif ontology_file.endswith(".rdf"):
return "application/rdf+xml"
elif ontology_file.endswith(".ttl"):
return "ttl"
else:
raise ValueError(
f"Could not determine format of ontology file {ontology_file}"
)
def get_nx_graph(self):
"""
Get the networkx graph representing the ontology.
"""
return self._nx_graph
def get_rdf_graph(self):
"""
Get the RDFlib graph representing the ontology.
"""
return self._rdf_graph
def get_root_node(self):
"""
Get root node in the ontology.
Returns:
root_node: If _switch_label_and_id is True, the root node label is returned,
otherwise the root node id is returned.
"""
root_node = None
root_label = self._root_label.replace("_", " ")
if self._switch_label_and_id:
root_node = to_lower_sentence_case(root_label)
elif not self._switch_label_and_id:
for node, data in self.get_nx_graph().nodes(data=True):
if "label" in data and data["label"] == to_lower_sentence_case(
root_label
):
root_node = node
break
return root_node
def get_ancestors(self, node_label):
"""
Get the ancestors of a node in the ontology.
"""
return nx.dfs_preorder_nodes(self._nx_graph, node_label)
def get_head_join_node(self):
"""
Get the head join node of the ontology.
"""
return self._head_join_node
class Ontology:
"""
A class that represents the ontological "backbone" of a BioCypher knowledge
graph. The ontology can be built from a single resource, or hybridised from
a combination of resources, with one resource being the "head" ontology,
while an arbitrary number of other resources can become "tail" ontologies at
arbitrary fusion points inside the "head" ontology.
"""
def __init__(
self,
head_ontology: dict,
ontology_mapping: Optional["OntologyMapping"] = None,
tail_ontologies: Optional[dict] = None,
):
"""
Initialize the Ontology class.
Args:
head_ontology (OntologyAdapter): The head ontology.
tail_ontologies (list): A list of OntologyAdapters that will be
added to the head ontology. Defaults to None.
"""
self._head_ontology_meta = head_ontology
self.mapping = ontology_mapping
self._tail_ontology_meta = tail_ontologies
self._tail_ontologies = None
self._nx_graph = None
# keep track of nodes that have been extended
self._extended_nodes = set()
self._main()
def _main(self) -> None:
"""
Main method to be run on instantiation. Loads the ontologies, joins
them, and returns the hybrid ontology. Loads only the head ontology
if nothing else is given. Adds user extensions and properties from
the mapping.
"""
self._load_ontologies()
if self._tail_ontologies:
for adapter in self._tail_ontologies.values():
head_join_node = self._get_head_join_node(adapter)
self._join_ontologies(adapter, head_join_node)
else:
self._nx_graph = self._head_ontology.get_nx_graph()
if self.mapping:
self._extend_ontology()
# experimental: add connections of disjoint classes to entity
# self._connect_biolink_classes()
self._add_properties()
def _load_ontologies(self) -> None:
"""
For each ontology, load the OntologyAdapter object and store it as an
instance variable (head) or a dictionary (tail).
"""
logger.info("Loading ontologies...")
self._head_ontology = OntologyAdapter(
ontology_file=self._head_ontology_meta["url"],
root_label=self._head_ontology_meta["root_node"],
ontology_file_format=self._head_ontology_meta.get("format", None),
switch_label_and_id=self._head_ontology_meta.get(
"switch_label_and_id", True
),
)
if self._tail_ontology_meta:
self._tail_ontologies = {}
for key, value in self._tail_ontology_meta.items():
self._tail_ontologies[key] = OntologyAdapter(
ontology_file=value["url"],
root_label=value["tail_join_node"],
head_join_node_label=value["head_join_node"],
ontology_file_format=value.get("format", None),
merge_nodes=value.get("merge_nodes", True),
switch_label_and_id=value.get("switch_label_and_id", True),
)
def _get_head_join_node(self, adapter: OntologyAdapter) -> str:
"""
Tries to find the head join node of the given ontology adapter in the
head ontology. If the join node is not found, the method will raise an
error.
Args:
adapter (OntologyAdapter): The ontology adapter of which to find the
join node in the head ontology.
"""
head_join_node = None
user_defined_head_join_node_label = adapter.get_head_join_node()
head_join_node_label_in_bc_format = to_lower_sentence_case(
user_defined_head_join_node_label.replace("_", " ")
)
if self._head_ontology._switch_label_and_id:
head_join_node = head_join_node_label_in_bc_format
elif not self._head_ontology._switch_label_and_id:
for node_id, data in self._head_ontology.get_nx_graph().nodes(
data=True
):
if (
"label" in data
and data["label"] == head_join_node_label_in_bc_format
):
head_join_node = node_id
break
if head_join_node not in self._head_ontology.get_nx_graph().nodes:
head_ontology = self._head_ontology._rdf_to_nx(
self._head_ontology.get_rdf_graph(),
self._head_ontology._root_label,
self._head_ontology._switch_label_and_id,
rename_nodes=False,
)
raise ValueError(
f"Head join node '{head_join_node}' not found in head ontology. "
f"The head ontology contains the following nodes: {head_ontology.nodes}."
)
return head_join_node
def _join_ontologies(
self, adapter: OntologyAdapter, head_join_node
) -> None:
"""
Joins the ontologies by adding the tail ontology as a subgraph to the
head ontology at the specified join nodes.
Args:
adapter (OntologyAdapter): The ontology adapter of the tail ontology
to be added to the head ontology.
"""
if not self._nx_graph:
self._nx_graph = self._head_ontology.get_nx_graph().copy()
tail_join_node = adapter.get_root_node()
tail_ontology = adapter.get_nx_graph()
# subtree of tail ontology at join node
tail_ontology_subtree = nx.dfs_tree(
tail_ontology.reverse(), tail_join_node
).reverse()
# transfer node attributes from tail ontology to subtree
for node in tail_ontology_subtree.nodes:
tail_ontology_subtree.nodes[node].update(tail_ontology.nodes[node])
# if merge_nodes is False, create parent of tail join node from head
# join node
if not adapter._merge_nodes:
# add head join node from head ontology to tail ontology subtree
# as parent of tail join node
tail_ontology_subtree.add_node(
head_join_node,
**self._head_ontology.get_nx_graph().nodes[head_join_node],
)
tail_ontology_subtree.add_edge(tail_join_node, head_join_node)
# else rename tail join node to match head join node if necessary
elif not tail_join_node == head_join_node:
tail_ontology_subtree = nx.relabel_nodes(
tail_ontology_subtree, {tail_join_node: head_join_node}
)
# combine head ontology and tail subtree
self._nx_graph = nx.compose(self._nx_graph, tail_ontology_subtree)
def _extend_ontology(self) -> None:
"""
Adds the user extensions to the ontology. Tries to find the parent in
the ontology, adds it if necessary, and adds the child and a directed
edge from child to parent. Can handle multiple parents.
"""
if not self._nx_graph:
self._nx_graph = self._head_ontology.get_nx_graph().copy()
for key, value in self.mapping.extended_schema.items():
if not value.get("is_a"):
if self._nx_graph.has_node(value.get("synonym_for")):
continue
if not self._nx_graph.has_node(key):
raise ValueError(
f"Node {key} not found in ontology, but also has no "
"inheritance definition. Please check your schema for "
"spelling errors, first letter not in lower case, use of underscores, a missing `is_a` definition (SubClassOf a root node), or missing labels in class or super-classes."
)
continue
parents = to_list(value.get("is_a"))
child = key
while parents:
parent = parents.pop(0)
if parent not in self._nx_graph.nodes:
self._nx_graph.add_node(parent)
self._nx_graph.nodes[parent][
"label"
] = sentencecase_to_pascalcase(parent)
# mark parent as user extension
self._nx_graph.nodes[parent]["user_extension"] = True
self._extended_nodes.add(parent)
if child not in self._nx_graph.nodes:
self._nx_graph.add_node(child)
self._nx_graph.nodes[child][
"label"
] = sentencecase_to_pascalcase(child)
# mark child as user extension
self._nx_graph.nodes[child]["user_extension"] = True
self._extended_nodes.add(child)
self._nx_graph.add_edge(child, parent)
child = parent
def _connect_biolink_classes(self) -> None:
"""
Experimental: Adds edges from disjoint classes to the entity node.
"""
if not self._nx_graph:
self._nx_graph = self._head_ontology.get_nx_graph().copy()
if "entity" not in self._nx_graph.nodes:
return
# biolink classes that are disjoint from entity
disjoint_classes = [
"frequency qualifier mixin",
"chemical entity to entity association mixin",
"ontology class",
"relationship quantifier",
"physical essence or occurrent",
"gene or gene product",
"subject of investigation",
]
for node in disjoint_classes:
if not self._nx_graph.nodes.get(node):
self._nx_graph.add_node(node)
self._nx_graph.nodes[node][
"label"
] = sentencecase_to_pascalcase(node)
self._nx_graph.add_edge(node, "entity")
def _add_properties(self) -> None:
"""
For each entity in the mapping, update the ontology with the properties
specified in the mapping. Updates synonym information in the graph,
setting the synonym as the primary node label.
"""
for key, value in self.mapping.extended_schema.items():
if key in self._nx_graph.nodes:
self._nx_graph.nodes[key].update(value)
if value.get("synonym_for"):
# change node label to synonym
if value["synonym_for"] not in self._nx_graph.nodes:
raise ValueError(
f'Node {value["synonym_for"]} not found in ontology.'
)
self._nx_graph = nx.relabel_nodes(
self._nx_graph, {value["synonym_for"]: key}
)
def get_ancestors(self, node_label: str) -> list:
"""
Get the ancestors of a node in the ontology.
Args:
node_label (str): The label of the node in the ontology.
Returns:
list: A list of the ancestors of the node.
"""
return nx.dfs_tree(self._nx_graph, node_label)
def show_ontology_structure(self, to_disk: str = None, full: bool = False):
"""
Show the ontology structure using treelib or write to GRAPHML file.
Args:
to_disk (str): If specified, the ontology structure will be saved
to disk as a GRAPHML file at the location (directory) specified
by the `to_disk` string, to be opened in your favourite graph
visualisation tool.
full (bool): If True, the full ontology structure will be shown,
including all nodes and edges. If False, only the nodes and
edges that are relevant to the extended schema will be shown.
"""
if not full and not self.mapping.extended_schema:
raise ValueError(
"You are attempting to visualise a subset of the loaded"
"ontology, but have not provided a schema configuration. "
"To display a partial ontology graph, please provide a schema "
"configuration file; to visualise the full graph, please use "
"the parameter `full = True`."
)
if not self._nx_graph:
raise ValueError("Ontology not loaded.")
if not self._tail_ontologies:
msg = f"Showing ontology structure based on {self._head_ontology._ontology_file}"
else:
msg = f"Showing ontology structure based on {len(self._tail_ontology_meta)+1} ontologies: "
logger.info(msg)
if not full:
# set of leaves and their intermediate parents up to the root
filter_nodes = set(self.mapping.extended_schema.keys())
for node in self.mapping.extended_schema.keys():
filter_nodes.update(self.get_ancestors(node).nodes)
# filter graph
G = self._nx_graph.subgraph(filter_nodes)
else:
G = self._nx_graph
if not to_disk:
# create tree
tree = create_tree_visualisation(G)
# add synonym information
for node in self.mapping.extended_schema:
if not isinstance(self.mapping.extended_schema[node], dict):
continue
if self.mapping.extended_schema[node].get("synonym_for"):
tree.nodes[node].tag = (
f"{node} = "
f"{self.mapping.extended_schema[node].get('synonym_for')}"
)
logger.info(f"\n{tree}")
return tree
else:
# convert lists/dicts to strings for vis only
for node in G.nodes:
# rename node and use former id as label
label = G.nodes[node].get("label")
if not label:
label = node
G = nx.relabel_nodes(G, {node: label})
G.nodes[label]["label"] = node
for attrib in G.nodes[label]:
if type(G.nodes[label][attrib]) in [list, dict]:
G.nodes[label][attrib] = str(G.nodes[label][attrib])
path = os.path.join(to_disk, "ontology_structure.graphml")
logger.info(f"Writing ontology structure to {path}.")
nx.write_graphml(G, path)
return True
def get_dict(self) -> dict:
"""
Returns a dictionary compatible with a BioCypher node for compatibility
with the Neo4j driver.
"""
d = {
"node_id": self._get_current_id(),
"node_label": "BioCypher",
"properties": {
"schema": "self.ontology_mapping.extended_schema",
},
}
return d
def _get_current_id(self):
"""
Instantiate a version ID for the current session. For now does simple
versioning using datetime.
Can later implement incremental versioning, versioning from
config file, or manual specification via argument.
"""
now = datetime.now()
return now.strftime("v%Y%m%d-%H%M%S")