Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1196 - Update multisim to use concurrency #1197

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pdr_backend/ppss/multisim_ss.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def approach(self) -> str:
def sweep_params(self) -> list:
return self.d["sweep_params"]

@property
def max_workers(self) -> int:
return self.d.get("max_workers", 1)
# --------------------------------
# derivative properties
@property
Expand Down
26 changes: 14 additions & 12 deletions pdr_backend/sim/multisim_engine.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import asyncio
import copy
import concurrent.futures
import csv
import logging
import os
import threading
import uuid
from typing import List, Union

Expand All @@ -19,7 +20,7 @@
from pdr_backend.util.time_types import UnixTimeMs

logger = logging.getLogger("multisim_engine")
lock = asyncio.Lock()
lock = threading.Lock()


class MultisimEngine:
Expand Down Expand Up @@ -49,19 +50,20 @@ def run(self):
ss = self.ss
logger.info("Multisim engine: start. # runs = %s", ss.n_runs)
self.initialize_csv_with_header()
asyncio.run(self.run_async(ss.n_runs))
self.run_multithreaded(ss.n_runs)

@enforce_types
async def run_async(self, n_runs):
tasks = []

for run_i in range(n_runs):
tasks.append(self.run_one(run_i))

await asyncio.gather(*tasks)
def run_multithreaded(self, n_runs):
with concurrent.futures.ProcessPoolExecutor(max_workers=self.ss.max_workers) as executor:
futures = [executor.submit(self.run_one, run_i) for run_i in range(n_runs)]
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except Exception as e:
logger.error(f"Run {future} generated an exception: {e}")

@enforce_types
async def run_one(self, run_i: int):
def run_one(self, run_i: int):
point_i = self.ss.point_i(run_i)
logger.info("Multisim run_i=%s: start. Vals=%s", run_i, point_i)
ppss = self.ppss_from_point(point_i)
Expand All @@ -71,7 +73,7 @@ async def run_one(self, run_i: int):
sim_engine.run()
run_metrics = list(sim_engine.st.recent_metrics().values())

async with lock:
with lock:
self.update_csv(run_i, run_metrics, point_i)
logger.info("Multisim run_i=%s: done", run_i)

Expand Down
1 change: 1 addition & 0 deletions ppss.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ sim_ss: # sim only

multisim_ss:
approach: SimpleSweep # SimpleSweep | FastSweep (future) | ..
max_workers: 1
sweep_params:
- trader_ss.buy_amt: 1000 USD, 2000 USD
- predictoor_ss.aimodel_ss.max_n_train: 500, 1000, 1500
Expand Down