Skip to content

Commit

Permalink
Fix memory usage (#751)
Browse files Browse the repository at this point in the history
* clear cache of flow.tree

* categorical time period dtype

* add pydantic for tests

* use time_label_dtype only when available

* allow missing taz skim_dict

* recover tree when needed

* predigitized time periods

* pass sh_tree back again for tracing

* better error message
  • Loading branch information
jpn-- authored Nov 12, 2023
1 parent 180dcca commit 460203c
Show file tree
Hide file tree
Showing 12 changed files with 147 additions and 53 deletions.
2 changes: 1 addition & 1 deletion activitysim/abm/models/parking_location_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ def parking_location(
if "trip_period" not in trips_merged_df:
# TODO: resolve this to the skim time period index not the label, it will be faster
trips_merged_df["trip_period"] = network_los.skim_time_period_label(
trips_merged_df[proposed_trip_departure_period]
trips_merged_df[proposed_trip_departure_period], as_cat=True
)
model_settings["TRIP_DEPARTURE_PERIOD"] = "trip_period"

Expand Down
2 changes: 1 addition & 1 deletion activitysim/abm/models/trip_mode_choice.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def trip_mode_choice(
# setup skim keys
assert "trip_period" not in trips_merged
trips_merged["trip_period"] = network_los.skim_time_period_label(
trips_merged.depart
trips_merged.depart, as_cat=True
)

orig_col = "origin"
Expand Down
16 changes: 10 additions & 6 deletions activitysim/abm/models/util/logsums.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def compute_logsums(
# FIXME - are we ok with altering choosers (so caller doesn't have to set these)?
if (in_period_col is not None) and (out_period_col is not None):
choosers["in_period"] = network_los.skim_time_period_label(
choosers[in_period_col]
choosers[in_period_col], as_cat=True
)
choosers["out_period"] = network_los.skim_time_period_label(
choosers[out_period_col]
choosers[out_period_col], as_cat=True
)
elif ("in_period" not in choosers.columns) and (
"out_period" not in choosers.columns
Expand All @@ -92,17 +92,21 @@ def compute_logsums(
and tour_purpose in model_settings["OUT_PERIOD"]
):
choosers["in_period"] = network_los.skim_time_period_label(
model_settings["IN_PERIOD"][tour_purpose]
model_settings["IN_PERIOD"][tour_purpose],
as_cat=True,
broadcast_to=choosers.index,
)
choosers["out_period"] = network_los.skim_time_period_label(
model_settings["OUT_PERIOD"][tour_purpose]
model_settings["OUT_PERIOD"][tour_purpose],
as_cat=True,
broadcast_to=choosers.index,
)
else:
choosers["in_period"] = network_los.skim_time_period_label(
model_settings["IN_PERIOD"]
model_settings["IN_PERIOD"], as_cat=True, broadcast_to=choosers.index
)
choosers["out_period"] = network_los.skim_time_period_label(
model_settings["OUT_PERIOD"]
model_settings["OUT_PERIOD"], as_cat=True, broadcast_to=choosers.index
)
else:
logger.error("Choosers table already has columns 'in_period' and 'out_period'.")
Expand Down
8 changes: 6 additions & 2 deletions activitysim/abm/models/util/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,12 @@ def run_tour_mode_choice_simulate(
assert ("in_period" not in choosers) and ("out_period" not in choosers)
in_time = skims["in_time_col_name"]
out_time = skims["out_time_col_name"]
choosers["in_period"] = network_los.skim_time_period_label(choosers[in_time])
choosers["out_period"] = network_los.skim_time_period_label(choosers[out_time])
choosers["in_period"] = network_los.skim_time_period_label(
choosers[in_time], as_cat=True
)
choosers["out_period"] = network_los.skim_time_period_label(
choosers[out_time], as_cat=True
)

expressions.annotate_preprocessors(
state, choosers, locals_dict, skims, model_settings, trace_label
Expand Down
50 changes: 34 additions & 16 deletions activitysim/abm/models/util/vectorize_tour_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ def dedupe_alt_tdd(state: workflow.State, alt_tdd, tour_purpose, trace_label):

logger.info("tdd_alt_segments specified for representative logsums")

if tdd_segments is not None:
# apply categorical dtypes
tdd_segments["time_period"] = tdd_segments["time_period"].astype(
alt_tdd["out_period"].dtype
)

with chunk.chunk_log(
state, tracing.extend_trace_label(trace_label, "dedupe_alt_tdd")
) as chunk_sizer:
Expand Down Expand Up @@ -328,11 +334,12 @@ def compute_tour_scheduling_logsums(
assert "out_period" not in alt_tdd
assert "in_period" not in alt_tdd

# FIXME:MEMORY
# These two lines each generate a massive array of strings,
# using a bunch of RAM and slowing things down.
alt_tdd["out_period"] = network_los.skim_time_period_label(alt_tdd["start"])
alt_tdd["in_period"] = network_los.skim_time_period_label(alt_tdd["end"])
alt_tdd["out_period"] = network_los.skim_time_period_label(
alt_tdd["start"], as_cat=True
)
alt_tdd["in_period"] = network_los.skim_time_period_label(
alt_tdd["end"], as_cat=True
)

alt_tdd["duration"] = alt_tdd["end"] - alt_tdd["start"]

Expand Down Expand Up @@ -383,17 +390,28 @@ def compute_tour_scheduling_logsums(

# tracing.log_runtime(model_name=trace_label, start_time=t0)

# redupe - join the alt_tdd_period logsums to alt_tdd to get logsums for alt_tdd
logsums = (
pd.merge(
alt_tdd.reset_index(),
deduped_alt_tdds.reset_index(),
on=[index_name] + redupe_columns,
how="left",
)
.set_index(index_name)
.logsums
)
logsums = pd.Series(data=0, index=alt_tdd.index, dtype=np.float64)
left_on = [alt_tdd.index]
right_on = [deduped_alt_tdds.index]
for i in redupe_columns:
if (
alt_tdd[i].dtype == "category"
and alt_tdd[i].dtype.ordered
and alt_tdd[i].dtype == deduped_alt_tdds[i].dtype
):
left_on += [alt_tdd[i].cat.codes]
right_on += [deduped_alt_tdds[i].cat.codes]
else:
left_on += [alt_tdd[i].to_numpy()]
right_on += [deduped_alt_tdds[i].to_numpy()]

logsums.iloc[:] = pd.merge(
pd.DataFrame(index=alt_tdd.index),
deduped_alt_tdds.logsums,
left_on=left_on,
right_on=right_on,
how="left",
).logsums.to_numpy()
chunk_sizer.log_df(trace_label, "logsums", logsums)

del deduped_alt_tdds
Expand Down
12 changes: 8 additions & 4 deletions activitysim/abm/tables/landuse.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ def land_use(state: workflow.State):

sharrow_enabled = state.settings.sharrow
if sharrow_enabled:
err_msg = (
"a zero-based land_use index is required for sharrow,\n"
"try adding `recode_pipeline_columns: true` to your settings file."
)
# when using sharrow, the land use file must be organized (either in raw
# form or via recoding) so that the index is zero-based and contiguous
assert df.index.is_monotonic_increasing
assert df.index[0] == 0
assert df.index[-1] == len(df.index) - 1
assert df.index.dtype.kind == "i"
assert df.index.is_monotonic_increasing, err_msg
assert df.index[0] == 0, err_msg
assert df.index[-1] == len(df.index) - 1, err_msg
assert df.index.dtype.kind == "i", err_msg

# try to make life easy for everybody by keeping everything in canonical order
# but as long as coalesce_pipeline doesn't sort tables it coalesces, it might not stay in order
Expand Down
36 changes: 29 additions & 7 deletions activitysim/core/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ def skims_mapping(
parking_col_name=None,
zone_layer=None,
primary_origin_col_name=None,
predigitized_time_periods=False,
):
logger.info("loading skims_mapping")
logger.info(f"- orig_col_name: {orig_col_name}")
Expand Down Expand Up @@ -337,6 +338,10 @@ def skims_mapping(
),
)
else:
if predigitized_time_periods:
time_rel = "_code ->"
else:
time_rel = " @"
return dict(
# TODO:SHARROW: organize dimensions.
odt_skims=skim_dataset,
Expand All @@ -347,16 +352,16 @@ def skims_mapping(
relationships=(
f"df._orig_col_name -> odt_skims.{odim}",
f"df._dest_col_name -> odt_skims.{ddim}",
"df.out_period @ odt_skims.time_period",
f"df.out_period{time_rel} odt_skims.time_period",
f"df._dest_col_name -> dot_skims.{odim}",
f"df._orig_col_name -> dot_skims.{ddim}",
"df.in_period @ dot_skims.time_period",
f"df.in_period{time_rel} dot_skims.time_period",
f"df._orig_col_name -> odr_skims.{odim}",
f"df._dest_col_name -> odr_skims.{ddim}",
"df.in_period @ odr_skims.time_period",
f"df.in_period{time_rel} odr_skims.time_period",
f"df._dest_col_name -> dor_skims.{odim}",
f"df._orig_col_name -> dor_skims.{ddim}",
"df.out_period @ dor_skims.time_period",
f"df.out_period{time_rel} dor_skims.time_period",
f"df._orig_col_name -> od_skims.{odim}",
f"df._dest_col_name -> od_skims.{ddim}",
),
Expand Down Expand Up @@ -525,6 +530,15 @@ def new_flow(

cache_dir = state.filesystem.get_sharrow_cache_dir()
logger.debug(f"flow.cache_dir: {cache_dir}")
predigitized_time_periods = False
if "out_period" in choosers and "in_period" in choosers:
if (
choosers["out_period"].dtype == "category"
and choosers["in_period"].dtype == "category"
):
choosers["out_period_code"] = choosers["out_period"].cat.codes
choosers["in_period_code"] = choosers["in_period"].cat.codes
predigitized_time_periods = True
skims_mapping_ = skims_mapping(
state,
orig_col_name,
Expand All @@ -534,6 +548,7 @@ def new_flow(
parking_col_name=parking_col_name,
zone_layer=zone_layer,
primary_origin_col_name=primary_origin_col_name,
predigitized_time_periods=predigitized_time_periods,
)
if size_term_mapping is None:
size_term_mapping = {}
Expand Down Expand Up @@ -774,6 +789,9 @@ def apply_flow(
it ever again, but having a reference to it available later can be useful
in debugging and tracing. Flows are cached and reused anyway, so it is
generally not important to delete this at any point to free resources.
tree : sharrow.DataTree
The tree data used to compute the flow result. It is seperate from the
flow to prevent it from being cached with the flow.
"""
if sh is None:
return None, None
Expand All @@ -800,7 +818,7 @@ def apply_flow(
logger.error(f"error in apply_flow: {err!s}")
if required:
raise
return None, None
return None, None, None
else:
raise
with logtime(f"{flow.name}.load", trace_label or ""):
Expand All @@ -822,7 +840,9 @@ def apply_flow(
logger.error(f"error in apply_flow: {err!s}")
if required:
raise
return None, flow
tree = flow.tree
flow.tree = None
return None, flow, tree
raise
except Exception as err:
logger.error(f"error in apply_flow: {err!s}")
Expand All @@ -833,4 +853,6 @@ def apply_flow(
# Detecting compilation activity when in production mode is a bug
# that should be investigated.
tracing.timing_notes.add(f"compiled:{flow.name}")
return flow_result, flow
tree = flow.tree
flow.tree = None
return flow_result, flow, tree
13 changes: 8 additions & 5 deletions activitysim/core/interaction_simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def replace_in_index_level(mi, level, *repls):

timelogger.mark("sharrow preamble", True, logger, trace_label)

sh_util, sh_flow = apply_flow(
sh_util, sh_flow, sh_tree = apply_flow(
state,
spec_sh,
df,
Expand All @@ -187,10 +187,13 @@ def replace_in_index_level(mi, level, *repls):
index=df.index if extra_data is None else None,
)
chunk_sizer.log_df(trace_label, "sh_util", None) # hand off to caller
if sharrow_enabled != "test":
# if not testing sharrow, we are done with this object now.
del sh_util

timelogger.mark("sharrow flow", True, logger, trace_label)
else:
sh_util, sh_flow = None, None
sh_util, sh_flow, sh_tree = None, None, None
timelogger.mark("sharrow flow", False)

if (
Expand Down Expand Up @@ -404,9 +407,9 @@ def to_series(x):
if sh_flow is not None and trace_rows is not None and trace_rows.any():
assert type(trace_rows) == np.ndarray
sh_utility_fat = sh_flow.load_dataarray(
# sh_flow.tree.replace_datasets(
# df=df.iloc[trace_rows],
# ),
sh_tree.replace_datasets(
df=df.iloc[trace_rows],
),
dtype=np.float32,
)
sh_utility_fat = sh_utility_fat[trace_rows, :]
Expand Down
24 changes: 21 additions & 3 deletions activitysim/core/los.py
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,9 @@ def get_tappairs3d(self, otap, dtap, dim3, key):

return s.values

def skim_time_period_label(self, time_period, fillna=None):
def skim_time_period_label(
self, time_period, fillna=None, as_cat=False, broadcast_to=None
):
"""
convert time period times to skim time period labels (e.g. 9 -> 'AM')
Expand Down Expand Up @@ -873,6 +875,14 @@ def skim_time_period_label(self, time_period, fillna=None):
assert 0 == model_time_window_min % period_minutes
total_periods = model_time_window_min / period_minutes

try:
time_label_dtype = self.skim_dicts["taz"].time_label_dtype
except (KeyError, AttributeError):
# if the "taz" skim_dict is missing, or if using old SkimDict
# instead of SkimDataset, this labeling shortcut is unavailable.
time_label_dtype = str
as_cat = False

# FIXME - eventually test and use np version always?
if np.isscalar(time_period):
bin = (
Expand All @@ -888,6 +898,12 @@ def skim_time_period_label(self, time_period, fillna=None):
result = self.skim_time_periods["labels"].get(bin, default=default)
else:
result = self.skim_time_periods["labels"][bin]
if broadcast_to is not None:
result = pd.Series(
data=result,
index=broadcast_to,
dtype=time_label_dtype if as_cat else str,
)
else:
result = pd.cut(
time_period,
Expand All @@ -898,8 +914,10 @@ def skim_time_period_label(self, time_period, fillna=None):
if fillna is not None:
default = self.skim_time_periods["labels"][fillna]
result = result.fillna(default)
result = result.astype(str)

if as_cat:
result = result.astype(time_label_dtype)
else:
result = result.astype(str)
return result

def get_tazs(self, state):
Expand Down
6 changes: 3 additions & 3 deletions activitysim/core/simulate.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ def eval_utilities(
locals_dict.update(state.get_global_constants())
if locals_d is not None:
locals_dict.update(locals_d)
sh_util, sh_flow = apply_flow(
sh_util, sh_flow, sh_tree = apply_flow(
state,
spec_sh,
choosers,
Expand Down Expand Up @@ -652,7 +652,7 @@ def eval_utilities(
if sh_flow is not None:
try:
data_sh = sh_flow.load(
sh_flow.tree.replace_datasets(
sh_tree.replace_datasets(
df=choosers.iloc[offsets],
),
dtype=np.float32,
Expand Down Expand Up @@ -731,7 +731,7 @@ def eval_utilities(
)
print(f"{sh_util.shape=}")
print(misses)
_sh_flow_load = sh_flow.load()
_sh_flow_load = sh_flow.load(sh_tree)
print("possible problematic expressions:")
for expr_n, expr in enumerate(exprs):
closeness = np.isclose(
Expand Down
Loading

0 comments on commit 460203c

Please sign in to comment.