From 9c5d05760e998de1175b0087f2e58232f688503f Mon Sep 17 00:00:00 2001 From: Jeff Newman Date: Thu, 15 Feb 2024 11:51:02 -0600 Subject: [PATCH] remove code duplication --- activitysim/abm/models/util/cdap.py | 227 ---------------------------- 1 file changed, 227 deletions(-) diff --git a/activitysim/abm/models/util/cdap.py b/activitysim/abm/models/util/cdap.py index 6ed14d492..eab26bc7b 100644 --- a/activitysim/abm/models/util/cdap.py +++ b/activitysim/abm/models/util/cdap.py @@ -313,20 +313,6 @@ def get_cached_spec(state: workflow.State, hhsize): return None -def get_cached_joint_spec(hhsize): - - spec_name = cached_joint_spec_name(hhsize) - - spec = inject.get_injectable(spec_name, None) - if spec is not None: - logger.debug( - "build_cdap_joint_spec returning cached injectable spec %s", spec_name - ) - return spec - - return None - - def get_cached_joint_spec(state: workflow.State, hhsize): spec_name = cached_joint_spec_name(hhsize) @@ -353,18 +339,6 @@ def cache_joint_spec(state: workflow.State, hhsize, spec): state.add_injectable(spec_name, spec) -def cache_joint_spec(hhsize, spec): - spec_name = cached_joint_spec_name(hhsize) - # cache as injectable - inject.add_injectable(spec_name, spec) - - -def cache_joint_spec(hhsize, spec): - spec_name = cached_joint_spec_name(hhsize) - # cache as injectable - inject.add_injectable(spec_name, spec) - - def build_cdap_spec( state: workflow.State, interaction_coefficients, @@ -570,207 +544,6 @@ def build_cdap_spec( return spec -def build_cdap_joint_spec( - joint_tour_coefficients, hhsize, trace_spec=False, trace_label=None, cache=True -): - """ - Build a spec file for computing joint tour utilities of alternative household member for households of specified size. - We generate this spec automatically from a table of rules and coefficients because the - interaction rules are fairly simple and can be expressed compactly whereas - there is a lot of redundancy between the spec files for different household sizes, as well as - in the vectorized expression of the interaction alternatives within the spec file itself - joint_tour_coefficients has five columns: - label - label of the expression - description - description of the expression - dependency - if the expression is dependent on alternative, and which alternative is it dependent on - (e.g. M_px, N_px, H_px) - expression - expression of the utility term - coefficient - The coefficient to apply for the alternative - The generated spec will have the eval expression in the index, and a utility column for each - alternative (e.g. ['HH', 'HM', 'HN', 'MH', 'MM', 'MN', 'NH', 'NM', 'NN', 'MMJ', 'MNJ', 'NMJ', 'NNJ'] for hhsize 2 with joint alts) - Parameters - ---------- - joint_tour_coefficients : pandas.DataFrame - Rules and coefficients for generating joint tour specs for different household sizes - hhsize : int - household size for which the spec should be built. - Returns - ------- - spec: pandas.DataFrame - """ - - t0 = tracing.print_elapsed_time() - - # cdap joint spec is same for all households of MAX_HHSIZE and greater - hhsize = min(hhsize, MAX_HHSIZE) - - if cache: - spec = get_cached_joint_spec(hhsize) - if spec is not None: - return spec - - expression_name = "Expression" - - # generate a list of activity pattern alternatives for this hhsize - # e.g. ['HH', 'HM', 'HN', 'MH', 'MM', 'MN', 'NH', 'NM', 'NN'] for hhsize=2 - alternatives = ["".join(tup) for tup in itertools.product("HMN", repeat=hhsize)] - - joint_alternatives = [ - "".join(tup) + "J" - for tup in itertools.product("HMN", repeat=hhsize) - if tup.count("M") + tup.count("N") >= 2 - ] - alternatives = alternatives + joint_alternatives - - # spec df has expression column plus a column for each alternative - spec = pd.DataFrame(columns=[expression_name] + alternatives) - - # Before processing the interaction_coefficients, we add add rows to the spec to carry - # the alternative utilities previously computed for each individual into all hh alternative - # columns in which the individual assigned that alternative. The Expression column contains - # the name of the choosers column with that individuals utility for the individual alternative - # and the hh alternative columns that should receive that utility are given a value of 1 - # e.g. M_p1 is a column in choosers with the individual utility to person p1 of alternative M - # Expression MM MN MH NM NN NH HM HN HH - # M_p1 1.0 1.0 1.0 0.0 0.0 0.0 0.0 0.0 0.0 - # N_p1 0.0 0.0 0.0 1.0 1.0 1.0 0.0 0.0 0.0 - for pnum in range(1, hhsize + 1): - for activity in ["M", "N", "H"]: - - new_row_index = len(spec) - spec.loc[new_row_index, expression_name] = add_pn(activity, pnum) - - # list of alternative columns where person pnum has expression activity - # e.g. for M_p1 we want the columns where activity M is in position p1 - alternative_columns = [ - alt for alt in alternatives if alt[pnum - 1] == activity - ] - spec.loc[new_row_index, alternative_columns] = 1 - - # for each row in the joint util table - for row in joint_tour_coefficients.itertuples(): - - # if there is no dependencies - if row.dependency is np.nan: - expression = row.Expression - # add a new row to spec - new_row_index = len(spec) - spec.loc[new_row_index, expression_name] = expression - spec.loc[new_row_index, alternatives] = row.coefficient - # if there is dependencies - else: - dependency_name = row.dependency - expression = row.Expression - coefficient = row.coefficient - if dependency_name in ["M_px", "N_px", "H_px"]: - if "_pxprod" in expression: - prod_conds = row.Expression.split("|") - expanded_expressions = [ - tup - for tup in itertools.product( - range(len(prod_conds)), repeat=hhsize - ) - ] - for expression_tup in expanded_expressions: - expression_list = [] - dependency_list = [] - for counter in range(len(expression_tup)): - expression_list.append( - prod_conds[expression_tup[counter]].replace( - "xprod", str(counter + 1) - ) - ) - if expression_tup[counter] == 0: - dependency_list.append( - dependency_name.replace("x", str(counter + 1)) - ) - - expression_value = "&".join(expression_list) - dependency_value = pd.Series( - np.ones(len(alternatives)), index=alternatives - ) - if len(dependency_list) > 0: - for dependency in dependency_list: - # temp = spec.loc[spec[expression_name]==dependency, alternatives].squeeze().fillna(0) - dependency_value *= ( - spec.loc[ - spec[expression_name] == dependency, - alternatives, - ] - .squeeze() - .fillna(0) - ) - - # add a new row to spec - new_row_index = len(spec) - spec.loc[new_row_index] = dependency_value - spec.loc[new_row_index, expression_name] = expression_value - spec.loc[new_row_index, alternatives] = ( - spec.loc[new_row_index, alternatives] * coefficient - ) - - elif "_px" in expression: - for pnum in range(1, hhsize + 1): - dependency_name = row.dependency.replace("x", str(pnum)) - expression = row.Expression.replace("x", str(pnum)) - - # add a new row to spec - new_row_index = len(spec) - spec.loc[new_row_index] = spec.loc[ - spec[expression_name] == dependency_name - ].squeeze() - spec.loc[new_row_index, expression_name] = expression - spec.loc[new_row_index, alternatives] = ( - spec.loc[new_row_index, alternatives] * coefficient - ) - - # drop dependency rows - spec = spec[~spec[expression_name].str.startswith(("M_p", "N_p", "H_p"))] - - # eval expression goes in the index - spec.set_index(expression_name, inplace=True) - - for c in spec.columns: - spec[c] = spec[c].fillna(0) - - simulate.uniquify_spec_index(spec) - - # make non-joint alts 0 - for c in alternatives: - if c.endswith("J"): - continue - else: - spec[c] = 0 - - if trace_spec: - tracing.trace_df( - spec, - "%s.hhsize%d_joint_spec" % (trace_label, hhsize), - transpose=False, - slicer="NONE", - ) - - if trace_spec: - tracing.trace_df( - spec, - "%s.hhsize%d_joint_spec_patched" % (trace_label, hhsize), - transpose=False, - slicer="NONE", - ) - - if cache: - cache_joint_spec(hhsize, spec) - - t0 = tracing.print_elapsed_time("build_cdap_joint_spec hh_size %s" % hhsize, t0) - - return spec - - def build_cdap_joint_spec( state: workflow.State, joint_tour_coefficients,