#!/usr/bin/env python # # Copyright 2021, Heidelberg University Clinic # # File author(s): Sebastian Lobentanzer # ... # # Distributed under MIT licence, see the file `LICENSE`. # """ BioCypher 'translation' module. Responsible for translating between the raw input data and the BioCypherNode and BioCypherEdge objects. """ from ._logger import logger logger.debug(f"Loading module {__name__}.") from typing import Any, Union, Optional from collections.abc import Iterable, Generator from more_itertools import peekable from . import _misc from ._create import BioCypherEdge, BioCypherNode, BioCypherRelAsNode from ._ontology import Ontology __all__ = ["BiolinkAdapter", "Translator"] class Translator: """ Class responsible for exacting the translation process that is configured in the schema_config.yaml file. Creates a mapping dictionary from that file, and, given nodes and edges, translates them into BioCypherNodes and BioCypherEdges. During this process, can also filter the properties of the entities if the schema_config.yaml file specifies a property whitelist or blacklist. Provides utility functions for translating between input and output labels and cypher queries. """ def __init__(self, ontology: "Ontology", strict_mode: bool = False): """ Args: leaves: Dictionary detailing the leaves of the hierarchy tree representing the structure of the graph; the leaves are the entities that will be direct components of the graph, while the intermediary nodes are additional labels for filtering purposes. strict_mode: If True, the translator will raise an error if input data do not carry source, licence, and version information. """ self.ontology = ontology self.strict_mode = strict_mode # record nodes without biolink type configured in schema_config.yaml self.notype = {} # mapping functionality for translating terms and queries self.mappings = {} self.reverse_mappings = {} self._update_ontology_types() def translate_nodes( self, node_tuples: Iterable, ) -> Generator[BioCypherNode, None, None]: """ Translates input node representation to a representation that conforms to the schema of the given BioCypher graph. For now requires explicit statement of node type on pass. Args: node_tuples (list of tuples): collection of tuples representing individual nodes by their unique id and a type that is translated from the original database notation to the corresponding BioCypher notation. """ self._log_begin_translate(node_tuples, "nodes") for _id, _type, _props in node_tuples: # check for strict mode requirements required_props = ["source", "licence", "version"] if self.strict_mode: # rename 'license' to 'licence' in _props if _props.get("license"): _props["licence"] = _props.pop("license") for prop in required_props: if prop not in _props: raise ValueError( f"Property `{prop}` missing from node {_id}. " "Strict mode is enabled, so this is not allowed." ) # find the node in leaves that represents ontology node type _ontology_class = self._get_ontology_mapping(_type) if _ontology_class: # filter properties for those specified in schema_config if any _filtered_props = self._filter_props(_ontology_class, _props) # preferred id _preferred_id = self._get_preferred_id(_ontology_class) yield BioCypherNode( node_id=_id, node_label=_ontology_class, preferred_id=_preferred_id, properties=_filtered_props, ) else: self._record_no_type(_type, _id) self._log_finish_translate("nodes") def _get_preferred_id(self, _bl_type: str) -> str: """ Returns the preferred id for the given Biolink type. """ return ( self.ontology.mapping.extended_schema[_bl_type]["preferred_id"] if "preferred_id" in self.ontology.mapping.extended_schema.get(_bl_type, {}) else "id" ) def _filter_props(self, bl_type: str, props: dict) -> dict: """ Filters properties for those specified in schema_config if any. """ filter_props = self.ontology.mapping.extended_schema[bl_type].get( "properties", {} ) # strict mode: add required properties (only if there is a whitelist) if self.strict_mode and filter_props: filter_props.update( {"source": "str", "licence": "str", "version": "str"}, ) exclude_props = self.ontology.mapping.extended_schema[bl_type].get( "exclude_properties", [] ) if isinstance(exclude_props, str): exclude_props = [exclude_props] if filter_props and exclude_props: filtered_props = { k: v for k, v in props.items() if (k in filter_props.keys() and k not in exclude_props) } elif filter_props: filtered_props = { k: v for k, v in props.items() if k in filter_props.keys() } elif exclude_props: filtered_props = { k: v for k, v in props.items() if k not in exclude_props } else: return props missing_props = [ k for k in filter_props.keys() if k not in filtered_props.keys() ] # add missing properties with default values for k in missing_props: filtered_props[k] = None return filtered_props def translate_edges( self, edge_tuples: Iterable, ) -> Generator[Union[BioCypherEdge, BioCypherRelAsNode], None, None]: """ Translates input edge representation to a representation that conforms to the schema of the given BioCypher graph. For now requires explicit statement of edge type on pass. Args: edge_tuples (list of tuples): collection of tuples representing source and target of an interaction via their unique ids as well as the type of interaction in the original database notation, which is translated to BioCypher notation using the `leaves`. Can optionally possess its own ID. """ self._log_begin_translate(edge_tuples, "edges") # legacy: deal with 4-tuples (no edge id) # TODO remove for performance reasons once safe edge_tuples = peekable(edge_tuples) if len(edge_tuples.peek()) == 4: edge_tuples = [ (None, src, tar, typ, props) for src, tar, typ, props in edge_tuples ] for _id, _src, _tar, _type, _props in edge_tuples: # check for strict mode requirements if self.strict_mode: if not "source" in _props: raise ValueError( f"Edge {_id if _id else (_src, _tar)} does not have a `source` property.", " This is required in strict mode.", ) if not "licence" in _props: raise ValueError( f"Edge {_id if _id else (_src, _tar)} does not have a `licence` property.", " This is required in strict mode.", ) # match the input label (_type) to # a Biolink label from schema_config bl_type = self._get_ontology_mapping(_type) if bl_type: # filter properties for those specified in schema_config if any _filtered_props = self._filter_props(bl_type, _props) rep = self.ontology.mapping.extended_schema[bl_type][ "represented_as" ] if rep == "node": if _id: # if it brings its own ID, use it node_id = _id else: # source target concat node_id = ( str(_src) + "_" + str(_tar) + "_" + "_".join(str(v) for v in _filtered_props.values()) ) n = BioCypherNode( node_id=node_id, node_label=bl_type, properties=_filtered_props, ) # directionality check TODO generalise to account for # different descriptions of directionality or find a # more consistent solution for indicating directionality if _filtered_props.get("directed") == True: l1 = "IS_SOURCE_OF" l2 = "IS_TARGET_OF" elif _filtered_props.get( "src_role", ) and _filtered_props.get("tar_role"): l1 = _filtered_props.get("src_role") l2 = _filtered_props.get("tar_role") else: l1 = l2 = "IS_PART_OF" e_s = BioCypherEdge( source_id=_src, target_id=node_id, relationship_label=l1, # additional here ) e_t = BioCypherEdge( source_id=_tar, target_id=node_id, relationship_label=l2, # additional here ) yield BioCypherRelAsNode(n, e_s, e_t) else: edge_label = self.ontology.mapping.extended_schema[ bl_type ].get("label_as_edge") if edge_label is None: edge_label = bl_type yield BioCypherEdge( relationship_id=_id, source_id=_src, target_id=_tar, relationship_label=edge_label, properties=_filtered_props, ) else: self._record_no_type(_type, (_src, _tar)) self._log_finish_translate("edges") def _record_no_type(self, _type: Any, what: Any) -> None: """ Records the type of a node or edge that is not represented in the schema_config. """ logger.debug(f"No ontology type defined for `{_type}`: {what}") if self.notype.get(_type, None): self.notype[_type] += 1 else: self.notype[_type] = 1 def get_missing_biolink_types(self) -> dict: """ Returns a dictionary of types that were not represented in the schema_config. """ return self.notype @staticmethod def _log_begin_translate(_input: Iterable, what: str): n = f"{len(_input)} " if hasattr(_input, "__len__") else "" logger.debug(f"Translating {n}{what} to BioCypher") @staticmethod def _log_finish_translate(what: str): logger.debug(f"Finished translating {what} to BioCypher.") def _update_ontology_types(self): """ Creates a dictionary to translate from input labels to ontology labels. If multiple input labels, creates mapping for each. """ self._ontology_mapping = {} for key, value in self.ontology.mapping.extended_schema.items(): labels = value.get("input_label") or value.get("label_in_input") if isinstance(labels, str): self._ontology_mapping[labels] = key elif isinstance(labels, list): for label in labels: self._ontology_mapping[label] = key if value.get("label_as_edge"): self._add_translation_mappings(labels, value["label_as_edge"]) else: self._add_translation_mappings(labels, key) def _get_ontology_mapping(self, label: str) -> Optional[str]: """ For each given input type ("input_label" or "label_in_input"), find the corresponding ontology class in the leaves dictionary (from the `schema_config.yam`). Args: label: The input type to find (`input_label` or `label_in_input` in `schema_config.yaml`). """ # commented out until behaviour of _update_bl_types is fixed return self._ontology_mapping.get(label, None) def translate_term(self, term): """ Translate a single term. """ return self.mappings.get(term, None) def reverse_translate_term(self, term): """ Reverse translate a single term. """ return self.reverse_mappings.get(term, None) def translate(self, query): """ Translate a cypher query. Only translates labels as of now. """ for key in self.mappings: query = query.replace(":" + key, ":" + self.mappings[key]) return query def reverse_translate(self, query): """ Reverse translate a cypher query. Only translates labels as of now. """ for key in self.reverse_mappings: a = ":" + key + ")" b = ":" + key + "]" # TODO this conditional probably does not cover all cases if a in query or b in query: if isinstance(self.reverse_mappings[key], list): raise NotImplementedError( "Reverse translation of multiple inputs not " "implemented yet. Many-to-one mappings are " "not reversible. " f"({key} -> {self.reverse_mappings[key]})", ) else: query = query.replace( a, ":" + self.reverse_mappings[key] + ")", ).replace(b, ":" + self.reverse_mappings[key] + "]") return query def _add_translation_mappings(self, original_name, biocypher_name): """ Add translation mappings for a label and name. We use here the PascalCase version of the BioCypher name, since sentence case is not useful for Cypher queries. """ if isinstance(original_name, list): for on in original_name: self.mappings[on] = self.name_sentence_to_pascal( biocypher_name, ) else: self.mappings[original_name] = self.name_sentence_to_pascal( biocypher_name, ) if isinstance(biocypher_name, list): for bn in biocypher_name: self.reverse_mappings[ self.name_sentence_to_pascal( bn, ) ] = original_name else: self.reverse_mappings[ self.name_sentence_to_pascal( biocypher_name, ) ] = original_name @staticmethod def name_sentence_to_pascal(name: str) -> str: """ Converts a name in sentence case to pascal case. """ # split on dots if dot is present if "." in name: return ".".join( [_misc.sentencecase_to_pascalcase(n) for n in name.split(".")], ) else: return _misc.sentencecase_to_pascalcase(name)