import numpy as np
from itertools import chain
from numpy.core._multiarray_umath import ndarray
from cobamp.core.linear_systems import GenericLinearSystem, VAR_CONTINUOUS, VAR_BINARY
from cobamp.core.optimization import LinearSystemOptimizer
from troppo.methods.base import ContextSpecificModelReconstructionAlgorithm, PropertiesReconstruction
[docs]class IMATProperties(PropertiesReconstruction):
"""
Properties for IMAT algorithm
Parameters
----------
exp_vector : np.ndarray or list
The vector of expression values
exp_thresholds : tuple
The thresholds for the expression values
core : list, optional
The core reactions, by default None
tolerance : float, optional
The tolerance, by default 1e-8
epsilon : float, optional
The epsilon, by default 1
"""
def __init__(self, exp_vector: np.ndarray or list, exp_thresholds: tuple or list or ndarray,
core: ndarray or list or tuple = None, tolerance: float = 1e-8, epsilon: int or float = 1):
new_mandatory = {
'exp_vector': lambda x: isinstance(x, list) and len(x) > 0 or isinstance(x, ndarray),
'exp_thresholds': lambda x: type(x) in (tuple, list, ndarray) and type(x[0]) in [float, int] and type(
x[1]) in [float, int]
}
new_optional = {
'core': lambda x: type(x) in [ndarray, list, tuple],
'tolerance': float,
'epsilon': lambda x: type(x) in [int, float]
}
super().__init__()
self.add_new_properties(new_mandatory, new_optional)
self['exp_vector'] = exp_vector
self['exp_thresholds'] = exp_thresholds
if core:
self['core'] = core
if tolerance:
self['tolerance'] = tolerance
if epsilon:
self['epsilon'] = epsilon
[docs] @staticmethod
def from_integrated_scores(scores: list, **kwargs) -> 'IMATProperties':
"""
Create IMAT properties from integrated scores
Parameters
----------
scores: list
The list of integrated scores
kwargs: dict
Additional arguments
Returns
-------
IMATProperties: The IMAT properties
"""
return IMATProperties(exp_vector=scores, **kwargs)
[docs]class IMAT(ContextSpecificModelReconstructionAlgorithm):
"""
IMAT algorithm
Parameters
----------
S : ndarray
The stoichiometric matrix
lb : ndarray
The lower bounds
ub : ndarray
The upper bounds
properties : IMATProperties
The IMAT properties
Attributes
----------
sol : LinearSystemOptimizer
The solution
"""
properties_class = IMATProperties
[docs] @staticmethod
def empty_matrix(r, c):
return np.zeros((r, c))
def __init__(self, S: ndarray, lb: ndarray, ub: ndarray, properties: IMATProperties):
super().__init__(S, lb, ub, properties)
self.S = np.array(S)
self.lb, self.ub = np.array(lb), np.array(ub)
self.properties = properties
self.sol = None
[docs] def run_imat(self):
"""
Run the IMAT algorithm
Returns
-------
solution: Solution
Instance with the solution to the IMAT problem
"""
exp_vector = self.properties['exp_vector']
exp_lb, exp_ub = self.properties['exp_thresholds']
core = self.properties['core']
epsilon = self.properties['epsilon']
high_idx = (np.where(np.array(exp_vector) >= exp_ub)[0]).astype(int)
low_idx = (np.where((np.array(exp_vector) >= 0) & (np.array(exp_vector) < exp_lb))[0]).astype(int)
if core:
high_idx = np.union1d(high_idx, np.array(core))
lso, lsystem = self.generate_imat_problem(self.S, self.lb, self.ub, high_idx, low_idx, epsilon)
solution = lso.optimize()
return solution
[docs] def run(self):
"""
Run the algorithm and return a list of reactions to keep in the final model.
Returns
-------
list: The list of reactions to keep in the final model
"""
tol = self.properties['tolerance']
solution = self.run_imat()
to_keep = np.where(abs(solution.x())[:self.S.shape[1]] >= tol)[0]
self.sol = solution
if solution.status() != 'optimal':
print('Solution was not optimal')
return to_keep
[docs] def generate_imat_problem(self, S, lb, ub, high_idx, low_idx, epsilon):
"""
Generate the IMAT problem
Parameters
----------
S: ndarray
The stoichiometric matrix
lb: ndarray
The lower bounds
ub: ndarray
The upper bounds
high_idx: ndarray
The high index
low_idx: ndarray
The low index
epsilon: float or int
The epsilon value
Returns
-------
lso: LinearSystemOptimizer
The linear system optimizer
lsystem: GenericLinearSystem
The linear system
"""
m, n = S.shape
nh, nl = len(high_idx), len(low_idx)
h_ident, l_ident = self.empty_matrix(nh, n), self.empty_matrix(nl, n)
h_lb, h_ub, l_lb, l_ub = self.empty_matrix(nh, nh), self.empty_matrix(nh, nh), \
self.empty_matrix(nl, nl), self.empty_matrix(nl, nl)
h_diag, l_diag = np.diag_indices_from(h_lb), np.diag_indices_from(l_lb)
if nh > 0:
h_ident[(np.array(range(nh)), high_idx)] = 1
h_lb[h_diag] = lb[high_idx] - epsilon
h_ub[h_diag] = ub[high_idx] + epsilon
if nl > 0:
l_ident[(np.array(range(nl)), low_idx)] = 1
l_lb[l_diag] = lb[low_idx]
l_ub[l_diag] = ub[low_idx]
rows = [
[S, self.empty_matrix(m, nh + nh + nl)],
[h_ident, h_lb, self.empty_matrix(nh, nh + nl)],
[h_ident, self.empty_matrix(nh, nh), h_ub, self.empty_matrix(nh, nl)],
[l_ident, self.empty_matrix(nl, nh * 2), l_lb],
[l_ident, self.empty_matrix(nl, nh * 2), l_ub]
]
A = np.vstack(list(map(np.hstack, rows)))
b_lb = [0] * m + list(lb[high_idx]) + [None] * nh + list(lb[low_idx]) + [None] * nl
b_ub = [0] * m + [None] * nh + list(ub[high_idx]) + [None] * nl + list(ub[low_idx])
A_lb, A_ub = np.concatenate([lb, np.array([0] * (2 * nh + nl))]), np.concatenate(
[ub, np.array([1] * (2 * nh + nl))])
A_vt = [VAR_CONTINUOUS] * n + [VAR_BINARY] * (2 * nh + nl)
## TODO: Move this to optimization on cobamp
prefix_maker = lambda cd: list([cd[0] + str(i) for i in range(cd[1])])
A_names = list(chain(*list(map(prefix_maker, [('V', n), ('Hpos', nh), ('Hneg', nh), ('L', nl)]))))
lsystem = GenericLinearSystem(S=A, var_types=A_vt, lb=A_lb, ub=A_ub, b_lb=b_lb, b_ub=b_ub, var_names=A_names)
lso = LinearSystemOptimizer(lsystem)
A_f = np.zeros((A.shape[1]))
A_f[n:] = 1
lsystem.set_objective(A_f, False)
# DEBUG LINES
# print(high_idx, low_idx)
# lsystem.write_to_lp('imat.lp')
return lso, lsystem