Skip to content

Commit

Permalink
Update tds_conflation.py
Browse files Browse the repository at this point in the history
- minor cleanup of .py file
  • Loading branch information
joshuacroff committed Jul 17, 2024
1 parent 5fdc534 commit 61470ad
Showing 1 changed file with 41 additions and 60 deletions.
101 changes: 41 additions & 60 deletions Project-Documentation/TDS-Conflation/tds_conflation.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# %% [markdown]
# # Travel Diary Survey Conflation
#
# Travel Diary Survey Conflation
#
# Notebook for Exploratory Data Analysis (EDA) only. Final code will be refactored into a python utilities module and script to run the conflation process.

# %% [markdown]
# ## How to run this notebook
#
#
# 1. Install the required package dependencies using the [requirements.txt](requirements/requirements.txt) file.
# 2. Clone the [mappymatch github repository](https://github.com/BayAreaMetro/mappymatch), which has been forked and modified from the original repository.
# 3. Update the MAPPYMATCH_PATH below to point to the location of the cloned repository.

# %%
import os
import sys
import pandas as pd
Expand All @@ -28,17 +26,11 @@
from mappymatch.matchers.lcss.lcss import LCSSMatcher
from mappymatch.maps.nx.nx_map import NxMap, NetworkType

from mappymatch.utils.plot import plot_trace
from mappymatch.utils.plot import plot_map
from mappymatch.utils.plot import plot_matches
from mappymatch.utils.plot import plot_path

# %% [markdown]
# ## Define functions

# %%
# create a batch process function to create a list of traces


def create_batch_traces(df, trip_id_column, xy=True):
"""Create a batch of traces from a dataframe with xy coordinates
Expand Down Expand Up @@ -91,7 +83,7 @@ def create_batch_traces(df, trip_id_column, xy=True):
batch_traces.append(trace_dict)
return batch_traces

# %%

# create a function that takes a list of traces and batch processes them using the LCSS matcher


Expand Down Expand Up @@ -198,7 +190,7 @@ def batch_process_traces(traces, geofence_buffer=1000, network_type=NetworkType.

return matched_traces

# %%

def process_trace(trace_dict, geofence_buffer, network_type):
"""_summary_
Expand Down Expand Up @@ -271,7 +263,7 @@ def process_trace(trace_dict, geofence_buffer, network_type):

return trace_dict

# %%

def batch_process_traces_parallel(traces, geofence_buffer=1000, network_type=NetworkType.DRIVE):
"""_summary_
Expand All @@ -284,23 +276,27 @@ def batch_process_traces_parallel(traces, geofence_buffer=1000, network_type=Net
_type_: _description_
"""
import concurrent.futures

matched_traces = []
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
# Prepare future tasks
futures = [executor.submit(process_trace, trace, geofence_buffer, network_type) for trace in traces]
futures = [
executor.submit(process_trace, trace, geofence_buffer, network_type) for trace in traces
]
for future in concurrent.futures.as_completed(futures):
matched_traces.append(future.result())
return matched_traces

# %%

# create a function that takes a list of dictionaries with matched trace geodataframes, concatenates them, and returns a single geodataframe


