Source code for troppo.methods.reconstruction.fastcore

import numpy as np
from scipy.sparse import lil_matrix, hstack, eye, vstack
from cobamp.core.linear_systems import GenericLinearSystem, VAR_CONTINUOUS, SteadyStateLinearSystem
from cobamp.core.optimization import LinearSystemOptimizer
from troppo.methods.base import ContextSpecificModelReconstructionAlgorithm, PropertiesReconstruction


# TODO make a generic LPProblem, then update with the necessary information for the LP7 and LP9 - this should improve
#  the performance of the algorithm

# TODO reformulate LP7/9 into a uniform structure and change bounds/objective functions

[docs]class FastcoreProperties(PropertiesReconstruction): """ Properties for FASTcore algorithm. Parameters ---------- core: list List of reactions that are considered core, as determined by the integrated scores flux_threshold: float Flux threshold for the algorithm solver: str Solver to be used """ def __init__(self, core: list, flux_threshold: float = 1e-4, solver: str = None): new_mandatory = {'core': lambda x: isinstance(x, list) and len(x) > 0, 'core_idx': lambda x: isinstance(x, list) and len(x) > 0, 'solver': lambda x: isinstance(x, str)} new_optional = {} super().__init__() # self.base_mandatory['method'] = MethodsReconstruction.FASTCORE self.add_new_properties(new_mandatory, new_optional) self['flux_threshold'] = flux_threshold self['core'] = list(core) # TODO change this later, this is only for testing self['core_idx'] = list(core) self['solver'] = solver
[docs] @staticmethod def from_integrated_scores(scores: list, **kwargs) -> 'FastcoreProperties': """ Creates a FastcoreProperties object from integrated scores. Parameters ---------- scores: list Integrated scores kwargs: dict Additional arguments Returns ------- FastcoreProperties FastcoreProperties object """ # TODO change later, idk why core_idx is here return FastcoreProperties(core=scores, **dict({k: v for k, v in kwargs.items() if k not in ['core', 'core_idx']}))
[docs]class FASTcore(ContextSpecificModelReconstructionAlgorithm): """ FASTcore algorithm for context-specific model reconstruction. Parameters ---------- S: numpy.ndarray Stoichiometric matrix lb: numpy.ndarray Lower bounds ub: numpy.ndarray Upper bounds properties: FastcoreProperties Properties for the algorithm """ properties_class = FastcoreProperties def __init__(self, S: np.ndarray, lb: np.ndarray, ub: np.ndarray, properties: FastcoreProperties): super().__init__(S, lb, ub, properties) self.LP3problem = None self.S = np.array(S) self.model_lb, self.model_ub = np.array(lb), np.array(ub) self.lb, self.ub = np.array(lb), np.array(ub) self.properties = properties self.n_metabolites, self.n_reactions = self.S.shape self.metabolites, self.reactions = np.array([i for i in range(self.n_metabolites)]), np.array( [i for i in range(self.n_reactions)]) self.counter = 0 self.LPproblem, self.lsoLP3, self.lsoLP7, self.lsoLP9 = None, None, None, None
[docs] def reverse_irreversible_reactions_in_reverse_direction(self, irrev_reverse_idx: np.ndarray): """ Identifies the irreversible reactions in the reverse direction and returns the S matrix with the signals for the metabolites reversed, a vector with the upper bounds as reversed form the lower bounds and a vector with the lower bounds as the reverse of the upper ones. Parameters ---------- irrev_reverse_idx: numpy.ndarray Indices of the irreversible reactions in the reverse direction Returns ------- S: numpy.ndarray Modified stoichiometric matrix ub: numpy.ndarray Modified upper bounds lb: numpy.ndarray Modified lower bounds """ if irrev_reverse_idx.size > 0: # if they exist self.S[:, irrev_reverse_idx] = -self.S[:, irrev_reverse_idx] temp = self.ub[irrev_reverse_idx] self.ub[irrev_reverse_idx] = -self.lb[irrev_reverse_idx] self.lb[irrev_reverse_idx] = -temp return
[docs] def generate_LP3_problem(self): """ Generates the LP3 problem. """ self.LP3problem = SteadyStateLinearSystem(self.S, self.lb, self.ub, ['V' + str(i) for i in range(self.S.shape[1])], solver=self.properties['solver']) self.lsoLP3 = LinearSystemOptimizer(self.LP3problem)
[docs] def LP3(self, J: np.ndarray, basis=None) -> dict: """ LP3 problem. Parameters ---------- J: numpy.ndarray Indices of the reactions to be considered basis: None Not used Returns ------- dict Dictionary with the solution of the LP3 problem """ if basis is None: pass # objective function f = np.zeros(self.n_reactions) f[J] = -1 self.LP3problem.set_objective(f, minimize=True) self.LP3problem.model.configuration.presolve = False solution = self.lsoLP3.optimize() return {i: k[1] for i, k in enumerate(solution.var_values().items()) if i <= self.n_reactions - 1}
[docs] def LP7(self, J, basis=None): """ LP7 problem. Parameters ---------- J: numpy.ndarray Indices of the reactions to be considered basis: None Not used Returns ------- dict Dictionary with the solution of the LP3 problem """ # TODO implement basis (from CPLEX) to improve the speed of the algorithm print('LP7') if basis is None: pass nJ = J.size # number of irreversible reactions # m,n and m2,n2 are the same since the matrix S does not change throughout the algorithm, so we'll be using # self.n_metabolites and self.n_reactions # objective function f = -np.concatenate([np.zeros(self.n_reactions), np.ones(nJ)]) # equalities Aeq = hstack([self.S, lil_matrix((self.n_metabolites, nJ))]) beq = np.zeros(self.n_metabolites) # inequalities Ij = lil_matrix((nJ, self.n_reactions)) # nJ x n_reactions Ij[tuple([list(range(nJ)), J])] = -1 # Ij(sub2ind(size(Ij),(1:nj)',J(:))) = -1; from the original code Aineq = hstack([Ij, eye(nJ)]) bineq = np.zeros((nJ,)) # bounds lb = np.concatenate([self.lb, -np.inf * np.ones((nJ,))]) ub = np.concatenate([self.ub, np.ones((nJ,)) * self.properties['flux_threshold']]) A = vstack([Aeq, Aineq]) b_lb, b_ub = np.concatenate([beq, [None] * nJ]), np.concatenate([beq, bineq]) # b_lb, b_ub = np.concatenate([beq, -np.inf * np.ones((nJ, ))]), np.concatenate([beq, bineq]) problem = GenericLinearSystem(A.toarray(), VAR_CONTINUOUS, lb, ub, b_lb, b_ub, ['V' + str(i) for i in range(A.shape[1])], solver=self.properties['solver']) lso = LinearSystemOptimizer(problem) problem.set_objective(f, True) problem.model.configuration.presolve = False # problem.model.configuration.lpmethod = 'dual' solution = lso.optimize() solution.x().sum() print(solution.objective_value()) if solution.status() != 'optimal': print('Warning, Solution is not optimal') if solution: return {i: k[1] for i, k in enumerate(solution.var_values().items()) if i <= self.n_reactions - 1} else: return [np.nan] * self.n_reactions
[docs] def LP9(self, K, P): """ LP9 problem. Parameters ---------- K: numpy.ndarray Indices of the reactions to be considered P: numpy.ndarray Indices of the reactions to be considered Returns ------- dict Dictionary with the solution of the LP3 problem """ print('LP9') scalingFactor = 1e5 # V = [] if K.size == 0 or P.size == 0: return np.array([]) nP = P.size nK = K.size # objective f = np.concatenate([np.zeros(self.n_reactions), np.ones(nP)]) # equalities Aeq = hstack([self.S, lil_matrix((self.n_metabolites, nP))]) beq = np.zeros(self.n_metabolites) # inequalities Ip = lil_matrix((nP, self.n_reactions)) Ip[tuple([list(range(nP)), P])] = 1 Ik = lil_matrix((nK, self.n_reactions)) Ik[tuple([list(range(nK)), K])] = 1 Aineq = vstack([hstack([Ip, -eye(nP)]), hstack([-Ip, -eye(nP)]), hstack([-Ik, lil_matrix((nK, nP))])]) bineq = np.concatenate( [np.zeros((2 * nP,)), -np.ones((nK,)) * self.properties['flux_threshold'] * scalingFactor]) # bounds lb = np.concatenate([self.lb, np.zeros((nP,))]) * scalingFactor ub = np.concatenate([self.ub, np.array( list(map(max, zip(np.abs(self.model_lb[P]), np.abs(self.model_ub[P])))))]) * scalingFactor A = vstack([Aeq, Aineq]) b_lb, b_ub = np.concatenate([beq, [None] * (2 * nP + nK)]), np.concatenate([beq, bineq]) problem = GenericLinearSystem(A.toarray(), VAR_CONTINUOUS, lb, ub, b_lb, b_ub, ['V' + str(i) for i in range(A.shape[1])], solver=self.properties['solver']) lso = LinearSystemOptimizer(problem) problem.set_objective(f, True) solution = lso.optimize() print(solution.objective_value()) if solution.status() != 'optimal': print('Warning, Solution is not optimal') if solution: return {i: k[1] for i, k in enumerate(solution.var_values().items()) if i <= self.n_reactions - 1} else: return [np.nan] * self.n_reactions
[docs] def findSparseMode(self, J: np.ndarray or tuple, P: np.ndarray, singleton: bool, basis=None) -> np.ndarray: """ Find sparse mode. Parameters ---------- J: numpy.ndarray or tuple Indices of the reactions to be considered P: numpy.ndarray Indices of the reactions to be considered singleton: bool If True, the algorithm will consider only one reaction at a time basis List of basis vectors Returns ------- numpy.ndarray """ if J.size == 0: return np.array([], dtype=int) # supp = [] if basis is None: basis = [] print('before LP7') if singleton: V = self.LP7(J[0], basis) else: # V, basis = self.LP7(J, basis) V = self.LP7(J, basis) K = np.array([rJ for rJ in J if V[rJ] >= 0.99 * self.properties['flux_threshold']]) print('done LP7') if K.size > 0: V = self.LP9(K, P) Supp = np.array([i for i, k in V.items() if (np.abs(k) >= 0.99 * self.properties['flux_threshold']) and i <= self.n_reactions - 1]) print('done LP9') return Supp else: return np.array([])
[docs] def preprocessing(self) -> (np.ndarray, np.ndarray, np.ndarray, np.ndarray): """ Preprocessing of the model. Returns ------- np.ndarray, np.ndarray, np.ndarray, np.ndarray """ irreversible_reactions_idx_to_change = np.where(self.model_ub <= 0)[0] self.reverse_irreversible_reactions_in_reverse_direction(irreversible_reactions_idx_to_change) irreversible_reactions_idx = np.where(self.model_lb >= 0)[0] singleton = False J = np.intersect1d(self.properties['core_idx'], irreversible_reactions_idx) # irreversible reactions in core print('J size' + str(len(J))) print(J) P = np.setdiff1d(self.reactions, self.properties['core_idx']) # non-core reactions supp = self.findSparseMode(J, P, singleton) if np.setdiff1d(J, supp).size > 0: raise Exception('Inconsistent irreversible core reactions \n\tImpossible to build model') return np.setdiff1d(self.properties['core_idx'], supp), supp, P, irreversible_reactions_idx
[docs] def fastcore(self) -> list: """ Fastcore algorithm. Returns ------- list List with the reaction IDs of the reactions in the resulting model """ flipped = False singleton = False J, A, P, irreversible_reactions_idx = self.preprocessing() print(J.size, A.size) while J.size > 0: P = np.setdiff1d(P, A) supp = self.findSparseMode(J, P, singleton) # print(A, supp) A = np.union1d(A, supp).astype(int) print(J.size, A.size) if np.intersect1d(J, A) != np.array([]): J = np.setdiff1d(J, A) flipped = False else: if singleton: JiRev = np.setdiff1d(J[0], irreversible_reactions_idx) else: JiRev = np.setdiff1d(J, irreversible_reactions_idx) if flipped or JiRev == np.array([]): if singleton: print('Error: Global network is not consistent') print(J) return sorted(np.union1d(A, J)) else: flipped = False singleton = True else: self.reverse_irreversible_reactions_in_reverse_direction(JiRev) flipped = True print('Flipped') print(J.size, A.size) return sorted(A)
[docs] def run(self): """ Run Fastcore algorithm. Returns ------- list List with the reaction IDs of the reactions in the resulting model """ return self.fastcore()