Source code for troppo.methods.reconstruction.swiftcore

from troppo.methods.base import PropertiesReconstruction, ContextSpecificModelReconstructionAlgorithm
import numpy as np
import scipy.linalg as linalg
import scipy.sparse as sprs
import scipy.sparse.linalg as sprslinalg

from cobamp.core.linear_systems import fix_backwards_irreversible_reactions, GenericLinearSystem, VAR_CONTINUOUS
from cobamp.core.optimization import LinearSystemOptimizer, Solution

EPS = 2 ^ -52


[docs]def find_core(S: np.ndarray, lb: np.ndarray, ub: np.ndarray, blocked: np.ndarray, weights: np.ndarray, solver: str) -> 'Solution': """ Find the core of the given model Parameters ---------- S: np.ndarray Stoichiometric matrix lb: np.ndarray Lower bounds ub: np.ndarray Upper bounds blocked: np.ndarray Blocked reactions weights: np.ndarray Weights solver: str The solver to use Returns ------- Solution : The solution of the optimization problem """ rev = (lb < 0) S = S.copy().astype(float) m, n = S.shape dense = np.zeros(n, ) if sum(blocked > 0) > 0: dense[blocked > 0] = np.random.normal(size=(sum(blocked > 0),)) non_null_weights = (abs(weights) > 2 ^ -52) k, l = non_null_weights & rev, non_null_weights & np.logical_not(rev) objective = np.concatenate([dense, weights[k], weights[l]]) nk, nl = sum(k), sum(l) t1, t2 = np.eye(n), np.eye(nk + nl) A = np.vstack([ np.hstack([S, np.zeros([m, nk + nl])]), np.hstack([t1[k, :], t2[rev[non_null_weights], :]]), np.hstack([-t1[non_null_weights, :], t2]) ]) b_lb = np.zeros(m + 2 * nk + nl, ) b_ub = b_lb.copy() b_ub[m:m + 2 * nk + nl] = 1e20 nlb = np.array([-1e20] * n) nlb[blocked > 0] = lb[blocked > 0] nlb[l] = 0 if sum(blocked > 0) < 1: nlb[l] = 1 fnlb = np.concatenate([nlb, np.array([-1e20] * (nk + nl))]) nub = np.array([1e20] * n) nub[blocked] = ub[blocked > 0] fnub = np.concatenate([nub, np.array([1e20] * (nk + nl))]) lsys = GenericLinearSystem(A, VAR_CONTINUOUS, fnlb, fnub, b_lb, b_ub, var_names=['x' + str(i) for i in range(A.shape[1])], solver=solver) lso = LinearSystemOptimizer(lsys) lsys.set_objective(coefficients=objective, minimize=False) return lso.optimize()
[docs]class SwiftcoreProperties(PropertiesReconstruction): def __init__(self, core: list, weights: list or np.ndarray, flux_threshold: int or float = 1e-4, solver: str = None): new_mandatory = {'core': lambda x: isinstance(x, list) and len(x) > 0, 'solver': lambda x: isinstance(x, str), 'weights': lambda x: isinstance(x, (list, np.ndarray)) and len(x) > 0} new_optional = {'flux_threshold': lambda x: isinstance(x, (int, float))} super().__init__() # self.base_mandatory['method'] = MethodsReconstruction.FASTCORE self.add_new_properties(new_mandatory, new_optional) self['flux_threshold'] = flux_threshold self['core'] = list(core) # TODO change this later, this is only for testing self['solver'] = solver self['weights'] = weights if weights is not None else np.array([])
[docs] @staticmethod def from_integrated_scores(scores, **kwargs): return SwiftcoreProperties(core=scores, **dict({k: v for k, v in kwargs.items() if k not in ['core']}))
[docs]class SWIFTCORE(ContextSpecificModelReconstructionAlgorithm): """ SWIFTCORE algorithm Parameters ---------- S: np.ndarray Stoichiometric matrix lb: np.ndarray Lower bounds ub: np.ndarray Upper bounds properties: SwiftcoreProperties The properties of the algorithm """ properties_class = SwiftcoreProperties def __init__(self, S: np.ndarray, lb: np.ndarray, ub: np.ndarray, properties: SwiftcoreProperties): super().__init__(S, lb, ub, properties) self.S = np.array(S) self.model_lb, self.model_ub = np.array(lb), np.array(ub) lbnorm, ubnorm = linalg.norm(self.model_lb, np.inf), linalg.norm(self.model_ub, np.inf) self.model_lb, self.model_ub = self.model_lb / lbnorm, self.model_ub / ubnorm self.properties = properties self.n_metabolites, self.n_reactions = self.S.shape self.Sf, self.lbf, self.ubf, fwd_irrev, bwd_irrev = \ fix_backwards_irreversible_reactions(self.S, self.model_lb, self.model_ub)
[docs] def run(self): """ Run the algorithm Returns ------- np.ndarray : The core of the model """ core = np.array(self.properties['core']) tol = self.properties['flux_threshold'] weights = np.array(self.properties['weights']) blocked = np.zeros(len(weights), ).astype(bool) weights[core] = 0 flux_sol = find_core(self.Sf, self.lbf, self.ubf, blocked, weights, self.properties['solver']) flux = flux_sol.x()[:self.Sf.shape[1]] weights[abs(flux) > tol] = 0 if len(weights) == self.Sf.shape[1]: blocked = np.isin(np.arange(self.Sf.shape[1]), core) blocked[abs(flux) > tol] = False else: _, D, V = sprs.linalg.svds(sprs.csc_matrix(self.Sf[:, weights == 0]), 10, which='SM') wv = linalg.norm(V[:, np.diag(D) < linalg.norm(self.Sf[:, weights == 0], 'fro') * EPS], np.inf, 1) blocked[weights == 0] = wv while sum(blocked > 0) > 0: n_blocked = sum(blocked) flux_sol = find_core(self.Sf, self.lbf, self.ubf, blocked, weights, self.properties['solver']) flux = flux_sol.x()[:self.Sf.shape[1]] weights[abs(flux) > tol] = 0 blocked[abs(flux) > tol] = 0 if 2 * sum(blocked) > n_blocked: weights = weights / 2 return np.where(weights == 0)[0]