Skip to content

Commit

Permalink
Merge branch 'main' into peter/cchaper
Browse files Browse the repository at this point in the history
  • Loading branch information
kongzii authored Nov 21, 2024
2 parents f37dfd4 + 060c34d commit 5c051c2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 35 deletions.
95 changes: 61 additions & 34 deletions prediction_market_agent_tooling/deploy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from enum import Enum
from functools import cached_property

from pydantic import BeforeValidator, computed_field
from typing_extensions import Annotated
from pydantic import computed_field

from prediction_market_agent_tooling.config import APIKeys
from prediction_market_agent_tooling.deploy.betting_strategy import (
Expand Down Expand Up @@ -69,27 +68,6 @@
from prediction_market_agent_tooling.tools.utils import DatetimeUTC, utcnow

MAX_AVAILABLE_MARKETS = 20
TRADER_TAG = "trader"


def to_boolean_outcome(value: str | bool) -> bool:
if isinstance(value, bool):
return value

elif isinstance(value, str):
value = value.lower().strip()

if value in {"true", "yes", "y", "1"}:
return True

elif value in {"false", "no", "n", "0"}:
return False

else:
raise ValueError(f"Expected a boolean string, but got {value}")

else:
raise ValueError(f"Expected a boolean or a string, but got {value}")


def initialize_langfuse(enable_langfuse: bool) -> None:
Expand All @@ -107,15 +85,21 @@ def initialize_langfuse(enable_langfuse: bool) -> None:
langfuse_context.configure(enabled=enable_langfuse)


Decision = Annotated[bool, BeforeValidator(to_boolean_outcome)]


class AnsweredEnum(str, Enum):
ANSWERED = "answered"
NOT_ANSWERED = "not_answered"


class AgentTagEnum(str, Enum):
PREDICTOR = "predictor"
TRADER = "trader"


class DeployableAgent:
"""
Subclass this class to create agent with standardized interface.
"""

def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
Expand Down Expand Up @@ -176,20 +160,25 @@ def __init_subclass__(cls, **kwargs: t.Any) -> None:
)

def load(self) -> None:
pass
"""
Implement this method to load arbitrary instances needed across the whole run of the agent.
Do not customize __init__ method.
"""

def deploy_local(
self,
market_type: MarketType,
sleep_time: float,
timeout: float,
run_time: float | None,
) -> None:
"""
Run the agent in the forever cycle every `sleep_time` seconds, until the `run_time` is met.
"""
start_time = time.time()
while True:
while run_time is None or time.time() - start_time < run_time:
self.run(market_type=market_type)
time.sleep(sleep_time)
if time.time() - start_time > timeout:
break

def deploy_gcp(
self,
Expand All @@ -205,6 +194,9 @@ def deploy_gcp(
start_time: DatetimeUTC | None = None,
timeout: int = 180,
) -> None:
"""
Deploy the agent as GCP Function.
"""
path_to_agent_file = os.path.relpath(inspect.getfile(self.__class__))

entrypoint_function_name = "main"
Expand Down Expand Up @@ -271,13 +263,24 @@ def {entrypoint_function_name}(request) -> str:
schedule_deployed_gcp_function(fname, cron_schedule=cron_schedule)

def run(self, market_type: MarketType) -> None:
"""
Run single iteration of the agent.
"""
raise NotImplementedError("This method must be implemented by the subclass.")

def get_gcloud_fname(self, market_type: MarketType) -> str:
return f"{self.__class__.__name__.lower()}-{market_type}-{utcnow().strftime('%Y-%m-%d--%H-%M-%S')}"


class DeployablePredictionAgent(DeployableAgent):
"""
Subclass this class to create your own prediction market agent.
The agent will process markets and make predictions.
"""

AGENT_TAG: AgentTagEnum = AgentTagEnum.PREDICTOR

bet_on_n_markets_per_run: int = 1

# Agent behaviour when fetching markets
Expand Down Expand Up @@ -328,7 +331,7 @@ def update_langfuse_trace_by_processed_market(
) -> None:
self.langfuse_update_current_trace(
tags=[
TRADER_TAG,
self.AGENT_TAG,
(
AnsweredEnum.ANSWERED
if processed_market is not None
Expand Down Expand Up @@ -386,6 +389,9 @@ def get_markets(
self,
market_type: MarketType,
) -> t.Sequence[AgentMarket]:
"""
Override this method to customize what markets will fetch for processing.
"""
cls = market_type.market_class
# Fetch the soonest closing markets to choose from
available_markets = cls.get_binary_markets(
Expand All @@ -399,6 +405,9 @@ def get_markets(
def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
"""
Executed before processing of each market.
"""
api_keys = APIKeys()

if market_type.is_blockchain_market:
Expand Down Expand Up @@ -442,6 +451,9 @@ def after_process_market(
market: AgentMarket,
processed_market: ProcessedMarket | None,
) -> None:
"""
Executed after processing of each market.
"""
keys = APIKeys()
if self.store_predictions:
market.store_prediction(
Expand All @@ -454,7 +466,7 @@ def after_process_market(

def before_process_markets(self, market_type: MarketType) -> None:
"""
Executes actions that occur before bets are placed.
Executed before market processing loop starts.
"""
api_keys = APIKeys()
self.check_min_required_balance_to_operate(market_type)
Expand Down Expand Up @@ -485,7 +497,9 @@ def process_markets(self, market_type: MarketType) -> None:
logger.info("All markets processed.")

def after_process_markets(self, market_type: MarketType) -> None:
"Executes actions that occur after bets are placed."
"""
Executed after market processing loop ends.
"""

def run(self, market_type: MarketType) -> None:
if market_type not in self.supported_markets:
Expand All @@ -498,6 +512,14 @@ def run(self, market_type: MarketType) -> None:


class DeployableTraderAgent(DeployablePredictionAgent):
"""
Subclass this class to create your own prediction market trading agent.
The agent will process markets, make predictions and place trades (bets) based off these predictions.
"""

AGENT_TAG: AgentTagEnum = AgentTagEnum.TRADER

# These markets require place of bet, not just predictions.
supported_markets: t.Sequence[MarketType] = [
MarketType.OMEN,
Expand Down Expand Up @@ -537,6 +559,11 @@ def check_min_required_balance_to_trade(self, market: AgentMarket) -> None:
)

def get_betting_strategy(self, market: AgentMarket) -> BettingStrategy:
"""
Override this method to customize betting strategy of your agent.
Given the market and prediction, agent uses this method to calculate optimal outcome and bet size.
"""
user_id = market.get_user_id(api_keys=APIKeys())

total_amount = market.get_tiny_bet_amount().amount
Expand Down
2 changes: 1 addition & 1 deletion tests_integration_with_local_chain/deploy/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_local_deployment(local_web3: Web3) -> None:
DeployableCoinFlipAgent(enable_langfuse=False).deploy_local(
sleep_time=0.001,
market_type=MarketType.OMEN,
timeout=0.01,
run_time=0.01,
)

0 comments on commit 5c051c2

Please sign in to comment.