Task performance evaluation

Troppo includes a module that allows for the evaluation of metabolic tasks performance. The class responsible for this is called TaskEvaluator. This class is a wrapper around a model that allows the evaluation of tasks on the model. It can be used to evaluate a single task, or to evaluate a batch of tasks on a batch of models.

To initialize a TaskEvaluator object, you need to pass a model object and the tasks to evaluate. The tasks should be instances of the Task class, which is a simple data structure that contains the following fields:

  • reaction_dict: a dictionary with the reaction identifiers as keys and a dictionary with the metabolites and their respective stoichiometry as values. (eg. rxd = {‘r1’:({‘m1’:-1, ‘m2’:2}, (lb, ub)), … })

  • inflow_dict: a dictionary with the metabolite identifiers as keys and the inflow rate as values. (eg. inflow = {‘m1’:(1, 1), … })

  • outflow_dict: a dictionary with the metabolite identifiers as keys and the outflow rate as values. (eg. outflow = {‘m5’:(5, 5), … })

Setup the model and the tasks

#imports
from troppo.tasks.core import TaskEvaluator
from troppo.tasks.task_io import JSONTaskIO
import pandas as pd
from json import JSONEncoder, JSONDecoder
from cobamp.utilities.parallel import batch_run

from numpy import log
import re

#load the model
task_model = read_sbml_model('data/Recon3D_301_consistent.xml')

#load the reconstruction results as a dictionary
fastcore_res_dict = pd.read_csv('data/r3d_compact_ccle_bc_fastcore.csv',
                                index_col=[0, 1]).T.to_dict()

#get only first sample
sample = list(fastcore_res_dict.keys())[0]
fastcore_res_dict = {sample: fastcore_res_dict[sample]}

TASKS_PATH = 'data/nl2019_tasks_r3d_compact.json'

# parse tasks from a previously existing JSON
# the supplied file contains tasks adapted from the publication of Richelle et. al, 2019
task_list = [t for t in JSONTaskIO().read_task(TASKS_PATH) if len((set(t.inflow_dict) |
             set(t.outflow_dict)) - set([m.id for m in task_model.metabolites])) == 0]

for task in task_list:
    task.inflow_dict = {k: v if k not in task.outflow_dict.keys() else [-1000, 1000] for k, v in
                        task.inflow_dict.items()}
    task.outflow_dict = {k: v for k, v in task.outflow_dict.items()
                         if k not in task.inflow_dict.items()}
for task in task_list:
    task.mandatory_activity = []

# tasks should be evaluated without open boundary reactions. We can easily close them on
# the COBRA model
for k in task_model.boundary:
    k.knock_out()

# get the names of all reactions in the model - this will be useful further on
all_reactions = set([r.id for r in task_model.reactions])

Evaluate a set of tasks

# create a structure to hold all of these results - a dictionary
task_eval_results = {}

# for each k (tuple with algorithm and sample information) and result (dict with reaction presences)
for k, result in fastcore_res_dict.items():
    # using with statements to change the COBRA model temporarily
    # this is done to knock-out reaction not appearing the FASTCORE result
    with task_model as context_specific_model:
        # get reactions included in the sample-specific model
        protected = set([k for k, v in result.items() if v])
        # get reactions except the protected ones
        to_remove = all_reactions - protected
        for rid in to_remove:
            # knock-out reactions not in the model
            context_specific_model.reactions.get_by_id(rid).knock_out()

        # create a task evaluator instance with the context specific model and the supplied
        # task list and solver
        task_eval = TaskEvaluator(model=context_specific_model, tasks=task_list, solver='CPLEX')

        # get task names (for future reference)
        task_names = task_eval.tasks

        # use the batch_function from the TaskEvaluator class (takes the name of a loaded task,
        # a params dictionary with the task evaluator associated to the 'tev' key) and set the
        # amount of threads to be used
        batch_res_tasks = batch_run(TaskEvaluator.batch_function, task_names,
                                    {'tev': task_eval}, threads=1)
    # each element in the list of results in batch_res_tasks is a tuple of length 3:
    # 0 - boolean flag representing the task evaluation
    # 1 - Solution instance used to evaluate the task
    # 2 - A dictionary with reactions supposed to be active mapped to True/False
    # according to that criterion

    # keep only items 0 and 2 of the task result - we don't need the flux distribution
    task_csm_res = {k: (v[0], v[2]) for k, v in dict(zip(task_names, batch_res_tasks)).items()}
    print(k, len(protected), len([v for k, v in task_csm_res.items() if v[0]]), 'tasks completed.')
    # assign this dictionary to it's sample on the master results dictionary
    task_eval_results[k] = task_csm_res

Save the results

# save these results for later analysis as a JSON file
with open('data/r3d_compact_task_results_ccle_bc_new_nodrains_only_feas.json', 'w') as f:
    f.write(JSONEncoder().encode([(k, v) for k, v in task_eval_results.items()]))