From ee77f558124657981e44b9bd634d1e152f798b35 Mon Sep 17 00:00:00 2001 From: Jeff Doyle Date: Sat, 13 Oct 2018 00:00:40 -0400 Subject: [PATCH] passing tests --- .pylintrc | 16 ++ .travis.yml | 2 +- .../test/test_vectorize_tour_scheduling.py | 9 +- activitysim/abm/test/test_misc.py | 28 +- activitysim/abm/test/test_pipeline.py | 49 ++-- activitysim/core/inject.py | 21 ++ activitysim/core/orca/orca.py | 3 +- activitysim/core/pipeline.py | 10 +- activitysim/core/test/test_assign.py | 6 +- activitysim/core/test/test_inject_defaults.py | 12 +- activitysim/core/test/test_logit.py | 6 +- activitysim/core/test/test_pipeline.py | 37 ++- activitysim/core/test/test_tracing.py | 28 +- activitysim/core/tracing.py | 35 --- example/simulation.py | 20 +- example_mp/simulation.py | 13 +- example_mp/tasks.py | 269 ++++++++---------- setup.py | 6 +- 18 files changed, 269 insertions(+), 301 deletions(-) create mode 100644 .pylintrc diff --git a/.pylintrc b/.pylintrc new file mode 100644 index 000000000..cef093dda --- /dev/null +++ b/.pylintrc @@ -0,0 +1,16 @@ +[MESSAGES CONTROL] +disable=locally-disabled, + # False positive for type annotations with typing module + # invalid-sequence-index, + # False positive for OK test methods names and few other places + invalid-name, + # False positive for test file classes and methods + missing-docstring + +[REPORTS] +# Simplify pylint reports +reports=no + +[SIMILARITIES] +min-similarity-lines=10 +ignore-docstrings=yes diff --git a/.travis.yml b/.travis.yml index 415d9a5c7..114187ad2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -15,7 +15,7 @@ install: - | conda create -q -n test-environment python=$TRAVIS_PYTHON_VERSION cytoolz numpy pandas pip pytables pyyaml toolz psutil - source activate test-environment -- pip install orca openmatrix zbox +- pip install openmatrix zbox future - pip install pytest pytest-cov coveralls pycodestyle - pip install sphinx numpydoc sphinx_rtd_theme - pip install . diff --git a/activitysim/abm/models/util/test/test_vectorize_tour_scheduling.py b/activitysim/abm/models/util/test/test_vectorize_tour_scheduling.py index 18c051413..f3405f699 100644 --- a/activitysim/abm/models/util/test/test_vectorize_tour_scheduling.py +++ b/activitysim/abm/models/util/test/test_vectorize_tour_scheduling.py @@ -5,7 +5,6 @@ import pytest import pandas as pd import numpy as np -import orca import pandas.util.testing as pdt @@ -17,7 +16,7 @@ def test_vts(): - orca.add_injectable("settings", {}) + inject.add_injectable("settings", {}) # note: need 0 duration tour on one end of day to guarantee at least one available tour alts = pd.DataFrame({ @@ -25,7 +24,7 @@ def test_vts(): "end": [1, 4, 5, 6] }) alts['duration'] = alts.end - alts.start - orca.add_injectable("tdd_alts", alts) + inject.add_injectable("tdd_alts", alts) current_tour_person_ids = pd.Series(['b', 'c'], index=['d', 'e']) @@ -54,13 +53,13 @@ def test_vts(): persons = pd.DataFrame({ "income": [20, 30, 25] }, index=[1, 2, 3]) - orca.add_table('persons', persons) + inject.add_table('persons', persons) spec = pd.DataFrame({"Coefficient": [1.2]}, index=["income"]) spec.index.name = "Expression" - orca.add_injectable("check_for_variability", True) + inject.add_injectable("check_for_variability", True) tdd_choices = vectorize_tour_scheduling(tours, persons, alts, spec) diff --git a/activitysim/abm/test/test_misc.py b/activitysim/abm/test/test_misc.py index 573b3aa49..380b43a3b 100644 --- a/activitysim/abm/test/test_misc.py +++ b/activitysim/abm/test/test_misc.py @@ -5,46 +5,40 @@ import tempfile import numpy as np -import orca import pytest import yaml -# orca injectables complicate matters because the decorators are executed at module load time -# and since py.test collects modules and loads them at the start of a run -# if a test method does something that has a lasting side-effect, then that side effect -# will carry over not just to subsequent test functions, but to subsequently called modules -# for instance, columns added with add_column will remain attached to orca tables -# pytest-xdist allows us to run py.test with the --boxed option which runs every function -# with a brand new python interpreter +from activitysim.core import inject -# Also note that the following import statement has the side-effect of registering injectables: + +# The following import statement has the side-effect of registering injectables: from .. import __init__ def test_misc(): - orca.clear_cache() + inject.clear_cache() with pytest.raises(RuntimeError) as excinfo: - orca.get_injectable("configs_dir") + inject.get_injectable("configs_dir") assert "directory does not exist" in str(excinfo.value) with pytest.raises(RuntimeError) as excinfo: - orca.get_injectable("data_dir") + inject.get_injectable("data_dir") assert "directory does not exist" in str(excinfo.value) with pytest.raises(RuntimeError) as excinfo: - orca.get_injectable("output_dir") + inject.get_injectable("output_dir") assert "directory does not exist" in str(excinfo.value) configs_dir = os.path.join(os.path.dirname(__file__), 'configs_test_misc') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) - settings = orca.get_injectable("settings") + settings = inject.get_injectable("settings") assert isinstance(settings, dict) data_dir = os.path.join(os.path.dirname(__file__), 'data') - orca.add_injectable("data_dir", data_dir) + inject.add_injectable("data_dir", data_dir) # default values if not specified in settings - assert orca.get_injectable("chunk_size") == 0 + assert inject.get_injectable("chunk_size") == 0 diff --git a/activitysim/abm/test/test_pipeline.py b/activitysim/abm/test/test_pipeline.py index 64ecfe265..2da6c204b 100644 --- a/activitysim/abm/test/test_pipeline.py +++ b/activitysim/abm/test/test_pipeline.py @@ -6,7 +6,6 @@ import logging import numpy as np -import orca import pandas as pd import pandas.util.testing as pdt import pytest @@ -35,7 +34,7 @@ def teardown_function(func): - orca.clear_cache() + inject.clear_cache() inject.reinject_decorated_tables() @@ -64,7 +63,7 @@ def inject_settings(configs_dir, households_sample_size, chunk_size=None, if check_for_variability is not None: settings['check_for_variability'] = check_for_variability - orca.add_injectable("settings", settings) + inject.add_injectable("settings", settings) return settings @@ -72,17 +71,17 @@ def inject_settings(configs_dir, households_sample_size, chunk_size=None, def test_rng_access(): configs_dir = os.path.join(os.path.dirname(__file__), 'configs') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) data_dir = os.path.join(os.path.dirname(__file__), 'data') - orca.add_injectable("data_dir", data_dir) + inject.add_injectable("data_dir", data_dir) inject_settings(configs_dir, households_sample_size=HOUSEHOLDS_SAMPLE_SIZE) - orca.clear_cache() + inject.clear_cache() inject.add_injectable('rng_base_seed', 0) @@ -91,7 +90,7 @@ def test_rng_access(): rng = pipeline.get_rn_generator() pipeline.close_pipeline() - orca.clear_cache() + inject.clear_cache() def regress_mini_auto(): @@ -149,22 +148,20 @@ def regress_mini_mtf(): def test_mini_pipeline_run(): configs_dir = os.path.join(os.path.dirname(__file__), 'configs') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) data_dir = os.path.join(os.path.dirname(__file__), 'data') - orca.add_injectable("data_dir", data_dir) + inject.add_injectable("data_dir", data_dir) inject_settings(configs_dir, households_sample_size=HOUSEHOLDS_SAMPLE_SIZE) - orca.clear_cache() + inject.clear_cache() tracing.config_logger() - # assert len(orca.get_table("households").index) == HOUSEHOLDS_SAMPLE_SIZE - _MODELS = [ 'initialize_landuse', 'compute_accessibility', @@ -198,7 +195,7 @@ def test_mini_pipeline_run(): assert "not in checkpoints" in str(excinfo.value) pipeline.close_pipeline() - orca.clear_cache() + inject.clear_cache() close_handlers() @@ -210,17 +207,17 @@ def test_mini_pipeline_run2(): # when we restart pipeline configs_dir = os.path.join(os.path.dirname(__file__), 'configs') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) data_dir = os.path.join(os.path.dirname(__file__), 'data') - orca.add_injectable("data_dir", data_dir) + inject.add_injectable("data_dir", data_dir) inject_settings(configs_dir, households_sample_size=HOUSEHOLDS_SAMPLE_SIZE) - orca.clear_cache() + inject.clear_cache() # should be able to get this BEFORE pipeline is opened checkpoints_df = pipeline.get_checkpoints() @@ -249,7 +246,7 @@ def test_mini_pipeline_run2(): assert len(checkpoints_df.index) == prev_checkpoint_count pipeline.close_pipeline() - orca.clear_cache() + inject.clear_cache() def full_run(resume_after=None, chunk_size=0, @@ -257,13 +254,13 @@ def full_run(resume_after=None, chunk_size=0, trace_hh_id=None, trace_od=None, check_for_variability=None): configs_dir = os.path.join(os.path.dirname(__file__), '..', '..', '..', 'example', 'configs') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) data_dir = os.path.join(os.path.dirname(__file__), 'data') - orca.add_injectable("data_dir", data_dir) + inject.add_injectable("data_dir", data_dir) output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) settings = inject_settings( configs_dir, @@ -273,7 +270,7 @@ def full_run(resume_after=None, chunk_size=0, trace_od=trace_od, check_for_variability=check_for_variability) - orca.clear_cache() + inject.clear_cache() tracing.config_logger() @@ -284,10 +281,6 @@ def full_run(resume_after=None, chunk_size=0, tours = pipeline.get_table('tours') tour_count = len(tours.index) - # pipeline.close_pipeline() - # - # orca.clear_cache() - return tour_count diff --git a/activitysim/core/inject.py b/activitysim/core/inject.py index 3cfcbf56b..da31db36e 100644 --- a/activitysim/core/inject.py +++ b/activitysim/core/inject.py @@ -1,3 +1,6 @@ +# ActivitySim +# See full license in LICENSE.txt. + import logging import pandas as pd @@ -84,6 +87,7 @@ def add_table(table_name, table, cache=False): return orca.add_table(table_name, table, cache=cache) +#fixme remove? def add_column(table_name, column_name, column, cache=False): return orca.add_column(table_name, column_name, column, cache=cache) @@ -114,6 +118,13 @@ def get_injectable(name, default=_NO_DEFAULT): return default +def remove_injectable(name): + + #fixme + #del orca.orca._INJECTABLES[name] + orca.orca._INJECTABLES.pop(name, None) + + def reinject_decorated_tables(): """ reinject the decorated tables (and columns) @@ -141,6 +152,10 @@ def reinject_decorated_tables(): orca.add_injectable(name, args['func'], cache=args['cache']) +def clear_cache(): + return orca.clear_cache() + + def set_step_args(args=None): assert isinstance(args, dict) or args is None @@ -156,3 +171,9 @@ def get_step_arg(arg_name, default=_NO_DEFAULT): raise "step arg '%s' not found and no default" % arg_name return args.get(arg_name, default) + + +def dump_state(): + + print "_DECORATED_STEPS", _DECORATED_STEPS.keys() + print "orca._STEPS", orca.orca._STEPS.keys() diff --git a/activitysim/core/orca/orca.py b/activitysim/core/orca/orca.py index f5dc0c37b..20514c9a6 100644 --- a/activitysim/core/orca/orca.py +++ b/activitysim/core/orca/orca.py @@ -24,7 +24,7 @@ from collections import namedtuple warnings.filterwarnings('ignore', category=tables.NaturalNameWarning) -logger = logging.getLogger('orca') +logger = logging.getLogger(__name__) _TABLES = {} _COLUMNS = {} @@ -1978,7 +1978,6 @@ def run(steps, iter_vars=None, data_out=None, out_interval=1, 'running iteration {} with iteration value {!r}'.format( i, var)) - t1 = time.time() for j, step_name in enumerate(steps): add_injectable('iter_step', iter_step(j, step_name)) with log_start_finish( diff --git a/activitysim/core/pipeline.py b/activitysim/core/pipeline.py index 345bd2fad..abaf435c9 100644 --- a/activitysim/core/pipeline.py +++ b/activitysim/core/pipeline.py @@ -1,3 +1,6 @@ +# ActivitySim +# See full license in LICENSE.txt. + import os import datetime as dt @@ -224,6 +227,7 @@ def rewrap(table_name, df=None): for column_name in orca.list_columns_for_table(table_name): # logger.debug("pop %s.%s: %s" % (table_name, column_name, t.column_type(column_name))) + #fixme orca.orca._COLUMNS.pop((table_name, column_name), None) # remove from orca's table list @@ -501,7 +505,6 @@ def last_checkpoint(): name of last checkpoint """ - #fixme if not _PIPELINE.is_open: raise RuntimeError("Pipeline is not open!") @@ -551,10 +554,11 @@ def run(models, resume_after=None): if resume_after == '_': resume_after = _PIPELINE.last_checkpoint[CHECKPOINT_NAME] - logger.info("Setting resume_after to %s" % (resume_after, )) + + if resume_after: + logger.info('resume_after %s' % resume_after) if resume_after in models: models = models[models.index(resume_after) + 1:] - #bug # preload any bulky injectables (e.g. skims) not in pipeline if orca.is_injectable('preload_injectables'): diff --git a/activitysim/core/test/test_assign.py b/activitysim/core/test/test_assign.py index 27946f32b..f073bf236 100644 --- a/activitysim/core/test/test_assign.py +++ b/activitysim/core/test/test_assign.py @@ -12,8 +12,6 @@ import pandas.util.testing as pdt import pytest -import orca - from .. import assign from .. import tracing from .. import inject @@ -30,7 +28,7 @@ def close_handlers(): def teardown_function(func): - orca.clear_cache() + inject.clear_cache() inject.reinject_decorated_tables() @@ -159,7 +157,7 @@ def test_assign_variables_failing(capsys, data): close_handlers() output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) tracing.config_logger(basic=True) diff --git a/activitysim/core/test/test_inject_defaults.py b/activitysim/core/test/test_inject_defaults.py index 0d78aa725..c8bd73083 100644 --- a/activitysim/core/test/test_inject_defaults.py +++ b/activitysim/core/test/test_inject_defaults.py @@ -15,13 +15,13 @@ def teardown_function(func): - orca.clear_cache() + inject.clear_cache() inject.reinject_decorated_tables() def test_defaults(): - orca.clear_cache() + inject.clear_cache() with pytest.raises(RuntimeError) as excinfo: inject.get_injectable("configs_dir") @@ -37,13 +37,11 @@ def test_defaults(): assert "directory does not exist" in str(excinfo.value) configs_dir = os.path.join(os.path.dirname(__file__), 'configs_test_defaults') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) - settings = orca.get_injectable("settings") + settings = inject.get_injectable("settings") assert isinstance(settings, dict) data_dir = os.path.join(os.path.dirname(__file__), 'data') - orca.add_injectable("data_dir", data_dir) + inject.add_injectable("data_dir", data_dir) - # default values if not specified in settings - assert orca.get_injectable("chunk_size") == 0 diff --git a/activitysim/core/test/test_logit.py b/activitysim/core/test/test_logit.py index c9a5af3e7..c23d24acb 100644 --- a/activitysim/core/test/test_logit.py +++ b/activitysim/core/test/test_logit.py @@ -5,13 +5,13 @@ import numpy as np import pandas as pd -import orca import pandas.util.testing as pdt import pytest from ..simulate import eval_variables from .. import logit +from .. import inject @pytest.fixture(scope='module') @@ -22,10 +22,10 @@ def data_dir(): def add_canonical_dirs(): configs_dir = os.path.join(os.path.dirname(__file__), 'configs') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) # this is lifted straight from urbansim's test_mnl.py diff --git a/activitysim/core/test/test_pipeline.py b/activitysim/core/test/test_pipeline.py index 61670811b..a0274e38b 100644 --- a/activitysim/core/test/test_pipeline.py +++ b/activitysim/core/test/test_pipeline.py @@ -5,19 +5,16 @@ import tempfile import logging -import numpy as np -import orca -import pandas as pd import pandas.util.testing as pdt import pytest -import yaml - -import extensions from activitysim.core import tracing from activitysim.core import pipeline from activitysim.core import inject +#from . import extensions +from .extensions import steps + # set the max households for all tests (this is to limit memory use on travis) HOUSEHOLDS_SAMPLE_SIZE = 100 HH_ID = 961042 @@ -25,25 +22,25 @@ def setup(): - orca.orca._INJECTABLES.pop('skim_dict', None) - orca.orca._INJECTABLES.pop('skim_stack', None) + inject.remove_injectable('skim_dict') + inject.remove_injectable('skim_stack') configs_dir = os.path.join(os.path.dirname(__file__), 'configs') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) data_dir = os.path.join(os.path.dirname(__file__), 'data') - orca.add_injectable("data_dir", data_dir) + inject.add_injectable("data_dir", data_dir) - orca.clear_cache() + inject.clear_cache() tracing.config_logger() def teardown_function(func): - orca.clear_cache() + inject.clear_cache() inject.reinject_decorated_tables() @@ -61,6 +58,13 @@ def test_pipeline_run(): setup() + #fixme + inject.add_step('step1', steps.step1) + inject.add_step('step2', steps.step2) + inject.add_step('step3', steps.step3) + inject.add_step('step_add_col', steps.step_add_col) + inject.dump_state() + _MODELS = [ 'step1', 'step2', @@ -102,6 +106,13 @@ def test_pipeline_checkpoint_drop(): setup() + #fixme + inject.add_step('step1', steps.step1) + inject.add_step('step2', steps.step2) + inject.add_step('step3', steps.step3) + inject.add_step('step_add_col', steps.step_add_col) + inject.add_step('step_forget_tab', steps.step_forget_tab) + _MODELS = [ 'step1', '_step2', diff --git a/activitysim/core/test/test_tracing.py b/activitysim/core/test/test_tracing.py index 2fffca60a..285df8862 100644 --- a/activitysim/core/test/test_tracing.py +++ b/activitysim/core/test/test_tracing.py @@ -6,10 +6,10 @@ import pytest -import orca import pandas as pd -from .. import tracing as tracing +from .. import tracing +from .. import inject def close_handlers(): @@ -24,13 +24,13 @@ def close_handlers(): def add_canonical_dirs(): - orca.clear_cache() + inject.clear_cache() configs_dir = os.path.join(os.path.dirname(__file__), 'configs') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) def test_config_logger(capsys): @@ -98,8 +98,8 @@ def test_register_households(capsys): df = pd.DataFrame({'zort': ['a', 'b', 'c']}, index=[1, 2, 3]) - orca.add_injectable('traceable_tables', ['households']) - orca.add_injectable("trace_hh_id", 5) + inject.add_injectable('traceable_tables', ['households']) + inject.add_injectable("trace_hh_id", 5) tracing.register_traceable_table('households', df) out, err = capsys.readouterr() @@ -124,11 +124,11 @@ def test_register_tours(capsys): tracing.config_logger() - orca.add_injectable('traceable_tables', ['households', 'tours']) - orca.add_injectable('traceable_table_refs', None) + inject.add_injectable('traceable_tables', ['households', 'tours']) + inject.add_injectable('traceable_table_refs', None) # in case another test injected this - orca.add_injectable("trace_tours", []) + inject.add_injectable("trace_tours", []) tours_df = pd.DataFrame({'zort': ['a', 'b', 'c']}, index=[10, 11, 12]) tours_df.index.name = 'tour_id' @@ -140,7 +140,7 @@ def test_register_tours(capsys): assert "can't find a registered table to slice table 'tours' index name 'tour_id'" in out - orca.add_injectable("trace_hh_id", 3) + inject.add_injectable("trace_hh_id", 3) households_df = pd.DataFrame({'dzing': ['a', 'b', 'c']}, index=[1, 2, 3]) households_df.index.name = 'household_id' tracing.register_traceable_table('households', households_df) @@ -159,7 +159,7 @@ def test_register_tours(capsys): print out # don't consume output # should be tracing tour with tour_id 3 - assert orca.get_injectable('trace_tours') == [12] + assert inject.get_injectable('trace_tours') == [12] close_handlers() @@ -205,10 +205,10 @@ def test_basic(capsys): close_handlers() configs_dir = os.path.join(os.path.dirname(__file__), 'configs') - orca.add_injectable("configs_dir", configs_dir) + inject.add_injectable("configs_dir", configs_dir) output_dir = os.path.join(os.path.dirname(__file__), 'output') - orca.add_injectable("output_dir", output_dir) + inject.add_injectable("output_dir", output_dir) # remove existing handlers or basicConfig is a NOP logging.getLogger().handlers = [] diff --git a/activitysim/core/tracing.py b/activitysim/core/tracing.py index e8e76157a..bd11fc8a5 100644 --- a/activitysim/core/tracing.py +++ b/activitysim/core/tracing.py @@ -122,8 +122,6 @@ def config_logger(basic=False): if not basic: log_config_file = config.config_file_path(LOGGING_CONF_FILE_NAME, mandatory=False) - #print "log_config_file", log_config_file - if log_config_file: with open(log_config_file) as f: config_dict = yaml.load(f) @@ -452,37 +450,6 @@ def get_trace_target(df, slicer): return target_ids, column -def slice_canonically(df, slicer, label, warn_if_empty=False): - """ - Slice dataframe by traced household or person id dataframe and write to CSV - - Parameters - ---------- - df: pandas.DataFrame - dataframe to slice - slicer: str - name of column or index to use for slicing - label: str - tracer name - only used to report bad slicer - - Returns - ------- - sliced subset of dataframe - """ - - target_ids, column = get_trace_target(df, slicer) - - if target_ids is not None: - df = slice_ids(df, target_ids, column) - - if warn_if_empty and df.shape[0] == 0: - column_name = column or slicer - logger.warn("slice_canonically: no rows in %s with %s == %s" - % (label, column_name, target_ids)) - - return df - - def trace_targets(df, slicer=None): target_ids, column = get_trace_target(df, slicer) @@ -573,8 +540,6 @@ def trace_df(df, label, slicer=None, columns=None, Nothing """ - #df = slice_canonically(df, slicer, label, warn_if_empty) - target_ids, column = get_trace_target(df, slicer) if target_ids is not None: diff --git a/example/simulation.py b/example/simulation.py index b163fd1a7..89b5e8ae5 100644 --- a/example/simulation.py +++ b/example/simulation.py @@ -1,20 +1,12 @@ -import os -import logging -import time -from random import randint +# ActivitySim +# See full license in LICENSE.txt. -import numpy as np -import multiprocessing as mp +import logging -from activitysim.core import inject +from activitysim import abm from activitysim.core import tracing - -from activitysim.core.tracing import print_elapsed_time from activitysim.core.config import handle_standard_args from activitysim.core.config import setting - - -from activitysim import abm from activitysim.core import pipeline logger = logging.getLogger('activitysim') @@ -30,8 +22,6 @@ def run(): tracing.config_logger() tracing.delete_csv_files() - t0 = print_elapsed_time() - MODELS = setting('models') # If you provide a resume_after argument to pipeline.run @@ -47,8 +37,6 @@ def run(): # tables will no longer be available after pipeline is closed pipeline.close_pipeline() - t0 = print_elapsed_time("all models", t0) - if __name__ == '__main__': run() diff --git a/example_mp/simulation.py b/example_mp/simulation.py index fd41db0a4..3d8789cf8 100644 --- a/example_mp/simulation.py +++ b/example_mp/simulation.py @@ -1,7 +1,9 @@ -import os -import sys +# ActivitySim +# See full license in LICENSE.txt. + +from __future__ import print_function + import logging -import multiprocessing from activitysim.core import inject from activitysim.core import tracing @@ -46,10 +48,11 @@ def cleanup_output_files(): cleanup_output_files() run_list = tasks.get_run_list() - with open(config.output_file_path('run_list.txt'), 'w') as file: - tasks.print_run_list(run_list, file) + with open(config.output_file_path('run_list.txt'), 'w') as f: + tasks.print_run_list(run_list, f) # tasks.print_run_list(run_list) + # bug if run_list['multiprocess']: logger.info("run multiprocess simulation") diff --git a/example_mp/tasks.py b/example_mp/tasks.py index 1320ac643..b1ed1bd68 100644 --- a/example_mp/tasks.py +++ b/example_mp/tasks.py @@ -1,18 +1,20 @@ +# ActivitySim +# See full license in LICENSE.txt. from __future__ import print_function +from future.utils import iteritems + import sys import os import time import logging -import yaml - +import multiprocessing as mp from collections import OrderedDict -from collections import Iterable +import yaml import numpy as np import pandas as pd -import multiprocessing as mp from activitysim.core import inject from activitysim.core import tracing @@ -53,7 +55,7 @@ def pipeline_table_keys(pipeline_store, checkpoint_name=None): # hdf5 key is / # FIXME - pathologically knows the format used by pipeline.pipeline_table_key() checkpoint_tables = {table_name: table_name + '/' + checkpoint_name - for table_name, checkpoint_name in checkpoint_tables.iteritems()} + for table_name, checkpoint_name in iteritems(checkpoint_tables)} # checkpoint name and series mapping table name to hdf5 key for tables in that checkpoint return checkpoint_name, checkpoint_tables @@ -68,10 +70,10 @@ def build_slice_rules(slice_info, tables): if primary_slicer not in tables: raise RuntimeError("primary slice table '%s' not in pipeline" % primary_slicer) - logger.debug("build_slice_rules tables %s" % tables.keys()) - logger.debug("build_slice_rules primary_slicer %s" % primary_slicer) - logger.debug("build_slice_rules slicer_table_names %s" % slicer_table_names) - logger.debug("build_slice_rules slicer_table_exceptions %s" % slicer_table_exceptions) + logger.debug("build_slice_rules tables %s", tables.keys()) + logger.debug("build_slice_rules primary_slicer %s", primary_slicer) + logger.debug("build_slice_rules slicer_table_names %s", slicer_table_names) + logger.debug("build_slice_rules slicer_table_exceptions %s", slicer_table_exceptions) # dict mapping slicer table_name to index name # (also presumed to be name of ref col name in referencing table) @@ -79,7 +81,7 @@ def build_slice_rules(slice_info, tables): # build slice rules for loaded tables slice_rules = {} - for table_name, df in tables.iteritems(): + for table_name, df in iteritems(tables): rule = {} if table_name == primary_slicer: @@ -96,7 +98,7 @@ def build_slice_rules(slice_info, tables): # if df has a column with same name as the ref_col (index) of a slicer? try: source, ref_col = next((t, c) - for t, c in slicer_ref_cols.iteritems() + for t, c in iteritems(slicer_ref_cols) if c in df.columns) # then we can use that table to slice this df rule = {'slice_by': 'column', @@ -111,10 +113,8 @@ def build_slice_rules(slice_info, tables): slice_rules[table_name] = rule - #print("## rule %s: %s" % (table_name, rule)) - for table_name in slice_rules: - logger.debug("%s: %s" % (table_name, slice_rules[table_name])) + logger.debug("%s: %s", table_name, slice_rules[table_name]) return slice_rules @@ -128,7 +128,7 @@ def apportion_pipeline(sub_job_names, slice_info): # get last checkpoint from first job pipeline pipeline_path = config.build_output_file_path(pipeline_file_name) - logger.debug("apportion_pipeline pipeline_path: %s" % pipeline_path) + logger.debug("apportion_pipeline pipeline_path: %s", pipeline_path) # load all tables from pipeline with pd.HDFStore(pipeline_path, mode='r') as pipeline_store: @@ -144,13 +144,13 @@ def apportion_pipeline(sub_job_names, slice_info): raise RuntimeError("slicer table %s not found in pipeline" % table_name) # load all tables from pipeline - for table_name, hdf5_key in hdf5_keys.iteritems(): + for table_name, hdf5_key in iteritems(hdf5_keys): # new checkpoint for all tables the same checkpoints_df[table_name] = checkpoint_name # load the dataframe tables[table_name] = pipeline_store[hdf5_key] - logger.debug("loaded table %s %s" % (table_name, tables[table_name].shape)) + logger.debug("loaded table %s %s", table_name, tables[table_name].shape) # keep only the last row of checkpoints and patch the last checkpoint name checkpoints_df = checkpoints_df.tail(1).copy() @@ -169,14 +169,14 @@ def apportion_pipeline(sub_job_names, slice_info): # remove existing file try: os.unlink(pipeline_path) - except OSError as e: + except OSError: pass with pd.HDFStore(pipeline_path, mode='a') as pipeline_store: # remember sliced_tables so we can cascade slicing to other tables sliced_tables = {} - for table_name, rule in slice_rules.iteritems(): + for table_name, rule in iteritems(slice_rules): df = tables[table_name] @@ -203,12 +203,12 @@ def apportion_pipeline(sub_job_names, slice_info): hdf5_key = pipeline.pipeline_table_key(table_name, checkpoint_name) - logger.debug("writing %s (%s) to %s in %s" % - (table_name, sliced_tables[table_name].shape, hdf5_key, pipeline_path)) + logger.debug("writing %s (%s) to %s in %s", + table_name, sliced_tables[table_name].shape, hdf5_key, pipeline_path) pipeline_store[hdf5_key] = sliced_tables[table_name] - logger.debug("writing checkpoints (%s) to %s in %s" % - (checkpoints_df.shape, pipeline.CHECKPOINT_TABLE_NAME, pipeline_path)) + logger.debug("writing checkpoints (%s) to %s in %s", + checkpoints_df.shape, pipeline.CHECKPOINT_TABLE_NAME, pipeline_path) pipeline_store[pipeline.CHECKPOINT_TABLE_NAME] = checkpoints_df @@ -216,7 +216,7 @@ def coalesce_pipelines(sub_process_names, slice_info): pipeline_file_name = inject.get_injectable('pipeline_file_name') - logger.debug("coalesce_pipelines to: %s" % pipeline_file_name) + logger.debug("coalesce_pipelines to: %s", pipeline_file_name) # tables that are identical in every pipeline and so don't need to be concatenated @@ -230,39 +230,39 @@ def coalesce_pipelines(sub_process_names, slice_info): # hdf5_keys is a dict mapping table_name to pipeline hdf5_key checkpoint_name, hdf5_keys = pipeline_table_keys(pipeline_store) - for table_name, hdf5_key in hdf5_keys.iteritems(): - logger.debug("loading table %s %s" % (table_name, hdf5_key)) + for table_name, hdf5_key in iteritems(hdf5_keys): + logger.debug("loading table %s %s", table_name, hdf5_key) tables[table_name] = pipeline_store[hdf5_key] # use slice rules followed by apportion_pipeline to identify singleton tables slice_rules = build_slice_rules(slice_info, tables) - singleton_table_names = [t for t, rule in slice_rules.iteritems() if rule['slice_by'] is None] + singleton_table_names = [t for t, rule in iteritems(slice_rules) if rule['slice_by'] is None] singleton_tables = {t: tables[t] for t in singleton_table_names} - omnibus_keys = {t: k for t, k in hdf5_keys.iteritems() if t not in singleton_table_names} + omnibus_keys = {t: k for t, k in iteritems(hdf5_keys) if t not in singleton_table_names} - logger.debug("coalesce_pipelines to: %s" % pipeline_file_name) - logger.debug("singleton_table_names: %s" % singleton_table_names) - logger.debug("omnibus_keys: %s" % omnibus_keys) + logger.debug("coalesce_pipelines to: %s", pipeline_file_name) + logger.debug("singleton_table_names: %s", singleton_table_names) + logger.debug("omnibus_keys: %s", omnibus_keys) # concat omnibus tables from all sub_processes omnibus_tables = {table_name: [] for table_name in omnibus_keys} for process_name in sub_process_names: pipeline_path = config.build_output_file_path(pipeline_file_name, use_prefix=process_name) - logger.info("coalesce pipeline %s" % pipeline_path) + logger.info("coalesce pipeline %s", pipeline_path) with pd.HDFStore(pipeline_path, mode='r') as pipeline_store: - for table_name, hdf5_key in omnibus_keys.iteritems(): + for table_name, hdf5_key in iteritems(omnibus_keys): omnibus_tables[table_name].append(pipeline_store[hdf5_key]) pipeline.open_pipeline() for table_name in singleton_tables: df = singleton_tables[table_name] - logger.info("adding singleton table %s %s" % (table_name, df.shape)) + logger.info("adding singleton table %s %s", table_name, df.shape) pipeline.replace_table(table_name, df) for table_name in omnibus_tables: df = pd.concat(omnibus_tables[table_name], sort=False) - logger.info("adding omnibus table %s %s" % (table_name, df.shape)) + logger.info("adding omnibus table %s %s", table_name, df.shape) pipeline.replace_table(table_name, df) pipeline.add_checkpoint(checkpoint_name) @@ -298,7 +298,7 @@ def allocate_shared_skim_buffer(): def setup_injectables_and_logging(injectables): - for k, v in injectables.iteritems(): + for k, v in iteritems(injectables): inject.add_injectable(k, v) inject.add_injectable("is_sub_task", True) @@ -323,14 +323,14 @@ def mp_run_simulation(queue, injectables, step_info, resume_after, **kwargs): process_name = mp.current_process().name if num_processes > 1: pipeline_prefix = process_name - logger.info("injecting pipeline_file_prefix '%s'" % pipeline_prefix) + logger.info("injecting pipeline_file_prefix '%s'", pipeline_prefix) inject.add_injectable("pipeline_file_prefix", pipeline_prefix) setup_injectables_and_logging(injectables) - logger.info("mp_run_simulation %s num_processes %s" % (process_name, num_processes)) + logger.info("mp_run_simulation %s num_processes %s", process_name, num_processes) if resume_after: - logger.info('resume_after %s' % resume_after) + logger.info('resume_after %s', resume_after) inject.add_injectable('skim_buffer', skim_buffer) inject.add_injectable("chunk_size", chunk_size) @@ -339,48 +339,32 @@ def mp_run_simulation(queue, injectables, step_info, resume_after, **kwargs): # if they specified a resume_after model, check to make sure it is checkpointed if resume_after not in pipeline.get_checkpoints()[pipeline.CHECKPOINT_NAME].values: # if not checkpointed, then fall back to last checkpoint - logger.warn("resume_after checkpoint '%s' not in pipeline" % (resume_after, )) + logger.warn("resume_after checkpoint '%s' not in pipeline", resume_after) resume_after = '_' pipeline.open_pipeline(resume_after) last_checkpoint = pipeline.last_checkpoint() if last_checkpoint in models: - logger.info("Resuming model run list after %s" % (last_checkpoint, )) + logger.info("Resuming model run list after %s", last_checkpoint) models = models[models.index(last_checkpoint) + 1:] # preload any bulky injectables (e.g. skims) not in pipeline - t0 = tracing.print_elapsed_time() - preload_injectables = inject.get_injectable('preload_injectables', None) - if preload_injectables is not None: - t0 = tracing.print_elapsed_time('preload_injectables', t0) + inject.get_injectable('preload_injectables', None) t0 = tracing.print_elapsed_time() for model in models: t1 = tracing.print_elapsed_time() pipeline.run_model(model) + tracing.print_elapsed_time("run_model %s %s" % (step_label, model,), t1) queue.put({'model': model, 'time': time.time()-t1}) - t1 = tracing.print_elapsed_time("run_model %s %s" % (step_label, model,), t1) - - #logger.debug('#mem after %s, %s' % (model, util.memory_info())) - t0 = tracing.print_elapsed_time("run (%s models)" % len(models), t0) - ########################### - pipeline.close_pipeline() - #fixme - # try: - # run_simulation(models, resume_after) - # except Exception as e: - # print(e) - # logger.error("Error running simulation: %s" % (e,)) - # raise e - def mp_apportion_pipeline(injectables, sub_job_proc_names, slice_info): setup_injectables_and_logging(injectables) @@ -399,13 +383,13 @@ def mp_coalesce_pipelines(injectables, sub_job_proc_names, slice_info): def run_sub_task(p): - logger.info("running sub_process %s" % p.name) + logger.info("running sub_process %s", p.name) p.start() p.join() # logger.info('%s.exitcode = %s' % (p.name, p.exitcode)) if p.exitcode: - logger.error("Process %s returned exitcode %s" % (p.name, p.exitcode)) + logger.error("Process %s returned exitcode %s", p.name, p.exitcode) raise RuntimeError("Process %s returned exitcode %s" % (p.name, p.exitcode)) @@ -413,14 +397,13 @@ def run_sub_simulations(injectables, shared_skim_buffer, step_info, process_name step_name = step_info['name'] - logger.info('run_sub_simulations step %s models resume_after %s' % (step_name, resume_after)) + logger.info('run_sub_simulations step %s models resume_after %s', step_name, resume_after) # if not the first step, resume_after the last checkpoint from the previous step if resume_after is None and step_info['step_num'] > 0: resume_after = '_' num_simulations = len(process_names) - last_checkpoints = [None] * num_simulations procs = [] queues = [] @@ -432,32 +415,33 @@ def run_sub_simulations(injectables, shared_skim_buffer, step_info, process_name procs.append(p) queues.append(q) - def handle_queued_messages(): - for i, p, q in zip(range(num_simulations), procs, queues): - while not q.empty(): - msg = q.get(block=False) - model = msg['model'] - t = msg['time'] - logger.info("%s %s : %s" % (p.name, model, tracing.format_elapsed_time(t))) - if model[0] != '_': - last_checkpoints[i] = model - #update_journal(step_name, 'checkpoints', last_checkpoints) + def log_queued_messages(): + for i, process, queue in zip(range(num_simulations), procs, queues): + while not queue.empty(): + msg = queue.get(block=False) + logger.info("%s %s : %s", + process.name, + msg['model'], + tracing.format_elapsed_time(msg['time'])) + + def idle(seconds): + log_queued_messages() + for _ in range(seconds): + time.sleep(1) + log_queued_messages() stagger = 0 for p in procs: if stagger > 0: - logger.info("stagger process %s by %s seconds" % (p.name, step_info['stagger'])) - for i in range(stagger): - handle_queued_messages() - time.sleep(1) + logger.info("stagger process %s by %s seconds", p.name, stagger) + idle(stagger) stagger = step_info['stagger'] - logger.info("start process %s" % p.name) + logger.info("start process %s", p.name) p.start() while mp.active_children(): - handle_queued_messages() - time.sleep(1) - handle_queued_messages() + idle(1) + log_queued_messages() for p in procs: p.join() @@ -466,40 +450,31 @@ def handle_queued_messages(): error_count = 0 for p in procs: if p.exitcode: - logger.error("Process %s returned exitcode %s" % (p.name, p.exitcode)) + logger.error("Process %s returned exitcode %s", p.name, p.exitcode) error_count += 1 return error_count -def update_journal(step_name, key, value): - - run_status = inject.get_injectable('run_status', OrderedDict()) - if not run_status: - inject.add_injectable('run_status', run_status) - - run_status.setdefault(step_name, {'name': step_name})[key] = value - save_journal(run_status) - - def run_multiprocess(run_list, injectables): - #resume_after = run_list['resume_after'] resume_journal = run_list.get('resume_journal', {}) - - logger.info('setup shared skim data') - shared_skim_buffer = allocate_shared_skim_buffer() + run_status = OrderedDict() def skip(step_name, key): - already_did_this = resume_journal and resume_journal.get(step_name, {}).get(key, False) - if already_did_this: - logger.info("Skipping %s %s" % (step_name, key)) + logger.info("Skipping %s %s", step_name, key) time.sleep(1) - return already_did_this + def update_journal(step_name, key, value): + run_status.setdefault(step_name, {'name': step_name})[key] = value + save_journal(run_status) + + logger.info('setup shared skim data') + shared_skim_buffer = allocate_shared_skim_buffer() + # - mp_setup_skims run_sub_task( mp.Process(target=mp_setup_skims, name='mp_setup_skims', @@ -521,7 +496,7 @@ def skip(step_name, key): update_journal(step_name, 'sub_proc_names', sub_proc_names) - logger.info('running step %s with %s processes' % (step_name, num_processes,)) + logger.info('running step %s with %s processes', step_name, num_processes,) # - mp_apportion_pipeline if not skip(step_name, 'apportion'): @@ -562,8 +537,8 @@ def get_resume_journal(run_list): previous_journal = read_journal() if not previous_journal: - logger.error("empty journal for resume_after '%s'" % (resume_after,)) - raise RuntimeError("empty journal for resume_after '%s'" % (resume_after,)) + logger.error("empty journal for resume_after '%s'", resume_after) + raise RuntimeError("empty journal for resume_after '%s'" % resume_after) if resume_after == '_': resume_step_name = previous_journal.keys()[-1] @@ -576,14 +551,16 @@ def get_resume_journal(run_list): if resume_after in step['models']), None) if resume_step_name not in previous_steps: - logger.error("resume_after model '%s' not in journal" % (resume_after,)) - raise RuntimeError("resume_after model '%s' not in journal" % (resume_after,)) + logger.error("resume_after model '%s' not in journal", resume_after) + raise RuntimeError("resume_after model '%s' not in journal" % resume_after) # drop any previous_journal steps after resume_step for step in previous_steps[previous_steps.index(resume_step_name) + 1:]: del previous_journal[step] - multiprocess_step = next((step for step in run_list['multiprocess_steps'] if step['name']==resume_step_name), []) + multiprocess_step = next((step for step in run_list['multiprocess_steps'] + if step['name'] == resume_step_name), []) + print("resume_step_models", multiprocess_step['models']) if resume_after in multiprocess_step['models'][:-1]: @@ -624,7 +601,6 @@ def get_run_list(): if not models or not isinstance(models, list): raise RuntimeError('No models list in settings file') - if resume_after not in models + ['_', None]: raise RuntimeError("resume_after '%s' not in models list" % resume_after) if resume_after == models[-1]: @@ -637,8 +613,9 @@ def get_run_list(): multiprocess) # check step name, num_processes, chunk_size and presence of slice info + num_steps = len(multiprocess_steps) step_names = set() - for istep in range(len(multiprocess_steps)): + for istep in range(num_steps): step = multiprocess_steps[istep] step['step_num'] = istep @@ -662,15 +639,15 @@ def get_run_list(): if 'slice' in step: if num_processes == 0: - logger.info("Setting num_processes = %s for step %s" % - (num_processes, name)) + logger.info("Setting num_processes = %s for step %s", + num_processes, name) num_processes = mp.cpu_count() if num_processes == 1: raise RuntimeError("num_processes = 1 but found slice info for step %s" " in multiprocess_steps" % name) if num_processes > mp.cpu_count(): - logger.warn("num_processes setting (%s) greater than cpu count (%s" % - (num_processes, mp.cpu_count())) + logger.warn("num_processes setting (%s) greater than cpu count (%s", + num_processes, mp.cpu_count()) else: if num_processes == 0: num_processes = 1 @@ -695,9 +672,9 @@ def get_run_list(): multiprocess_steps[istep]['stagger'] = max(int(step.get('stagger', 0)), 0) # - determine index in models list of step starts - START = 'begin' + start_tag = 'begin' starts = [0] * len(multiprocess_steps) - for istep in range(len(multiprocess_steps)): + for istep in range(num_steps): step = multiprocess_steps[istep] name = step['name'] @@ -708,30 +685,30 @@ def get_run_list(): raise RuntimeError("missing tables list for step %s" " in multiprocess_steps" % istep) - start = step.get(START, None) + start = step.get(start_tag, None) if not name: raise RuntimeError("missing %s tag for step '%s' (%s)" " in multiprocess_steps" % - (START, name, istep)) + (start_tag, name, istep)) if start not in models: raise RuntimeError("%s tag '%s' for step '%s' (%s) not in models list" % - (START, start, name, istep)) + (start_tag, start, name, istep)) starts[istep] = models.index(start) if istep == 0 and starts[istep] != 0: raise RuntimeError("%s tag '%s' for first step '%s' (%s)" " is not first model in models list" % - (START, start, name, istep)) + (start_tag, start, name, istep)) if istep > 0 and starts[istep] <= starts[istep - 1]: raise RuntimeError("%s tag '%s' for step '%s' (%s)" " falls before that of prior step in models list" % - (START, start, name, istep)) + (start_tag, start, name, istep)) # - build step model lists starts.append(len(models)) # so last step gets remaining models in list - for istep in range(len(multiprocess_steps)): + for istep in range(num_steps): step_models = models[starts[istep]: starts[istep + 1]] if step_models[-1][0] == '_': @@ -742,6 +719,7 @@ def get_run_list(): run_list['multiprocess_steps'] = multiprocess_steps + # - add resume_journal if resume_after: resume_journal = get_resume_journal(run_list) if resume_journal: @@ -750,59 +728,59 @@ def get_run_list(): istep = len(resume_journal) - 1 multiprocess_steps[istep]['resume_after'] = resume_after - return run_list -def print_run_list(run_list, file=None): +def print_run_list(run_list, output_file=None): - if file is None: - file = sys.stdout + if output_file is None: + output_file = sys.stdout - print("resume_after:", run_list['resume_after'], file=file) - print("multiprocess:", run_list['multiprocess'], file=file) + print("resume_after:", run_list['resume_after'], file=output_file) + print("multiprocess:", run_list['multiprocess'], file=output_file) - print("models", file=file) + print("models", file=output_file) for m in run_list['models']: - print(" - ", m, file=file) + print(" - ", m, file=output_file) if run_list['multiprocess']: - print("\nmultiprocess_steps:", file=file) + print("\nmultiprocess_steps:", file=output_file) for step in run_list['multiprocess_steps']: - print(" step:", step['name'], file=file) + print(" step:", step['name'], file=output_file) for k in step: if isinstance(step[k], (list, )): - print(" ", k, file=file) + print(" ", k, file=output_file) for v in step[k]: - print(" -", v, file=file) + print(" -", v, file=output_file) else: - print(" %s: %s" % (k, step[k]), file=file) + print(" %s: %s" % (k, step[k]), file=output_file) if run_list.get('resume_journal'): - print("\nresume_journal:", file=file) - print_journal(run_list['resume_journal'], file) + print("\nresume_journal:", file=output_file) + print_journal(run_list['resume_journal'], output_file) else: - print("models", file=file) + print("models", file=output_file) for m in run_list['models']: - print(" - ", m, file=file) + print(" - ", m, file=output_file) -def print_journal(journal, file=None): - if file is None: - file = sys.stdout +def print_journal(journal, output_file=None): + + if output_file is None: + output_file = sys.stdout for step_name in journal: step = journal[step_name] - print(" step:", step_name, file=file) + print(" step:", step_name, file=output_file) for k in step: if isinstance(k, str): - print(" ", k, step[k], file=file) + print(" ", k, step[k], file=output_file) else: - print(" ", k, file=file) + print(" ", k, file=output_file) for v in step[k]: - print(" ", v, file=file) + print(" ", v, file=output_file) def journal_file_path(file_name=None): @@ -825,6 +803,7 @@ def save_journal(journal, file_name=None): journal = [step for step in journal.values()] yaml.dump(journal, f) + def is_sub_task(): return inject.get_injectable('is_sub_task', False) @@ -834,6 +813,7 @@ def if_sub_task(if_is, if_isnt): return if_is if is_sub_task() else if_isnt + def if_sub_task_opt(if_is, if_isnt): opt = { @@ -844,4 +824,3 @@ def if_sub_task_opt(if_is, if_isnt): 'NOTSET': logging.NOTSET, } return opt[if_is] if is_sub_task() else opt[if_isnt] - diff --git a/setup.py b/setup.py index d90ac1922..14a6187f8 100644 --- a/setup.py +++ b/setup.py @@ -24,12 +24,12 @@ install_requires=[ 'numpy >= 1.13.0', 'openmatrix >= 0.2.4', - #'orca >= 1.1', 'pandas >= 0.20.3', 'pyyaml >= 3.0', 'tables >= 3.3.0', - 'toolz >= 0.7', + 'toolz >= 0.8.1', 'zbox >= 1.2', - 'psutil >= 4.1' + 'psutil >= 4.1', + 'future >= 0.16.0' ] )