Source code for troppo.methods.reconstruction.tINIT


import scipy.sparse as sprs
import numpy as np
from numpy import int_
from itertools import chain

from cobamp.core.linear_systems import GenericLinearSystem, VAR_CONTINUOUS, VAR_BINARY
from cobamp.core.optimization import LinearSystemOptimizer

from cobamp.core.models import make_irreversible_model_raven
from troppo.methods.base import ContextSpecificModelReconstructionAlgorithm, PropertiesReconstruction
from troppo.utilities.list import is_list_else_empty, if_none_return_list


[docs]class tINITProperties(PropertiesReconstruction): """ Properties for the tINIT algorithm. Parameters ---------- reactions_scores : list The scores for each reaction. present_metabolites : list, default=[] The metabolites that are present in the model. essential_reactions : list, default=[] The reactions that are essential in the model. production_weight : float, default=0.5 The weight of the production reactions. allow_excretion : bool, default=False Whether to allow excretion. no_reverse_loops : bool, default=False Whether to allow reverse loops. solver : str, default=None The solver to be used. """ def __init__(self, reactions_scores: list, present_metabolites: list = None, essential_reactions: list = None, production_weight: float = 0.5, allow_excretion: bool = False, no_reverse_loops: bool = False, solver: str = None): if present_metabolites is None: present_metabolites = [] if essential_reactions is None: essential_reactions = [] # TODO: check later if the params input might be necessary to include for the troppo new_mandatory = { 'solver': lambda y: isinstance(y, str) } new_optional = { 'reactions_scores': lambda y: is_list_else_empty(y), 'present_metabolites': lambda y: is_list_else_empty(y), 'essential_reactions': lambda y: is_list_else_empty(y), 'production_weight': lambda y: isinstance(y, float), 'allow_excretion': lambda y: isinstance(y, bool), 'no_reverse_loops': lambda y: isinstance(y, bool), } super().__init__() self.add_new_properties(new_mandatory, new_optional) properties_dict_list = ["reactions_scores", "present_metabolites", "essential_reactions", "production_weight", "allow_excretion", "no_reverse_loops", "solver"] properties_list = [reactions_scores, if_none_return_list(present_metabolites), if_none_return_list(essential_reactions), if_none_return_list(production_weight), allow_excretion, no_reverse_loops, solver] prop_dict = dict(zip(properties_dict_list, properties_list)) [self.add_if_not_none(*k) for k in prop_dict.items()]
[docs] @staticmethod def from_integrated_scores(scores: list, **kwargs: dict) -> 'tINITProperties': """ Creates the properties object from integrated scores. Parameters ---------- scores: list The integrated scores for each reaction. kwargs: dict The other properties. Returns ------- tINITProperties The properties object. """ return tINITProperties(reactions_scores=scores, **kwargs)
[docs]class tINIT(ContextSpecificModelReconstructionAlgorithm): """ The tINIT algorithm. Parameters ---------- S : np.ndarray The stoichiometric matrix. lb : np.ndarray The lower bounds for the reactions. ub : np.ndarray The upper bounds for the reactions. properties : tINITProperties The properties for the algorithm. """ properties_class = tINITProperties def __init__(self, S: np.ndarray, lb: np.ndarray, ub: np.ndarray, properties: tINITProperties): super().__init__(S, lb, ub, properties) self.S = np.array(S) self.lb = np.array(lb) self.ub = np.array(ub) self.properties = properties self.n_metabolites, self.n_reactions = self.S.shape self.no_reverse_loops = None self.allow_excretion = None self.production_weight = None self.essential_reactions = None self.present_metabolites = None self.present_metabolites_unlisted = None self.metabolites_production = None self.reaction_scores = None self.bwd_idx = None self.fwd_idx = None self.rev_map = None self.irreversible_ub = None self.irreversible_lb = None self.essential_reactions_idx = None self.essential_reversible_reactions = None self.reversible_reactions = None self.problem_a = None self.problem_c = None self.problem_buc = None self.met_ub = None self.rev_ub = None self.problem_blc = None self.irreversible_b = None self.problem_bux = None self.problem_blx = None self.n_rev_bounds = None self.irreversible_S = None self.n_net_production = None self.net_production = None self.non_essential = None self.v_vector_reactions = None self.n_non_essential_reactions = None self.n_essential_reactions = None self.n_reactions_irrev = None self.n_metabolites_irrev = None
[docs] def preprocessing(self): """ Preprocessing of the algorithm. Returns ------- """ self.reaction_scores = np.array(self.properties['reactions_scores']) self.present_metabolites = np.array(self.properties['present_metabolites']) self.essential_reactions_idx = np.array(self.properties['essential_reactions']) self.essential_reactions = np.zeros(self.n_reactions).astype(bool) if self.essential_reactions_idx.size != 0: self.essential_reactions[self.essential_reactions_idx] = True self.production_weight = self.properties['production_weight'] self.allow_excretion = self.properties['allow_excretion'] self.no_reverse_loops = self.properties['no_reverse_loops'] if self.present_metabolites.size != np.unique( self.present_metabolites.size): # in theory, this should not even happen once, only if by mistake print('There are repeated metabolites in the list. We\' apply a unique method and will be used ') self.present_metabolites = np.unique(self.present_metabolites) self.present_metabolites_unlisted = np.array( list(chain(*map(lambda y: [y] if not isinstance(y, list) else y, self.present_metabolites)))) if self.present_metabolites_unlisted.size > 0: self.metabolites_production = np.ones(self.present_metabolites_unlisted.size) * -1 # self.metabolites_production[self.present_metabolites] = -1 else: self.metabolites_production = np.array([]) self.present_metabolites = np.array([]) self.present_metabolites_unlisted = np.array([]) # TODO: have a function to check if the model is unconstrained # TODO: change the reversible stuff later to the new to be implemented functions self.reversible_reactions = np.where(self.lb < 0)[0] # change this TODO: should this also have self.ub > 0? # self.reversible_reactions = np.where((np.logical_and(self.lb < 0, self.ub>0)))[0] self.essential_reversible_reactions = self.essential_reactions_idx[ np.isin(self.essential_reactions_idx, self.reversible_reactions)] self.essential_reactions_idx = np.setdiff1d(self.essential_reactions_idx, self.essential_reversible_reactions) # convert the model to irreversible self.irreversible_S, self.irreversible_lb, self.irreversible_ub, self.rev_map = make_irreversible_model_raven( self.S, self.lb, self.ub, False) self.reaction_scores = np.hstack([self.reaction_scores, self.reaction_scores[ self.reversible_reactions]]) if self.no_reverse_loops: self.fwd_idx = self.reversible_reactions self.bwd_idx = np.array( [self.rev_map[i][1] for i in self.reversible_reactions]) # TODO: check this part of no reverse loops else: self.fwd_idx = self.essential_reversible_reactions # self.bwd_idx = np.array([i + (self.n_reactions) for i in self.essential_reversible_reactions]) self.bwd_idx = np.array([self.rev_map[i][1] for i in self.essential_reversible_reactions]) # remove the essential reactions from the self.reactions_scores if len(self.essential_reactions_idx) > 0: self.reaction_scores = np.delete(self.reaction_scores, self.essential_reactions_idx) # this part will create fake metabolites for the self.present_metabolites to insert on the self.irreversible_S # TODO: this should also be a function in a superclass if self.present_metabolites_unlisted.size > 0: metabolites_to_add = [i + self.irreversible_S.shape[0] for i in range(self.present_metabolites_unlisted.size)] pre_shape_irreversible_iterable = self.irreversible_S.shape[0] self.irreversible_S = np.vstack( [self.irreversible_S, np.zeros([self.present_metabolites_unlisted.size, self.irreversible_S.shape[1]])]) # change the values for the new added metabolites # TODO: check this for to make sure that the unlisted present metabolites have # the same location as the listed present metabolites for metabolites in self.present_metabolites: if type(metabolites) in [list, tuple]: # check the reactions for all the metabolites in the list/tuple temp_S = self.S[metabolites, ] temp_idx, temp_stoi = np.nonzero(temp_S), temp_S[np.nonzero(temp_S)] unique_reactions = np.unique(temp_idx[1]) unique_stoi = [np.sum(temp_S[:, i]) for i in range(unique_reactions.size)] for metabolite in metabolites: self.irreversible_S[pre_shape_irreversible_iterable, temp_idx] = unique_stoi pre_shape_irreversible_iterable = pre_shape_irreversible_iterable + 1 # temp_values = self.S[metabolite,] # temp_idx, temp_stoi = np.nonzero(temp_values), temp_values[np.nonzero(temp_values)] else: temp_values = self.S[metabolites, ] temp_idx, temp_stoi = np.nonzero(temp_values), temp_values[np.nonzero(temp_values)] self.irreversible_S[pre_shape_irreversible_iterable, temp_idx] = temp_stoi pre_shape_irreversible_iterable = pre_shape_irreversible_iterable + 1 # TODO: CHECK THIS LATER TO SEE IF THIS DOESN'T FUCKING MESS UP # TODO: for this block, check if the values are updated throughout the modifications, # but I do not think so and its the way it should be # useful numbers self.n_metabolites_irrev, self.n_reactions_irrev = self.irreversible_S.shape self.n_essential_reactions = self.essential_reactions_idx.size self.n_non_essential_reactions = self.n_reactions_irrev - self.n_essential_reactions self.non_essential = np.setdiff1d(list(range(self.irreversible_S.shape[1])), self.essential_reactions_idx) self.v_vector_reactions = list(self.rev_map.keys()) for rev_r in self.reversible_reactions: self.v_vector_reactions.append(str(rev_r) + '_r') revs = np.setdiff1d(self.non_essential, list(self.rev_map.keys())) # revs_names = [] # for r in range(revs.size): # for k, v in self.rev_map.items(): # if isinstance(v, (tuple, list)): # if revs[r] in v: # revs_names.append(str(k) + '_r') revs_names = [str(k) + '_r' for k, v in self.rev_map.items() if isinstance(v, (tuple, list)) and v[1] in revs] self.non_essential = self.non_essential.astype(str) revs = revs.astype(str) for r in range(revs.size): temp = np.where(self.non_essential == revs[r])[0] self.non_essential[temp] = revs_names[r] # TODO: The blocks below this will need a rework as larger models will use too much memory # non-essential reactions to produce a fake metabolite ids_to_keep = np.setdiff1d(np.arange(self.n_reactions_irrev), self.essential_reactions_idx) matrix_to_add = sprs.eye(self.n_reactions_irrev, format='csc')[ids_to_keep, :] self.irreversible_S = sprs.vstack( [self.irreversible_S, matrix_to_add]) # for non-essential, but with a stoichiometry of 1000 temp = sprs.eye(self.n_non_essential_reactions, format='csc') * 1000 new_temp = sprs.vstack([np.zeros([self.n_metabolites_irrev, self.n_non_essential_reactions]), temp]) self.irreversible_S = sprs.hstack( [self.irreversible_S, new_temp]) if self.production_weight != 0: zmat = sprs.csc_matrix(np.zeros([self.n_non_essential_reactions + self.present_metabolites_unlisted.size, self.n_metabolites_irrev - self.present_metabolites_unlisted.size])) temp2 = sprs.vstack( [sprs.eye(self.n_metabolites_irrev - self.present_metabolites_unlisted.size, format='csc') * -1, zmat]) self.irreversible_S = sprs.hstack([self.irreversible_S, temp2]) self.n_net_production = self.n_metabolites_irrev - self.present_metabolites_unlisted.size self.net_production = np.setdiff1d(list(range(self.n_metabolites_irrev)), self.present_metabolites_unlisted) else: self.net_production = [] self.n_net_production = 0 if self.fwd_idx.size > 0: self.n_rev_bounds = self.fwd_idx.size I = sprs.csc_matrix(sprs.eye(self.n_reactions_irrev) * -1) temp = sprs.vstack([I[self.fwd_idx, ], I[self.bwd_idx, ]]) # padding temp = sprs.hstack([temp, sprs.csc_matrix( np.zeros([temp.shape[0], self.irreversible_S.shape[1] - self.n_reactions_irrev]))]) temp = sprs.hstack([temp, sprs.eye(self.n_rev_bounds * 2, format='csc') * 1000]) temp = sprs.vstack([temp, sprs.hstack( [sprs.csc_matrix(np.zeros([self.n_rev_bounds, self.irreversible_S.shape[1]])), sprs.eye(self.n_rev_bounds, format='csc') * -1, sprs.eye(self.n_rev_bounds, format='csc') * -1])]) self.irreversible_S = sprs.hstack( [self.irreversible_S, sprs.csc_matrix(np.zeros([self.irreversible_S.shape[0], self.n_rev_bounds * 2]))]) self.irreversible_S = sprs.vstack([self.irreversible_S, temp]) else: self.n_rev_bounds = 0
[docs] def build_problem(self): """ Builds the problem to be optimized Returns ------- """ # Preparation of the problem to be optimized self.problem_blx = np.hstack([self.irreversible_lb, np.zeros( self.n_non_essential_reactions + self.n_net_production + (self.n_rev_bounds * 2))]) # TODO: check if this in float; if not, we have to change the dtype of problem_blx, # otherwise 0.1 will no exist (will be 0) if self.essential_reactions_idx.size > 0: self.problem_blx[self.essential_reactions_idx] = np.array( list(map(lambda y: y if y > 0.1 else 0.1, self.problem_blx[self.essential_reactions_idx]))) self.problem_bux = np.hstack([self.irreversible_ub, np.ones( self.n_non_essential_reactions + self.n_net_production + self.n_rev_bounds * 2)]) self.irreversible_b = np.zeros(self.n_metabolites_irrev) self.problem_blc = np.hstack( [self.irreversible_b, np.ones(self.n_non_essential_reactions), np.zeros(self.n_rev_bounds * 2), np.ones(self.n_rev_bounds) * -1]) # this part of the code will take care of excretion and reverse loops, if they are true on the parameters if self.no_reverse_loops: self.rev_ub = np.zeros(self.n_rev_bounds) # TODO this is just a fix; REALLY FIX THIS LATER for i in range(self.essential_reversible_reactions.size): self.rev_ub[i] = -1 else: self.rev_ub = np.ones(self.n_rev_bounds) * -1 if self.allow_excretion: self.met_ub = np.array([None] * self.n_metabolites_irrev) else: self.met_ub = np.zeros(self.n_metabolites_irrev) self.problem_buc = np.hstack( [self.met_ub, np.ones(self.n_non_essential_reactions) * 1000, np.ones(self.n_rev_bounds * 2) * 999.9, self.rev_ub]) self.problem_c = np.hstack([np.zeros(self.n_reactions_irrev), self.reaction_scores, np.ones(self.n_net_production) * self.production_weight * -1, np.zeros(self.n_rev_bounds * 2)]) self.problem_a = self.irreversible_S self.problem_a = self.problem_a.astype(np.float32)
[docs] def solve_problem(self): """ Solves the problem Returns ------- """ # len_reactions = [len(self.irreversible_lb), self.n_non_essential_reactions, self.n_net_production, # self.n_rev_bounds, self.n_rev_bounds] # prefix = ['v_', 'ne_', 'net_prod_', 'revF_', 'revB_'] # # rx_names_problem2 = list(chain(*[[k + str(i) for i in range(v)] for k, v in zip(prefix, len_reactions)])) len_reactions = [np.array(self.v_vector_reactions), self.non_essential, self.net_production, self.fwd_idx, self.fwd_idx] prefix = ['v_', 'ne_', 'net_prod_', 'revF_', 'revB_'] rx_names_problem = list(chain(*[[k + str(i) for i in v] for k, v in zip(prefix, len_reactions)])) start_index = self.n_metabolites_irrev - self.present_metabolites_unlisted.size problem = GenericLinearSystem(self.problem_a.toarray(), VAR_CONTINUOUS, self.problem_blx, self.problem_bux, self.problem_blc, self.problem_buc, rx_names_problem, self.properties['solver']) ## TODO: Remove this later if self.properties['solver'] == 'CPLEX': problem.model.problem.parameters.mip.tolerances.mipgap.set(1e-3) elif self.properties['solver'] == 'GUROBI': problem.model.problem.Params.MIPGap = 1e-3 problem.model.configuration.tolerances.feasibility = 1e-8 problem.model.configuration.tolerances.optimality = 1e-8 problem.model.configuration.verbosity = 3 lso = LinearSystemOptimizer(problem) problem.write_to_lp('tINIT_test.lp') if self.present_metabolites_unlisted.size > 0: for i in range(self.present_metabolites_unlisted.size): self.problem_buc[start_index + i] = 1 problem.set_constraint_bounds(problem.model.constraints, self.problem_blc, self.problem_buc) problem.set_objective(self.problem_c, True) solution = lso.optimize() if solution.status() != 'optimal': self.problem_buc[start_index + i] = 0 else: self.metabolites_production[i] = 1 allInt = np.hstack( [list(range(self.n_reactions_irrev, self.n_reactions_irrev + self.n_non_essential_reactions)), list( range(self.irreversible_S.shape[1] - (self.n_rev_bounds * 2) + 1, self.irreversible_S.shape[1]))]) # return lso.optimize() problem.set_variable_types([problem.model.variables[int_(i)] for i in allInt], VAR_BINARY) problem.set_objective(self.problem_c, True) problem.write_to_lp('tINIT_test.lp') # print(problem.model.to_lp()) solution = lso.optimize() print(solution.objective_value()) if solution.status() != 'optimal': print('The problem is infeasible') # return solution # TODO improve return # return solution # used_reactions = np.array([]) # for k, v in solution.var_values().items(): print(k, v) used_reactions = np.array([int_(k.split('_')[1]) for k in list(solution.var_values().keys()) if 'ne_' in k and solution.var_values()[k] < 0.1]) # for k, v in solution.var_values().items(): # if 'ne_' in k: # # if '_r' in k: # # if v < 1 - 1e-6: # if v < 0.1: # used_reactions = np.append(used_reactions, int_(k.split('_')[1])) return np.append(used_reactions, self.essential_reactions_idx)
[docs] def run_tINIT(self) -> np.array: """ Runs the tINIT algorithm Returns ------- np.array An array of the reactions that are used in the model """ self.preprocessing() self.build_problem() res = self.solve_problem() return np.unique(np.int_(np.sort(res)))
[docs] def run(self) -> np.array: return self.run_tINIT()
if __name__ == '__main__': import numpy as np s_matrix = np.array([[1, -1, 0, 0, -1, 0, -1, 0, 0], [0, 1, -1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 1, -1, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1, -1, 0, 0], [0, 0, 0, 0, 0, 0, 1, -1, 0], [0, 0, 0, 0, 1, 0, 0, 1, -1]]) a = s_matrix[2, ] n_a_idx, n_a = np.nonzero(a), a[np.nonzero(a)] x = s_matrix[[1, 2, 3], ] n_x_idx, n_x = np.nonzero(x), x[np.nonzero(x)] unique_reac = np.unique(n_x_idx[1]) z = np.array([np.sum(x[:, i]) for i in range(unique_reac.size)], dtype=float) z = np.vstack([z, z, z, z, z, z]) z[[0, 2, 4], ] = np.apply_along_axis(lambda y: [i if i > 0.1 else 0.1 for i in y], 1, z[[0, 2, 4], ]) l = (lambda y: [i if i > 0.1 else 0.1 for i in y]) l([0, 1, 2])