From 652525f68df987c495c7dee7a4c9735d6f0e3fb2 Mon Sep 17 00:00:00 2001 From: trizin <25263018+trizin@users.noreply.github.com> Date: Wed, 12 Jun 2024 13:19:06 +0300 Subject: [PATCH 1/4] Add max_workers property --- pdr_backend/ppss/multisim_ss.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pdr_backend/ppss/multisim_ss.py b/pdr_backend/ppss/multisim_ss.py index ed4f2be20..00df5a91f 100644 --- a/pdr_backend/ppss/multisim_ss.py +++ b/pdr_backend/ppss/multisim_ss.py @@ -36,6 +36,9 @@ def approach(self) -> str: def sweep_params(self) -> list: return self.d["sweep_params"] + @property + def max_workers(self) -> int: + return self.d.get("max_workers", 1) # -------------------------------- # derivative properties @property From 25a612caad07c632806443317ef416eab925bec6 Mon Sep 17 00:00:00 2001 From: trizin <25263018+trizin@users.noreply.github.com> Date: Wed, 12 Jun 2024 13:19:25 +0300 Subject: [PATCH 2/4] Multithreading --- pdr_backend/sim/multisim_engine.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/pdr_backend/sim/multisim_engine.py b/pdr_backend/sim/multisim_engine.py index 265c0f679..7ca295108 100644 --- a/pdr_backend/sim/multisim_engine.py +++ b/pdr_backend/sim/multisim_engine.py @@ -1,8 +1,9 @@ -import asyncio import copy +import concurrent.futures import csv import logging import os +import threading import uuid from typing import List, Union @@ -19,7 +20,7 @@ from pdr_backend.util.time_types import UnixTimeMs logger = logging.getLogger("multisim_engine") -lock = asyncio.Lock() +lock = threading.Lock() class MultisimEngine: @@ -52,13 +53,14 @@ def run(self): asyncio.run(self.run_async(ss.n_runs)) @enforce_types - async def run_async(self, n_runs): - tasks = [] - - for run_i in range(n_runs): - tasks.append(self.run_one(run_i)) - - await asyncio.gather(*tasks) + def run_multithreaded(self, n_runs): + with concurrent.futures.ProcessPoolExecutor(max_workers=self.ss.max_workers) as executor: + futures = [executor.submit(self.run_one, run_i) for run_i in range(n_runs)] + for future in concurrent.futures.as_completed(futures): + try: + future.result() + except Exception as e: + logger.error(f"Run {future} generated an exception: {e}") @enforce_types async def run_one(self, run_i: int): @@ -71,7 +73,7 @@ async def run_one(self, run_i: int): sim_engine.run() run_metrics = list(sim_engine.st.recent_metrics().values()) - async with lock: + with lock: self.update_csv(run_i, run_metrics, point_i) logger.info("Multisim run_i=%s: done", run_i) From ba426f653cd7aa67da679338579251a71796a8b9 Mon Sep 17 00:00:00 2001 From: trizin <25263018+trizin@users.noreply.github.com> Date: Wed, 12 Jun 2024 13:22:06 +0300 Subject: [PATCH 3/4] Add max workers property to yaml --- ppss.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/ppss.yaml b/ppss.yaml index 7dc0f51d9..189e380ea 100644 --- a/ppss.yaml +++ b/ppss.yaml @@ -63,6 +63,7 @@ sim_ss: # sim only multisim_ss: approach: SimpleSweep # SimpleSweep | FastSweep (future) | .. + max_workers: 1 sweep_params: - trader_ss.buy_amt: 1000 USD, 2000 USD - predictoor_ss.aimodel_ss.max_n_train: 500, 1000, 1500 From e2e4f6940b3882a98f43e08e1e130f58a16759c6 Mon Sep 17 00:00:00 2001 From: trizin <25263018+trizin@users.noreply.github.com> Date: Thu, 13 Jun 2024 15:30:24 +0300 Subject: [PATCH 4/4] Fixes --- pdr_backend/sim/multisim_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pdr_backend/sim/multisim_engine.py b/pdr_backend/sim/multisim_engine.py index 7ca295108..7a3bcd42c 100644 --- a/pdr_backend/sim/multisim_engine.py +++ b/pdr_backend/sim/multisim_engine.py @@ -50,7 +50,7 @@ def run(self): ss = self.ss logger.info("Multisim engine: start. # runs = %s", ss.n_runs) self.initialize_csv_with_header() - asyncio.run(self.run_async(ss.n_runs)) + self.run_multithreaded(ss.n_runs) @enforce_types def run_multithreaded(self, n_runs): @@ -63,7 +63,7 @@ def run_multithreaded(self, n_runs): logger.error(f"Run {future} generated an exception: {e}") @enforce_types - async def run_one(self, run_i: int): + def run_one(self, run_i: int): point_i = self.ss.point_i(run_i) logger.info("Multisim run_i=%s: start. Vals=%s", run_i, point_i) ppss = self.ppss_from_point(point_i)