Source code for troppo.methods.reconstruction.corda

from copy import deepcopy
from numbers import Number

import pandas as pd

from collections import OrderedDict

import numpy as np
from numpy import array, zeros, sqrt, vstack, where, floor, random, unique, \
    apply_along_axis, nan, logical_or, int_, append

from cobamp.core.models import ConstraintBasedModel
from time import time

from cobamp.core.optimization import Solution
from troppo.methods.base import ContextSpecificModelReconstructionAlgorithm, PropertiesReconstruction

from pathos.multiprocessing import cpu_count
from pathos.pools import _ProcessPool

from troppo.utilities.list import is_list


def _init_corda_worker(corso_fba, constraint, constrainby, costfx, costbase, ntimes, eps, lb):
    global _corso_fba, _constraint, _constrainby, _costfx, _costbase, _ntimes, _eps, _lb

    _corso_fba = corso_fba
    _constraint = constraint
    _constrainby = constrainby
    _costfx = costfx
    _costbase = costbase
    _ntimes = ntimes
    _eps = eps
    _lb = lb


def _corda_dependent_reactions_iteration(rx):
    global _corso_fba, _constraint, _constrainby, _costfx, _costbase
    global _ntimes, _forward, _eps, _lb
    dependent, to_delete = _corda_dependent_rxs_per_sense(rx, _constraint, True)

    if _lb[rx] < 0:
        bkw_dep, to_del_bkw = _corda_dependent_rxs_per_sense(rx, -_constraint, False)

        dependent = dependent | bkw_dep
        to_delete = to_del_bkw & to_delete

    return rx, (dependent, to_delete)


def _corda_dependent_rxs_per_sense(rx, constraint, forward):
    global _corso_fba, _constrainby, _costfx, _costbase
    of_dict = {rx: 1}
    cost = _costbase + _costfx()
    flux, corso_sol = _corso_fba.optimize_corso(cost, of_dict, not forward, constraint, _constrainby, eps=_eps)

    dependent = abs(corso_sol.x()) > _eps
    to_del = not dependent.any()
    if not to_del:
        for i in range(_ntimes - 1):
            cost = _costbase + _costfx()
            flux, corso_sol = _corso_fba.optimize_corso(cost, of_dict,
                                                        not forward, constraint,
                                                        _constrainby, eps=_eps, flux1=flux)
            dependent = (abs(corso_sol.x()) > _eps) | dependent
    else:
        dependent = zeros(dependent.shape).astype(bool)
    return dependent, to_del


