Skip to content

Commit

Permalink
Add more parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
UsualSpec committed Jul 12, 2024
1 parent 0574729 commit 5dd6a13
Showing 1 changed file with 113 additions and 113 deletions.
226 changes: 113 additions & 113 deletions server/plotter/pages/dut.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,123 +273,123 @@ def showMsmtButton(start, end, mmtids, mmtBin):
)
def showMsmtPlot(binTime, startDate, endDate, msmt_ids, n_clicks):
if n_clicks > 0:
with createDb() as pgcon:
# Using psycopg2 may give a warning for not being supported (as of May 2024 pandas doesn't officially support psycopg2) even though it works: https://github.com/pandas-dev/pandas/issues/45660
allRecentMsmts = pd.read_sql_query(sql="SELECT measurement_value_out AS measurement_value, measurement_timestamp_out::timestamp AS measurement_timestamp FROM getMmtAggregateOfDut (%(mmtids)s, %(bintime)s, %(startts)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=pgcon)
# Using psycopg2 may give a warning for not being supported (as of May 2024 pandas doesn't officially support psycopg2) even though it works: https://github.com/pandas-dev/pandas/issues/45660

def buildTimeseriesPlot():
# compose the graph together
# first add the aggregated measurements
allRecentMsmts = pd.read_sql_query(sql="SELECT measurement_value_out AS measurement_value, measurement_timestamp_out::timestamp AS measurement_timestamp FROM getMmtAggregateOfDut (%(mmtids)s, %(bintime)s, %(startts)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb())
allRecentMsmts["measurement_value"] = allRecentMsmts["measurement_value"].div(1000) # convert mW into W
plt = go.Figure(layout_title_text="Aggregated measurements")
plt.add_trace(go.Scatter(x=allRecentMsmts["measurement_timestamp"], y=allRecentMsmts["measurement_value"], name="Summed measurement"))
if True: # add the split measurements
def getMsmtVals(msmt_id):
msmtDf = pd.read_sql_query(sql="EXECUTE getMmtPtsBinned (%(bintime)s, %(srvmsmtid)s, %(startts)s,%(startts)s, %(endts)s)", params={"bintime": binTime, "srvmsmtid": msmt_id, "startts": startDate, "endts": endDate}, con=createDb())
msmtDf["measurement_value"] = msmtDf["measurement_value"].div(1000) # convert mW to W
return go.Scatter(x=msmtDf["measurement_timestamp"], y=msmtDf["measurement_value"], name="Measurement id " + str(msmt_id))
with ThreadPoolExecutor(max_workers=3) as binExec:
allMsmtsForThisId = list(binExec.map(getMsmtVals, msmt_ids))
plt.add_traces(allMsmtsForThisId)

# Add x axis and y axis titles
plt.update_layout(xaxis_title="Time", yaxis_title="Measured power [W]")
return plt, allRecentMsmts

def buildHourlyPlot():
# copy the dataframe
hourlyMsmts = pd.read_sql_query(sql="SELECT measurement_value, measurement_time FROM getMsmtsByHour (%(mmtids)s, %(bintime)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb())
# and update the date to only show the minutes
hourlyMsmts['measurement_value'] = hourlyMsmts['measurement_value'].div(1000)
# naively map the calculate the time to one day close to the start of the unix epoch. This seems like a hack. Alternatively follow something like:
# seconds_per_day = 24*60*60
# bin_size = 5*60 # 5-minute bins
# time_s = ((pd.to_datetime(df.index).astype(int) / 10**9)%seconds_per_day/bin_size).astype(int)
# df['bin-time'] = pd.to_datetime(time_s*bin_size, unit='s')

hourlyMsmts['measurement_time'] = hourlyMsmts['measurement_time'].map(lambda t: datetime(1970, 1, 5) + (datetime.combine(date.min, t) - datetime.min))
hourlyMsmts.sort_values(by="measurement_time", inplace=True)
# calculate median
medianHourlyMsmts = hourlyMsmts.groupby(by=["measurement_time"], as_index=False).median()
# now calculate the hourly plot

# create opacity slider: https://stackoverflow.com/a/74317122
opacitySlider = {
'active': 50,
'currentvalue': {'prefix': 'Opacity: '},
'steps': [{
'value': step / 100,
'label': f'{step}%',
'visible': True,
'execute': True,
'method': 'restyle',
'args': [{'opacity': step / 100}, [0]]
} for step in range(101)]
}

hourlyPlot = go.Figure(layout_title_text="Daily power")
hourlyPlot.add_trace(
go.Scatter(x=hourlyMsmts["measurement_time"], y=hourlyMsmts["measurement_value"], mode='markers', name="Averaged datapoints")
)
hourlyPlot.add_trace(
go.Scatter(x=medianHourlyMsmts["measurement_time"], y=medianHourlyMsmts["measurement_value"], name="Median")
)
hourlyPlot.update_layout(xaxis_title="Time of day", yaxis_title="Measured power [W]", sliders=[opacitySlider])
# TODO: This feels wrong: this won't work for higher sampling rates (https://gitlab.ethz.ch/nsg/students/projects/2024/ba-2024_auto-power/-/issues/23)
hourlyPlot.update_xaxes(tickformat="%H:%M")
hourlyPlot.update_traces(marker=dict(opacity=0.5,), selector=dict(mode='markers'))
return hourlyPlot
def buildDailyPlot():
# daily plot
dailyMsmts = pd.read_sql_query(sql="SELECT measurement_value, measurement_dow, measurement_time, measurement_timestamp FROM getMsmtsByDow (%(mmtids)s, %(bintime)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb())

dailyMsmts = dailyMsmts.apply(
(lambda row:
pd.Series(
[row["measurement_value"] / 1000, (datetime(1970, 1, 4 + int(row["measurement_dow"])) + (datetime.combine(date.min, row["measurement_time"]) - datetime.min))],
index=["measurement_value", "measurement_reduced_ts"]
)
), axis="columns")
dailyMsmts.sort_values(by="measurement_reduced_ts", inplace=True)
# calculate median
medianDailyMsmts = dailyMsmts.groupby("measurement_reduced_ts", as_index=False).median()
dailyPlot = go.Figure(layout_title_text="Weekly power")
dailyPlot.add_trace(
go.Scatter(x=dailyMsmts["measurement_reduced_ts"], y=dailyMsmts["measurement_value"], mode='markers', name="Averaged datapoints")
)
dailyPlot.add_trace(
go.Scatter(x=medianDailyMsmts["measurement_reduced_ts"], y=medianDailyMsmts["measurement_value"], name="Median")
)


# create opacity slider: https://stackoverflow.com/a/74317122
opacitySlider = {
'active': 50,
'currentvalue': {'prefix': 'Opacity: '},
'steps': [{
'value': step / 100,
'label': f'{step}%',
'visible': True,
'execute': True,
'method': 'restyle',
'args': [{'opacity': step / 100}, [0]]
} for step in range(101)]
}

dailyPlot.update_layout(xaxis_title="Weekday", yaxis_title="Measured power [W]", sliders=[opacitySlider])
dailyPlot.update_xaxes(tickformat="%A, %H:%M")
dailyPlot.update_traces(marker=dict(opacity=0.5,), selector=dict(mode='markers'))
return dailyPlot

with ThreadPoolExecutor(max_workers=3) as executor:
# Parallelize building plots to speed up
tsPlotFuture = executor.submit(buildTimeseriesPlot)
hourlyPlotFuture = executor.submit(buildHourlyPlot)
dailyPlotFuture = executor.submit(buildDailyPlot)
timeseriesPlot, allRecentMsmts = tsPlotFuture.result()
if allRecentMsmts.empty:
return "Didn't get any data. Please check the timeframe and id.", [], True, 0, "", "", {"component_name": "dutPowergraph", "is_loading": False}
else:
# compose the graph together
# first add the aggregated measurements
def buildTimeseriesPlot():
plt = go.Figure(layout_title_text="Aggregated measurements")
plt.add_trace(go.Scatter(x=allRecentMsmts["measurement_timestamp"], y=allRecentMsmts["measurement_value"], name="Summed measurement"))
if True: # add the split measurements
def getMsmtVals(msmt_id):
msmtDf = pd.read_sql_query(sql="EXECUTE getMmtPtsBinned (%(bintime)s, %(srvmsmtid)s, %(startts)s,%(startts)s, %(endts)s)", params={"bintime": binTime, "srvmsmtid": msmt_id, "startts": startDate, "endts": endDate}, con=createDb())
msmtDf["measurement_value"] = msmtDf["measurement_value"].div(1000) # convert mW to W
return go.Scatter(x=msmtDf["measurement_timestamp"], y=msmtDf["measurement_value"], name="Measurement id " + str(msmt_id))
with ThreadPoolExecutor(max_workers=3) as binExec:
allMsmtsForThisId = list(binExec.map(getMsmtVals, msmt_ids))
plt.add_traces(allMsmtsForThisId)

# Add x axis and y axis titles
plt.update_layout(xaxis_title="Time", yaxis_title="Measured power [W]")
return plt

def buildHourlyPlot():
# copy the dataframe
hourlyMsmts = pd.read_sql_query(sql="SELECT measurement_value, measurement_time FROM getMsmtsByHour (%(mmtids)s, %(bintime)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb())
# and update the date to only show the minutes
hourlyMsmts['measurement_value'] = hourlyMsmts['measurement_value'].div(1000)
# naively map the calculate the time to one day close to the start of the unix epoch. This seems like a hack. Alternatively follow something like:
# seconds_per_day = 24*60*60
# bin_size = 5*60 # 5-minute bins
# time_s = ((pd.to_datetime(df.index).astype(int) / 10**9)%seconds_per_day/bin_size).astype(int)
# df['bin-time'] = pd.to_datetime(time_s*bin_size, unit='s')

hourlyMsmts['measurement_time'] = hourlyMsmts['measurement_time'].map(lambda t: datetime(1970, 1, 5) + (datetime.combine(date.min, t) - datetime.min))
hourlyMsmts.sort_values(by="measurement_time", inplace=True)
# calculate median
medianHourlyMsmts = hourlyMsmts.groupby(by=["measurement_time"], as_index=False).median()
# now calculate the hourly plot

# create opacity slider: https://stackoverflow.com/a/74317122
opacitySlider = {
'active': 50,
'currentvalue': {'prefix': 'Opacity: '},
'steps': [{
'value': step / 100,
'label': f'{step}%',
'visible': True,
'execute': True,
'method': 'restyle',
'args': [{'opacity': step / 100}, [0]]
} for step in range(101)]
}

hourlyPlot = go.Figure(layout_title_text="Daily power")
hourlyPlot.add_trace(
go.Scatter(x=hourlyMsmts["measurement_time"], y=hourlyMsmts["measurement_value"], mode='markers', name="Averaged datapoints")
)
hourlyPlot.add_trace(
go.Scatter(x=medianHourlyMsmts["measurement_time"], y=medianHourlyMsmts["measurement_value"], name="Median")
)
hourlyPlot.update_layout(xaxis_title="Time of day", yaxis_title="Measured power [W]", sliders=[opacitySlider])
# TODO: This feels wrong: this won't work for higher sampling rates (https://gitlab.ethz.ch/nsg/students/projects/2024/ba-2024_auto-power/-/issues/23)
hourlyPlot.update_xaxes(tickformat="%H:%M")
hourlyPlot.update_traces(marker=dict(opacity=0.5,), selector=dict(mode='markers'))
return hourlyPlot
def buildDailyPlot():
# daily plot
dailyMsmts = pd.read_sql_query(sql="SELECT measurement_value, measurement_dow, measurement_time, measurement_timestamp FROM getMsmtsByDow (%(mmtids)s, %(bintime)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb())

dailyMsmts = dailyMsmts.apply(
(lambda row:
pd.Series(
[row["measurement_value"] / 1000, (datetime(1970, 1, 4 + int(row["measurement_dow"])) + (datetime.combine(date.min, row["measurement_time"]) - datetime.min))],
index=["measurement_value", "measurement_reduced_ts"]
)
), axis="columns")
dailyMsmts.sort_values(by="measurement_reduced_ts", inplace=True)
# calculate median
medianDailyMsmts = dailyMsmts.groupby("measurement_reduced_ts", as_index=False).median()
dailyPlot = go.Figure(layout_title_text="Weekly power")
dailyPlot.add_trace(
go.Scatter(x=dailyMsmts["measurement_reduced_ts"], y=dailyMsmts["measurement_value"], mode='markers', name="Averaged datapoints")
)
dailyPlot.add_trace(
go.Scatter(x=medianDailyMsmts["measurement_reduced_ts"], y=medianDailyMsmts["measurement_value"], name="Median")
)


# create opacity slider: https://stackoverflow.com/a/74317122
opacitySlider = {
'active': 50,
'currentvalue': {'prefix': 'Opacity: '},
'steps': [{
'value': step / 100,
'label': f'{step}%',
'visible': True,
'execute': True,
'method': 'restyle',
'args': [{'opacity': step / 100}, [0]]
} for step in range(101)]
}

dailyPlot.update_layout(xaxis_title="Weekday", yaxis_title="Measured power [W]", sliders=[opacitySlider])
dailyPlot.update_xaxes(tickformat="%A, %H:%M")
dailyPlot.update_traces(marker=dict(opacity=0.5,), selector=dict(mode='markers'))
return dailyPlot

with ThreadPoolExecutor(max_workers=3) as executor:
# Parallelize building plots to speed up
tsPlotFuture = executor.submit(buildTimeseriesPlot)
hourlyPlotFuture = executor.submit(buildHourlyPlot)
dailyPlotFuture = executor.submit(buildDailyPlot)

return dcc.Graph(figure=tsPlotFuture.result()), allRecentMsmts.to_dict('records'), False, 0, dcc.Graph(figure=hourlyPlotFuture.result()), dcc.Graph(figure=dailyPlotFuture.result()), {"component_name": "dutPowergraph", "is_loading": False} # 0 is for n clicks reset
return dcc.Graph(figure=timeseriesPlot), allRecentMsmts.to_dict('records'), False, 0, dcc.Graph(figure=hourlyPlotFuture.result()), dcc.Graph(figure=dailyPlotFuture.result()), {"component_name": "dutPowergraph", "is_loading": False} # 0 is for n clicks reset
else:
return "Please select an id and time frame to show the graphs", [], True, 0, "", "", {"component_name": "dutPowergraph", "is_loading": False}

Expand Down

0 comments on commit 5dd6a13

Please sign in to comment.