import copy
import re
from troppo.omics.id_converter import searchNomenclature, idConverter
import numpy as np
from typing import Union, Sequence
from numbers import Number
import warnings
import pandas as pd
from . import GenericReader
from . import HpaReader
from . import ProbeReader
[docs]class OmicsContainer:
"""
OmicsContainer class to be used for the creation of objects that store omics data and other useful information, such
as its type, and the tissue condition from where this data was obtained.
To successfully create an OmicsContainer object one must:
a) create an OmicsContainer object providing: a) its omictype b) the tissue/patient condition
b) Use its .load() method providing a previously created reader object (HpaReader, ProbeReader, GenericReader)
Once created this object can be transformed in several ways:
a) Id conversion
b) Value conversion
c) Filtering by id, regular expressions, or values threshold
d) Log transformation, or data normalization
Main attribute is .data() which is a dictionary containing : {gene_id: Expression Value}
Attributes
----------
otype: str
The type of omics data stored in the container
condition: str
The condition from where the data was obtained
data: dict
The data stored in the container
nomenclature: str
The nomenclature used for the gene ids
"""
def __init__(self, omicstype: str = None, condition: str = None, data: dict = None, nomenclature: str = None):
self.otype = omicstype
self.condition = condition
self.nomenclature = nomenclature
if data is None:
self.data = {}
else:
self.load(data)
[docs] def load(self, arg: dict or HpaReader or ProbeReader or GenericReader, **kwargs):
"""
Loads data into the OmicsContainer object. Data can be loaded from a dictionary or from a reader object.
Parameters
----------
arg: dict or reader object
The data to be loaded into the OmicsContainer object
kwargs: dict
The keyword arguments to be passed to the reader object
"""
if isinstance(arg, dict):
self.data = arg
else:
self.data = arg.load(**kwargs)
if self.nomenclature is None:
self.nomenclature = searchNomenclature(list(self.data.keys()))
[docs] def convertValues(self, mapping: dict):
"""
Converts the values in the exp_val field to different values based on a valid user supplied mapping.
IMPORTANT: Will not work if _values contains NAs
Mapping shall be a dictionary of either:
- old value (may it be string or numeric): new value (may it be string or numeric)
- tuple of (lower bound, upper bound) of old value: new value (numeric, string)
Parameters
----------
mapping: dict
a dictionary containing the mapping between the values to be converted and the desired values
"""
if self._isNumeric():
# range to numeric/text
if type(list(mapping.keys())[0]) is tuple:
if self._mapIsValid('r2n', mapping):
new_values = {}
for k, v in self.data.items():
for tup in mapping.keys():
if float(tup[0]) <= v <= float(tup[1]):
new_values[k] = mapping[tup]
self.data = new_values
else:
raise Exception('Supplied mapping is not valid for the intended conversion')
# numeric to numeric/discrete
elif type(list(mapping.keys())[0]) in (int, float):
# n2n
if type(list(mapping.values())[0]) in (int, float):
if self._mapIsValid('n2n', mapping):
new_values = {k: float(mapping[v]) for k, v in self.data.items()}
self.data = new_values
else:
raise Exception('Supplied mapping is not valid for the intended conversion')
# n2d
elif type(list(mapping.values())[0]) is str:
if self._mapIsValid('n2d', mapping):
new_values = {k: mapping[v] for k, v in self.data.items()}
self.data = new_values
else:
raise Exception('Supplied mapping is not valid for the intended conversion')
# avoid cases where the mapping does not follow {oldval:newval}
else:
raise Exception('Supplied mapping is not valid. Please supply a valid mapping with {oldval:newval}')
# discrete to integer/float
else:
if self._mapIsValid('d2n', mapping):
new_map = copy.deepcopy(mapping) # avoid a not intended effect where this would change the user mapping
new_map[np.NaN] = np.NaN
new_values = {k: float(new_map[v]) for k, v in self.data.items()}
self.data = new_values
else:
raise Exception('Supplied mapping is not valid for the intended conversion')
print('Value conversion is complete!')
[docs] def convertIds(self, new: str):
"""
Redefines the ids(keys) on the data attribute.
Parameters
----------
new:string
designation of the new id according to hgnc
"""
new_data = {}
if self.nomenclature is None:
print('No valid nomenclature was found. Please reload your data carefully')
return
if idConverter(self.data.keys(), self.nomenclature, new) is None:
return
else:
for old, new in idConverter(self.data.keys(), self.nomenclature, new).items():
new_data[new] = self.data[old]
lost = len(self.data) - len(new_data.keys())
print(
'ID conversion is complete! {0} entries were lost due to inexistent match in the HGNC platform'.format(
lost))
self.nomenclature = new
self.set_data(new_data)
[docs] def dropNA(self): # irreversible once done
"""
Removes every entry whose exp_val is NA
"""
for k, v in self.data.items():
if np.isnan(v):
del self.data[k]
[docs] def filterByValue(self, op: str, threshold: Union[int, float, tuple, str]) -> 'OmicsContainer':
"""
Filters the _values attribute to match a user defined filter
above and under use < and > operators, while between uses <= and >=.
Parameters
----------
op: string
one of (above, under, between, oneof)
threshold: int, float, tuple, string
numeric threshold for above and under, tuple of (lowerbound, upperbound) for between, string
for included discrete levels for levels operation
Returns
-------
OmicsContainer:
a new OmicsContainer object is returned once this filter is applied. Original instance remains unchanged.
"""
new_values = copy.deepcopy(self.data)
if self._isNumeric():
if op.lower() == 'above':
new_values = {k: v for k, v in new_values.items() if v > threshold}
elif op.lower() == 'under':
new_values = {k: v for k, v in new_values.items() if v < threshold}
elif op.lower() == 'between':
try:
new_values = {k: v for k, v in new_values.items() if threshold[0] <= v <= threshold[1]}
except IndexError:
print('Threshold for between operation must be a tuple of two elements')
else:
print('Please input a valid operation for numeric filtering: \'above\', \'under\' or \'between\'')
else:
if op.lower() == 'levels':
new_values = {k: v for k, v in new_values.items() if v in threshold}
else:
print('Discrete filtering only supports \'levels\' operation')
return self.__createNew(new_values)
[docs] def filterById(self, regex: str) -> 'OmicsContainer':
"""
Filters the data attribute to contain genes that match a regular expression or string supplied by the user
Parameters
----------
regex: string
regular expression or string to be contained in the Gene Symbol field of the data attr.
Returns
-------
OmicsContainer:
a new OmicsContainer object is returned once this filter is applied. Original instance remains unchanged.
"""
new_values = copy.deepcopy(self.data)
try:
exp = re.compile(regex, re.IGNORECASE)
new_values = {k: v for k, v in new_values.items() if exp.search(k) is not None}
except TypeError:
print('Regex must be a string')
return self.__createNew(new_values)
def _isNumeric(self):
return type(list(self.data.values())[0]) in (int, float)
def _mapIsValid(self, task: str, mapping: dict) -> bool:
"""
Checks if a supllied mapping is valid, namely if all fields are present (case-sensitive), and if all values
are numerical
Parameters
----------
task: string
one of (n2n, n2d, d2n, r2n)
mapping: dict
the mapping to be validated
Returns
-------
bool:
True if the mapping is valid, False otherwise
"""
unique = set(self.data.values())
if task in ['n2n', 'd2n']:
return set(mapping.keys()) == unique and len([x for x in mapping.values() if
type(x) in [int, float]]) == len(unique)
elif task == 'n2d':
return set(mapping.keys()) == unique and len([x for x in mapping.values() if
type(x) is str]) == len(unique)
elif task == 'r2n':
if len([x for x in mapping.values() if type(x) in (float, int, str)]) == len(mapping.values()):
for v in unique:
covered = False
for tup in mapping.keys():
if float(tup[0]) <= v <= float(tup[1]):
covered = True
if not covered:
return False
return True
else:
return False
def __createNew(self, new_values: dict) -> 'OmicsContainer':
"""
Creates a new OmicsContainer object with the same attributes as the original one, but with a new data attribute
Parameters
----------
new_values: dict
the new data attribute to be used in the new OmicsContainer object
Returns
-------
OmicsContainer:
a new OmicsContainer object is returned once this filter is applied. Original instance remains unchanged.
"""
newOC = OmicsContainer(omicstype=self.get_OmicsType(), condition=self.get_Condition())
newOC.set_data(new_values)
return newOC
# setters
[docs] def set_type(self, newType: str):
self.otype = newType
[docs] def set_condition(self, newCond: str):
self.condition = newCond
[docs] def set_data(self, newData: dict):
self.data = newData
# getters
def __len__(self):
return len(self.data.items())
[docs] def get_OmicsType(self):
return self.otype
[docs] def get_Condition(self):
return self.condition
[docs] def get_Data(self):
return self.data
[docs] def get_Nomenclature(self):
return self.nomenclature
[docs] def get_integrated_data_map(self, model_reader: HpaReader or ProbeReader or GenericReader,
and_func=min, or_func=max):
"""
Function responsible for the integration of different omics data with a metabolic model loaded with framed package.
Matches model ids for gene_ids, metabolites or reaction ids with those present in the omicsContainer object.
Parameters
----------
model_reader: HpaReader or ProbeReader or GenericReader
a cobamp AbstractModelObjectReader object
and_func:
the mathematical function to replace the "AND" operator present in the Gene-Protein-Rules
or_func:
the mathematical function to replace the "OR" operator present in the Gene-Protein-Rules
Returns
-------
OmicsDataMap:
an OmicsDataMap object which contains the mapping between reactions/metabolites and its fluxes
based on the supplied omics data.
"""
def g2rIntegrate():
"""
Handles integration of both proteomics and transcriptomics expression data relying on framed's gene2reaction
Returns
-------
OmicsDataMap:
an OmicsDataMap object which contains the mapping between reactions/metabolites and its fluxes
"""
# suffixAndPrefix()
d = model_reader.get_reaction_scores(self.get_Data(), or_fx=or_func, and_fx=and_func)
return aux_createMap(d, 'ReactionDataMap')
def aux_createMap(mMap, mapType):
m = OmicsDataMap(mMap, mapType)
return m
# execution commands
omicsType = self.otype.lower()
if omicsType.lower() in ['proteomics', 'transcriptomics']:
return g2rIntegrate()
else:
raise Exception('Omics data type not yet supported')
[docs] def print_values(self):
print('Gene Symbol >>> Exp Value')
for k, v in self.data.items():
print("{0} >>> {1}".format(k, v))
def __str__(self):
return str('{0} container\n'
'Condition: {1}\n'
'Nomenclature: {2}\n'
'{3} Expression Values'.format(self.otype, self.condition, self.nomenclature, len(self.data.keys())))
[docs]class OmicsDataMap:
"""
Stores integrated omics data, matching a given metabolic model
Attributes
----------
_mapType: str
The type of map stored in the object
_scores: dict
The scores stored in the object
"""
def __init__(self, scores, mapType):
self._mapType = mapType
self._scores = scores
# getters
def __len__(self):
return len(self._scores.items())
def __str__(self):
return self.get_scores()
[docs] def mapType(self):
return self._mapType
[docs] def get_scores(self):
return self._scores
[docs] def select(self, op: str, threshold: Number) -> set or None:
"""
Filtering the original reaction scores to be under or above a threshold. Above or under operations use the
>= and <= operators
Parameters
----------
op: str
either "above" or "under" determining which scores shall be chosen
threshold: Number
either a float or an integer whether under or above all scores shall be chosen
Returns
-------
set:
a set of reaction ids whose scores are above or under the threshold
"""
if type(threshold) not in [int, float]:
print('Select threshold must be numeric!')
return
if op.lower() == 'above':
res = {x: y for x, y in self._scores.items() if y is not None and y >= threshold}
elif op.lower() == 'under':
res = {x: y for x, y in self._scores.items() if y is not None and y <= threshold}
else:
print('Select operation must be either \'above\' or \'under\'')
return
# self.set_scores(res)
return set(res.keys())
# setters
[docs] def set_scores(self, newScores: dict):
"""
Sets the scores attribute to a new dictionary
Parameters
----------
newScores: dict
the new scores to be set
"""
self._scores = newScores
lofl_array = Union[Sequence[Sequence[Number]], np.ndarray]
[docs]def has_valid_dims(rows: Sequence, cols: Sequence, data: lofl_array):
"""
Checks if the data has the same dimensions as the rows and columns
Parameters
----------
rows: Sequence
The rows of the data
cols: Sequence
The columns of the data
data: lofl_array
The data to be checked
Returns
-------
bool, bool:
True if the data has the same dimensions as the rows and columns, False otherwise
"""
shapes = tuple(map(len, (rows, cols)))
return (data.shape == shapes), (data.shape[::-1] == shapes)
[docs]class TabularContainer(object):
"""
TabularContainer class to be used for the creation of objects that store tabular data and other useful information,
such as its row and column labels. This class is meant to be used as a base class for other classes that store
tabular data.
Parameters
----------
row_labels: Sequence[Union[str, int]]
The row labels of the data
column_labels: Sequence[str]
The column labels of the data
values: lofl_array
The values of the data
Attributes
----------
data: pd.DataFrame
The data stored in the container
"""
def __init__(self, row_labels: Sequence[Union[str, int]], column_labels: Sequence[str], values: lofl_array):
if not isinstance(values, np.ndarray):
values = np.array(values)
valid_norm, valid_txp = has_valid_dims(row_labels, column_labels, values)
if not valid_norm:
if valid_txp:
row_labels, column_labels = column_labels, row_labels
warnings.warn('Values have been transposed since the original labels did not match dimensions.')
else:
raise IndexError('row_labels or column_labels do not match the value dimensions.')
self.__data = pd.DataFrame(data=values, index=row_labels, columns=column_labels)
@property
def data(self):
return self.__data
@data.setter
def data(self, value: lofl_array):
if not isinstance(value, pd.DataFrame):
try:
value = pd.DataFrame(value)
except:
raise TypeError('data must be set as a pandas DataFrame or list of lists')
# assert value.shape == self.data.shape
self.__data = value
def __getitem__(self, item: Union[str, int]):
return self.data[item]
@property
def column_names(self):
return self.data.columns
@column_names.setter
def column_names(self, value: Union[dict, Sequence]):
if isinstance(value, dict):
self.__data = self.data.rename(columns=value)
elif isinstance(value, (pd.Series, list, tuple)):
self.__data.columns = value
else:
raise TypeError('value must be a dict, Series, list or tuple')
@property
def row_names(self):
return self.data.index
@row_names.setter
def row_names(self, value: Union[dict, Sequence]):
if isinstance(value, dict):
self.__data = self.data.rename(index=value)
elif isinstance(value, (pd.Series, list, tuple)):
self.__data.columns = value
else:
raise TypeError('value must be a dict, Series, list or tuple')
[docs] def drop(self, rows: Sequence = None, columns: Sequence = None):
"""
Drops the given rows and columns from the data attribute
Parameters
----------
rows: Sequence
The rows to be dropped
columns: Sequence
The columns to be dropped
"""
self.data = self.data.drop(columns=columns, index=rows)
[docs]class IdentifierMapping(object):
def __init__(self, type_name: str, id_mapping_table: pd.DataFrame):
self.__name = type_name
self.__id_map = id_mapping_table
@property
def name(self):
return self.__name
[docs] def get_id_table(self, ids: Sequence[Union[str, int]], from_id):
return pd.merge(pd.Series(ids, name=from_id), self.__id_map, how='left', on=from_id)
[docs] def map_ids(self, ids: Sequence[Union[str, int]], from_id: Union[str, int], to_id: Union[str, int]):
id_table = self.get_id_table(ids, from_id)
return id_table[to_id]
[docs]class OmicsMeasurementSet(TabularContainer):
def __init__(self, sample_labels: Sequence[Union[str, int]], feature_labels: Sequence[str], values: lofl_array):
super().__init__(sample_labels, feature_labels, values)
[docs] def to_omics_container(self, sample_id):
return OmicsContainer(None, condition=sample_id, data=self.data.loc[sample_id, :].to_dict())
[docs]class TypedOmicsMeasurementSet(OmicsMeasurementSet):
def __init__(self, sample_labels: Sequence[Union[str, int]], feature_labels: Sequence[str], values: lofl_array,
omics_type: IdentifierMapping):
super().__init__(sample_labels, feature_labels, values)
self.omics_type = omics_type
@property
def omics_type(self) -> IdentifierMapping:
return self.__omics_type
@omics_type.setter
def omics_type(self, value: IdentifierMapping):
self.__omics_type = value
[docs] def convert_feature_ids(self, from_id, to_id):
new_ids = self.omics_type.map_ids(self.data.columns.to_list(), from_id, to_id)
self.column_names = new_ids
self.data = self.data.loc[:, ~self.data.columns.isna()]
[docs] def to_omics_container(self, sample_id):
return OmicsContainer(omicstype=self.omics_type.name, condition=sample_id,
data=self.data.loc[sample_id, :].to_dict(), nomenclature='custom')
if __name__ == '__main__':
id_map = pd.DataFrame([
['one', 'uno', 'eins', 'un', 'um'],
['two', 'dos', 'zwei', 'deux', 'dois'],
['three', 'tres', 'drei', 'trois', 'tres']
], columns=['en', 'es', 'de', 'fr', 'pt'])
samples = ['sampleA', 'sampleB']
features = ['eins', 'zwei', 'drei', 'vier']
vals = [
[0, 5, 7, 8],
[7, 6, 2, 9]]
id_map_obj = IdentifierMapping('numbers', id_map)
exp_set = OmicsMeasurementSet(samples, features, vals)
typed_exp_set = TypedOmicsMeasurementSet(samples, features, vals, id_map_obj)
typed_exp_set.convert_feature_ids('de', 'fr')
oc = typed_exp_set.to_omics_container('sampleA')
oc.get_Condition()