Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add some docs and remove dead code #559

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 61 additions & 34 deletions prediction_market_agent_tooling/deploy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
from enum import Enum
from functools import cached_property

from pydantic import BeforeValidator, computed_field
from typing_extensions import Annotated
from pydantic import computed_field

from prediction_market_agent_tooling.config import APIKeys
from prediction_market_agent_tooling.deploy.betting_strategy import (
Expand Down Expand Up @@ -69,27 +68,6 @@
from prediction_market_agent_tooling.tools.utils import DatetimeUTC, utcnow

MAX_AVAILABLE_MARKETS = 20
TRADER_TAG = "trader"


def to_boolean_outcome(value: str | bool) -> bool:
if isinstance(value, bool):
return value

elif isinstance(value, str):
value = value.lower().strip()

if value in {"true", "yes", "y", "1"}:
return True

elif value in {"false", "no", "n", "0"}:
return False

else:
raise ValueError(f"Expected a boolean string, but got {value}")

else:
raise ValueError(f"Expected a boolean or a string, but got {value}")


def initialize_langfuse(enable_langfuse: bool) -> None:
Expand All @@ -107,15 +85,21 @@ def initialize_langfuse(enable_langfuse: bool) -> None:
langfuse_context.configure(enabled=enable_langfuse)


Decision = Annotated[bool, BeforeValidator(to_boolean_outcome)]


class AnsweredEnum(str, Enum):
ANSWERED = "answered"
NOT_ANSWERED = "not_answered"


class AgentTagEnum(str, Enum):
PREDICTOR = "predictor"
TRADER = "trader"


class DeployableAgent:
"""
Subclass this class to create agent with standardized interface.
"""

def __init__(
self,
enable_langfuse: bool = APIKeys().default_enable_langfuse,
Expand Down Expand Up @@ -176,20 +160,25 @@ def __init_subclass__(cls, **kwargs: t.Any) -> None:
)

def load(self) -> None:
pass
"""
Implement this method to load arbitrary instances needed across the whole run of the agent.

Do not customize __init__ method.
"""

def deploy_local(
self,
market_type: MarketType,
sleep_time: float,
timeout: float,
run_time: float | None,
) -> None:
"""
Run the agent in the forever cycle every `sleep_time` seconds, until the `run_time` is met.
"""
start_time = time.time()
while True:
while run_time is None or time.time() - start_time < run_time:
self.run(market_type=market_type)
time.sleep(sleep_time)
if time.time() - start_time > timeout:
break

def deploy_gcp(
self,
Expand All @@ -205,6 +194,9 @@ def deploy_gcp(
start_time: DatetimeUTC | None = None,
timeout: int = 180,
) -> None:
"""
Deploy the agent as GCP Function.
"""
path_to_agent_file = os.path.relpath(inspect.getfile(self.__class__))

entrypoint_function_name = "main"
Expand Down Expand Up @@ -271,13 +263,24 @@ def {entrypoint_function_name}(request) -> str:
schedule_deployed_gcp_function(fname, cron_schedule=cron_schedule)

def run(self, market_type: MarketType) -> None:
"""
Run single iteration of the agent.
"""
raise NotImplementedError("This method must be implemented by the subclass.")

def get_gcloud_fname(self, market_type: MarketType) -> str:
return f"{self.__class__.__name__.lower()}-{market_type}-{utcnow().strftime('%Y-%m-%d--%H-%M-%S')}"


class DeployablePredictionAgent(DeployableAgent):
"""
Subclass this class to create your own prediction market agent.

The agent will process markets and make predictions.
"""

AGENT_TAG: AgentTagEnum = AgentTagEnum.PREDICTOR

bet_on_n_markets_per_run: int = 1

# Agent behaviour when fetching markets
Expand Down Expand Up @@ -328,7 +331,7 @@ def update_langfuse_trace_by_processed_market(
) -> None:
self.langfuse_update_current_trace(
tags=[
TRADER_TAG,
self.AGENT_TAG,
(
AnsweredEnum.ANSWERED
if processed_market is not None
Expand Down Expand Up @@ -386,6 +389,9 @@ def get_markets(
self,
market_type: MarketType,
) -> t.Sequence[AgentMarket]:
"""
Override this method to customize what markets will fetch for processing.
"""
cls = market_type.market_class
# Fetch the soonest closing markets to choose from
available_markets = cls.get_binary_markets(
Expand All @@ -399,6 +405,9 @@ def get_markets(
def before_process_market(
self, market_type: MarketType, market: AgentMarket
) -> None:
"""
Executed before processing of each market.
"""
api_keys = APIKeys()

if market_type.is_blockchain_market:
Expand Down Expand Up @@ -442,6 +451,9 @@ def after_process_market(
market: AgentMarket,
processed_market: ProcessedMarket | None,
) -> None:
"""
Executed after processing of each market.
"""
keys = APIKeys()
if self.store_prediction:
market.store_prediction(
Expand All @@ -454,7 +466,7 @@ def after_process_market(

def before_process_markets(self, market_type: MarketType) -> None:
"""
Executes actions that occur before bets are placed.
Executed before market processing loop starts.
"""
api_keys = APIKeys()
self.check_min_required_balance_to_operate(market_type)
Expand Down Expand Up @@ -485,7 +497,9 @@ def process_markets(self, market_type: MarketType) -> None:
logger.info("All markets processed.")

def after_process_markets(self, market_type: MarketType) -> None:
"Executes actions that occur after bets are placed."
"""
Executed after market processing loop ends.
"""

def run(self, market_type: MarketType) -> None:
if market_type not in self.supported_markets:
Expand All @@ -498,6 +512,14 @@ def run(self, market_type: MarketType) -> None:


class DeployableTraderAgent(DeployablePredictionAgent):
"""
Subclass this class to create your own prediction market trading agent.

The agent will process markets, make predictions and place trades (bets) based off these predictions.
"""

AGENT_TAG: AgentTagEnum = AgentTagEnum.TRADER

# These markets require place of bet, not just predictions.
supported_markets: t.Sequence[MarketType] = [
MarketType.OMEN,
Expand Down Expand Up @@ -537,6 +559,11 @@ def check_min_required_balance_to_trade(self, market: AgentMarket) -> None:
)

def get_betting_strategy(self, market: AgentMarket) -> BettingStrategy:
"""
Override this method to customize betting strategy of your agent.

Given the market and prediction, agent uses this method to calculate optimal outcome and bet size.
"""
user_id = market.get_user_id(api_keys=APIKeys())

total_amount = market.get_tiny_bet_amount().amount
Expand Down
2 changes: 1 addition & 1 deletion tests_integration_with_local_chain/deploy/test_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ def test_local_deployment(local_web3: Web3) -> None:
DeployableCoinFlipAgent(enable_langfuse=False).deploy_local(
sleep_time=0.001,
market_type=MarketType.OMEN,
timeout=0.01,
run_time=0.01,
)
Loading