From 060c34dad88580d367a5495d8d66e18bfff4a19f Mon Sep 17 00:00:00 2001 From: Peter Jung Date: Wed, 20 Nov 2024 09:12:54 +0100 Subject: [PATCH] Add some docs and remove dead code (#559) --- .../deploy/agent.py | 95 ++++++++++++------- .../deploy/test_deploy.py | 2 +- 2 files changed, 62 insertions(+), 35 deletions(-) diff --git a/prediction_market_agent_tooling/deploy/agent.py b/prediction_market_agent_tooling/deploy/agent.py index e8132fbc..0d814e66 100644 --- a/prediction_market_agent_tooling/deploy/agent.py +++ b/prediction_market_agent_tooling/deploy/agent.py @@ -8,8 +8,7 @@ from enum import Enum from functools import cached_property -from pydantic import BeforeValidator, computed_field -from typing_extensions import Annotated +from pydantic import computed_field from prediction_market_agent_tooling.config import APIKeys from prediction_market_agent_tooling.deploy.betting_strategy import ( @@ -69,27 +68,6 @@ from prediction_market_agent_tooling.tools.utils import DatetimeUTC, utcnow MAX_AVAILABLE_MARKETS = 20 -TRADER_TAG = "trader" - - -def to_boolean_outcome(value: str | bool) -> bool: - if isinstance(value, bool): - return value - - elif isinstance(value, str): - value = value.lower().strip() - - if value in {"true", "yes", "y", "1"}: - return True - - elif value in {"false", "no", "n", "0"}: - return False - - else: - raise ValueError(f"Expected a boolean string, but got {value}") - - else: - raise ValueError(f"Expected a boolean or a string, but got {value}") def initialize_langfuse(enable_langfuse: bool) -> None: @@ -107,15 +85,21 @@ def initialize_langfuse(enable_langfuse: bool) -> None: langfuse_context.configure(enabled=enable_langfuse) -Decision = Annotated[bool, BeforeValidator(to_boolean_outcome)] - - class AnsweredEnum(str, Enum): ANSWERED = "answered" NOT_ANSWERED = "not_answered" +class AgentTagEnum(str, Enum): + PREDICTOR = "predictor" + TRADER = "trader" + + class DeployableAgent: + """ + Subclass this class to create agent with standardized interface. + """ + def __init__( self, enable_langfuse: bool = APIKeys().default_enable_langfuse, @@ -176,20 +160,25 @@ def __init_subclass__(cls, **kwargs: t.Any) -> None: ) def load(self) -> None: - pass + """ + Implement this method to load arbitrary instances needed across the whole run of the agent. + + Do not customize __init__ method. + """ def deploy_local( self, market_type: MarketType, sleep_time: float, - timeout: float, + run_time: float | None, ) -> None: + """ + Run the agent in the forever cycle every `sleep_time` seconds, until the `run_time` is met. + """ start_time = time.time() - while True: + while run_time is None or time.time() - start_time < run_time: self.run(market_type=market_type) time.sleep(sleep_time) - if time.time() - start_time > timeout: - break def deploy_gcp( self, @@ -205,6 +194,9 @@ def deploy_gcp( start_time: DatetimeUTC | None = None, timeout: int = 180, ) -> None: + """ + Deploy the agent as GCP Function. + """ path_to_agent_file = os.path.relpath(inspect.getfile(self.__class__)) entrypoint_function_name = "main" @@ -271,6 +263,9 @@ def {entrypoint_function_name}(request) -> str: schedule_deployed_gcp_function(fname, cron_schedule=cron_schedule) def run(self, market_type: MarketType) -> None: + """ + Run single iteration of the agent. + """ raise NotImplementedError("This method must be implemented by the subclass.") def get_gcloud_fname(self, market_type: MarketType) -> str: @@ -278,6 +273,14 @@ def get_gcloud_fname(self, market_type: MarketType) -> str: class DeployablePredictionAgent(DeployableAgent): + """ + Subclass this class to create your own prediction market agent. + + The agent will process markets and make predictions. + """ + + AGENT_TAG: AgentTagEnum = AgentTagEnum.PREDICTOR + bet_on_n_markets_per_run: int = 1 # Agent behaviour when fetching markets @@ -328,7 +331,7 @@ def update_langfuse_trace_by_processed_market( ) -> None: self.langfuse_update_current_trace( tags=[ - TRADER_TAG, + self.AGENT_TAG, ( AnsweredEnum.ANSWERED if processed_market is not None @@ -386,6 +389,9 @@ def get_markets( self, market_type: MarketType, ) -> t.Sequence[AgentMarket]: + """ + Override this method to customize what markets will fetch for processing. + """ cls = market_type.market_class # Fetch the soonest closing markets to choose from available_markets = cls.get_binary_markets( @@ -399,6 +405,9 @@ def get_markets( def before_process_market( self, market_type: MarketType, market: AgentMarket ) -> None: + """ + Executed before processing of each market. + """ api_keys = APIKeys() if market_type.is_blockchain_market: @@ -442,6 +451,9 @@ def after_process_market( market: AgentMarket, processed_market: ProcessedMarket | None, ) -> None: + """ + Executed after processing of each market. + """ keys = APIKeys() if self.store_prediction: market.store_prediction( @@ -454,7 +466,7 @@ def after_process_market( def before_process_markets(self, market_type: MarketType) -> None: """ - Executes actions that occur before bets are placed. + Executed before market processing loop starts. """ api_keys = APIKeys() self.check_min_required_balance_to_operate(market_type) @@ -485,7 +497,9 @@ def process_markets(self, market_type: MarketType) -> None: logger.info("All markets processed.") def after_process_markets(self, market_type: MarketType) -> None: - "Executes actions that occur after bets are placed." + """ + Executed after market processing loop ends. + """ def run(self, market_type: MarketType) -> None: if market_type not in self.supported_markets: @@ -498,6 +512,14 @@ def run(self, market_type: MarketType) -> None: class DeployableTraderAgent(DeployablePredictionAgent): + """ + Subclass this class to create your own prediction market trading agent. + + The agent will process markets, make predictions and place trades (bets) based off these predictions. + """ + + AGENT_TAG: AgentTagEnum = AgentTagEnum.TRADER + # These markets require place of bet, not just predictions. supported_markets: t.Sequence[MarketType] = [ MarketType.OMEN, @@ -537,6 +559,11 @@ def check_min_required_balance_to_trade(self, market: AgentMarket) -> None: ) def get_betting_strategy(self, market: AgentMarket) -> BettingStrategy: + """ + Override this method to customize betting strategy of your agent. + + Given the market and prediction, agent uses this method to calculate optimal outcome and bet size. + """ user_id = market.get_user_id(api_keys=APIKeys()) total_amount = market.get_tiny_bet_amount().amount diff --git a/tests_integration_with_local_chain/deploy/test_deploy.py b/tests_integration_with_local_chain/deploy/test_deploy.py index 3fa7edeb..51ab4cb5 100644 --- a/tests_integration_with_local_chain/deploy/test_deploy.py +++ b/tests_integration_with_local_chain/deploy/test_deploy.py @@ -12,5 +12,5 @@ def test_local_deployment(local_web3: Web3) -> None: DeployableCoinFlipAgent(enable_langfuse=False).deploy_local( sleep_time=0.001, market_type=MarketType.OMEN, - timeout=0.01, + run_time=0.01, )