[docs]class CORSOSolution(Solution): """ A solution to a COBRA FBA problem. """ def __init__(self, sol_max, sol_min, f, index_map, var_names, eps=1e-8): x = sol_min.x() rev = index_map[max(index_map) + 1:] nx = x[:max(index_map) + 1] nx[rev] = x[rev] - sol_min.x()[max(index_map) + 1:-1] nx[abs(nx) < eps] = 0 nvalmap = OrderedDict([(k, v) for k, v in zip(var_names, nx)]) super().__init__(nvalmap, [sol_max.status(), sol_min.status()], objective_value=f)
[docs]class CORSOModel(ConstraintBasedModel): def __init__(self, cbmodel, corso_element_names=('R_PSEUDO_CORSO', 'M_PSEUDO_CORSO'), solver=None): if not cbmodel.model: cbmodel.initialize_optimizer() self.cbmodel = cbmodel irrev_model, self.mapping = cbmodel.make_irreversible() S = irrev_model.get_stoichiometric_matrix() bounds = irrev_model.bounds self.cost_index_mapping = zeros(S.shape[1], dtype=int_) self.corso_rx, self.corso_mt = corso_element_names super().__init__(S, bounds, irrev_model.reaction_names, irrev_model.metabolite_names, solver=solver) self.add_metabolite(zeros(len(self.reaction_names)), self.corso_mt) self.add_reaction(zeros(len(self.metabolite_names)), (0, 0), self.corso_rx) self.original_bounds = deepcopy(self.bounds) for orx, nrx in self.mapping.items(): if isinstance(nrx, int): self.cost_index_mapping[nrx] = orx else: for nrx_split in nrx: self.cost_index_mapping[nrx_split] = orx
[docs] def solve_original_model(self, of_dict, minimize=False): self.cbmodel.set_objective(of_dict, minimize) sol = self.cbmodel.optimize() return sol
[docs] def set_costs(self, cost): true_cost = cost[self.cost_index_mapping] true_cost = append(true_cost, array([-1])) self.set_stoichiometric_matrix(true_cost.reshape(1, len(true_cost)), rows=[self.corso_mt])
# def set_reaction_bounds(self, index, **kwargs): # super().set_reaction_bounds(index, **kwargs) # self.original_bounds[self.decode_index(index, 'reaction')] = self.get_reaction_bounds(index)
[docs] def set_corso_objective(self): self.set_objective({self.corso_rx: 1}, True)
[docs] def optimize_corso(self, cost, of_dict, minimize=False, constraint=1, constraintby='val', eps=1e-6, flux1=None): if flux1 is None: flux1 = self.solve_original_model(of_dict, minimize) if abs(flux1.objective_value()) < eps: return flux1, flux1 f1_x = flux1.x() if constraintby == 'perc': # f1_f = flux1.x()[self.cbmodel.c != 0] f1_f = {idx: f1_x[idx] * (constraint / 100) for idx in of_dict.keys()} elif constraintby == 'val': if (flux1.objective_value() < constraint and not minimize) or ( flux1.objective_value() > constraint and minimize): raise Exception('Objective flux is not sufficient for the the set constraint value.') else: f1_f = {idx: constraint for idx in of_dict.keys()} else: raise Exception('Invalid constraint options.') self.set_reaction_bounds(self.corso_rx, lb=0, ub=1e20) # corso_of_dict = deepcopy(of_dict) # corso_of_dict[self.corso_rx] = 1 self.set_costs(cost) for rx in f1_f.keys(): true_idx = self.decode_index(rx, 'reaction') involved = self.mapping[true_idx] fluxval = f1_f[rx] # if isinstance(f1_f, ndarray) else f1_f if isinstance(involved, (int, int_)): self.set_reaction_bounds(involved, lb=fluxval, ub=fluxval, temporary=True) else: self.set_reaction_bounds(involved[0], lb=fluxval, ub=fluxval, temporary=True) self.set_reaction_bounds(involved[1], lb=0, ub=0, temporary=True) self.set_objective({self.corso_rx: 1}, True) sol = self.optimize() self.revert_to_original_bounds() return flux1, CORSOSolution(flux1, sol, sum([of_dict[k] * f1_f[k] for k in of_dict.keys()]), self.cost_index_mapping, self.cbmodel.reaction_names, eps=eps)
[docs]class CORDAProperties(PropertiesReconstruction): """ Properties reconstruction using CORDA Parameters ---------- high_conf_rx : list High confidence reactions medium_conf_rx : list Medium confidence reactions neg_conf_rx : list Negative confidence reactions pr_to_np : float Threshold to include NP reactions if PR reactions depend on them constraint : int Value of the constraint constrainby : str either 'val' (constrain fluxes by value) or 'perc' (constraint by percentage) om : float cost assigned to reactions when calculating dependencies ntimes : int Number of CORSO FBA simulations performed per dependency assessment nl : int or float Number of loops to perform solver : str Solver to use for optimization threads : int Number of threads to use """ CONSTRAINBY_VAL = 'val' CONSTRAINBY_PERC = 'perc' def __init__(self, high_conf_rx: list, medium_conf_rx: list, neg_conf_rx: list, pr_to_np: float = None, constraint: int = None, constrainby: str = None, om: float = None, ntimes: int = None, nl: int = None, solver: str = None, threads: int = None): new_mandatory = {k: is_list for k in ['high_conf_rx', 'medium_conf_rx', 'neg_conf_rx']} new_optional = { # 'met_tests': lambda x: is_list(x) or x is None, 'pr_to_np': lambda x: isinstance(x, Number), 'constraint': lambda x: isinstance(x, Number), 'constrainby': [self.CONSTRAINBY_VAL, self.CONSTRAINBY_PERC], 'om': lambda x: isinstance(x, Number), 'ntimes': lambda x: isinstance(x, int) and x > 0, 'nl': lambda x: isinstance(x, Number) and x >= 0, 'threads': int } super().__init__() self.add_new_properties(new_mandatory, new_optional) vars = [high_conf_rx, medium_conf_rx, neg_conf_rx, pr_to_np, constraint, constrainby, om, ntimes, nl, solver, threads] defaults = [None, None, None, 2, 1, CORDAProperties.CONSTRAINBY_VAL, 1e4, 5, 1e-2, 'CPLEX', cpu_count() - 1] names = ['high_conf_rx', 'medium_conf_rx', 'neg_conf_rx', 'pr_to_np', 'constraint', 'constrainby', 'om', 'ntimes', 'nl', 'solver', 'threads'] for v, k, d in zip(vars, names, defaults): self[k] = v if v is not None else d
[docs] @staticmethod def from_integrated_scores(scores: tuple, **kwargs) -> 'CORDAProperties': """ Create a CORDAProperties object from integrated scores Parameters ---------- scores: tuple Tuple of high, medium and negative confidence scores kwargs: dict Additional arguments to pass to the constructor Returns ------- CORDAProperties """ hi, med, neg = scores return CORDAProperties(hi, med, neg, **kwargs)
[docs]class CORDA(ContextSpecificModelReconstructionAlgorithm): """ CORDA algorithm Parameters ---------- S : array Stoichiometric matrix lb : array Lower bounds ub : array Upper bounds properties : CORDAProperties Properties object Attributes ---------- corso_fba : CORSOModel CORSO FBA model """ properties_class = CORDAProperties
[docs] @staticmethod def costfx_factory(nl, om, costbase): return lambda: nl * floor(om * random.rand(len(costbase), )) / om
def __init__(self, S, lb, ub, properties): super().__init__(S, lb, ub, properties) self.S = array(S) self.lb, self.ub = array(lb), array(ub) self.properties = properties rx_names, mt_names = ['V' + str(i) for i in range(S.shape[1])], ['M' + str(i) for i in range(S.shape[0])] cbmodel = ConstraintBasedModel(S, list(zip(lb, ub)), reaction_names=rx_names, metabolite_names=mt_names, optimizer=True, solver=properties['solver']) self._m, self._n = self.S.shape self.corso_fba = CORSOModel(cbmodel, solver=properties['solver'])
[docs] def run(self) -> np.ndarray: """ Set up of the variables to be used and calls the run_corda method. Returns ------- array Returns an Array of reaction categories """ rx_cat = zeros(self._n, ) rx_cat[self.properties['high_conf_rx']] = 1 rx_cat[self.properties['medium_conf_rx']] = 2 rx_cat[self.properties['neg_conf_rx']] = 3 constraint = self.properties['constraint'] constrainby = self.properties['constrainby'] nl = self.properties['nl'] ntimes = self.properties['ntimes'] om = self.properties['om'] pr_to_np = self.properties['pr_to_np'] threads = self.properties['threads'] return self.run_corda(rx_cat, constraint, constrainby, nl, ntimes, om, pr_to_np, threads)
[docs] def run_corda(self, rx_cat: np.ndarray, constraint: int, constrainby: str, nl: int, ntimes: int, om: float = 1e4, pr_to_np: float = 2, threads: int = cpu_count() - 1) -> np.ndarray: """ Run the CORDA algorithm Parameters ---------- rx_cat: np.ndarray Array of reaction categories constraint: int Value of the constraint constrainby: str either 'val' (constrain fluxes by value) or 'perc' (constraint by percentage) nl: int Number of loops to perform ntimes: int Number of CORSO FBA simulations performed per dependency assessment om: float Cost assigned to reactions when calculating dependencies pr_to_np: float Threshold to include NP reactions if PR reactions depend on them threads: int Number of threads to use Returns ------- np.ndarray Returns an Array of reaction categories """ def _corda_find_all_dependencies(reaction_list: np.ndarray) -> list: """ Find all dependencies in a list of reactions Parameters ---------- reaction_list: np.ndarray Array of reactions Returns ------- list List of tuples of dependent reactions and reactions to delete """ res_map = {r: i for i, r in enumerate(reaction_list)} true_threads = min((len(reaction_list) // 2) + 1, threads) result = [None] * len(reaction_list) rx_per_job = len(reaction_list) // threads pool = _ProcessPool( processes=true_threads, initializer=_init_corda_worker, initargs=(self.corso_fba, constraint, constrainby, costfx, costbase, ntimes, 1e-6, self.lb) ) for i, value in pool.imap_unordered(_corda_dependent_reactions_iteration, reaction_list, chunksize=rx_per_job): result[res_map[i]] = value pool.close() pool.join() return result rx_cat = array(rx_cat) costbase = zeros(self._n, ) print(pd.Series(rx_cat).value_counts()) costbase[rx_cat == 2] = sqrt(om) costbase[rx_cat == 3] = om costfx = self.costfx_factory(nl, om, costbase) def nested_dependent_rxs(rx: int) -> tuple: """ Find dependent reactions for a given reaction Parameters ---------- rx: int Reaction index Returns ------- tuple Dependent reactions and reactions to delete """ return self.find_dependent_reactions(rx, constraint, constrainby, costfx, costbase, ntimes, eps=1e-6) HC_reactions = where(rx_cat == 1)[0] print('Step 1 started') s1t = time() # print(sum(dep),pd.Series(rx_cat).value_counts()) if (threads and threads > 1) and (len(HC_reactions) / 2 > threads): res1 = _corda_find_all_dependencies(HC_reactions) else: res1 = list(map(nested_dependent_rxs, HC_reactions)) s1_deps, s1_block = list(zip(*res1)) rx_cat[logical_or.reduce(s1_deps)] = 1 if sum(s1_block) > 0: rx_cat[HC_reactions[s1_block]] = -1 print('\t- Finished in ', str(time() - s1t), 'seconds') print(pd.Series(rx_cat).value_counts()) # for rx in HC_reactions: # dep, to_del = self.find_dependent_reactions(rx, constraint, constrainby, costfx, costbase, ntimes, eps=1e-6) # rx_cat[dep] = 1 # if to_del: # rx_cat[rx] = -1 self.block_reactions_from_idxs(rx_cat) costbase = zeros(self._n, ) costbase[rx_cat == 3] = om PR_reactions = where(rx_cat == 2)[0] NP_reactions = where(rx_cat == 3)[0] costfx = self.costfx_factory(nl, om, costbase) print('Step 2 started') s1t = time() PR_NP = {} # def _step_two(rx): # dep, to_del = self.find_dependent_reactions(rx, constraint, constrainby, costfx, costbase, ntimes, eps=1e-6) # PR_NP[rx] = dep[NP_reactions] # if to_del: # rx_cat[rx] = -1 if (threads and threads > 1) and (len(PR_reactions) / 2 > threads): res2 = _corda_find_all_dependencies(PR_reactions) else: res2 = list(map(nested_dependent_rxs, PR_reactions)) s2_deps, s2_block = list(zip(*res2)) for rx, dep in zip(PR_reactions, s2_deps): PR_NP[rx] = dep[NP_reactions] if sum(s2_block) > 0: rx_cat[PR_reactions[s2_block]] = -1 print('\t- Finished in ', str(time() - s1t), 'seconds') print(pd.Series(rx_cat).value_counts()) self.block_reactions_from_idxs(rx_cat) PR_NP = [PR_NP[k] for k in sorted(PR_NP.keys()) if rx_cat[k] != -1] if len(PR_NP) > 0: PR_NP = vstack(PR_NP) NP_occurrence = apply_along_axis(sum, 0, PR_NP) np_to_pr_idx = NP_reactions[NP_occurrence > pr_to_np] if len(np_to_pr_idx) > 0: rx_cat[np_to_pr_idx] = 2 PR_NP = PR_NP[:, NP_occurrence > pr_to_np] if PR_NP.shape[1] > 0: PR_NP = vstack([zeros((len(np_to_pr_idx), PR_NP.shape[1]))]) else: PR_NP = array([]) # 2.2 PR_reactions = where(rx_cat == 2)[0] NP_reactions = where(rx_cat == 3)[0] PR_to_check_l8r = [] rx_cat[NP_reactions] = -1 self.block_reactions_from_idxs(rx_cat) res2 = [] for i, rx in enumerate(PR_reactions): to_del = self.check_if_blocked(rx) if to_del: # PR_to_check_l8r.append(rx) # if len(PR_NP) > 0 and len(np_to_pr_idx) > 0: # np_from_rx = where(PR_NP[i, :] > 0)[0] # if len(np_from_rx) == 0: # print('Undefined') # else: # for kn in np_from_rx: # res2.append(kn) rx_cat[rx] = -1 res2 = unique(sorted(array(res2))) PR_reactions = where(rx_cat == 2)[0] rx_cat[PR_reactions] = 1 # rescued = set(sorted(PR_to_check_l8r + res2.tolist())) ES_reactions = rx_cat == 1 OT_reactions = rx_cat == 0 # to_block = where(ES_reactions | OT_reactions)[0] # rx_cat[to_block] = -1 # TODO: check this self.block_reactions_from_idxs(rx_cat) costbase = zeros(self._n, ) costbase[OT_reactions] = om print('Step 3 started') s1t = time() ES_OT = {} # def _step_three(rx): # dep, to_del = self.find_dependent_reactions(rx, constraint, constrainby, costfx, costbase, ntimes, 1e-6) # ES_OT[rx] = dep[OT_reactions] # ES_temp = where(ES_reactions)[0] if (threads and threads > 1) and (len(ES_temp) / 2 > threads): res3 = _corda_find_all_dependencies(ES_temp) else: res3 = list(map(nested_dependent_rxs, ES_temp)) s3_deps, _ = list(zip(*res3)) for rx, dep in zip(where(ES_reactions)[0], s3_deps): ES_OT[rx] = dep[OT_reactions] ES_OT = [ES_OT[k] for k in sorted(ES_OT.keys()) if rx_cat[k] != -1] print('\t- Finished in ', str(time() - s1t), 'seconds') print(pd.Series(rx_cat).value_counts()) OT_reaction_ids = where(OT_reactions)[0] if ES_OT: ES_OT = vstack(ES_OT) if ES_OT.shape[1] > 0: OT_occurrence = apply_along_axis(sum, 0, ES_OT) ot_to_es_idx = OT_reaction_ids[OT_occurrence != 0] rx_cat[ot_to_es_idx] = 1 return rx_cat
[docs] def block_reactions_from_idxs(self, rxcat: np.ndarray): """ Block reactions from a list of indices Parameters ---------- rxcat: np.ndarray Array of reaction indices """ block_corso = lambda rx: self.corso_fba.set_reaction_bounds(rx, lb=0, ub=0) block_cbmodel = lambda rx: self.corso_fba.cbmodel.set_reaction_bounds(rx, lb=0, ub=0) to_block = where(rxcat == -1)[0] for reaction in to_block: self.do_function_for_reactions_on_both_models(reaction, block_cbmodel, block_corso)
[docs] def do_function_for_reactions_on_both_models(self, reaction: int, mfunction, corsofunction): """ Perform a function on both models Parameters ---------- reaction: int Reaction index mfunction: function Function to perform on the CBModel corsofunction: function Function to perform on the CORSOModel """ if isinstance(self.corso_fba.mapping[reaction], int): corsofunction(reaction) else: for rxsplit in self.corso_fba.mapping[reaction]: corsofunction(rxsplit) mfunction(reaction)
[docs] def find_reaction_limits(self, rx: int) -> tuple: """ Find the minimum and maximum flux for a given reaction Parameters ---------- rx: int Reaction index Returns ------- tuple Minimum and maximum flux """ self.corso_fba.cbmodel.set_objective({rx: 1}, True) fmin = self.corso_fba.cbmodel.optimize().objective_value() self.corso_fba.cbmodel.set_objective({rx: 1}, False) fmax = self.corso_fba.cbmodel.optimize().objective_value() return fmin, fmax
[docs] def check_if_blocked(self, rx: int) -> bool: """ Check if a reaction is blocked Parameters ---------- rx: int Reaction index Returns ------- bool True if the reaction is blocked, False otherwise """ fl = self.find_reaction_limits(rx) if fl[0] == nan and fl[1] == nan: return True else: return (abs(fl[0]) < 1e-6) and (abs(fl[1]) < 1e-6)
[docs] def find_dependent_reactions(self, rx: int, constraint: int, constrainby: str, costfx, costbase: np.ndarray, ntimes: int, eps: float) -> tuple: """ Find dependent reactions for a given reaction Parameters ---------- rx: int Reaction index constraint: int Value of the constraint constrainby: str either 'val' (constrain fluxes by value) or 'perc' (constraint by percentage) costfx: function Cost function costbase: np.ndarray Cost base ntimes: int Number of CORSO FBA simulations performed per dependency assessment eps: float Epsilon value Returns ------- tuple Dependent reactions and reactions to delete """ dependent, to_delete = self.__find_dependent_reactions(rx, constraint, constrainby, costfx, costbase, ntimes, True, eps) if self.lb[rx] < 0: bkw_dep, to_del_bkw = self.__find_dependent_reactions(rx, -constraint, constrainby, costfx, costbase, ntimes, False, eps) dependent = dependent | bkw_dep to_delete = to_del_bkw & to_delete return dependent, to_delete
def __find_dependent_reactions(self, rx: int, constraint: int, constrainby: str, costfx, costbase: np.ndarray, n_times: int, forward: bool, eps: float) -> tuple: """ Find dependent reactions for a given reaction Parameters ---------- rx: int Reaction index constraint: int Value of the constraint constrainby: str either 'val' (constrain fluxes by value) or 'perc' (constraint by percentage) costfx: function Cost function costbase: np.ndarray Cost base n_times: int Number of CORSO FBA simulations performed per dependency assessment forward: bool True if forward, False if backward eps: float Epsilon value Returns ------- tuple Dependent reactions and reactions to delete """ of_dict = {rx: 1} cost = costbase + costfx() # print(rx, cost) flux, corso_sol = self.corso_fba.optimize_corso(cost, of_dict, not forward, constraint, constrainby, eps=eps) dependent = abs(corso_sol.x()) > eps to_del = not dependent.any() if not to_del: for i in range(n_times - 1): cost = costbase + costfx() flux, corso_sol = self.corso_fba.optimize_corso(cost, of_dict, not forward, constraint, constrainby, eps=eps, flux1=flux) dependent = (abs(corso_sol.x()) > eps) | dependent else: dependent = zeros(dependent.shape).astype(bool) return dependent, to_del