def concatenate_matched_gdfs(matched_traces, match_type="matched_gdf"):
"""Concatenate matched trace geodataframes into a single geodataframe.
Args:
matched_traces (List): List of dictionaries with matched trace geodataframes.
match_type (String, optional): Type of match to concatenate. Defaults to "matched_gdf".
match_type (String, optional): Type of match to concatenate. Defaults to "matched_gdf".
Options are "matched_gdf", "matched_path_gdf", "trace_gdf".
Returns:
Expand All @@ -320,23 +316,26 @@ def concatenate_matched_gdfs(matched_traces, match_type="matched_gdf"):
# if values in the matched_gdf are lists, convert to strings
for col in matched_gdf.columns:
if matched_gdf[col].apply(lambda x: isinstance(x, list)).any():
matched_gdf[col] = matched_gdf[col].apply(lambda x: "; ".join(x) if isinstance(x, list) else x)
matched_gdf[col] = matched_gdf[col].apply(
lambda x: "; ".join(x) if isinstance(x, list) else x
)
return matched_gdf

# %%

# define a function to concatenate matched gdfs and write each match type to geopackage


def write_matched_gdfs(match_result, file_path):
"""Write traces matched with the LCSS matcher to a geopackage.
Args:
match_result (List): List of dictionaries with matched trace geodataframes.
file_path (String): path to the geopackage file.
file_path (String): path to the geopackage file.
"""
trace_gdf = concatenate_matched_gdfs(match_result, match_type="trace_gdf")
trace_line_gdf = concatenate_matched_gdfs(match_result, match_type="trace_line_gdf")
matched_gdf = concatenate_matched_gdfs(match_result, match_type="matched_gdf")
matched_path_gdf = concatenate_matched_gdfs(match_result, match_type="matched_path_gdf")
matched_path_gdf = concatenate_matched_gdfs(match_result, match_type="matched_path_gdf")

# convert matched_gdf and matched_path_gdf "road_id" column from RoadId data type to string
matched_gdf["road_id"] = matched_gdf["road_id"].astype(str)
Expand All @@ -346,37 +345,37 @@ def write_matched_gdfs(match_result, file_path):
trace_gdf.to_file(file_path, layer="trace_gdf", driver="GPKG")
trace_line_gdf.to_file(file_path, layer="trace_line_gdf", driver="GPKG")
matched_gdf.to_file(file_path, layer="matched_gdf", driver="GPKG")
matched_path_gdf.to_file(file_path, layer="matched_path_gdf", driver="GPKG")
matched_path_gdf.to_file(file_path, layer="matched_path_gdf", driver="GPKG")


# %% [markdown]
# ## Prepare the data

# %%
## Define file name
location_tbl = 'location.csv'
trip_tbl = 'trip.csv'
location_tbl = "location.csv"
trip_tbl = "trip.csv"

## Define Box System Root Directory
box_dir = os.path.join(
"/Users", user, "Library", "CloudStorage", "Box-Box"
)
box_dir = os.path.join("/Users", user, "Library", "CloudStorage", "Box-Box")

## Define BAUS directory on Box for .csv output files
file_dir = os.path.join(
box_dir, "Modeling and Surveys", "Surveys", "Travel Diary Survey",
"Biennial Travel Diary Survey", "Data",'2023', "Full Unweighted 2023 Dataset"
box_dir,
"Modeling and Surveys",
"Surveys",
"Travel Diary Survey",
"Biennial Travel Diary Survey",
"Data",
"2023",
"Full Unweighted 2023 Dataset",
)

location_path = os.path.join(file_dir, location_tbl)
trip_path = os.path.join(file_dir, trip_tbl)

# %%
# read location and trip
# read location and trip
location_df = pd.read_csv(location_path)
trip_df = pd.read_csv(trip_path)

# %%
# merge trips with locations
trip_locations = pd.merge(
location_df,
Expand All @@ -395,10 +394,8 @@ def write_matched_gdfs(match_result, file_path):
on="trip_id",
)

# %%
trip_locations.head()

# %%
# filter trips_locations to only include trips with mode 8 in mode_1 or mode_2 or mode_3 or mode_4 columns with origins and destinations in region
car_trips = trip_locations[
((trip_locations["mode_type"].isin([5, 6, 8, 9, 11]))) & (trip_locations["o_in_region"] == 1)
Expand All @@ -407,10 +404,8 @@ def write_matched_gdfs(match_result, file_path):

unique_trips = car_trips["trip_id"].unique()

# %%
unique_trips.shape[0]
print("Unique trip count " + str(unique_trips.shape[0]))

# %%
# # create batch traces from the test list
# test_list = [
# 2304076901001, #highway
Expand All @@ -421,37 +416,23 @@ def write_matched_gdfs(match_result, file_path):
# 2333407402037,
# 2333413601001, # issue with the trace - missing geometry
# ]
# select top 1000 trips from unique trip list
# select top 1000 trips from unique trip list
test_list = unique_trips[:20]
car_trips_test = car_trips[car_trips['trip_id'].isin(test_list)]
car_trips_test = car_trips[car_trips["trip_id"].isin(test_list)]
batch_traces_test = create_batch_traces(car_trips_test, trip_id_column="trip_id", xy=True)

# %%
# batch_traces = create_batch_traces(car_trips, trip_id_column="trip_id", xy=True)

# %% [markdown]
# ## Match using the LCSS matching algorithm

# %%
match_result_test = batch_process_traces_parallel(batch_traces_test, geofence_buffer=1000, network_type=NetworkType.DRIVE)
match_result_test = batch_process_traces_parallel(
batch_traces_test, geofence_buffer=1000, network_type=NetworkType.DRIVE
)

# %%
match_result = batch_process_traces(
traces=batch_traces_test, geofence_buffer=1000, network_type=NetworkType.DRIVE
)

# %%
# import cProfile
# cProfile.run('batch_process_traces(traces=batch_traces_test, geofence_buffer=1000, network_type=NetworkType.DRIVE)')

# %%
# write the matched gdfs to a geopackage
out_file_path = f"/Users/{user}/Library/CloudStorage/Box-Box/DataViz Projects/Spatial Analysis and Mapping/TDS Conflation/Data"
gpkg_path = os.path.join(out_file_path, "tds_conflation_results.gpkg")
write_matched_gdfs(match_result, gpkg_path)

# %%
# match_result = batch_process_traces(
# traces=batch_traces, geofence_buffer=1000, network_type=NetworkType.DRIVE
# )


0 comments on commit 61470ad

Please sign in to comment.