diff --git a/Dockerfile b/Dockerfile index d718c673..b37afd5e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,11 +16,14 @@ RUN pip wheel . --wheel-dir=/wheels # Install from wheels FROM ghcr.io/apeworx/ape:${BASE_APE_IMAGE_TAG:-latest} USER root -COPY --from=builder /wheels /wheels +COPY --from=builder /wheels/*.whl /wheels RUN pip install --upgrade pip \ - && pip install silverback \ + && pip install \ + --no-cache-dir --find-links=/wheels \ 'taskiq-sqs>=0.0.11' \ - --no-cache-dir --find-links=/wheels + 'taskiq-redis>=1.0.2,<2' \ + silverback + USER harambe ENTRYPOINT ["silverback"] diff --git a/README.md b/README.md index ab0b8c3e..1bf479cd 100644 --- a/README.md +++ b/README.md @@ -1,11 +1,11 @@ # Quick Start Silverback lets you create and deploy your own Python bots that respond to on-chain events. -The Silverback library leverages the [Ape](https://docs.apeworx.io/ape/stable/userguides/quickstart) development framework as well as it's ecosystem of plugins and packages to enable you to develop simple-yet-sophisticated automated applications that can listen and respond to live chain data. +The Silverback library leverages the [Ape](https://docs.apeworx.io/ape/stable/userguides/quickstart) development framework as well as it's ecosystem of plugins and packages to enable you to develop simple-yet-sophisticated automated bots that can listen and respond to live chain data. -Silverback applications are excellent for use cases that involve continuously monitoring and responding to on-chain events, such as newly confirmed blocks or contract event logs. +Silverback bots are excellent for use cases that involve continuously monitoring and responding to on-chain events, such as newly confirmed blocks or contract event logs. -Some examples of these types of applications: +Some examples of these types of bots: - Monitoring new pool creations, and depositing liquidity - Measuring trading activity of popular pools @@ -13,7 +13,7 @@ Some examples of these types of applications: ## Documentation -Please read the [development userguide](https://docs.apeworx.io/silverback/stable/userguides/development.html) for more information on how to develop an application. +Please read the [development userguide](https://docs.apeworx.io/silverback/stable/userguides/development.html) for more information on how to develop a bot. ## Dependencies @@ -72,11 +72,11 @@ Silverback will automatically register files in this folder as separate bots tha ```{note} It is also suggested that you treat this as a scripts folder, and do not include an __init__.py -If you have a complicated project, follow the previous example to ensure you run the application correctly. +If you have a complicated project, follow the previous example to ensure you run the bot correctly. ``` ```{note} -A final suggestion would be to name your `SilverbackApp` object `bot`. Silverback automatically searches +A final suggestion would be to name your `SilverbackBot` object `bot`. Silverback automatically searches for this object name when running. If you do not do so, once again, ensure you replace `example` with `example:` the previous example. ``` @@ -139,7 +139,7 @@ Traceback (most recent call last): ape_alchemy.exceptions.MissingProjectKeyError: Must set one of $WEB3_ALCHEMY_PROJECT_ID, $WEB3_ALCHEMY_API_KEY, $WEB3_ETHEREUM_MAINNET_ALCHEMY_PROJECT_ID, $WEB3_ETHEREUM_MAINNET_ALCHEMY_API_KEY. ``` -Go to [Alchemy](https://alchemy.com), create an account, then create an application in their dashboard, and copy the API Key. +Go to [Alchemy](https://alchemy.com), create an account, then create an bot in their dashboard, and copy the API Key. Another requirement for the command from `Docker Usage` to run the given example is that it uses [ape-tokens](https://github.com/ApeWorX/ape-tokens) plugin to look up token interfaces by symbol. In order for this to work, you should have installed and configured that plugin using a token list that includes both YFI and USDC on Ethereum mainnet. diff --git a/docs/userguides/development.md b/docs/userguides/development.md index 0a1431cd..9e33c6d5 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -1,6 +1,6 @@ -# Developing Applications +# Developing Bots -In this guide, we are going to show you more details on how to build an application with Silverback. +In this guide, we are going to show you more details on how to build an bot with Silverback. ## Prerequisites @@ -15,7 +15,7 @@ There are 3 suggested ways to structure your project. In the root directory of y 2. Create a `bots/` folder. Then develop bots in this folder as separate scripts (Do not include a __init__.py file). -3. Create a `bot/` folder with a `__init__.py` file that will include the instantiation of your `SilverbackApp()` object. +3. Create a `bot/` folder with a `__init__.py` file that will include the instantiation of your `SilverbackBot()` object. The `silverback` cli automatically searches for python scripts to run as bots in specific locations relative to the root of your project. It will also be able to detect the scripts inside your `bots/` directory and let you run those by name (in case you have multiple bots in your project). @@ -23,7 +23,7 @@ It will also be able to detect the scripts inside your `bots/` directory and let If `silverback` finds a module named `bot` in the root directory of the project, then it will use that by default. ```{note} -It is suggested that you create the instance of your `SilverbackApp()` object by naming the variable `bot`, since `silverback` will autodetect that variable name when loading your script file. +It is suggested that you create the instance of your `SilverbackBot()` object by naming the variable `bot`, since `silverback` will autodetect that variable name when loading your script file. ``` Another way you can structure your bot is to create a `bot` folder and define a runner inside of that folder as `__init__.py`. @@ -43,7 +43,7 @@ If your bot's module name is `example.py` (for example), you can run it like thi silverback run example --network your:network:of:choice ``` -If the variable that you call the `SilverbackApp()` object is something other than `bot`, you can specific that by adding `:{variable-name}`: +If the variable that you call the `SilverbackBot()` object is something other than `bot`, you can specific that by adding `:{variable-name}`: ```bash silverback run example:my_bot --network your:network:of:choice @@ -52,7 +52,7 @@ silverback run example:my_bot --network your:network:of:choice We will automatically detect all scripts under the `bots/` folder automatically, but if your bot resides in a location other than `bots/` then you can use this to run it: ```bash -silverback run folder.example:app --network your:network:of:choice +silverback run folder.example:bot --network your:network:of:choice ``` Note that with a `bot/__init__.py` setup, silverback will also autodetect it, and you can run it with: @@ -69,21 +69,21 @@ For the most streamlined experience, develop your bots as scripts, and avoid rel If you follow these suggestions, your Silverback deployments will be easy to use and require almost no thought. ``` -## Creating an Application +## Creating a Bot -Creating a Silverback Application is easy, to do so initialize the `silverback.SilverbackApp` class: +Creating a Silverback Bot is easy, to do so initialize the `silverback.SilverbackBot` class: ```py -from silverback import SilverbackApp +from silverback import SilverbackBot -bot = SilverbackApp() +bot = SilverbackBot() ``` -The SilverbackApp class handles state and configuration. +The SilverbackBot class handles state and configuration. Through this class, we can hook up event handlers to be executed each time we encounter a new block or each time a specific event is emitted. -Initializing the app creates a network connection using the Ape configuration of your local project, making it easy to add a Silverback bot to your project in order to perform automation of necessary on-chain interactions required. +Initializing the bot creates a network connection using the Ape configuration of your local project, making it easy to add a Silverback bot to your project in order to perform automation of necessary on-chain interactions required. -However, by default an app has no configured event handlers, so it won't be very useful. +However, by default an bot has no configured event handlers, so it won't be very useful. This is where adding event handlers is useful via the `bot.on_` method. This method lets us specify which event will trigger the execution of our handler as well as which handler to execute. @@ -161,9 +161,9 @@ def block_handler(block, context: Annotated[Context, TaskiqDepends()]): ... ``` -### Application Events +### Bot Events -You can also add an application startup and shutdown handler that will be **executed once upon every application startup**. This may be useful for things like processing historical events since the application was shutdown or other one-time actions to perform at startup. +You can also add an bot startup and shutdown handler that will be **executed once upon every bot startup**. This may be useful for things like processing historical events since the bot was shutdown or other one-time actions to perform at startup. ```py @bot.on_startup() @@ -180,7 +180,46 @@ def handle_on_shutdown(): ... ``` -*Changed in 0.2.0*: The behavior of the `@bot.on_startup()` decorator and handler signature have changed. It is now executed only once upon application startup and worker events have moved on `@bot.on_worker_startup()`. +*Changed in 0.2.0*: The behavior of the `@bot.on_startup()` decorator and handler signature have changed. It is now executed only once upon bot startup and worker events have moved on `@bot.on_worker_startup()`. + +## Bot State + +Sometimes it is very useful to have access to values in a shared state across your workers. +For example you might have a value or complex reference type that you wish to update during one of your tasks, and read during another. +Silverback provides `bot.state` to help with these use cases. + +For example, you might want to pre-populate a large dataframe into state on startup, keeping that dataframe in sync with the chain through event logs, +and then use that data to determine a signal under which you want trigger transactions to commit back to the chain. +Such an bot might look like this: + +```py +@bot.on_startup() +def create_table(startup_state): + df = contract.MyEvent.query(..., start_block=startup_state.last_block_processed) + ... # Do some further processing on df + bot.state.table = df + + +@bot.on_(contract.MyEvent) +def update_table(log): + bot.state.table = ... # Update using stuff from `log` + + +@bot.on_(chain.blocks) +def use_table(blk): + if bot.state.table[...].mean() > bot.state.table[...].sum(): + # Trigger your bot to send a transaction from `bot.signer` + contract.myMethod(..., sender=bot.signer) + ... +``` + +```{warning} +You can use `bot.state` to store any python variable type, however note that the item is not networked nor threadsafe so it is not recommended to have multiple tasks write to the same value in state at the same time. +``` + +```{note} +Bot startup and bot runtime event triggers (e.g. block or event container) are handled distinctly and can be trusted not to execute at the same time. +``` ### Signing Transactions @@ -192,10 +231,10 @@ While not recommended, you can use keyfile accounts for automated signing. See [this guide](https://docs.apeworx.io/ape/stable/userguides/accounts.html#automation) to learn more about how to do that. ``` -## Running your Application +## Running your Bot Once you have programmed your bot, it's really useful to be able to run it locally and validate that it does what you expect it to do. -To run your bot locally, we have included a really useful cli command [`run`](../commands/run) that takes care of connecting to the proper network, configuring signers (using your local Ape accounts), and starting up the application client and in-memory task queue workers. +To run your bot locally, we have included a really useful cli command [`run`](../commands/run) that takes care of connecting to the proper network, configuring signers (using your local Ape accounts), and starting up the bot client and in-memory task queue workers. ```sh # Run your bot on the Ethereum Sepolia testnet, with your own signer: @@ -206,20 +245,20 @@ $ silverback run my_bot --network :sepolia --account acct-name `my_bot:bot` is not required for silverback run if you follow the suggested folder structure at the start of this page, you can just call it via `my_bot`. ``` -It's important to note that signers are optional, if not configured in the application then `bot.signer` will be `None`. -You can use this in your application to enable a "test execution" mode, something like this: +It's important to note that signers are optional, if not configured in the bot then `bot.signer` will be `None`. +You can use this in your bot to enable a "test execution" mode, something like this: ```py # Compute some metric that might lead to creating a transaction if bot.signer: - # Execute a transaction via `sender=app.signer` + # Execute a transaction via `sender=bot.signer` else: # Log what the transaction *would* have done, had a signer been enabled ``` ```{warning} -If you configure your application to use a signer, and that signer signs anything given to it, remember that you can lose substational amounts of funds if you deploy this to a production network. -Always test your applications throughly before deploying, and always use a dedicated key for production signing with your application in a remote setting. +If you configure your bot to use a signer, and that signer signs anything given to it, remember that you can lose substational amounts of funds if you deploy this to a production network. +Always test your bots throughly before deploying, and always use a dedicated key for production signing with your bot in a remote setting. ``` ```{note} @@ -230,7 +269,7 @@ Use segregated keys and limit your risk by controlling the amount of funds that ### Distributed Execution Using only the `silverback run ...` command in a default configuration executes everything in one process and the job queue is completely in-memory with a shared state. -In some high volume environments, you may want to deploy your Silverback application in a distributed configuration using multiple processes to handle the messages at a higher rate. +In some high volume environments, you may want to deploy your Silverback bot in a distributed configuration using multiple processes to handle the messages at a higher rate. The primary components are the client and workers. The client handles Silverback events (blocks and contract event logs) and creates jobs for the workers to process in an asynchronous manner. @@ -265,10 +304,10 @@ silverback worker -w 2 The client will send tasks to the 2 worker subprocesses, and all task queue and results data will be go through Redis. -## Testing your Application +## Testing your Bot TODO: Add backtesting mode w/ `silverback test` -## Deploying your Application +## Deploying your Bot -Check out the [Platform Deployment Userguide](./platform.html) for more information on how to deploy your application to the [Silverback Platform](https://silverback.apeworx.io). +Check out the [Platform Deployment Userguide](./platform.html) for more information on how to deploy your bot to the [Silverback Platform](https://silverback.apeworx.io). diff --git a/docs/userguides/platform.md b/docs/userguides/platform.md index 922fb5c8..d538739b 100644 --- a/docs/userguides/platform.md +++ b/docs/userguides/platform.md @@ -1,14 +1,14 @@ -# Deploying Applications +# Deploying Bots In this guide, we are going to show you more details on how to deploy your application to the [Silverback Platform](https://silverback.apeworx.io). ## Creating a Cluster -The Silverback Platform runs your Applications (or "Bots") on dedicated managed application Clusters. +The Silverback Platform runs your Bots on dedicated managed application Clusters. These Clusters will take care to orchestrate infrastructure, monitor, run your triggers, and collect metrics for your applications. Each Cluster is bespoke for an individual or organization, and isolates your applications from others on different infrastructure. -Before we deploy our Application, we have to create a Cluster. +Before we deploy our Bot, we have to create a Cluster. If you haven't yet, please sign up for Silverback at [https://silverback.apeworx.io](https://silverback.apeworx.io). Once you have signed up, you can actually create (and pay for) your Clusters from the Silverback CLI utility by first @@ -44,7 +44,7 @@ For instance, to list all your available bots on your cluster, use [`silverback To obtain general information about your cluster, just use [`silverback cluster info`][silverback-cluster-info], or [`silverback cluster health`][silverback-cluster-health] to see the current status of your Cluster. -If you have no bots, we will first have to containerize our Applications and upload them to a container registry that our Cluster is configured to access. +If you have no bots, we will first have to containerize our Bots and upload them to a container registry that our Cluster is configured to access. ```{note} Building a container for your application can be an advanced topic, we have included the `silverback build` subcommand to help assist in generating Dockerfiles. @@ -108,7 +108,7 @@ Silverback Clusters include an environment variable management system for exactl which you can manage using [`silverback cluster vars`][silverback-cluster-vars] subcommand. The environment variable management system makes use of a concept called "Variable Groups" which are distinct collections of environment variables meant to be used together. -These variable groups will help in managing the runtime environment of your Applications by allowing you to segregate different variables depending on each bot's needs. +These variable groups will help in managing the runtime environment of your Bots by allowing you to segregate different variables depending on each bot's needs. To create an environment group, use the [`silverback cluster vars new`][silverback-cluster-vars-new] command and give it a name and a set of related variables. For instance, it may make sense to make a group of variables for your favorite Ape plugins or services, such as RPC Providers, Blockchain Data Indexers, Etherscan, etc. @@ -199,7 +199,7 @@ Any task execution that experiences an error will abort execution (and therefore All errors encountered during task exeuction are reported to the Cluster for later review by any users with appriopiate access. Tasks do not retry (by default), but updates to `app.state` are maintained up until the point an error occurs. -It is important to keep track of these errors and ensure that none of them are in fact critical to the operation of your Application, +It is important to keep track of these errors and ensure that none of them are in fact critical to the operation of your Bot, and to take corrective or preventative action if it is determined that it should be treated as a more critical failure condition. ``` diff --git a/example.py b/example.py index 23039a7b..70737d54 100644 --- a/example.py +++ b/example.py @@ -1,3 +1,4 @@ +import asyncio from typing import Annotated from ape import chain @@ -6,40 +7,56 @@ from ape_tokens import tokens # type: ignore[import] from taskiq import Context, TaskiqDepends, TaskiqState -from silverback import AppState, CircuitBreaker, SilverbackApp +from silverback import CircuitBreaker, SilverbackBot, StateSnapshot -# Do this first to initialize your app -app = SilverbackApp() +# Do this first to initialize your bot +bot = SilverbackBot() -# NOTE: Don't do any networking until after initializing app +# Cannot call `bot.state` outside of an bot function handler +# bot.state.something # NOTE: raises AttributeError + +# NOTE: Don't do any networking until after initializing bot USDC = tokens["USDC"] YFI = tokens["YFI"] -@app.on_startup() -def app_startup(startup_state: AppState): - # NOTE: This is called just as the app is put into "run" state, - # and handled by the first available worker - # raise Exception # NOTE: Any exception raised on startup aborts immediately +@bot.on_startup() +def bot_startup(startup_state: StateSnapshot): + # This is called just as the bot is put into "run" state, + # and handled by the first available worker + + # Any exception raised on startup aborts immediately: + # raise Exception # NOTE: raises StartupFailure + + # This is a great place to set `bot.state` values + bot.state.logs_processed = 0 + # NOTE: Can put anything here, any python object works + return {"block_number": startup_state.last_block_seen} # Can handle some resource initialization for each worker, like LLMs or database connections class MyDB: def execute(self, query: str): - pass + pass # Handle query somehow... -@app.on_worker_startup() -def worker_startup(state: TaskiqState): # NOTE: You need the type hint here +@bot.on_worker_startup() +# NOTE: This event is triggered internally, do not use unless you know what you're doing +def worker_startup(worker_state: TaskiqState): # NOTE: You need the type hint to load worker state + # NOTE: Worker state is per-worker, not shared with other workers # NOTE: Can put anything here, any python object works - state.db = MyDB() - state.block_count = 0 - # raise Exception # NOTE: Any exception raised on worker startup aborts immediately + worker_state.db = MyDB() + + # Any exception raised on worker startup aborts immediately: + # raise Exception # NOTE: raises StartupFailure + + # Cannot call `bot.state` because it is not set up yet on worker startup functions + # bot.state.something # NOTE: raises AttributeError # This is how we trigger off of new blocks -@app.on_(chain.blocks) +@bot.on_(chain.blocks) # NOTE: The type hint for block is `BlockAPI`, but we parse it using `EcosystemAPI` # NOTE: If you need something from worker state, you have to use taskiq context def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): @@ -49,36 +66,51 @@ def exec_block(block: BlockAPI, context: Annotated[Context, TaskiqDepends()]): # This is how we trigger off of events # Set new_block_timeout to adjust the expected block time. -@app.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25) +@bot.on_(USDC.Transfer, start_block=19784367, new_block_timeout=25) # NOTE: Typing isn't required, it will still be an Ape `ContractLog` type def exec_event1(log): if log.log_index % 7 == 3: # If you raise any exception, Silverback will track the failure and keep running - # NOTE: By default, if you have 3 tasks fail in a row, the app will shutdown itself + # NOTE: By default, if you have 3 tasks fail in a row, the bot will shutdown itself raise ValueError("I don't like the number 3.") + # You can update state whenever you want + bot.state.logs_processed += 1 + return {"amount": log.amount} -@app.on_(YFI.Approval) +@bot.on_(YFI.Approval) # Any handler function can be async too async def exec_event2(log: ContractLog): - if log.log_index % 7 == 6: - # If you ever want the app to immediately shutdown under some scenario, raise this exception - raise CircuitBreaker("Oopsie!") - + # All `bot.state` values are updated across all workers at the same time + bot.state.logs_processed += 1 + # Do any other long running tasks... + await asyncio.sleep(5) return log.amount +@bot.on_(chain.blocks) +# NOTE: You can have multiple handlers for any trigger we support +def check_logs(log): + if bot.state.logs_processed > 20: + # If you ever want the bot to immediately shutdown under some scenario, raise this exception + raise CircuitBreaker("Oopsie!") + + # A final job to execute on Silverback shutdown -@app.on_shutdown() -def app_shutdown(): - # raise Exception # NOTE: Any exception raised on shutdown is ignored +@bot.on_shutdown() +def bot_shutdown(): + # NOTE: Any exception raised on worker shutdown is ignored: + # raise Exception return {"some_metric": 123} # Just in case you need to release some resources or something inside each worker -@app.on_worker_shutdown() +@bot.on_worker_shutdown() def worker_shutdown(state: TaskiqState): # NOTE: You need the type hint here + # This is a good time to release resources state.db = None - # raise Exception # NOTE: Any exception raised on worker shutdown is ignored + + # NOTE: Any exception raised on worker shutdown is ignored: + # raise Exception diff --git a/silverback/__init__.py b/silverback/__init__.py index 43b3c961..b2ec0a03 100644 --- a/silverback/__init__.py +++ b/silverback/__init__.py @@ -1,10 +1,10 @@ -from .application import SilverbackApp from .exceptions import CircuitBreaker, SilverbackException -from .state import AppState +from .main import SilverbackBot +from .state import StateSnapshot __all__ = [ - "AppState", + "StateSnapshot", "CircuitBreaker", - "SilverbackApp", + "SilverbackBot", "SilverbackException", ] diff --git a/silverback/_cli.py b/silverback/_cli.py index fe193e47..fc2f30b1 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -51,7 +51,7 @@ @click.group(cls=SectionedHelpGroup) def cli(): """ - Silverback: Build Python apps that react to on-chain events + Silverback: Build Python bots that react to on-chain events To learn more about our cloud offering, please check out https://silverback.apeworx.io """ @@ -110,7 +110,7 @@ def _network_callback(ctx, param, val): @click.option("-x", "--max-exceptions", type=int, default=3) @click.argument("bot", required=False, callback=bot_path_callback) def run(cli_ctx, account, runner_class, recorder_class, max_exceptions, bot): - """Run Silverback application""" + """Run Silverback bot""" if not runner_class: # NOTE: Automatically select runner class @@ -120,7 +120,7 @@ def run(cli_ctx, account, runner_class, recorder_class, max_exceptions, bot): runner_class = PollingRunner else: raise click.BadOptionUsage( - option_name="network", message="Network choice cannot support running app" + option_name="network", message="Network choice cannot support running bot" ) runner = runner_class( @@ -213,7 +213,7 @@ def login(auth: FiefAuth): @cli.group(cls=SectionedHelpGroup, section="Cloud Commands (https://silverback.apeworx.io)") def cluster(): - """Manage a Silverback hosted application cluster + """Manage a Silverback hosted bot cluster For clusters on the Silverback Platform, please provide a name for the cluster to access under your platform account via `-c WORKSPACE/NAME`""" diff --git a/silverback/exceptions.py b/silverback/exceptions.py index 371dff83..554f5a88 100644 --- a/silverback/exceptions.py +++ b/silverback/exceptions.py @@ -48,11 +48,11 @@ def __init__(self): class Halt(SilverbackException): def __init__(self): - super().__init__("App halted, must restart manually") + super().__init__("Bot halted, must restart manually") class CircuitBreaker(Halt): - """Custom exception (created by user) that will trigger an application shutdown.""" + """Custom exception (created by user) that will trigger an bot shutdown.""" def __init__(self, message: str): super(SilverbackException, self).__init__(message) diff --git a/silverback/application.py b/silverback/main.py similarity index 67% rename from silverback/application.py rename to silverback/main.py index 2691a98a..015ec1a5 100644 --- a/silverback/application.py +++ b/silverback/main.py @@ -1,4 +1,5 @@ import atexit +from collections import defaultdict from datetime import timedelta from typing import Any, Callable @@ -13,15 +14,16 @@ from .exceptions import ContainerTypeMismatchError, InvalidContainerTypeError from .settings import Settings +from .state import StateSnapshot from .types import SilverbackID, TaskType class SystemConfig(BaseModel): # NOTE: Do not change this datatype unless major breaking - # NOTE: Useful for determining if Runner can handle this app + # NOTE: Useful for determining if Runner can handle this bot sdk_version: str - # NOTE: Useful for specifying what task types can be specified by app + # NOTE: Useful for specifying what task types can be specified by bot task_types: list[str] @@ -33,22 +35,69 @@ class TaskData(BaseModel): # NOTE: Any other items here must have a default value -class SilverbackApp(ManagerAccessMixin): +class SharedState(defaultdict): """ - The application singleton. Must be initialized prior to use. + Class containing the bot shared state that all workers can read from and write to. + + ```{warning} + This is not networked in any way, nor is it multi-process safe, but will be + accessible across multiple thread workers within a single process. + ``` + + Usage example:: + + @bot.on_(...) + def do_something_with_state(value): + # Read from state using `getattr` + ... = bot.state.something + + # Set state using `setattr` + bot.state.something = ... + + # Read from state using `getitem` + ... = bot.state["something"] + + # Set state using setitem + bot.state["something"] = ... + """ + + # TODO: This class does not have thread-safe access control, but should remain safe due to + # it being a memory mapping, and writes are strictly controlled to be handled only by + # one worker at a time. There may be issues with using this in production however. + + def __init__(self): + # Any unknown key returns None + super().__init__(lambda: None) + + def __getattr__(self, attr): + try: + return super().__getattr__(attr) + except AttributeError: + return super().__getitem__(attr) + + def __setattr__(self, attr, val): + try: + super().__setattr__(attr, val) + except AttributeError: + super().__setitem__(attr, val) + + +class SilverbackBot(ManagerAccessMixin): + """ + The bot singleton. Must be initialized prior to use. Usage example:: - from silverback import SilverbackApp + from silverback import SilverbackBot - app = SilverbackApp() + bot = SilverbackBot() - ... # Connection has been initialized, can call broker methods e.g. `app.on_(...)` + ... # Connection has been initialized, can call broker methods e.g. `bot.on_(...)` """ def __init__(self, settings: Settings | None = None): """ - Create app + Create bot Args: settings (~:class:`silverback.settings.Settings` | None): Settings override. @@ -62,7 +111,7 @@ def __init__(self, settings: Settings | None = None): provider = provider_context.__enter__() self.identifier = SilverbackID( - name=settings.APP_NAME, + name=settings.BOT_NAME, network=provider.network.name, ecosystem=provider.network.ecosystem.name, ) @@ -74,7 +123,7 @@ def __init__(self, settings: Settings | None = None): settings.NEW_BLOCK_TIMEOUT = int(timedelta(days=1).total_seconds()) settings_str = "\n ".join(f'{key}="{val}"' for key, val in settings.dict().items() if val) - logger.info(f"Loading Silverback App with settings:\n {settings_str}") + logger.info(f"Loading Silverback Bot with settings:\n {settings_str}") self.broker = settings.get_broker() self.tasks: dict[TaskType, list[TaskData]] = { @@ -97,7 +146,7 @@ def __init__(self, settings: Settings | None = None): network_choice = f"{self.identifier.ecosystem}:{self.identifier.network}" logger.success( - f'Loaded Silverback App:\n NETWORK="{network_choice}"' + f'Loaded Silverback Bot:\n NETWORK="{network_choice}"' f"{signer_str}{new_block_timeout_str}" ) @@ -112,6 +161,12 @@ def __init__(self, settings: Settings | None = None): self._get_user_all_taskdata = self.__register_system_task( TaskType.SYSTEM_USER_ALL_TASKDATA, self.__get_user_all_taskdata_handler ) + self._load_snapshot = self.__register_system_task( + TaskType.SYSTEM_LOAD_SNAPSHOT, self.__load_snapshot_handler + ) + self._create_snapshot = self.__register_system_task( + TaskType.SYSTEM_CREATE_SNAPSHOT, self.__create_snapshot_handler + ) def __register_system_task( self, task_type: TaskType, task_handler: Callable @@ -142,6 +197,34 @@ def __get_user_taskdata_handler(self, task_type: TaskType) -> list[TaskData]: def __get_user_all_taskdata_handler(self) -> list[TaskData]: return [v for k, l in self.tasks.items() if str(k).startswith("user:") for v in l] + async def __load_snapshot_handler(self, startup_state: StateSnapshot): + # NOTE: *DO NOT USE* in Runner, as it will not be updated by the bot + self.state = SharedState() + # NOTE: attribute does not exist before this task is executed, + # ensuring no one uses it during worker startup + + self.state["system:last_block_seen"] = startup_state.last_block_seen + self.state["system:last_block_processed"] = startup_state.last_block_processed + # TODO: Load user custom state (should not start with `system:`) + + async def __create_snapshot_handler( + self, + last_block_seen: int | None = None, + last_block_processed: int | None = None, + ): + # Task that updates state checkpoints before/after every non-system runtime task/at shutdown + if last_block_seen is not None: + self.state["system:last_block_seen"] = last_block_seen + + if last_block_processed is not None: + self.state["system:last_block_processed"] = last_block_processed + + return StateSnapshot( + # TODO: Migrate these to parameters (remove explicitly from state) + last_block_seen=self.state.get("system:last_block_seen", -1), + last_block_processed=self.state.get("system:last_block_processed", -1), + ) + def broker_task_decorator( self, task_type: TaskType, @@ -150,6 +233,11 @@ def broker_task_decorator( """ Dynamically create a new broker task that handles tasks of ``task_type``. + ```{warning} + Dynamically creating a task does not ensure that the runner will be aware of the task + in order to trigger it. Use at your own risk. + ``` + Args: task_type: :class:`~silverback.types.TaskType`: The type of task to create. container: (BlockContainer | ContractEvent): The event source to watch. @@ -206,47 +294,61 @@ def add_taskiq_task(handler: Callable) -> AsyncTaskiqDecoratedTask: def on_startup(self) -> Callable: """ - Code to execute on one worker upon startup / restart after an error. + Code that will be exected by one worker after worker startup, but before the + bot is put into the "run" state by the Runner. Usage example:: - @app.on_startup() - def do_something_on_startup(startup_state): + @bot.on_startup() + def do_something_on_startup(startup_state: StateSnapshot): ... # Reprocess missed events or blocks """ return self.broker_task_decorator(TaskType.STARTUP) def on_shutdown(self) -> Callable: """ - Code to execute on one worker at shutdown. + Code that will be exected by one worker before worker shutdown, after the + Runner has decided to put the bot into the "shutdown" state. Usage example:: - @app.on_shutdown() + @bot.on_shutdown() def do_something_on_shutdown(): - ... # Record final state of app + ... # Record final state of bot """ return self.broker_task_decorator(TaskType.SHUTDOWN) + # TODO: Abstract away worker startup into dependency system def on_worker_startup(self) -> Callable: """ - Code to execute on every worker at startup / restart after an error. + Code to execute on every worker immediately after broker startup. + + ```{note} + This is a great place to load heavy dependencies for the workers, + such as database connections, ML models, etc. + ``` Usage example:: - @app.on_startup() + @bot.on_worker_startup() def do_something_on_startup(state): ... # Can provision resources, or add things to `state`. """ return self.broker.on_event(TaskiqEvents.WORKER_STARTUP) + # TODO: Abstract away worker shutdown into dependency system def on_worker_shutdown(self) -> Callable: """ - Code to execute on every worker at shutdown. + Code to execute on every worker immediately before broker shutdown. + + ```{note} + This is where you should also release any resources you have loaded during + worker startup. + ``` Usage example:: - @app.on_shutdown() + @bot.on_worker_shutdown() def do_something_on_shutdown(state): ... # Update some external service, perhaps using information from `state`. """ @@ -255,22 +357,23 @@ def do_something_on_shutdown(state): def on_( self, container: BlockContainer | ContractEvent, + # TODO: possibly remove these new_block_timeout: int | None = None, start_block: int | None = None, ): """ - Create task to handle events created by `container`. + Create task to handle events created by the `container` trigger. Args: container: (BlockContainer | ContractEvent): The event source to watch. new_block_timeout: (int | None): Override for block timeout that is acceptable. - Defaults to whatever the app's settings are for default polling timeout are. + Defaults to whatever the bot's settings are for default polling timeout are. start_block (int | None): block number to start processing events from. Defaults to whatever the latest block is. Raises: :class:`~silverback.exceptions.InvalidContainerTypeError`: - If the type of `container` is not configurable for the app. + If the type of `container` is not configurable for the bot. """ if isinstance(container, BlockContainer): if new_block_timeout is not None: @@ -307,5 +410,5 @@ def on_( return self.broker_task_decorator(TaskType.EVENT_LOG, container=container) # TODO: Support account transaction polling - # TODO: Support mempool polling + # TODO: Support mempool polling? raise InvalidContainerTypeError(container) diff --git a/silverback/middlewares.py b/silverback/middlewares.py index cd19cc7a..ef97206c 100644 --- a/silverback/middlewares.py +++ b/silverback/middlewares.py @@ -85,7 +85,12 @@ def pre_execute(self, message: TaskiqMessage) -> TaskiqMessage: message.labels["transaction_hash"] = log.transaction_hash message.labels["log_index"] = str(log.log_index) - logger.info(f"{self._create_label(message)} - Started") + msg = f"{self._create_label(message)} - Started" + if message.task_name.startswith("system:"): + logger.debug(msg) + else: + logger.info(msg) + return message def post_execute(self, message: TaskiqMessage, result: TaskiqResult): diff --git a/silverback/runner.py b/silverback/runner.py index 1bd4eda0..46987fa4 100644 --- a/silverback/runner.py +++ b/silverback/runner.py @@ -11,10 +11,10 @@ from taskiq import AsyncTaskiqTask from taskiq.kicker import AsyncKicker -from .application import SilverbackApp, SystemConfig, TaskData from .exceptions import Halt, NoTasksAvailableError, NoWebsocketAvailableError, StartupFailure +from .main import SilverbackBot, SystemConfig, TaskData from .recorder import BaseRecorder, TaskResult -from .state import AppDatastore, AppState +from .state import Datastore, StateSnapshot from .subscriptions import SubscriptionType, Web3SubscriptionsManager from .types import TaskType from .utils import ( @@ -28,17 +28,19 @@ class BaseRunner(ABC): def __init__( self, - # TODO: Make fully stateless by replacing `app` with `broker` and `identifier` - app: SilverbackApp, + # TODO: Make fully stateless by replacing `bot` with `broker` and `identifier` + bot: SilverbackBot, *args, max_exceptions: int = 3, recorder: BaseRecorder | None = None, **kwargs, ): - self.app = app + self.bot = bot + + # TODO: Make datastore optional and settings-driven + # TODO: Allow configuring datastore class + self.datastore = Datastore() self.recorder = recorder - self.state = None - self.datastore = AppDatastore() self.max_exceptions = max_exceptions self.exceptions = 0 @@ -47,7 +49,7 @@ def __init__( def _create_task_kicker(self, task_data: TaskData) -> AsyncKicker: return AsyncKicker( - task_name=task_data.name, broker=self.app.broker, labels=task_data.labels + task_name=task_data.name, broker=self.bot.broker, labels=task_data.labels ) def _create_system_task_kicker(self, task_type: TaskType) -> AsyncKicker: @@ -76,26 +78,14 @@ async def _checkpoint( last_block_processed: int | None = None, ): """Set latest checkpoint block number""" - assert self.state, f"{self.__class__.__name__}.run() not triggered." - - logger.debug( - ( - f"Checkpoint block [seen={self.state.last_block_seen}, " - f"procssed={self.state.last_block_processed}]" - ) - ) + if not self._snapshotting_supported: + return # Can't support this feature - if last_block_seen: - self.state.last_block_seen = last_block_seen - if last_block_processed: - self.state.last_block_processed = last_block_processed - - if self.recorder: - try: - await self.datastore.set_state(self.state) - - except Exception as err: - logger.error(f"Error setting state: {err}") + task = await self.bot._create_snapshot.kiq(last_block_seen, last_block_processed) + if (result := await task.wait_result()).is_err: + logger.error(f"Error saving snapshot: {result.error}") + else: + await self.datastore.save(result.return_value) @abstractmethod async def _block_task(self, task_data: TaskData): @@ -106,12 +96,12 @@ async def _block_task(self, task_data: TaskData): @abstractmethod async def _event_task(self, task_data: TaskData): """ - handle an event handler task for the given contract event + Handle an event handler task for the given contract event """ async def run(self): """ - Run the task broker client for the assembled ``SilverbackApp`` application. + Run the task broker client for the assembled ``SilverbackBot`` bot. Will listen for events against the connected provider (using `ManagerAccessMixin` context), and process them by kicking events over to the configured broker. @@ -123,7 +113,7 @@ async def run(self): If there are no configured tasks to execute. """ # Initialize broker (run worker startup events) - await self.app.broker.startup() + await self.bot.broker.startup() # Obtain system configuration for worker result = await run_taskiq_task_wait_result( @@ -148,19 +138,47 @@ async def run(self): f", available task types:\n- {system_tasks_str}" ) + # NOTE: Bypass snapshotting if unsupported + self._snapshotting_supported = TaskType.SYSTEM_CREATE_SNAPSHOT in system_tasks + + # Load the snapshot (if available) + # NOTE: Add some additional handling to see if this feature is available in bot + if TaskType.SYSTEM_LOAD_SNAPSHOT not in system_tasks: + logger.warning( + "Silverback no longer supports runner-based snapshotting, " + "please upgrade your bot SDK version to latest to use snapshots." + ) + startup_state = StateSnapshot( + last_block_seen=-1, + last_block_processed=-1, + ) # Use empty snapshot + + elif not (startup_state := await self.datastore.init(bot_id=self.bot.identifier)): + logger.warning("No state snapshot detected, using empty snapshot") + startup_state = StateSnapshot( + # TODO: Migrate these to parameters (remove explicitly from state) + last_block_seen=-1, + last_block_processed=-1, + ) # Use empty snapshot + + logger.debug(f"Startup state: {startup_state}") + # NOTE: State snapshot is immediately out of date after init + + # Send startup state to bot + if ( + result := await run_taskiq_task_wait_result( + self._create_system_task_kicker(TaskType.SYSTEM_LOAD_SNAPSHOT), startup_state + ) + ).is_err: + raise StartupFailure(result.error) + # NOTE: Do this for other system tasks because they may not be in older SDK versions # `if TaskType. not in system_tasks: raise StartupFailure(...)` # or handle accordingly by having default logic if it is not available - # Initialize recorder (if available) and fetch state if app has been run previously + # Initialize recorder (if available) if self.recorder: - await self.recorder.init(app_id=self.app.identifier) - - if startup_state := (await self.datastore.init(app_id=self.app.identifier)): - self.state = startup_state - - else: # use empty state - self.state = AppState(last_block_seen=-1, last_block_processed=-1) + await self.recorder.init(bot_id=self.bot.identifier) # Execute Silverback startup task before we init the rest startup_taskdata_result = await run_taskiq_task_wait_result( @@ -176,7 +194,7 @@ async def run(self): ) startup_task_results = await run_taskiq_task_group_wait_results( - (task_handler for task_handler in startup_task_handlers), self.state + (task_handler for task_handler in startup_task_handlers), startup_state ) if any(result.is_err for result in startup_task_results): @@ -242,13 +260,13 @@ async def run(self): # NOTE: All listener tasks are shut down now - # Execute Silverback shutdown task(s) before shutting down the broker and app + # Execute Silverback shutdown task(s) before shutting down the broker and bot shutdown_taskdata_result = await run_taskiq_task_wait_result( self._create_system_task_kicker(TaskType.SYSTEM_USER_TASKDATA), TaskType.SHUTDOWN ) if shutdown_taskdata_result.is_err: - raise StartupFailure(shutdown_taskdata_result.error) + logger.error(f"Error when collecting shutdown tasks:\n{shutdown_taskdata_result.error}") else: shutdown_task_handlers = map( @@ -271,16 +289,20 @@ async def run(self): # NOTE: No need to handle results otherwise - await self.app.broker.shutdown() + if self._snapshotting_supported: + # Do one last checkpoint to save a snapshot of final state + await self._checkpoint() + + await self.bot.broker.shutdown() # Release broker class WebsocketRunner(BaseRunner, ManagerAccessMixin): """ - Run a single app against a live network using a basic in-memory queue and websockets. + Run a single bot against a live network using a basic in-memory queue and websockets. """ - def __init__(self, app: SilverbackApp, *args, **kwargs): - super().__init__(app, *args, **kwargs) + def __init__(self, bot: SilverbackBot, *args, **kwargs): + super().__init__(bot, *args, **kwargs) # Check for websocket support if not (ws_uri := self.chain_manager.provider.ws_uri): @@ -335,14 +357,14 @@ async def run(self): class PollingRunner(BaseRunner, ManagerAccessMixin): """ - Run a single app against a live network using a basic in-memory queue. + Run a single bot against a live network using a basic in-memory queue. """ # TODO: Move block_timeout settings to Ape core config # TODO: Merge polling/websocket subscriptions downstream in Ape core - def __init__(self, app: SilverbackApp, *args, **kwargs): - super().__init__(app, *args, **kwargs) + def __init__(self, bot: SilverbackBot, *args, **kwargs): + super().__init__(bot, *args, **kwargs) logger.warning( "The polling runner makes a significant amount of requests. " "Do not use in production over long time periods unless you know what you're doing." @@ -351,13 +373,13 @@ def __init__(self, app: SilverbackApp, *args, **kwargs): async def _block_task(self, task_data: TaskData): new_block_task_kicker = self._create_task_kicker(task_data) - if block_settings := self.app.poll_settings.get("_blocks_"): + if block_settings := self.bot.poll_settings.get("_blocks_"): new_block_timeout = block_settings.get("new_block_timeout") else: new_block_timeout = None new_block_timeout = ( - new_block_timeout if new_block_timeout is not None else self.app.new_block_timeout + new_block_timeout if new_block_timeout is not None else self.bot.new_block_timeout ) async for block in async_wrap_iter( chain.blocks.poll_blocks( @@ -379,13 +401,13 @@ async def _event_task(self, task_data: TaskData): event_abi = EventABI.from_signature(event_signature) event_log_task_kicker = self._create_task_kicker(task_data) - if address_settings := self.app.poll_settings.get(contract_address): + if address_settings := self.bot.poll_settings.get(contract_address): new_block_timeout = address_settings.get("new_block_timeout") else: new_block_timeout = None new_block_timeout = ( - new_block_timeout if new_block_timeout is not None else self.app.new_block_timeout + new_block_timeout if new_block_timeout is not None else self.bot.new_block_timeout ) async for event in async_wrap_iter( self.provider.poll_logs( diff --git a/silverback/settings.py b/silverback/settings.py index f4c5366c..7b756484 100644 --- a/silverback/settings.py +++ b/silverback/settings.py @@ -19,14 +19,14 @@ class Settings(BaseSettings, ManagerAccessMixin): """ - Settings for the Silverback app. + Settings for the Silverback bot. Can override these settings from a default state, typically for advanced testing or deployment purposes. Defaults to a working in-memory broker. """ # A unique identifier for this silverback instance - APP_NAME: str = "bot" + BOT_NAME: str = "bot" BROKER_CLASS: str = "taskiq:InMemoryBroker" BROKER_URI: str = "" # To be deprecated in 0.6 @@ -122,7 +122,7 @@ def get_signer(self) -> AccountAPI | None: acct_idx = int(alias.replace("TEST::", "")) return self.account_manager.test_accounts[acct_idx] - # NOTE: Will only have a signer if assigned one here (or in app) + # NOTE: Will only have a signer if assigned one here (or in bot) signer = self.account_manager.load(alias) # NOTE: Set autosign if it's a keyfile account (for local testing) diff --git a/silverback/state.py b/silverback/state.py index 36e059a2..22591ffe 100644 --- a/silverback/state.py +++ b/silverback/state.py @@ -5,7 +5,7 @@ from .types import SilverbackID, UTCTimestamp, utc_now -class AppState(BaseModel): +class StateSnapshot(BaseModel): # Last block number seen by runner last_block_seen: int @@ -17,46 +17,39 @@ class AppState(BaseModel): last_updated: UTCTimestamp = Field(default_factory=utc_now) -class AppDatastore: +class Datastore: """ - Very basic implementation used to store application state and handler result data by + Very basic implementation used to store bot state and handler result data by storing/retreiving state from a JSON-encoded file. - The file structure that this Recorder uses leverages the value of `SILVERBACK_APP_NAME` + The file structure that this Recorder uses leverages the value of `SILVERBACK_BOT_NAME` as well as the configured network to determine the location where files get saved: ./.silverback-sessions/ - / + / / state.json # always write here Note that this format can be read by basic means (even in a JS frontend): - You may also want to give your app a unique name so the data does not get overwritten, - if you are using multiple apps from the same directory: + You may also want to give your bot a unique name so the data does not get overwritten, + if you are using multiple bots from the same directory: - - `SILVERBACK_APP_NAME`: Any alphabetical string valid as a folder name + - `SILVERBACK_BOT_NAME`: Any alphabetical string valid as a folder name """ - async def init(self, app_id: SilverbackID) -> AppState | None: + async def init(self, bot_id: SilverbackID) -> StateSnapshot | None: data_folder = ( - Path.cwd() / ".silverback-sessions" / app_id.name / app_id.ecosystem / app_id.network + Path.cwd() / ".silverback-sessions" / bot_id.name / bot_id.ecosystem / bot_id.network ) data_folder.mkdir(parents=True, exist_ok=True) - self.state_backup_file = data_folder / "state.json" return ( - AppState.parse_file(self.state_backup_file) if self.state_backup_file.exists() else None + StateSnapshot.parse_file(self.state_backup_file) + if self.state_backup_file.exists() + else None ) - async def set_state(self, state: AppState): - if self.state_backup_file.exists(): - old_state = AppState.parse_file(self.state_backup_file) - if old_state.last_block_seen > state.last_block_seen: - state.last_block_seen = old_state.last_block_seen - if old_state.last_block_processed > state.last_block_processed: - state.last_block_processed = old_state.last_block_processed - - state.last_updated = utc_now() - self.state_backup_file.write_text(state.model_dump_json()) + async def save(self, snapshot: StateSnapshot): + self.state_backup_file.write_text(snapshot.model_dump_json()) diff --git a/silverback/types.py b/silverback/types.py index 1d5ccadb..68be6721 100644 --- a/silverback/types.py +++ b/silverback/types.py @@ -16,6 +16,8 @@ class TaskType(str, Enum): SYSTEM_CONFIG = "system:config" SYSTEM_USER_TASKDATA = "system:user-taskdata" SYSTEM_USER_ALL_TASKDATA = "system:user-all-taskdata" + SYSTEM_LOAD_SNAPSHOT = "system:load-snapshot" + SYSTEM_CREATE_SNAPSHOT = "system:create-snapshot" # User-accessible Tasks STARTUP = "user:startup"