From 5dd6a1371dc863cd7fde60de27392f1fbfbc4ae1 Mon Sep 17 00:00:00 2001 From: UsualSpec <98665326+UsualSpec@users.noreply.github.com> Date: Fri, 12 Jul 2024 21:58:02 +0200 Subject: [PATCH] Add more parallelization --- server/plotter/pages/dut.py | 226 ++++++++++++++++++------------------ 1 file changed, 113 insertions(+), 113 deletions(-) diff --git a/server/plotter/pages/dut.py b/server/plotter/pages/dut.py index 09dd414..2652dfa 100644 --- a/server/plotter/pages/dut.py +++ b/server/plotter/pages/dut.py @@ -273,123 +273,123 @@ def showMsmtButton(start, end, mmtids, mmtBin): ) def showMsmtPlot(binTime, startDate, endDate, msmt_ids, n_clicks): if n_clicks > 0: - with createDb() as pgcon: - # Using psycopg2 may give a warning for not being supported (as of May 2024 pandas doesn't officially support psycopg2) even though it works: https://github.com/pandas-dev/pandas/issues/45660 - allRecentMsmts = pd.read_sql_query(sql="SELECT measurement_value_out AS measurement_value, measurement_timestamp_out::timestamp AS measurement_timestamp FROM getMmtAggregateOfDut (%(mmtids)s, %(bintime)s, %(startts)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=pgcon) + # Using psycopg2 may give a warning for not being supported (as of May 2024 pandas doesn't officially support psycopg2) even though it works: https://github.com/pandas-dev/pandas/issues/45660 + + def buildTimeseriesPlot(): + # compose the graph together + # first add the aggregated measurements + allRecentMsmts = pd.read_sql_query(sql="SELECT measurement_value_out AS measurement_value, measurement_timestamp_out::timestamp AS measurement_timestamp FROM getMmtAggregateOfDut (%(mmtids)s, %(bintime)s, %(startts)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb()) allRecentMsmts["measurement_value"] = allRecentMsmts["measurement_value"].div(1000) # convert mW into W + plt = go.Figure(layout_title_text="Aggregated measurements") + plt.add_trace(go.Scatter(x=allRecentMsmts["measurement_timestamp"], y=allRecentMsmts["measurement_value"], name="Summed measurement")) + if True: # add the split measurements + def getMsmtVals(msmt_id): + msmtDf = pd.read_sql_query(sql="EXECUTE getMmtPtsBinned (%(bintime)s, %(srvmsmtid)s, %(startts)s,%(startts)s, %(endts)s)", params={"bintime": binTime, "srvmsmtid": msmt_id, "startts": startDate, "endts": endDate}, con=createDb()) + msmtDf["measurement_value"] = msmtDf["measurement_value"].div(1000) # convert mW to W + return go.Scatter(x=msmtDf["measurement_timestamp"], y=msmtDf["measurement_value"], name="Measurement id " + str(msmt_id)) + with ThreadPoolExecutor(max_workers=3) as binExec: + allMsmtsForThisId = list(binExec.map(getMsmtVals, msmt_ids)) + plt.add_traces(allMsmtsForThisId) + + # Add x axis and y axis titles + plt.update_layout(xaxis_title="Time", yaxis_title="Measured power [W]") + return plt, allRecentMsmts + + def buildHourlyPlot(): + # copy the dataframe + hourlyMsmts = pd.read_sql_query(sql="SELECT measurement_value, measurement_time FROM getMsmtsByHour (%(mmtids)s, %(bintime)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb()) + # and update the date to only show the minutes + hourlyMsmts['measurement_value'] = hourlyMsmts['measurement_value'].div(1000) + # naively map the calculate the time to one day close to the start of the unix epoch. This seems like a hack. Alternatively follow something like: + # seconds_per_day = 24*60*60 + # bin_size = 5*60 # 5-minute bins + # time_s = ((pd.to_datetime(df.index).astype(int) / 10**9)%seconds_per_day/bin_size).astype(int) + # df['bin-time'] = pd.to_datetime(time_s*bin_size, unit='s') + + hourlyMsmts['measurement_time'] = hourlyMsmts['measurement_time'].map(lambda t: datetime(1970, 1, 5) + (datetime.combine(date.min, t) - datetime.min)) + hourlyMsmts.sort_values(by="measurement_time", inplace=True) + # calculate median + medianHourlyMsmts = hourlyMsmts.groupby(by=["measurement_time"], as_index=False).median() + # now calculate the hourly plot + + # create opacity slider: https://stackoverflow.com/a/74317122 + opacitySlider = { + 'active': 50, + 'currentvalue': {'prefix': 'Opacity: '}, + 'steps': [{ + 'value': step / 100, + 'label': f'{step}%', + 'visible': True, + 'execute': True, + 'method': 'restyle', + 'args': [{'opacity': step / 100}, [0]] + } for step in range(101)] + } + + hourlyPlot = go.Figure(layout_title_text="Daily power") + hourlyPlot.add_trace( + go.Scatter(x=hourlyMsmts["measurement_time"], y=hourlyMsmts["measurement_value"], mode='markers', name="Averaged datapoints") + ) + hourlyPlot.add_trace( + go.Scatter(x=medianHourlyMsmts["measurement_time"], y=medianHourlyMsmts["measurement_value"], name="Median") + ) + hourlyPlot.update_layout(xaxis_title="Time of day", yaxis_title="Measured power [W]", sliders=[opacitySlider]) + # TODO: This feels wrong: this won't work for higher sampling rates (https://gitlab.ethz.ch/nsg/students/projects/2024/ba-2024_auto-power/-/issues/23) + hourlyPlot.update_xaxes(tickformat="%H:%M") + hourlyPlot.update_traces(marker=dict(opacity=0.5,), selector=dict(mode='markers')) + return hourlyPlot + def buildDailyPlot(): + # daily plot + dailyMsmts = pd.read_sql_query(sql="SELECT measurement_value, measurement_dow, measurement_time, measurement_timestamp FROM getMsmtsByDow (%(mmtids)s, %(bintime)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb()) + + dailyMsmts = dailyMsmts.apply( + (lambda row: + pd.Series( + [row["measurement_value"] / 1000, (datetime(1970, 1, 4 + int(row["measurement_dow"])) + (datetime.combine(date.min, row["measurement_time"]) - datetime.min))], + index=["measurement_value", "measurement_reduced_ts"] + ) + ), axis="columns") + dailyMsmts.sort_values(by="measurement_reduced_ts", inplace=True) + # calculate median + medianDailyMsmts = dailyMsmts.groupby("measurement_reduced_ts", as_index=False).median() + dailyPlot = go.Figure(layout_title_text="Weekly power") + dailyPlot.add_trace( + go.Scatter(x=dailyMsmts["measurement_reduced_ts"], y=dailyMsmts["measurement_value"], mode='markers', name="Averaged datapoints") + ) + dailyPlot.add_trace( + go.Scatter(x=medianDailyMsmts["measurement_reduced_ts"], y=medianDailyMsmts["measurement_value"], name="Median") + ) + + + # create opacity slider: https://stackoverflow.com/a/74317122 + opacitySlider = { + 'active': 50, + 'currentvalue': {'prefix': 'Opacity: '}, + 'steps': [{ + 'value': step / 100, + 'label': f'{step}%', + 'visible': True, + 'execute': True, + 'method': 'restyle', + 'args': [{'opacity': step / 100}, [0]] + } for step in range(101)] + } + + dailyPlot.update_layout(xaxis_title="Weekday", yaxis_title="Measured power [W]", sliders=[opacitySlider]) + dailyPlot.update_xaxes(tickformat="%A, %H:%M") + dailyPlot.update_traces(marker=dict(opacity=0.5,), selector=dict(mode='markers')) + return dailyPlot + + with ThreadPoolExecutor(max_workers=3) as executor: + # Parallelize building plots to speed up + tsPlotFuture = executor.submit(buildTimeseriesPlot) + hourlyPlotFuture = executor.submit(buildHourlyPlot) + dailyPlotFuture = executor.submit(buildDailyPlot) + timeseriesPlot, allRecentMsmts = tsPlotFuture.result() if allRecentMsmts.empty: return "Didn't get any data. Please check the timeframe and id.", [], True, 0, "", "", {"component_name": "dutPowergraph", "is_loading": False} - else: - # compose the graph together - # first add the aggregated measurements - def buildTimeseriesPlot(): - plt = go.Figure(layout_title_text="Aggregated measurements") - plt.add_trace(go.Scatter(x=allRecentMsmts["measurement_timestamp"], y=allRecentMsmts["measurement_value"], name="Summed measurement")) - if True: # add the split measurements - def getMsmtVals(msmt_id): - msmtDf = pd.read_sql_query(sql="EXECUTE getMmtPtsBinned (%(bintime)s, %(srvmsmtid)s, %(startts)s,%(startts)s, %(endts)s)", params={"bintime": binTime, "srvmsmtid": msmt_id, "startts": startDate, "endts": endDate}, con=createDb()) - msmtDf["measurement_value"] = msmtDf["measurement_value"].div(1000) # convert mW to W - return go.Scatter(x=msmtDf["measurement_timestamp"], y=msmtDf["measurement_value"], name="Measurement id " + str(msmt_id)) - with ThreadPoolExecutor(max_workers=3) as binExec: - allMsmtsForThisId = list(binExec.map(getMsmtVals, msmt_ids)) - plt.add_traces(allMsmtsForThisId) - - # Add x axis and y axis titles - plt.update_layout(xaxis_title="Time", yaxis_title="Measured power [W]") - return plt - - def buildHourlyPlot(): - # copy the dataframe - hourlyMsmts = pd.read_sql_query(sql="SELECT measurement_value, measurement_time FROM getMsmtsByHour (%(mmtids)s, %(bintime)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb()) - # and update the date to only show the minutes - hourlyMsmts['measurement_value'] = hourlyMsmts['measurement_value'].div(1000) - # naively map the calculate the time to one day close to the start of the unix epoch. This seems like a hack. Alternatively follow something like: - # seconds_per_day = 24*60*60 - # bin_size = 5*60 # 5-minute bins - # time_s = ((pd.to_datetime(df.index).astype(int) / 10**9)%seconds_per_day/bin_size).astype(int) - # df['bin-time'] = pd.to_datetime(time_s*bin_size, unit='s') - - hourlyMsmts['measurement_time'] = hourlyMsmts['measurement_time'].map(lambda t: datetime(1970, 1, 5) + (datetime.combine(date.min, t) - datetime.min)) - hourlyMsmts.sort_values(by="measurement_time", inplace=True) - # calculate median - medianHourlyMsmts = hourlyMsmts.groupby(by=["measurement_time"], as_index=False).median() - # now calculate the hourly plot - - # create opacity slider: https://stackoverflow.com/a/74317122 - opacitySlider = { - 'active': 50, - 'currentvalue': {'prefix': 'Opacity: '}, - 'steps': [{ - 'value': step / 100, - 'label': f'{step}%', - 'visible': True, - 'execute': True, - 'method': 'restyle', - 'args': [{'opacity': step / 100}, [0]] - } for step in range(101)] - } - - hourlyPlot = go.Figure(layout_title_text="Daily power") - hourlyPlot.add_trace( - go.Scatter(x=hourlyMsmts["measurement_time"], y=hourlyMsmts["measurement_value"], mode='markers', name="Averaged datapoints") - ) - hourlyPlot.add_trace( - go.Scatter(x=medianHourlyMsmts["measurement_time"], y=medianHourlyMsmts["measurement_value"], name="Median") - ) - hourlyPlot.update_layout(xaxis_title="Time of day", yaxis_title="Measured power [W]", sliders=[opacitySlider]) - # TODO: This feels wrong: this won't work for higher sampling rates (https://gitlab.ethz.ch/nsg/students/projects/2024/ba-2024_auto-power/-/issues/23) - hourlyPlot.update_xaxes(tickformat="%H:%M") - hourlyPlot.update_traces(marker=dict(opacity=0.5,), selector=dict(mode='markers')) - return hourlyPlot - def buildDailyPlot(): - # daily plot - dailyMsmts = pd.read_sql_query(sql="SELECT measurement_value, measurement_dow, measurement_time, measurement_timestamp FROM getMsmtsByDow (%(mmtids)s, %(bintime)s, %(startts)s, %(endts)s)", params={"bintime": binTime, "mmtids": msmt_ids, "startts": startDate, "endts": endDate}, con=createDb()) - - dailyMsmts = dailyMsmts.apply( - (lambda row: - pd.Series( - [row["measurement_value"] / 1000, (datetime(1970, 1, 4 + int(row["measurement_dow"])) + (datetime.combine(date.min, row["measurement_time"]) - datetime.min))], - index=["measurement_value", "measurement_reduced_ts"] - ) - ), axis="columns") - dailyMsmts.sort_values(by="measurement_reduced_ts", inplace=True) - # calculate median - medianDailyMsmts = dailyMsmts.groupby("measurement_reduced_ts", as_index=False).median() - dailyPlot = go.Figure(layout_title_text="Weekly power") - dailyPlot.add_trace( - go.Scatter(x=dailyMsmts["measurement_reduced_ts"], y=dailyMsmts["measurement_value"], mode='markers', name="Averaged datapoints") - ) - dailyPlot.add_trace( - go.Scatter(x=medianDailyMsmts["measurement_reduced_ts"], y=medianDailyMsmts["measurement_value"], name="Median") - ) - - - # create opacity slider: https://stackoverflow.com/a/74317122 - opacitySlider = { - 'active': 50, - 'currentvalue': {'prefix': 'Opacity: '}, - 'steps': [{ - 'value': step / 100, - 'label': f'{step}%', - 'visible': True, - 'execute': True, - 'method': 'restyle', - 'args': [{'opacity': step / 100}, [0]] - } for step in range(101)] - } - - dailyPlot.update_layout(xaxis_title="Weekday", yaxis_title="Measured power [W]", sliders=[opacitySlider]) - dailyPlot.update_xaxes(tickformat="%A, %H:%M") - dailyPlot.update_traces(marker=dict(opacity=0.5,), selector=dict(mode='markers')) - return dailyPlot - - with ThreadPoolExecutor(max_workers=3) as executor: - # Parallelize building plots to speed up - tsPlotFuture = executor.submit(buildTimeseriesPlot) - hourlyPlotFuture = executor.submit(buildHourlyPlot) - dailyPlotFuture = executor.submit(buildDailyPlot) - return dcc.Graph(figure=tsPlotFuture.result()), allRecentMsmts.to_dict('records'), False, 0, dcc.Graph(figure=hourlyPlotFuture.result()), dcc.Graph(figure=dailyPlotFuture.result()), {"component_name": "dutPowergraph", "is_loading": False} # 0 is for n clicks reset + return dcc.Graph(figure=timeseriesPlot), allRecentMsmts.to_dict('records'), False, 0, dcc.Graph(figure=hourlyPlotFuture.result()), dcc.Graph(figure=dailyPlotFuture.result()), {"component_name": "dutPowergraph", "is_loading": False} # 0 is for n clicks reset else: return "Please select an id and time frame to show the graphs", [], True, 0, "", "", {"component_name": "dutPowergraph", "is_loading": False}