from cobamp.core.linear_systems import IrreversibleLinearSystem
from cobamp.algorithms.kshortest import KShortestEnumerator
from cobamp.core.models import ConstraintBasedModel
from numpy import zeros, array, where
from collections import Counter
[docs]class SubEFMGapfill(object):
"""
Solve the gap-filling problem using an Elementary Flux Mode enumeration algorithm.
Parameters
----------
template_model: dict
The template model for the gap-filling problem. Must contain {S, lb, ub} and possibly rx_names/met_names.
task_reactions
Contains the task reactions.
subset_forced_reactions
Contains the indexes of the forced reactions' subset.
iterative: bool
Flag to determine whether the k-shortest algorithm behaviour.
max_time: int
Maximum time for the algorithm to run.
max_threads: int
Maximum number of threads to be used.
big_m: bool
Flag to determine whether to use big-M constraints.
big_m_value: int
Value of the big-M constraints.
solver: str
Solver to be used by the IrreversibleLinearPatternSystem.
at_most_n_sols: int
Maximum number of solutions to be returned.
populate_max_size: int
Maximum size of the population.
"""
def __init__(self, template_model: dict, task_reactions, subset_forced_reactions, iterative=True,
max_time: int = 0, max_threads: int = 0, big_m: bool = False, big_m_value: int = 1000,
solver: str = None, at_most_n_sols: int = 1, populate_max_size: int = None):
# media: dict - must contain the following keys - consumed, non_consumed, produced.
# Values can be set as empty lists if not needed
if isinstance(template_model, dict):
S, lb, ub = [template_model[k] for k in ['S', 'lb', 'ub']]
self.cb_model = ConstraintBasedModel(S=S, thermodynamic_constraints=list(zip(lb, ub)))
elif isinstance(template_model, ConstraintBasedModel):
self.cb_model = template_model
self.cb_model.make_irreversible()
self.task_reactions = task_reactions
self.subset_forced_reactions = subset_forced_reactions
self.__iterative = iterative
self.__max_time = max_time
self.__max_threads = max_threads
self.__big_m = big_m
self.__big_m_value = big_m_value
self.__at_most_n_sols = at_most_n_sols
self.__popmaxsz = populate_max_size
self.enumerated_solutions = []
Si = self.cb_model.get_stoichiometric_matrix()
lbi, ubi = list(zip(*self.cb_model.bounds))
lsys = IrreversibleLinearSystem(
S=Si, lb=array(lbi), ub=array(ubi), solver=solver)
self.enumerator_obj = KShortestEnumerator(
linear_system=lsys,
m_value=self.__big_m_value,
force_big_m=self.__big_m,
n_threads=self.__max_threads,
max_time=self.__max_time,
)
self.__mask_len = Si.shape[1]
[docs] def gapfill(self, missing_set, forced=(), non_forced=()):
"""
Gapfill the model using the task reactions and the subset of forced reactions.
Parameters
----------
missing_set
Dictionary containing the indexes of the missing reactions.
forced:
Dictionary containing the indexes of the forced reactions.
non_forced:
Dictionary containing the indexes of the non-forced reactions.
Returns
-------
List of solutions.
"""
self.enumerator_obj.reset_enumerator_state()
dvmap = self.enumerator_obj.get_model().get_dvar_mapping()
final_subset = missing_set | set(self.subset_forced_reactions)
subset_mask = zeros(self.__mask_len)
subset_mask[list(final_subset)] = 1
# forced_on = task reactions
# forced_off = non task reactions
n_ivars = len(self.enumerator_obj.model.get_dvars())
non_forced_rxs = set(non_forced)
non_forced_rxs |= set([dvmap[i][1] for i in non_forced if isinstance(dvmap[i], (list, tuple))])
io_rxs = set(forced)
io_rxs |= set([dvmap[i][1] for i in io_rxs if isinstance(dvmap[i], (list, tuple))])
non_io_rxs = set(self.task_reactions)
non_io_rxs |= set([dvmap[i][1] for i in non_io_rxs if isinstance(dvmap[i], (list, tuple))])
non_io_rxs -= io_rxs
io_rxs_mask, non_io_rxs_mask = zeros(n_ivars, ), zeros(n_ivars, )
io_mask_idx = array(list([int(i) for i in (io_rxs - non_forced_rxs)]))
non_io_mask_idx = array(list([int(i) for i in (non_io_rxs - non_forced_rxs)]))
if len(io_mask_idx):
io_rxs_mask[io_mask_idx] = 1
if len(non_io_mask_idx):
non_io_rxs_mask[non_io_mask_idx] = 1
self.enumerator_obj.set_indicator_activity(forced_on=io_rxs_mask, forced_off=non_io_rxs_mask)
self.enumerator_obj.set_objective_expression(subset_mask)
if self.__iterative:
enumerator = self.enumerator_obj.solution_iterator()
else:
enumerator = self.enumerator_obj.population_iterator(self.__popmaxsz)
solutions = []
has_next = True
while has_next and (len(solutions) < self.__at_most_n_sols):
try:
sol_result = next(enumerator)
if isinstance(sol_result, (list, tuple)):
solutions.extend(sol_result)
else:
solutions.append(sol_result)
except StopIteration as e:
has_next = False
self.enumerated_solutions = solutions
return [sol.attribute_value(sol.SIGNED_INDICATOR_SUM) for sol in solutions]
[docs]class CombinatorialEFMGapfill():
def __init__(self, template_model, media): # tasks, media, min_acceptable_tasks=0.5):
self.template = template_model
# self.pass_fx = pass_fx
if isinstance(self.template, ConstraintBasedModel):
self.S = self.template.get_stoichiometric_matrix()
self.lb, self.ub = map(array, self.template.get_bounds_as_list())
elif isinstance(self.template, dict):
self.S, self.lb, self.ub = [array(self.template[k]) for k in ['S', 'lb', 'ub']]
self.cbmodel = ConstraintBasedModel(self.S, list(zip(self.lb, self.ub)))
# self.tasks = tasks
self.media = media
# if isinstance(min_acceptable_tasks, int):
# self.__min_tasks = min_acceptable_tasks
# elif isinstance(min_acceptable_tasks, float) and (0 < min_acceptable_tasks <= 1):
# self.__min_tasks = (min_acceptable_tasks // len(self.tasks)) if (len(self.tasks) > 0) else 0
# else:
# raise ValueError('Invalid parameter `min_acceptable_tasks`')
[docs] def combinatorial_gapfill(self, sample_models, model_approval_fx):
partials = self.generate_partial_models(sample_models)
i = len(partials)
gfr, success = {}, False
while i >= 2 and not success:
m1, m0 = partials[i - 2:i]
gfr, success = self.gapfill_partial_pair(m0, m1, model_approval_fx)
i -= 1
return gfr, success
[docs] def generate_partial_models(self, sample_models):
c = Counter()
for mod in sample_models:
c.update(mod)
partials = [set([k for k, v in c.items() if v >= n]) for n in range(len(sample_models) + 1)]
return partials
[docs] def prune_model(self, to_keep):
tkid = list(to_keep)
S, lb, ub = self.S.copy()[:, tkid], self.lb.copy()[tkid], self.ub.copy()[tkid]
metabs = where(~(S == 0).all(axis=1))[0]
S = S[metabs, :]
return S, lb, ub, metabs
[docs] def gapfill_partial_pair(self, m0, m1, model_approval_fx):
assert len(m0) <= len(m1), 'Model 1 has less reactions than Model 0!'
Sp, lbp, ubp, metabs = self.prune_model(m1)
mapback = dict(zip(range(len(m1)), list(m1)))
mapback_rev = dict(zip(list(m1), range(len(m1))))
metab_mapback = dict(zip(list(metabs), range(len(metabs))))
valid_problem = False
try:
pruned_media = {k: [metab_mapback[i] for i in v] for k, v in self.media.items()}
valid_problem = True
except:
print('Partial model pair is unsuitable for the selected medium')
if valid_problem:
print('Attempting to gapfill partial model with', len(m0), 'reactions using', len(m1), 'sized template.')
algorithm = SubEFMGapfill({'S': Sp, 'lb': lbp, 'ub': ubp}, subset_forced_reactions={},
media=pruned_media, big_m=True, big_m_value=1000)
gfs = algorithm.gapfill(missing_set={mapback_rev[i] for i in list(m1 - m0)})
if len(gfs) > 0:
print('Found a solution!')
gfrx = set([k for k, v in gfs[0].items() if abs(v) > 1e-9])
tentative_m = m0 | {mapback[g] for g in gfrx}
print('Original model has', len(m0), 'while the gapfilled solution has', len(tentative_m))
is_valid_model = model_approval_fx(tentative_m)
if is_valid_model:
return tentative_m, True
else:
return {}, False
else:
return {}, False
else:
return {}, False
[docs]def simulate_context(model, context, objective, sense, sol_approval_fx):
ko_count = 0
ndim = range(model.get_stoichiometric_matrix().shape[1])
for r in ndim:
if r not in context:
model.set_reaction_bounds(r, lb=0, ub=0, temporary=True)
ko_count += 1
try:
print('Sanity check:', ko_count, 'knockouts applied.', len(set(ndim) - context), 'found.')
real_ko_count = 0
for i in ndim:
lb, ub = model.get_reaction_bounds(i)
if abs(lb) < 1e-9 and abs(ub) < 1e-9:
real_ko_count += 1
print('Sanity check:', real_ko_count, 'knockouts found within the CB Model')
sol = model.optimize(coef_dict=objective, minimize=sense)
print('Simulating model with', len(context), 'reactions...', sol.objective_value(), sol.status(),
sol_approval_fx(sol))
return sol_approval_fx(sol)
except:
pass
finally:
model.revert_to_original_bounds()
if __name__ == '__main__':
from urllib.request import urlretrieve
from cobra.io import read_sbml_model
from cobra.core import Reaction, Metabolite
from cobamp.wrappers.cobra import COBRAModelObjectReader
from random import sample, randint
BIOMASS_RX_NAME = 'BIOMASS_Ecoli_core_w_GAM'
BIOMASS_TRANS_NAME = 'BIOMASSt'
BIOMASS_DRAIN_NAME = 'EX_biomass_e'
BIOMASS_CYT_NAME = 'biomass_c'
BIOMASS_EXT_NAME = 'biomass_e'
def get_ecoli_core_model():
model_url = 'http://bigg.ucsd.edu/static/models/e_coli_core.xml'
model_path, _ = urlretrieve(model_url)
ecoli_model = read_sbml_model(model_path)
b_c, b_e = (Metabolite(BIOMASS_CYT_NAME, name='biomass (cytosol)'),
Metabolite(BIOMASS_EXT_NAME, name='biomass (extracellular)'))
ecoli_model.reactions.get_by_id(BIOMASS_RX_NAME).add_metabolites({b_c: 1})
b_trans = Reaction(BIOMASS_TRANS_NAME, name='Biomass transport')
b_trans.add_metabolites({b_c: -1, b_e: 1})
b_drain = Reaction(BIOMASS_DRAIN_NAME, name='Biomass drain')
b_drain.add_metabolites({b_e: -1})
ecoli_model.add_reactions([b_trans, b_drain])
return ecoli_model
def random_knockouts(model_template, biomass_reactions_set, drains_set):
exclude = biomass_reactions_set | drains_set
choices = set(r.id for r in model_template.reactions) - set(exclude)
missing_reaction_set = set(sample(choices, k=randint(len(choices) // 4, (3 * len(choices)) // 4)))
return missing_reaction_set
def simulate_model_with_kos(model_template, kos):
with model_template as m:
for rx in kos:
m.reactions.get_by_id(rx).knock_out()
sol = m.optimize()
return sol
model = get_ecoli_core_model()
objreader = COBRAModelObjectReader(model)
# s_matrix = objreader.get_stoichiometric_matrix()
# lower_bounds, upper_bounds = objreader.get_model_bounds(separate_list=True)
# template_model = {'S': s_matrix, 'lb': lower_bounds, 'ub': upper_bounds}
template_model = objreader.to_cobamp_cbm('CPLEX')
media = {'produced': [template_model.metabolite_names.index(BIOMASS_EXT_NAME)], 'non_consumed': [], 'consumed': []}
drains = set([r.id for r in model.boundary])
biomass_reactions = {BIOMASS_RX_NAME, BIOMASS_TRANS_NAME, BIOMASS_DRAIN_NAME}
missing_set = random_knockouts(model, biomass_reactions, drains)
final_subset = missing_set | biomass_reactions
before_gapfill = model.optimize()
missing_reactions = random_knockouts(model, biomass_reactions, drains)
print(len(missing_reactions), 'total reactions missing.')
after_missing_reactions = simulate_model_with_kos(model, missing_reactions)
print('After removing selected KOs:', after_missing_reactions.status, after_missing_reactions.objective_value)
missing_reaction_ids = [i for i, r_id in enumerate(template_model.reaction_names) if r_id in missing_set]
biomass_reaction_ids = [template_model.reaction_names.index(k) for k in biomass_reactions]
drain_reaction_ids = [template_model.reaction_names.index(k) for k in drains]
gapfiller = SubEFMGapfill(template_model, biomass_reaction_ids, media, iterative=True, big_m=True, big_m_value=1000)
gapfill_reactions = gapfiller.gapfill(missing_set=set(missing_reaction_ids), media=set(drain_reaction_ids))
gapfill_rx_ids = set([template_model.reaction_names[i] for i, v in gapfill_reactions[0].items() if abs(v) > 1e-6])
added_rx = set(missing_reactions) & gapfill_rx_ids
print(len(added_rx), 'gapfilled reactions:', ','.join(added_rx))
after_gapfilling = simulate_model_with_kos(model, set(missing_reactions) - gapfill_rx_ids)
print('After gapfilling with EFM:', after_gapfilling.status, after_gapfilling.objective_value)
# any_sols = []
# sub_ko = None