Source code for troppo.tasks.core

import warnings
from typing import Iterable

from cobamp.core.models import ConstraintBasedModel
from cobamp.core.optimization import Solution, BatchOptimizer
from cobamp.utilities.context import CommandHistory
from cobamp.wrappers.external_wrappers import get_model_reader, AbstractObjectReader
from pathos.multiprocessing import cpu_count

MP_THREADS = cpu_count()
INF = float('inf')
DEFAULT_EPS = 1e-6


[docs]class Task(object): """ A task is a set of constraints that can be applied to a model. It is defined by a set of reactions, a set of constraints on the fluxes of these reactions, a set of constraints on the inflow and outflow of metabolites, and a set of mandatory activities that must be present in the solution. A task can be evaluated on a model to determine if the model satisfies the task. """ __defaults__ = { 'should_fail': False, 'reaction_dict': {}, 'inflow_dict': {}, 'outflow_dict': {}, 'flux_constraints': {}, 'mandatory_activity': [], 'name': 'default_task', 'annotations': {} } __types__ = { 'reaction_dict': dict, 'inflow_dict': dict, 'outflow_dict': dict, 'should_fail': bool, 'name': str, 'flux_constraints': dict, 'annotations': dict, 'mandatory_activity': list } def __init__(self, **kwargs): """ reaction_dict: rxd = {'r1':({'m1':-1, 'm2':2}, (lb, ub)), ... } inflow_dict: ifd = {'m3':(1,1), ... } outflow_dict: ofd = {'m5':(5,5), ... } """ for k, v in self.__defaults__.items(): itype, dval = self.__types__[k], self.__defaults__[k] setattr(self, k, dval if k not in kwargs.keys() else kwargs[k])
[docs] def combine(self, other, add: bool = True): """ Combine two tasks into a single task. The resulting task will have the same failure criteria as the original Parameters ---------- other: Task The task to combine with add: bool If True, the resulting task will be the sum of the two tasks. If False, the resulting task will be the Returns ------- Task: The combined task """ assert isinstance(other, Task), 'Could not apply + operator between types Task and ' + str(type(other)) assert self.should_fail == other.should_fail, 'Tasks with different failure criteria cannot be added' def bound_dict_add(lst): ndict = {} for dct in lst: for k, d in dct.items(): if k not in ndict.keys(): ndict[k] = d else: ndict[k][0] += max((d[0] if add else -d[0]), 0) ndict[k][1] += max((d[1] if add else -d[0]), 0) return ndict shfail = self.should_fail rx_dict = {k: v for k, v in list(self.reaction_dict.items()) + list(other.reaction_dict.items())} flx_dict = bound_dict_add([self.flux_constraints, other.flux_constraints]) in_dict = bound_dict_add([self.inflow_dict, other.inflow_dict]) out_dict = bound_dict_add([self.outflow_dict, other.outflow_dict]) mnd_act = set(self.mandatory_activity) | set(other.mandatory_activity) name = (' minus ' if not add else ' plus ').join([self.name, other.name]) annotations = {} if self.name not in self.annotations: annotations[self.name] = self.annotations annotations[other.name] = other.annotations return shfail, rx_dict, flx_dict, in_dict, out_dict, mnd_act, name, annotations
[docs] def combine_inplace(self, other, add: bool = True): """ Combine two tasks into a single task. The resulting task will have the same failure criteria as the original Parameters ---------- other: Task The task to combine with add: bool If True, the resulting task will be the sum of the two tasks. If False, the resulting task will be the Returns ------- Task: The combined task """ shfail, rx_dict, flx_dict, in_dict, out_dict, mnd_act, name, annotations = self.combine(other, add) self.should_fail = shfail self.reaction_dict = rx_dict self.flux_constraints = flx_dict self.outflow_dict = out_dict self.inflow_dict = in_dict self.mandatory_activity = mnd_act self.name = name self.annotations = annotations
def __add__(self, other): shfail, rx_dict, flx_dict, in_dict, out_dict, mnd_act, name, annotations = self.combine(other, True) return Task(should_fail=shfail, reaction_dict=rx_dict, flux_constraints=flx_dict, inflow_dict=in_dict, outflow_dict=out_dict, mandatory_activity=mnd_act, name=name, annotations=annotations) def __sub__(self, other): shfail, rx_dict, flx_dict, in_dict, out_dict, mnd_act, name, annotations = self.combine(other, False) return Task(should_fail=shfail, reaction_dict=rx_dict, flux_constraints=flx_dict, inflow_dict=in_dict, outflow_dict=out_dict, mandatory_activity=mnd_act, name=name, annotations=annotations) def __iadd__(self, other): self.combine_inplace(other, True) return self def __isub__(self, other): self.combine_inplace(other, False) return self @property def should_fail(self): return self.__should_fail @should_fail.setter def should_fail(self, value): self.__should_fail = value @property def reaction_dict(self): return self.__reaction_dict @reaction_dict.setter def reaction_dict(self, value): self.__reaction_dict = value @property def flux_constraints(self): return self.__flux_constraints @flux_constraints.setter def flux_constraints(self, value): self.__flux_constraints = value @property def outflow_dict(self): return self.__outflow_dict @outflow_dict.setter def outflow_dict(self, value): self.__outflow_dict = value @property def inflow_dict(self): return self.__inflow_dict @inflow_dict.setter def inflow_dict(self, value): self.__inflow_dict = value @property def name(self): return self.__name @name.setter def name(self, value): self.__name = value @property def annotations(self): return self.__annotations @annotations.setter def annotations(self, value): self.__annotations = value @property def mandatory_activity(self): return self.__mandatory_activity @mandatory_activity.setter def mandatory_activity(self, value): self.__mandatory_activity = value
[docs] def id_replace(self, func): """ Replace the identifiers in the task with the result of a function applied to the identifiers Parameters ---------- func: function The function to apply to the identifiers """ for prop, typ in self.__types__.items(): prop_data = getattr(self, prop) if typ == list: setattr(self, prop, [func(k) for k in prop_data]) elif typ == dict and prop != 'annotations': setattr(self, prop, {func(k): v for k, v in prop_data.items()}) else: pass
[docs] def get_add_reaction_args(self, model: ConstraintBasedModel, closed: bool = False) -> (dict, set): """ Get the arguments to add the reactions to the model Parameters ---------- model: ConstraintBasedModel The model to add the reactions to closed: bool Whether to add the reactions as closed or open Returns ------- dict, set: The arguments to add the reactions to the model, and the set of reactions that were added """ ## assume list order is arg, bounds. keys contain the name ## reaction_dict - add reactions to the model reactions = {} for k, v in self.reaction_dict.items(): reaction_name = '_'.join([self.name, k, 'task_reaction']) if reaction_name not in model.reaction_names: reactions[reaction_name] = [v[0], (0, 0) if closed else v[1], reaction_name] ## flow_dict - add drains to the model for k, v in self.inflow_dict.items(): sink_name = '_'.join([k, 'inflow']) if sink_name not in model.reaction_names: reactions[sink_name] = [{k: 1}, (0, 0) if closed else v, sink_name] for k, v in self.outflow_dict.items(): sink_name = '_'.join([k, 'outflow']) if sink_name not in model.reaction_names: reactions[sink_name] = [{k: -1}, (0, 0) if closed else v, sink_name] # reaction_names = list(reactions.keys()) # arg, bounds = zip(*[reactions[k] for k in reaction_names]) # # call_args = {'arg':arg, 'bounds':bounds, 'names': reaction_names} added_rxs = set(reactions.keys()) return reactions, added_rxs
[docs] def get_add_reaction_cmds(self, model: ConstraintBasedModel, closed: bool = False) -> (CommandHistory, set): """ Get the commands to add the reactions to the model Parameters ---------- model: ConstraintBasedModel The model to add the reactions to closed: bool Whether to add the reactions as closed or open Returns ------- CommandHistory, set: The commands to add the reactions to the model, and the set of reactions that were added """ ## reaction_dict - add reactions to the model command_history = CommandHistory() args, added_rx = self.get_add_reaction_args(model, closed) ordered_rx_names = list(added_rx) arg, bnd = list(zip(*[args[k] for k in ordered_rx_names])) command_history.queue_command(model.add_reactions, {'args': arg, 'bounds': bnd, 'names': ordered_rx_names}) return command_history, added_rx
[docs] def get_task_bounds(self) -> dict: """ Get the bounds for the task Returns ------- dict: The bounds for the task """ master_dict = {} reac_bounds = {'_'.join([self.name, k, 'task_reaction']): v[1] for k, v in self.reaction_dict.items()} inflow_bounds = {'_'.join([k, 'inflow']): v for k, v in self.inflow_dict.items()} outflow_bounds = {'_'.join([k, 'outflow']): v for k, v in self.outflow_dict.items()} aflx_bounds = {k: (v[0], v[1]) for k, v in self.flux_constraints.items()} for d in [reac_bounds, inflow_bounds, outflow_bounds, aflx_bounds]: master_dict.update(d) return master_dict
## constraint_dict - impose additional bounds @property def involved_reactions(self) -> set: """ Get the set of reactions involved in the task Returns ------- set: The set of reactions involved in the task """ return set(self.flux_constraints.keys()) | set(self.mandatory_activity)
[docs] def apply_evaluate(self, model: ConstraintBasedModel) -> (bool, dict): """ Apply the task to the model and evaluate the solution Parameters ---------- model: ConstraintBasedModel The model to apply the task to Returns ------- bool, dict: Whether the task was satisfied, and the expected activity of the mandatory reactions """ involved_reactions_in_model = len(self.involved_reactions - set(model.reaction_names)) == 0 task_evaluation = False if involved_reactions_in_model: with model as task_model: commands, _ = self.get_add_reaction_cmds(task_model) commands.execute_all(True) for k, v in self.get_task_bounds(): lb, ub = v task_model.set_reaction_bounds(k, lb=lb, ub=ub) task_evaluation, expected = self.evaluate_solution(task_model.optimize()) return task_evaluation & involved_reactions_in_model, expected
[docs] def evaluate_solution(self, sol: Solution, ftol: float = 1e-6) -> (bool, dict): """ Evaluate the solution to the task Parameters ---------- sol: Solution The solution to evaluate ftol: float The tolerance for the fluxes Returns ------- bool, dict: Whether the task was satisfied, and the expected activity of the mandatory reactions """ is_optimal = sol.status() == 'optimal' expected_activity = {k: abs(sol[k]) > ftol for k in self.mandatory_activity} # mandatory_are_valid = len([k for k,v in expected_activity.items() if v]) == len(self.mandatory_activity) if self.should_fail: task_eval = not is_optimal else: task_eval = is_optimal return task_eval, expected_activity
def __repr__(self): name = "Task '" + self.name desc = '' info = ' ; '.join([k + ': ' + str(len(getattr(self, k))) for k, t in self.__types__.items() if t in [dict, list] and len(getattr(self, k)) > 0]) fail = "'" + (' expecting failure' if self.should_fail else ' expecting success') + ":" if hasattr(self, 'annotations') and 'description' in self.annotations: desc = self.annotations['description'] return name + fail + info + ' -- ' + desc
[docs]class TaskEvaluator(object): """ A task evaluator is a wrapper around a model that allows the evaluation of tasks on the model. It can be used to evaluate a single task, or to evaluate a batch of tasks on a batch of models. Parameters ---------- model: ConstraintBasedModel The model to evaluate tasks on tasks: Iterable[Task] The tasks to evaluate on the model solver: str The solver to use for the model S: np.ndarray The stoichiometric matrix lb: np.ndarray The lower bounds for the reactions ub: np.ndarray The upper bounds for the reactions rxn: np.ndarray The reaction names mtn: np.ndarray The metabolite names """ def __init__(self, **kwargs): if 'solver' in kwargs: solver = kwargs['solver'] else: solver = None if 'model' in kwargs.keys(): model_obj = kwargs['model'] if isinstance(model_obj, ConstraintBasedModel): self.model = model_obj elif isinstance(model_obj, AbstractObjectReader): self.model = model_obj.to_cobamp_cbm(solver if solver is not None else True) else: self.model = get_model_reader(model_obj).to_cobamp_cbm(solver) else: if 'lb' in kwargs.keys(): S, lb, ub, rxn, mtn = [kwargs[k] for k in ['S', 'lb', 'ub', 'reaction_names', 'metabolite_names']] bounds = list(zip(lb, ub)) self.model = ConstraintBasedModel(S, bounds, rxn, mtn, True, solver) self.__tasks = {} self.__original_bounds = {k: v for k, v in zip(self.model.reaction_names, self.model.bounds)} self.__task_rxs = {} self.__activated_task = None if 'tasks' in kwargs: self.tasks = kwargs['tasks'] self.history = None @property def current_task(self): return self.__activated_task @current_task.setter def current_task(self, value: str): self.__disable_tasks(self.model) if value is not None: self.__enable_task(value, self.model) def __enable_task(self, name: str, model: ConstraintBasedModel): bounds = self.__tasks[name].get_task_bounds() for k, v in bounds.items(): lb, ub = v model.set_reaction_bounds(k, lb=lb, ub=ub) self.__activated_task = name def __disable_tasks(self, model: ConstraintBasedModel): for k, v in self.__original_bounds.items(): lb, ub = v model.set_reaction_bounds(k, lb=lb, ub=ub) for k, v in self.__task_rxs.items(): model.set_reaction_bounds(k, lb=0, ub=0) self.__activated_task = None def __apply(self, func): with self.model as amodel: return func(amodel)
[docs] def evaluate(self, context_function=None, flux_distribution_func=None) -> (bool, Solution): """ Evaluate the current task on the model Parameters ---------- context_function: function A function to apply to the model prior to evaluation flux_distribution_func: function A function to apply to the model to get the flux distribution Returns ------- bool, Solution: Whether the task was satisfied, and the solution to the model """ def apply_eval(model): return self.__inner_evaluate(model, context_function, flux_distribution_func) if context_function is None: return apply_eval(self.model) else: return self.__apply(apply_eval)
[docs] def batch_evaluate(self, bound_changes: dict, threads: int = MP_THREADS, output_sol: bool = False, mp_batch_size: int = 5000) -> dict: """ Evaluate a batch of tasks on the model Parameters ---------- bound_changes: dict The changes to apply to the model threads: int The number of threads to use output_sol: bool Whether to output the solution mp_batch_size: int The batch size for multiprocessing Returns ------- dict: The results of the evaluation """ self.current_task = None cobamp_model = self.model task_bounds = {k: v.get_task_bounds() for k, v in self.__tasks.items()} bound_change_runs = {} for k, tb in task_bounds.items(): for i, bc in enumerate(bound_changes): fd = {} fd.update(tb) fd.update(bc) bound_change_runs[(i, k)] = {cobamp_model.map_labels['reaction'][x]: y for x, y in fd.items()} objective_sense = [False] * len(bound_change_runs) objective_coef = [{0: 1}] * len(bound_change_runs) bc_names = list(bound_change_runs.keys()) bc_runs = [bound_change_runs[k] for k in bc_names] del bound_change_runs cobamp_model.initialize_optimizer() bopt = BatchOptimizer(linear_system=cobamp_model.model, threads=threads) res_dict = {i: {} for i in range(len(bound_changes))} ind = 0 while ind < len(bc_runs): sols = dict(zip(bc_names[ind:ind + mp_batch_size], bopt.batch_optimize(bc_runs[ind:ind + mp_batch_size], objective_coef[ind:ind + mp_batch_size], objective_sense[ind:ind + mp_batch_size]))) ind += mp_batch_size for k, sol in sols.items(): i, tn = k truth, expected = self.__tasks[tn].evaluate_solution(sol) res_dict[i][tn] = (truth, expected, sol if output_sol else None) return res_dict
[docs] @staticmethod def batch_function(task: Task, params: dict) -> (bool, Solution): """ Evaluate a task on a model Parameters ---------- task: Task The task to evaluate params: dict The parameters to use for evaluation Returns ------- bool, Solution: Whether the task was satisfied, and the solution to the model """ params['tev'].current_task = task cfunc, fdfunc = [params[k] if k in params else None for k in ['context_func', 'flux_distribution_func']] return params['tev'].evaluate(cfunc, fdfunc)
def __inner_evaluate(self, model: ConstraintBasedModel, context_func, flux_distribution_func) -> (bool, Solution): """ Evaluate the current task on the model Parameters ---------- model: ConstraintBasedModel The model to evaluate the task on context_func: function A function to apply to the model prior to evaluation flux_distribution_func: function A function to apply to the model to get the flux distribution Returns ------- bool, Solution: Whether the task was satisfied, and the solution to the model """ if self.__activated_task is not None: task_to_eval = self.__tasks[self.__activated_task] if context_func is not None: context_func(model) involved_reactions = task_to_eval.involved_reactions involved_reactions_in_model = len(involved_reactions - set(model.reaction_names)) == 0 if flux_distribution_func is not None: sol = flux_distribution_func(model) else: _, nflows = task_to_eval.get_add_reaction_args(model) self.model.set_objective({k: 1 for k in nflows}) sol = model.optimize() # evaluation, expected = task_to_eval.evaluate_solution(sol) if involved_reactions_in_model else (False, {}) evaluation, expected = task_to_eval.evaluate_solution(sol) if not involved_reactions_in_model: warnings.warn('Task ' + task_to_eval.name + ' has references to missing reactions and was evaluated as ' 'False by default') return evaluation, sol, expected else: warnings.warn('No task is currently active. A loaded task must be activated prior to evaluation using the ' 'current_task setter (.current_task = task_name') @property def tasks(self) -> Iterable[Task]: """ Get the tasks Returns ------- list: The tasks """ return [n for n in self.__tasks.keys()] @tasks.setter def tasks(self, value: Iterable[Task]): """ Set the tasks Parameters ---------- value: Iterable[Task] The tasks to set """ ## TODO: improve task setter with an efficient method for adding reactions at once for tn in self.tasks: self.__remove_task(tn) self.__tasks = {k.name: k for k in value} # for t in value: # self.__populate_task(t) self.__populate_tasks(value) def __remove_task(self, task_name: Task): """ Remove a task from the model Parameters ---------- task_name: str The name of the task to remove """ to_remove = [] for k, v in self.__task_rxs.items(): if task_name in v and len(v) <= 1: to_remove.append(k) v.remove(task_name) self.model.remove_reactions(to_remove) self.__task_rxs = {k: v for k, v in self.__task_rxs.items() if len(v) > 0} self.__tasks = {k: v for k, v in self.__tasks.items() if k != task_name} def __populate_task(self, task: Task): """ Populate a task Parameters ---------- task: Task The task to populate """ cmds, rxs = task.get_add_reaction_cmds(self.model, True) involved_reactions_in_model = len(task.involved_reactions - set(self.model.reaction_names)) == 0 cmds.execute_all(True) for k in rxs: if k not in self.__task_rxs.keys(): self.__task_rxs[k] = [task.name] else: self.__task_rxs[k].append(task.name) if not involved_reactions_in_model: warnings.warn( 'Task object with name ' + task.name + ' refers to reactions that are not present in the model. ' 'This task will be loaded but will never evaluate as True') def __populate_tasks(self, tasks: Iterable[Task]): """ Populate a set of tasks Parameters ---------- tasks: Iterable[Task] The tasks to populate """ k_names = ['args', 'bounds', 'names'] add_rx_args = {k: [] for k in k_names} for task in tasks: arg_list, rxs_to_add = task.get_add_reaction_args(self.model, True) for k in rxs_to_add: if k not in self.__task_rxs.keys(): self.__task_rxs[k] = [task.name] for kp, vp in zip(k_names, arg_list[k]): add_rx_args[kp].extend([vp]) else: self.__task_rxs[k].append(task.name) self.model.add_reactions(**add_rx_args)
if __name__ == '__main__': from numpy import array S = array([[1, -1, 0, 0, -1, 0, -1, 0, 0], [0, 1, -1, 0, 0, 0, 0, 0, 0], [0, 1, 0, 1, -1, 0, 0, 0, 0], [0, 0, 0, 0, 0, 1, -1, 0, 0], [0, 0, 0, 0, 0, 0, 1, -1, 0], [0, 0, 0, 0, 1, 0, 0, 1, -1]]) rx_names = ["R" + str(i) for i in range(1, 10)] mt_names = ["M" + str(i) for i in range(1, 7)] irrev = [0, 1, 2, 4, 5, 6, 7, 8] bounds = [(0 if i in irrev else -1000, 1000) for i in range(9)] lb, ub = list(zip(*bounds)) T = array([0] * S.shape[1]).reshape(1, S.shape[1]) T[0, 8] = -1 b = array([-1]).reshape(1, ) # tasks = TaskEvaluator(S, lb, ub, rx_names, mt_names) from cobamp.core.models import ConstraintBasedModel cbm = ConstraintBasedModel(S, list(zip(lb, ub)), rx_names, mt_names) task = Task( should_fail=False, inflow_dict={'glucose': [0, 5]}, outflow_dict={'etanol': [3, 5]}, # 'm1 => 2 m2' reaction_dict={'r1': ({'m1': -1, 'm2': 2}, (0, 1000))} ) from troppo.tasks.task_io import JSONTaskIO print(JSONTaskIO().write_to_string([task, task])) tasks = [Task( flow_dict={'M4': (-4, -4)}, should_fail=False, flux_constraints={'R2': (i, 10)}, name=str(i)) for i in range(11)] tev = TaskEvaluator(model=cbm, tasks=tasks) for task in tev.tasks: tev.current_task = task print(task, tev.evaluate())