medax_pipeline/biocypher/_translate.py
2025-04-16 22:12:19 +02:00

481 lines
16 KiB
Python

#!/usr/bin/env python
#
# Copyright 2021, Heidelberg University Clinic
#
# File author(s): Sebastian Lobentanzer
# ...
#
# Distributed under MIT licence, see the file `LICENSE`.
#
"""
BioCypher 'translation' module. Responsible for translating between the raw
input data and the BioCypherNode and BioCypherEdge objects.
"""
from ._logger import logger
logger.debug(f"Loading module {__name__}.")
from typing import Any, Union, Optional
from collections.abc import Iterable, Generator
from more_itertools import peekable
from . import _misc
from ._create import BioCypherEdge, BioCypherNode, BioCypherRelAsNode
from ._ontology import Ontology
__all__ = ["BiolinkAdapter", "Translator"]
class Translator:
"""
Class responsible for exacting the translation process that is configured in
the schema_config.yaml file. Creates a mapping dictionary from that file,
and, given nodes and edges, translates them into BioCypherNodes and
BioCypherEdges. During this process, can also filter the properties of the
entities if the schema_config.yaml file specifies a property whitelist or
blacklist.
Provides utility functions for translating between input and output labels
and cypher queries.
"""
def __init__(self, ontology: "Ontology", strict_mode: bool = False):
"""
Args:
leaves:
Dictionary detailing the leaves of the hierarchy
tree representing the structure of the graph; the leaves are
the entities that will be direct components of the graph,
while the intermediary nodes are additional labels for
filtering purposes.
strict_mode:
If True, the translator will raise an error if input data do not
carry source, licence, and version information.
"""
self.ontology = ontology
self.strict_mode = strict_mode
# record nodes without biolink type configured in schema_config.yaml
self.notype = {}
# mapping functionality for translating terms and queries
self.mappings = {}
self.reverse_mappings = {}
self._update_ontology_types()
def translate_nodes(
self,
node_tuples: Iterable,
) -> Generator[BioCypherNode, None, None]:
"""
Translates input node representation to a representation that
conforms to the schema of the given BioCypher graph. For now
requires explicit statement of node type on pass.
Args:
node_tuples (list of tuples): collection of tuples
representing individual nodes by their unique id and a type
that is translated from the original database notation to
the corresponding BioCypher notation.
"""
self._log_begin_translate(node_tuples, "nodes")
for _id, _type, _props in node_tuples:
# check for strict mode requirements
required_props = ["source", "licence", "version"]
if self.strict_mode:
# rename 'license' to 'licence' in _props
if _props.get("license"):
_props["licence"] = _props.pop("license")
for prop in required_props:
if prop not in _props:
raise ValueError(
f"Property `{prop}` missing from node {_id}. "
"Strict mode is enabled, so this is not allowed."
)
# find the node in leaves that represents ontology node type
_ontology_class = self._get_ontology_mapping(_type)
if _ontology_class:
# filter properties for those specified in schema_config if any
_filtered_props = self._filter_props(_ontology_class, _props)
# preferred id
_preferred_id = self._get_preferred_id(_ontology_class)
yield BioCypherNode(
node_id=_id,
node_label=_ontology_class,
preferred_id=_preferred_id,
properties=_filtered_props,
)
else:
self._record_no_type(_type, _id)
self._log_finish_translate("nodes")
def _get_preferred_id(self, _bl_type: str) -> str:
"""
Returns the preferred id for the given Biolink type.
"""
return (
self.ontology.mapping.extended_schema[_bl_type]["preferred_id"]
if "preferred_id"
in self.ontology.mapping.extended_schema.get(_bl_type, {})
else "id"
)
def _filter_props(self, bl_type: str, props: dict) -> dict:
"""
Filters properties for those specified in schema_config if any.
"""
filter_props = self.ontology.mapping.extended_schema[bl_type].get(
"properties", {}
)
# strict mode: add required properties (only if there is a whitelist)
if self.strict_mode and filter_props:
filter_props.update(
{"source": "str", "licence": "str", "version": "str"},
)
exclude_props = self.ontology.mapping.extended_schema[bl_type].get(
"exclude_properties", []
)
if isinstance(exclude_props, str):
exclude_props = [exclude_props]
if filter_props and exclude_props:
filtered_props = {
k: v
for k, v in props.items()
if (k in filter_props.keys() and k not in exclude_props)
}
elif filter_props:
filtered_props = {
k: v for k, v in props.items() if k in filter_props.keys()
}
elif exclude_props:
filtered_props = {
k: v for k, v in props.items() if k not in exclude_props
}
else:
return props
missing_props = [
k for k in filter_props.keys() if k not in filtered_props.keys()
]
# add missing properties with default values
for k in missing_props:
filtered_props[k] = None
return filtered_props
def translate_edges(
self,
edge_tuples: Iterable,
) -> Generator[Union[BioCypherEdge, BioCypherRelAsNode], None, None]:
"""
Translates input edge representation to a representation that
conforms to the schema of the given BioCypher graph. For now
requires explicit statement of edge type on pass.
Args:
edge_tuples (list of tuples):
collection of tuples representing source and target of
an interaction via their unique ids as well as the type
of interaction in the original database notation, which
is translated to BioCypher notation using the `leaves`.
Can optionally possess its own ID.
"""
self._log_begin_translate(edge_tuples, "edges")
# legacy: deal with 4-tuples (no edge id)
# TODO remove for performance reasons once safe
edge_tuples = peekable(edge_tuples)
if len(edge_tuples.peek()) == 4:
edge_tuples = [
(None, src, tar, typ, props)
for src, tar, typ, props in edge_tuples
]
for _id, _src, _tar, _type, _props in edge_tuples:
# check for strict mode requirements
if self.strict_mode:
if not "source" in _props:
raise ValueError(
f"Edge {_id if _id else (_src, _tar)} does not have a `source` property.",
" This is required in strict mode.",
)
if not "licence" in _props:
raise ValueError(
f"Edge {_id if _id else (_src, _tar)} does not have a `licence` property.",
" This is required in strict mode.",
)
# match the input label (_type) to
# a Biolink label from schema_config
bl_type = self._get_ontology_mapping(_type)
if bl_type:
# filter properties for those specified in schema_config if any
_filtered_props = self._filter_props(bl_type, _props)
rep = self.ontology.mapping.extended_schema[bl_type][
"represented_as"
]
if rep == "node":
if _id:
# if it brings its own ID, use it
node_id = _id
else:
# source target concat
node_id = (
str(_src)
+ "_"
+ str(_tar)
+ "_"
+ "_".join(str(v) for v in _filtered_props.values())
)
n = BioCypherNode(
node_id=node_id,
node_label=bl_type,
properties=_filtered_props,
)
# directionality check TODO generalise to account for
# different descriptions of directionality or find a
# more consistent solution for indicating directionality
if _filtered_props.get("directed") == True:
l1 = "IS_SOURCE_OF"
l2 = "IS_TARGET_OF"
elif _filtered_props.get(
"src_role",
) and _filtered_props.get("tar_role"):
l1 = _filtered_props.get("src_role")
l2 = _filtered_props.get("tar_role")
else:
l1 = l2 = "IS_PART_OF"
e_s = BioCypherEdge(
source_id=_src,
target_id=node_id,
relationship_label=l1,
# additional here
)
e_t = BioCypherEdge(
source_id=_tar,
target_id=node_id,
relationship_label=l2,
# additional here
)
yield BioCypherRelAsNode(n, e_s, e_t)
else:
edge_label = self.ontology.mapping.extended_schema[
bl_type
].get("label_as_edge")
if edge_label is None:
edge_label = bl_type
yield BioCypherEdge(
relationship_id=_id,
source_id=_src,
target_id=_tar,
relationship_label=edge_label,
properties=_filtered_props,
)
else:
self._record_no_type(_type, (_src, _tar))
self._log_finish_translate("edges")
def _record_no_type(self, _type: Any, what: Any) -> None:
"""
Records the type of a node or edge that is not represented in the
schema_config.
"""
logger.debug(f"No ontology type defined for `{_type}`: {what}")
if self.notype.get(_type, None):
self.notype[_type] += 1
else:
self.notype[_type] = 1
def get_missing_biolink_types(self) -> dict:
"""
Returns a dictionary of types that were not represented in the
schema_config.
"""
return self.notype
@staticmethod
def _log_begin_translate(_input: Iterable, what: str):
n = f"{len(_input)} " if hasattr(_input, "__len__") else ""
logger.debug(f"Translating {n}{what} to BioCypher")
@staticmethod
def _log_finish_translate(what: str):
logger.debug(f"Finished translating {what} to BioCypher.")
def _update_ontology_types(self):
"""
Creates a dictionary to translate from input labels to ontology labels.
If multiple input labels, creates mapping for each.
"""
self._ontology_mapping = {}
for key, value in self.ontology.mapping.extended_schema.items():
labels = value.get("input_label") or value.get("label_in_input")
if isinstance(labels, str):
self._ontology_mapping[labels] = key
elif isinstance(labels, list):
for label in labels:
self._ontology_mapping[label] = key
if value.get("label_as_edge"):
self._add_translation_mappings(labels, value["label_as_edge"])
else:
self._add_translation_mappings(labels, key)
def _get_ontology_mapping(self, label: str) -> Optional[str]:
"""
For each given input type ("input_label" or "label_in_input"), find the
corresponding ontology class in the leaves dictionary (from the
`schema_config.yam`).
Args:
label:
The input type to find (`input_label` or `label_in_input` in
`schema_config.yaml`).
"""
# commented out until behaviour of _update_bl_types is fixed
return self._ontology_mapping.get(label, None)
def translate_term(self, term):
"""
Translate a single term.
"""
return self.mappings.get(term, None)
def reverse_translate_term(self, term):
"""
Reverse translate a single term.
"""
return self.reverse_mappings.get(term, None)
def translate(self, query):
"""
Translate a cypher query. Only translates labels as of now.
"""
for key in self.mappings:
query = query.replace(":" + key, ":" + self.mappings[key])
return query
def reverse_translate(self, query):
"""
Reverse translate a cypher query. Only translates labels as of
now.
"""
for key in self.reverse_mappings:
a = ":" + key + ")"
b = ":" + key + "]"
# TODO this conditional probably does not cover all cases
if a in query or b in query:
if isinstance(self.reverse_mappings[key], list):
raise NotImplementedError(
"Reverse translation of multiple inputs not "
"implemented yet. Many-to-one mappings are "
"not reversible. "
f"({key} -> {self.reverse_mappings[key]})",
)
else:
query = query.replace(
a,
":" + self.reverse_mappings[key] + ")",
).replace(b, ":" + self.reverse_mappings[key] + "]")
return query
def _add_translation_mappings(self, original_name, biocypher_name):
"""
Add translation mappings for a label and name. We use here the
PascalCase version of the BioCypher name, since sentence case is
not useful for Cypher queries.
"""
if isinstance(original_name, list):
for on in original_name:
self.mappings[on] = self.name_sentence_to_pascal(
biocypher_name,
)
else:
self.mappings[original_name] = self.name_sentence_to_pascal(
biocypher_name,
)
if isinstance(biocypher_name, list):
for bn in biocypher_name:
self.reverse_mappings[
self.name_sentence_to_pascal(
bn,
)
] = original_name
else:
self.reverse_mappings[
self.name_sentence_to_pascal(
biocypher_name,
)
] = original_name
@staticmethod
def name_sentence_to_pascal(name: str) -> str:
"""
Converts a name in sentence case to pascal case.
"""
# split on dots if dot is present
if "." in name:
return ".".join(
[_misc.sentencecase_to_pascalcase(n) for n in name.split(".")],
)
else:
return _misc.sentencecase_to_pascalcase(name)