diff --git a/docs/commands/cluster.rst b/docs/commands/cluster.rst new file mode 100644 index 00000000..1dafd291 --- /dev/null +++ b/docs/commands/cluster.rst @@ -0,0 +1,21 @@ +Cloud Platform +************** + +.. click:: silverback._cli:login + :prog: silverback login + :nested: none + +.. click:: silverback._cli:cluster + :prog: silverback cluster + :nested: full + :commands: workspaces, new, list, info, health + +.. click:: silverback._cli:vars + :prog: silverback cluster vars + :nested: full + :commands: new, list, info, update, remove + +.. click:: silverback._cli:bots + :prog: silverback cluster bots + :nested: full + :commands: new, list, info, update, remove, health, start, stop, logs, errors diff --git a/docs/commands/run.rst b/docs/commands/run.rst index 75a12f64..fbceda35 100644 --- a/docs/commands/run.rst +++ b/docs/commands/run.rst @@ -1,6 +1,10 @@ -run -*** +Local Development +***************** .. click:: silverback._cli:run - :prog: run + :prog: silverback run + :nested: none + +.. click:: silverback._cli:worker + :prog: silverback worker :nested: none diff --git a/docs/index.md b/docs/index.md index d6177851..1a045cbb 100644 --- a/docs/index.md +++ b/docs/index.md @@ -7,6 +7,7 @@ userguides/quickstart userguides/development + userguides/platform ``` ```{eval-rst} @@ -15,6 +16,7 @@ :maxdepth: 1 commands/run.rst + commands/cluster.rst ``` ```{eval-rst} diff --git a/docs/userguides/development.md b/docs/userguides/development.md index a62514d2..4b8f9c63 100644 --- a/docs/userguides/development.md +++ b/docs/userguides/development.md @@ -1,4 +1,4 @@ -# Developing a Silverback Application +# Developing Applications In this guide, we are going to show you more details on how to build an application with Silverback. @@ -182,12 +182,12 @@ export SILVERBACK_RESULT_BACKEND_URI="redis://127.0.0.1:6379" silverback worker -w 2 "example:app" ``` -This will run one client and 2 workers and all queue data will be go through Redis. +The client will send tasks to the 2 worker subprocesses, and all task queue and results data will be go through Redis. ## Testing your Application TODO: Add backtesting mode w/ `silverback test` -## Deploying to the Silverback Platform +## Deploying your Application -TODO: Add packaging and deployment to the Silverback platform, once available. +Check out the [Platform Deployment Userguide](./platform.html) for more information on how to deploy your application to the [Silverback Platform](https://silverback.apeworx.io). diff --git a/docs/userguides/platform.md b/docs/userguides/platform.md new file mode 100644 index 00000000..d6171cb0 --- /dev/null +++ b/docs/userguides/platform.md @@ -0,0 +1,179 @@ +# Deploying Applications + +In this guide, we are going to show you more details on how to deploy your application to the [Silverback Platform](https://silverback.apeworx.io). + +## Creating a Cluster + +The Silverback Platform runs your Applications (or "Bots") on dedicated managed application Clusters. +These Clusters will take care to orchestrate infrastructure, monitor, run your triggers, and collect metrics for your applications. +Each Cluster is bespoke for an individual or organization, and isolates your applications from others on different infrastructure. + +Before we deploy our Application, we have to create a Cluster. +If you haven't yet, please sign up for Silverback at [https://silverback.apeworx.io](https://silverback.apeworx.io). + +Once you have signed up, you can actually create (and pay for) your Clusters from the Silverback CLI utility by first +logging in to the Platform using [`silverback login`][silverback-login], +and then using [`silverback cluster new`][silverback-cluster-new] to follow the steps necessary to deploy it. + +```{note} +The Platform UI will let you create and manage Clusters using a graphical experience, which may be preferred. +The CLI experience is for those working locally who don't want to visit the website, or are locally developing their applications. +``` + +## Connecting to your Cluster + +To connect to a cluster, you can use commands from the [`silverback cluster`][silverback-cluster] subcommand group. +For instance, to list all your available bots on your cluster, use [`silverback cluster bots list`][silverback-cluster-bots-list]. +To obtain general information about your cluster, just use [`silverback cluster info`][silverback-cluster-info], +or [`silverback cluster health`][silverback-cluster-health] to see the current status of your Cluster. + +If you have no bots, we will first have to containerize our Applications and upload them to a container registry that our Cluster is configured to access. + +```{note} +Building a container for your application can be an advanced topic, we have included the `silverback build` subcommand to help assist in generating Dockerfiles. +``` + +## Building your Bot + +TODO: Add build process and describe `silverback build --autogen` and `silverback build --upgrade` + +TODO: Add how to debug containers using `silverback run` w/ `taskiq-redis` broker + +## Adding Environment Variables + +Once you have created your bot application container image, you might know of some environment variables the image requires to run properly. +Thanks to it's flexible plugin system, ape plugins may also require specific environment variables to load as well. +Silverback Clusters include an environment variable management system for exactly this purpose, +which you can manage using [`silverback cluster vars`][silverback-cluster-vars] subcommand. + +The environment variable management system makes use of a concept called "Variable Groups" which are distinct collections of environment variables meant to be used together. +These variable groups will help in managing the runtime environment of your Applications by allowing you to segregate different variables depending on each bot's needs. + +To create an environment group, use the [`silverback cluster vars new`][silverback-cluster-vars-new] command and give it a name and a set of related variables. +For instance, it may make sense to make a group of variables for your favorite Ape plugins or services, such as RPC Providers, Blockchain Data Indexers, Etherscan, etc. +You might have a database connection that you want all your bots to access. + +```{warning} +All environment variables in Silverback Clusters are private, meaning they cannot be viewed after they are uploaded. +However, your Bots will have full access to their values from within their runtime environment, so be careful that you fully understand what you are sharing with your bots. + +Also, understand your build dependencies within your container and make sure you are not using any vulnerable or malicious packages. + +**NEVER** upload your private key in a plaintext format! + +Use _Ape Account Plugins_ such as [`ape-aws`](https://github.com/ApeWorX/ape-aws) to safely manage access to your hosted keys. +``` + +```{note} +The Etherscan plugin _will not function_ without an API key in the cloud environment. +This will likely create errors running your applications if you use Ape's `Contract` class. +``` + +To list your Variable Groups, use [`silverback cluster vars list`][silverback-cluster-vars-list]. +To see information about a specific Variable Group, including the Environment Variables it includes, use [`silverback cluster vars info`][silverback-cluster-vars-info] +To remove a variable group, use [`silverback cluster vars remove`][silverback-cluster-vars-remove], + +```{note} +You can only remove a Variable Group if it is not referenced by any existing Bot. +``` + +Once you have created all the Variable Group(s) that you need to operate your Bot, you can reference these groups by name when adding your Bot to the cluster. + +## Deploying your Bot + +You are finally ready to deploy your bot on the Cluster and get it running! + +To deploy your Bot, use the [`silverback cluster bots new`][silverback-cluster-bots-new] command and give your bot a name, +container image, network to run on, an account alias (if you want to sign transactions w/ `app.signer`), +and any environment Variable Group(s) the bot needs. +If everything validates successfully, the Cluster will begin orchestrating your deployment for you. + +You should monitor the deployment and startup of your bot to make sure it enters the RUNNING state successfully. +You can do this using the [`silverback cluster bots health`][silverback-cluster-bots-health] command. + +```{note} +It usually takes a minute or so for your bot to transition from PROVISIONING to STARTUP to the RUNNING state. +If there are any difficulties in downloading your container image, provisioning your desired infrastructure, or if your application encounters an error during the STARTUP phase, +the Bot will not enter into the RUNNING state and will be shut down gracefully into the STOPPED state. + +Once in the STOPPED state, you can make any adjustments to the environment Variable Group(s) or other runtime parameters in the Bot config; +or, you can make code changes and deploy a new image for the Bot to use. +Once ready, you can use the `silverback cluster bots start` command to re-start your Bot. +``` + +If at any time you want to view the configuration of your bot, you can do so using the [`silverback cluster bots info`][silverback-cluster-bots-info] command. +You can also update metadata or configuration of your bot using the [`silverback cluster bots update`][silverback-cluster-bots-update] command. +Lastly, if you want to shutdown and delete your bot, you can do so using the [`silverback cluster bots remove`][silverback-cluster-bots-remove] command. + +```{note} +Configuration updates do not redeploy your Bots automatically, you must manually stop and restart your bots for changes to take effect. +``` + +```{warning} +Removing a Bot will immediately trigger a SHUTDOWN if the Bot is not already STOPPED. +``` + +## Monitoring your Bot + +Once your bot is successfully running in the RUNNING state, you can monitor your bot with a series of commands +under the [`silverback cluster bots`][silverback-cluster-bots] subcommand group. +We already saw how you can use the [`silverback cluster bots list`][silverback-cluster-bots-list] command to see all bots managed by your Cluster (running or not). + +To see runtime health information about a specific bot, again use the [`silverback cluster bots health`][silverback-cluster-bots-health] command. +You can view the logs that a specific bot is generating using the [`silverback cluster bots logs`][silverback-cluster-bots-logs] command. +Lastly, you can view unacknowledged errors that your bot has experienced while in the RUNNING state +using the [`silverback cluster bots errors`][silverback-cluster-bots-errors] command. + +```{warning} +Once in the RUNNING state, your Bot will not stop running unless it experiences a certain amount of errors in quick succession. +Any task execution that experiences an error will abort execution (and therefore not produce any metrics) but the Bot **will not** shutdown. + +All errors encountered during task exeuction are reported to the Cluster for later review by any users with appriopiate access. +Tasks do not retry (by default), but updates to `app.state` are maintained up until the point an error occurs. + +It is important to keep track of these errors and ensure that none of them are in fact critical to the operation of your Application, +and to take corrective or preventative action if it is determined that it should be treated as a more critical failure condition. +``` + +```{note} +Your Bots can also be monitored from the Platform UI at [https://silverback.apeworx.io](https://silverback.apeworx.io). +``` + +## Controlling your Bot + +As we already saw, once a Bot is configured in a Cluster, we can control it using commands from the [`silverback cluster bots`][silverback-cluster-bots] subcommand group. +For example, we can attempt to start a Bot that is not currently running (after making configuration or code changes) +using the [`silverback cluster bots start`][silverback-cluster-bots-start] command. +We can also stop a bot using [`silverback cluster bots stop`][silverback-cluster-bots-stop] that is currently in the RUNNING state if we desire. + +```{note} +Controlling your bots can be done from the Platform UI at [https://silverback.apeworx.io](https://silverback.apeworx.io), if you have the right permissions to do so. +``` + +TODO: Updating runtime parameters + +## Viewing Measured Metrics + +TODO: Downloading metrics from your Bot + +[silverback-cluster]: ../commands/cluster.html#silverback-cluster +[silverback-cluster-bots]: ../commands/cluster.html#silverback-cluster-bots +[silverback-cluster-bots-errors]: ../commands/cluster.html#silverback-cluster-bots-errors +[silverback-cluster-bots-health]: ../commands/cluster.html#silverback-cluster-bots-health +[silverback-cluster-bots-info]: ../commands/cluster.html#silverback-cluster-bots-info +[silverback-cluster-bots-list]: ../commands/cluster.html#silverback-cluster-bots-list +[silverback-cluster-bots-logs]: ../commands/cluster.html#silverback-cluster-bots-logs +[silverback-cluster-bots-new]: ../commands/cluster.html#silverback-cluster-bots-new +[silverback-cluster-bots-remove]: ../commands/cluster.html#silverback-cluster-bots-remove +[silverback-cluster-bots-start]: ../commands/cluster.html#silverback-cluster-bots-start +[silverback-cluster-bots-stop]: ../commands/cluster.html#silverback-cluster-bots-stop +[silverback-cluster-bots-update]: ../commands/cluster.html#silverback-cluster-bots-update +[silverback-cluster-health]: ../commands/cluster.html#silverback-cluster-health +[silverback-cluster-info]: ../commands/cluster.html#silverback-cluster-info +[silverback-cluster-new]: ../commands/cluster.html#silverback-cluster-new +[silverback-cluster-vars]: ../commands/cluster.html#silverback-cluster-vars +[silverback-cluster-vars-info]: ../commands/cluster.html#silverback-cluster-vars-info +[silverback-cluster-vars-list]: ../commands/cluster.html#silverback-cluster-vars-list +[silverback-cluster-vars-new]: ../commands/cluster.html#silverback-cluster-vars-new +[silverback-cluster-vars-remove]: ../commands/cluster.html#silverback-cluster-vars-remove +[silverback-login]: ../commands/cluster.html#silverback-login diff --git a/setup.py b/setup.py index 76f40d5f..3a9be5cc 100644 --- a/setup.py +++ b/setup.py @@ -74,6 +74,8 @@ "packaging", # Use same version as eth-ape "pydantic_settings", # Use same version as eth-ape "taskiq[metrics]>=0.11.3,<0.12", + "tomlkit>=0.12,<1", # For reading/writing global platform profile + "fief-client[cli]>=0.19,<1", # for platform auth/cluster login ], entry_points={ "console_scripts": ["silverback=silverback._cli:cli"], diff --git a/silverback/_cli.py b/silverback/_cli.py index 17781839..b234eaea 100644 --- a/silverback/_cli.py +++ b/silverback/_cli.py @@ -1,49 +1,43 @@ import asyncio import os -from concurrent.futures import ThreadPoolExecutor import click +import yaml # type: ignore[import-untyped] from ape.cli import ( AccountAliasPromptChoice, ConnectedProviderCommand, ape_cli_context, network_option, - verbosity_option, ) from ape.exceptions import Abort -from taskiq import AsyncBroker -from taskiq.cli.worker.run import shutdown_broker -from taskiq.receiver import Receiver +from fief_client.integrations.cli import FiefAuth +from silverback._click_ext import ( + SectionedHelpGroup, + auth_required, + cls_import_callback, + cluster_client, + display_login_message, + platform_client, +) from silverback._importer import import_from_string +from silverback.cluster.client import ClusterClient, PlatformClient +from silverback.cluster.types import ClusterTier from silverback.runner import PollingRunner, WebsocketRunner +from silverback.worker import run_worker -@click.group() +@click.group(cls=SectionedHelpGroup) def cli(): - """Work with Silverback applications in local context (using Ape).""" - - -def _runner_callback(ctx, param, val): - if not val: - return None - - elif runner := import_from_string(val): - return runner - - raise ValueError(f"Failed to import runner '{val}'.") + """ + Silverback: Build Python apps that react to on-chain events - -def _recorder_callback(ctx, param, val): - if not val: - return None - - elif recorder := import_from_string(val): - return recorder() - - raise ValueError(f"Failed to import recorder '{val}'.") + To learn more about our cloud offering, please check out https://silverback.apeworx.io + """ +# TODO: Make `silverback.settings.Settings` (to remove having to set envvars) +# TODO: Use `envvar=...` to be able to set the value of options from correct envvar def _account_callback(ctx, param, val): if val: val = val.alias.replace("dev_", "TEST::") @@ -52,6 +46,8 @@ def _account_callback(ctx, param, val): return val +# TODO: Make `silverback.settings.Settings` (to remove having to set envvars) +# TODO: Use `envvar=...` to be able to set the value of options from correct envvar def _network_callback(ctx, param, val): # NOTE: Make sure both of these have the same setting if env_network_choice := os.environ.get("SILVERBACK_NETWORK_CHOICE"): @@ -69,29 +65,8 @@ def _network_callback(ctx, param, val): return val -async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): - try: - tasks = [] - with ThreadPoolExecutor(max_workers=worker_count) as pool: - for _ in range(worker_count): - receiver = Receiver( - broker=broker, - executor=pool, - validate_params=True, - max_async_tasks=1, - max_prefetch=0, - ) - broker.is_worker_process = True - tasks.append(receiver.listen()) - - await asyncio.gather(*tasks) - finally: - await shutdown_broker(broker, shutdown_timeout) - - -@cli.command(cls=ConnectedProviderCommand, help="Run Silverback application client") +@cli.command(cls=ConnectedProviderCommand, section="Local Commands") @ape_cli_context() -@verbosity_option() @network_option( default=os.environ.get("SILVERBACK_NETWORK_CHOICE", "auto"), callback=_network_callback, @@ -100,17 +75,22 @@ async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): @click.option( "--runner", "runner_class", + metavar="CLASS_REF", help="An import str in format ':'", - callback=_runner_callback, + callback=cls_import_callback, ) @click.option( "--recorder", + "recorder_class", + metavar="CLASS_REF", help="An import string in format ':'", - callback=_recorder_callback, + callback=cls_import_callback, ) @click.option("-x", "--max-exceptions", type=int, default=3) @click.argument("path") -def run(cli_ctx, account, runner_class, recorder, max_exceptions, path): +def run(cli_ctx, account, runner_class, recorder_class, max_exceptions, path): + """Run Silverback application""" + if not runner_class: # NOTE: Automatically select runner class if cli_ctx.provider.ws_uri: @@ -123,13 +103,16 @@ def run(cli_ctx, account, runner_class, recorder, max_exceptions, path): ) app = import_from_string(path) - runner = runner_class(app, recorder=recorder, max_exceptions=max_exceptions) + runner = runner_class( + app, + recorder=recorder_class() if recorder_class else None, + max_exceptions=max_exceptions, + ) asyncio.run(runner.run()) -@cli.command(cls=ConnectedProviderCommand, help="Run Silverback application task workers") +@cli.command(cls=ConnectedProviderCommand, section="Local Commands") @ape_cli_context() -@verbosity_option() @network_option( default=os.environ.get("SILVERBACK_NETWORK_CHOICE", "auto"), callback=_network_callback, @@ -140,5 +123,523 @@ def run(cli_ctx, account, runner_class, recorder, max_exceptions, path): @click.option("-s", "--shutdown_timeout", type=int, default=90) @click.argument("path") def worker(cli_ctx, account, workers, max_exceptions, shutdown_timeout, path): + """Run Silverback task workers (advanced)""" + app = import_from_string(path) asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout)) + + +@cli.command(section="Cloud Commands (https://silverback.apeworx.io)") +@auth_required +def login(auth: FiefAuth): + """Login to ApeWorX Authorization Service (https://account.apeworx.io)""" + + auth.authorize() + display_login_message(auth, auth.client.base_url) + + +@cli.group(cls=SectionedHelpGroup, section="Cloud Commands (https://silverback.apeworx.io)") +def cluster(): + """Manage a Silverback hosted application cluster + + For clusters on the Silverback Platform, please provide a name for the cluster to access under + your platform account via `-c WORKSPACE/NAME`""" + + +@cluster.command(section="Platform Commands (https://silverback.apeworx.io)") +@platform_client +def workspaces(platform: PlatformClient): + """List available workspaces for your account""" + + if workspace_names := list(platform.workspaces): + click.echo(yaml.safe_dump(workspace_names)) + + else: + click.secho( + "No workspaces available for this account. " + "Go to https://silverback.apeworx.io to sign up and create a new workspace", + bold=True, + fg="red", + ) + + +@cluster.command(name="list", section="Platform Commands (https://silverback.apeworx.io)") +@click.argument("workspace") +@platform_client +def list_clusters(platform: PlatformClient, workspace: str): + """List available clusters in a WORKSPACE""" + + if not (workspace_client := platform.workspaces.get(workspace)): + raise click.BadOptionUsage("workspace", f"Unknown workspace '{workspace}'") + + if cluster_names := list(workspace_client.clusters): + click.echo(yaml.safe_dump(cluster_names)) + + else: + click.secho("No clusters for this account", bold=True, fg="red") + + +@cluster.command(name="new", section="Platform Commands (https://silverback.apeworx.io)") +@click.option( + "-n", + "--name", + "cluster_name", + help="Name for new cluster (Defaults to random)", +) +@click.option( + "-s", + "--slug", + "cluster_slug", + help="Slug for new cluster (Defaults to `name.lower()`)", +) +@click.option( + "-t", + "--tier", + default=ClusterTier.PERSONAL.name, + metavar="NAME", + help="Named set of options to use for cluster as a base (Defaults to Personal)", +) +@click.option( + "-c", + "--config", + "config_updates", + type=(str, str), + multiple=True, + help="Config options to set for cluster (overrides value of -t/--tier)", +) +@click.argument("workspace") +@platform_client +def new_cluster( + platform: PlatformClient, + workspace: str, + cluster_name: str | None, + cluster_slug: str | None, + tier: str, + config_updates: list[tuple[str, str]], +): + """Create a new cluster in WORKSPACE""" + + if not (workspace_client := platform.workspaces.get(workspace)): + raise click.BadOptionUsage("workspace", f"Unknown workspace '{workspace}'") + + if not hasattr(ClusterTier, tier.upper()): + raise click.BadOptionUsage("tier", f"Invalid choice: {tier}") + + configuration = getattr(ClusterTier, tier.upper()).configuration() + + for k, v in config_updates: + setattr(configuration, k, int(v) if v.isnumeric() else v) + + if cluster_name: + click.echo(f"name: {cluster_name}") + click.echo(f"slug: {cluster_slug or cluster_name.lower().replace(' ', '-')}") + + elif cluster_slug: + click.echo(f"slug: {cluster_slug}") + + click.echo(yaml.safe_dump(dict(configuration=configuration.settings_display_dict()))) + + if not click.confirm("Do you want to make a new cluster with this configuration?"): + return + + cluster = workspace_client.create_cluster( + cluster_name=cluster_name, + cluster_slug=cluster_slug, + configuration=configuration, + ) + click.echo(f"{click.style('SUCCESS', fg='green')}: Created '{cluster.name}'") + # TODO: Pay for cluster via new stream + + +# `silverback cluster pay WORKSPACE/NAME --account ALIAS --time "10 days"` +# TODO: Create a signature scheme for ClusterInfo +# (ClusterInfo configuration as plaintext, .id as nonce?) +# TODO: Test payment w/ Signature validation of extra data + + +@cluster.command(name="info") +@cluster_client +def cluster_info(cluster: ClusterClient): + """Get Configuration information about a CLUSTER""" + + # NOTE: This actually doesn't query the cluster's routes, which are protected + click.echo(f"Cluster Version: v{cluster.version}") + + if config := cluster.state.configuration: + click.echo(yaml.safe_dump(config.settings_display_dict())) + + else: + click.secho("No Cluster Configuration detected", fg="yellow", bold=True) + + +@cluster.command(name="health") +@cluster_client +def cluster_health(cluster: ClusterClient): + """Get Health information about a CLUSTER""" + + click.echo(yaml.safe_dump(cluster.health.model_dump())) + + +@cluster.group(cls=SectionedHelpGroup) +def vars(): + """Manage groups of environment variables in a CLUSTER""" + + +def parse_envvars(ctx, name, value: list[str]) -> dict[str, str]: + def parse_envar(item: str): + if not ("=" in item and len(item.split("=")) == 2): + raise click.UsageError(f"Value '{item}' must be in form `NAME=VAL`") + + return item.split("=") + + return dict(parse_envar(item) for item in value) + + +@vars.command(name="new") +@click.option( + "-e", + "--env", + "variables", + multiple=True, + type=str, + metavar="NAME=VAL", + callback=parse_envvars, + help="Environment variable key and value to add (Multiple allowed)", +) +@click.argument("name") +@cluster_client +def new_vargroup(cluster: ClusterClient, variables: dict, name: str): + """Create a new group of environment variables in a CLUSTER""" + + if len(variables) == 0: + raise click.UsageError("Must supply at least one var via `-e`") + + vg = cluster.new_variable_group(name=name, variables=variables) + click.echo(yaml.safe_dump(vg.model_dump(exclude={"id"}))) # NOTE: Skip machine `.id` + + +@vars.command(name="list") +@cluster_client +def list_vargroups(cluster: ClusterClient): + """List latest revisions of all variable groups in a CLUSTER""" + + if group_names := list(cluster.variable_groups): + click.echo(yaml.safe_dump(group_names)) + + else: + click.secho("No Variable Groups present in this cluster", bold=True, fg="red") + + +@vars.command(name="info") +@click.argument("name") +@cluster_client +def vargroup_info(cluster: ClusterClient, name: str): + """Show latest revision of a variable GROUP in a CLUSTER""" + + if not (vg := cluster.variable_groups.get(name)): + raise click.UsageError(f"Unknown Variable Group '{name}'") + + click.echo(yaml.safe_dump(vg.model_dump(exclude={"id", "name"}))) + + +@vars.command(name="update") +@click.option("--new-name", "new_name") # NOTE: No `-n` to match `bots update` +@click.option( + "-e", + "--env", + "updated_vars", + multiple=True, + type=str, + metavar="NAME=VAL", + callback=parse_envvars, + help="Environment variable key and value to add/update (Multiple allowed)", +) +@click.option( + "-d", + "--del", + "deleted_vars", + multiple=True, + type=str, + metavar="NAME", + help="Environment variable name to delete (Multiple allowed)", +) +@click.argument("name") +@cluster_client +def update_vargroup( + cluster: ClusterClient, + name: str, + new_name: str, + updated_vars: dict[str, str], + deleted_vars: tuple[str], +): + """Update a variable GROUP in CLUSTER + + NOTE: Changing the values of variables in GROUP by create a new revision, since variable groups + are immutable. New revisions do not automatically update bot configuration.""" + + if not (vg := cluster.variable_groups.get(name)): + raise click.UsageError(f"Unknown Variable Group '{name}'") + + if dup := "', '".join(set(updated_vars) & set(deleted_vars)): + raise click.UsageError(f"Cannot update and delete vars at the same time: '{dup}'") + + if missing := "', '".join(set(deleted_vars) - set(vg.variables)): + raise click.UsageError(f"Cannot delete vars not in group: '{missing}'") + + click.echo( + yaml.safe_dump( + vg.update( + name=new_name, + # NOTE: Do not update variables if no updates are provided + variables=dict(**updated_vars, **{v: None for v in deleted_vars}) or None, + ).model_dump( + exclude={"id"} + ) # NOTE: Skip machine `.id` + ) + ) + + +@vars.command(name="remove") +@click.argument("name") +@cluster_client +def remove_vargroup(cluster: ClusterClient, name: str): + """ + Remove a variable GROUP from a CLUSTER + + NOTE: Cannot delete if any bots reference any revision of GROUP + """ + if not (vg := cluster.variable_groups.get(name)): + raise click.UsageError(f"Unknown Variable Group '{name}'") + + vg.remove() # NOTE: No confirmation because can only delete if no references exist + click.secho(f"Variable Group '{vg.name}' removed.", fg="green", bold=True) + + +@cluster.group(cls=SectionedHelpGroup) +def bots(): + """Manage bots in a CLUSTER""" + + +@bots.command(name="new", section="Configuration Commands") +@click.option("-i", "--image", required=True) +@click.option("-n", "--network", required=True) +@click.option("-a", "--account") +@click.option("-g", "--group", "vargroups", multiple=True) +@click.argument("name") +@cluster_client +def new_bot( + cluster: ClusterClient, + image: str, + network: str, + account: str | None, + vargroups: list[str], + name: str, +): + """Create a new bot in a CLUSTER with the given configuration""" + + if name in cluster.bots: + raise click.UsageError(f"Cannot use name '{name}' to create bot") + + environment = [cluster.variable_groups[vg_name].get_revision("latest") for vg_name in vargroups] + + click.echo(f"Name: {name}") + click.echo(f"Image: {image}") + click.echo(f"Network: {network}") + if environment: + click.echo("Environment:") + click.echo(yaml.safe_dump([var for vg in environment for var in vg.variables])) + + if not click.confirm("Do you want to create and start running this bot?"): + return + + bot = cluster.new_bot(name, image, network, account=account, environment=environment) + click.secho(f"Bot '{bot.name}' ({bot.id}) deploying...", fg="green", bold=True) + + +@bots.command(name="list", section="Configuration Commands") +@cluster_client +def list_bots(cluster: ClusterClient): + """List all bots in a CLUSTER (Regardless of status)""" + + if bot_names := list(cluster.bots): + click.echo(yaml.safe_dump(bot_names)) + + else: + click.secho("No bots in this cluster", bold=True, fg="red") + + +@bots.command(name="info", section="Configuration Commands") +@click.argument("bot_name", metavar="BOT") +@cluster_client +def bot_info(cluster: ClusterClient, bot_name: str): + """Get configuration information of a BOT in a CLUSTER""" + + if not (bot := cluster.bots.get(bot_name)): + raise click.UsageError(f"Unknown bot '{bot_name}'.") + + # NOTE: Skip machine `.id`, and we already know it is `.name` + click.echo(yaml.safe_dump(bot.model_dump(exclude={"id", "name", "environment"}))) + if bot.environment: + click.echo("environment:") + click.echo(yaml.safe_dump([var.name for var in bot.environment])) + + +@bots.command(name="update", section="Configuration Commands") +@click.option("--new-name", "new_name") # NOTE: No shorthand, because conflicts w/ `--network` +@click.option("-i", "--image") +@click.option("-n", "--network") +@click.option("-a", "--account") +@click.option("-g", "--group", "vargroups", multiple=True) +@click.argument("name", metavar="BOT") +@cluster_client +def update_bot( + cluster: ClusterClient, + new_name: str | None, + image: str | None, + network: str | None, + account: str | None, + vargroups: list[str], + name: str, +): + """Update configuration of BOT in CLUSTER + + NOTE: Some configuration updates will trigger a redeploy""" + + if new_name in cluster.bots: + raise click.UsageError(f"Cannot use name '{new_name}' to update bot '{name}'") + + if not (bot := cluster.bots.get(name)): + raise click.UsageError(f"Unknown bot '{name}'.") + + if new_name: + click.echo(f"Name:\n old: {name}\n new: {new_name}") + + if network: + click.echo(f"Network:\n old: {bot.network}\n new: {network}") + + redeploy_required = False + if image: + redeploy_required = True + click.echo(f"Image:\n old: {bot.image}\n new: {image}") + + environment = [cluster.variable_groups[vg_name].get_revision("latest") for vg_name in vargroups] + + set_environment = True + + if len(environment) == 0 and bot.environment: + set_environment = click.confirm("Do you want to clear all environment variables?") + + elif environment != bot.environment: + click.echo("old-environment:") + click.echo(yaml.safe_dump([var.name for var in bot.environment])) + click.echo("new-environment:") + click.echo(yaml.safe_dump([var for vg in environment for var in vg.variables])) + + redeploy_required |= set_environment + + if not click.confirm( + f"Do you want to update '{name}'?" + if not redeploy_required + else f"Do you want to update and redeploy '{name}'?" + ): + return + + bot = bot.update( + name=new_name, + image=image, + network=network, + account=account, + environment=environment if set_environment else None, + ) + + # NOTE: Skip machine `.id` + click.echo(yaml.safe_dump(bot.model_dump(exclude={"id", "environment"}))) + if bot.environment: + click.echo("environment:") + click.echo(yaml.safe_dump([var.name for var in bot.environment])) + + +@bots.command(name="remove", section="Configuration Commands") +@click.argument("name", metavar="BOT") +@cluster_client +def remove_bot(cluster: ClusterClient, name: str): + """Remove BOT from CLUSTER (Shutdown if running)""" + + if not (bot := cluster.bots.get(name)): + raise click.UsageError(f"Unknown bot '{name}'.") + + elif not click.confirm(f"Do you want to shutdown and delete '{name}'?"): + return + + bot.remove() + click.secho(f"Bot '{bot.name}' removed.", fg="green", bold=True) + + +@bots.command(name="health", section="Bot Operation Commands") +@click.argument("bot_name", metavar="BOT") +@cluster_client +def bot_health(cluster: ClusterClient, bot_name: str): + """Show current health of BOT in a CLUSTER""" + + if not (bot := cluster.bots.get(bot_name)): + raise click.UsageError(f"Unknown bot '{bot_name}'.") + + click.echo(yaml.safe_dump(bot.health.model_dump(exclude={"bot_id"}))) + + +@bots.command(name="start", section="Bot Operation Commands") +@click.argument("name", metavar="BOT") +@cluster_client +def start_bot(cluster: ClusterClient, name: str): + """Start BOT running in CLUSTER (if stopped or terminated)""" + + if not (bot := cluster.bots.get(name)): + raise click.UsageError(f"Unknown bot '{name}'.") + + elif not click.confirm(f"Do you want to start running '{name}'?"): + return + + bot.start() + click.secho(f"Bot '{bot.name}' starting...", fg="green", bold=True) + + +@bots.command(name="stop", section="Bot Operation Commands") +@click.argument("name", metavar="BOT") +@cluster_client +def stop_bot(cluster: ClusterClient, name: str): + """Stop BOT from running in CLUSTER (if running)""" + + if not (bot := cluster.bots.get(name)): + raise click.UsageError(f"Unknown bot '{name}'.") + + elif not click.confirm(f"Do you want to stop '{name}' from running?"): + return + + bot.stop() + click.secho(f"Bot '{bot.name}' stopping...", fg="green", bold=True) + + +@bots.command(name="logs", section="Bot Operation Commands") +@click.argument("name", metavar="BOT") +@cluster_client +def show_bot_logs(cluster: ClusterClient, name: str): + """Show runtime logs for BOT in CLUSTER""" + + if not (bot := cluster.bots.get(name)): + raise click.UsageError(f"Unknown bot '{name}'.") + + for log in bot.logs: + click.echo(log) + + +@bots.command(name="errors", section="Bot Operation Commands") +@click.argument("name", metavar="BOT") +@cluster_client +def show_bot_errors(cluster: ClusterClient, name: str): + """Show unacknowledged errors for BOT in CLUSTER""" + + if not (bot := cluster.bots.get(name)): + raise click.UsageError(f"Unknown bot '{name}'.") + + for log in bot.errors: + click.echo(log) diff --git a/silverback/_click_ext.py b/silverback/_click_ext.py new file mode 100644 index 00000000..1810ec9d --- /dev/null +++ b/silverback/_click_ext.py @@ -0,0 +1,244 @@ +from functools import update_wrapper +from pathlib import Path + +import click +from fief_client import Fief +from fief_client.integrations.cli import FiefAuth, FiefAuthNotAuthenticatedError + +from silverback._importer import import_from_string +from silverback.cluster.client import ClusterClient, PlatformClient +from silverback.cluster.settings import ( + PROFILE_PATH, + BaseProfile, + ClusterProfile, + PlatformProfile, + ProfileSettings, +) + +# NOTE: only load once +settings = ProfileSettings.from_config_file() + + +def cls_import_callback(ctx, param, cls_name): + if cls_name is None: + return None # User explicitly provided None + + elif cls := import_from_string(cls_name): + return cls + + # If class not found, `import_from_string` returns `None`, so raise + raise click.BadParameter(message=f"Failed to import {param} class: '{cls_name}'.") + + +class OrderedCommands(click.Group): + # NOTE: Override so we get the list ordered by definition order + def list_commands(self, ctx: click.Context) -> list[str]: + return list(self.commands) + + +class SectionedHelpGroup(OrderedCommands): + """Section commands into help groups""" + + sections: dict[str | None, list[click.Command | click.Group]] + + def __init__(self, *args, section=None, **kwargs): + self.section = section or "Commands" + self.sections = kwargs.pop("sections", {}) + commands = {} + + for section, command_list in self.sections.items(): + for cmd in command_list: + cmd.section = section + commands[cmd.name] = cmd + + super().__init__(*args, commands=commands, **kwargs) + + def command(self, *args, **kwargs): + section = kwargs.pop("section", "Commands") + decorator = super().command(*args, **kwargs) + + def new_decorator(f): + cmd = decorator(f) + cmd.section = section + self.sections.setdefault(section, []).append(cmd) + return cmd + + return new_decorator + + def format_commands(self, ctx, formatter): + for section, cmds in self.sections.items(): + rows = [] + for subcommand in self.list_commands(ctx): + cmd = self.get_command(ctx, subcommand) + + if cmd is None or cmd.section != section: + continue + + rows.append((subcommand, cmd.get_short_help_str(formatter.width) or "")) + + if rows: + with formatter.section(section): + formatter.write_dl(rows) + + +def display_login_message(auth: FiefAuth, host: str): + userinfo = auth.current_user() + user_id = userinfo["sub"] + username = userinfo["fields"].get("username") + click.echo( + f"{click.style('INFO', fg='blue')}: " + f"Logged in to '{click.style(host, bold=True)}' as " + f"'{click.style(username if username else user_id, bold=True)}'" + ) + + +def profile_option(f): + expose_value = "profile" in f.__annotations__ + + def get_profile(ctx: click.Context, param, value) -> BaseProfile: + if not (profile := settings.profile.get(value)): + raise click.BadOptionUsage(option_name=param, message=f"Unknown profile '{value}'.") + + # Add it to context in case we need it elsewhere + ctx.obj = ctx.obj or {} + ctx.obj["profile"] = profile + return profile + + opt = click.option( + "-p", + "--profile", + "profile", + metavar="PROFILE", + default=settings.default_profile, + callback=get_profile, + expose_value=expose_value, + is_eager=True, # NOTE: Required to ensure that `profile` is always set, even if not provied + help="The authentication profile to use (Advanced)", + ) + return opt(f) + + +def auth_required(f): + expose_value = "auth" in f.__annotations__ + + @profile_option + @click.pass_context + def add_auth(ctx: click.Context, *args, **kwargs): + ctx.obj = ctx.obj or {} + profile: BaseProfile | None = ctx.obj.get("profile") + + if isinstance(profile, PlatformProfile): + auth_info = settings.auth[profile.auth] + fief = Fief(auth_info.host, auth_info.client_id) + ctx.obj["auth"] = FiefAuth(fief, str(PROFILE_PATH.parent / f"{profile.auth}.json")) + + if expose_value: + kwargs["auth"] = ctx.obj["auth"] + + return ctx.invoke(f, *args, **kwargs) + + return update_wrapper(add_auth, f) + + +def platform_client(f): + expose_value = "platform" in f.__annotations__ + + @auth_required + @click.pass_context + def get_platform_client(ctx: click.Context, *args, **kwargs): + ctx.obj = ctx.obj or {} + if not isinstance(profile := ctx.obj.get("profile"), PlatformProfile): + if not expose_value: + return ctx.invoke(f, *args, **kwargs) + + raise click.UsageError("This command only works with the Silverback Platform") + + # NOTE: `auth` should be set if `profile` is set and is `PlatformProfile` + auth: FiefAuth = ctx.obj["auth"] + + try: + display_login_message(auth, profile.host) + except FiefAuthNotAuthenticatedError as e: + raise click.UsageError("Not authenticated, please use `silverback login` first.") from e + + ctx.obj["platform"] = PlatformClient( + base_url=profile.host, + cookies=dict(session=auth.access_token_info()["access_token"]), + ) + + if expose_value: + kwargs["platform"] = ctx.obj["platform"] + + return ctx.invoke(f, *args, **kwargs) + + return update_wrapper(get_platform_client, f) + + +def cluster_client(f): + + def inject_cluster(ctx, param, value: str | None): + ctx.obj = ctx.obj or {} + if not (profile := ctx.obj.get("profile")): + raise AssertionError("Shouldn't happen, fix cli") + + elif isinstance(profile, ClusterProfile): + return value # Ignore processing this for cluster clients + + elif value is None or "/" not in value: + if not profile.default_workspace: + raise click.UsageError( + "Must provide `-c CLUSTER`, or set `profile..default-workspace` " + f"in your `~/{PROFILE_PATH.relative_to(Path.home())}`" + ) + + if value is None and profile.default_workspace not in profile.default_cluster: + raise click.UsageError( + "Must provide `-c CLUSTER`, or set " + "`profile..default-cluster.` " + f"in your `~/{PROFILE_PATH.relative_to(Path.home())}`" + ) + + parts = [ + profile.default_workspace, + # NOTE: `value` works as cluster selector, if set + value or profile.default_cluster[profile.default_workspace], + ] + + elif len(parts := value.split("/")) > 2: + raise click.BadParameter( + param=param, + message="CLUSTER should be in format `WORKSPACE/NAME`", + ) + + ctx.obj["cluster_path"] = parts + return parts + + @click.option( + "-c", + "--cluster", + "cluster_path", + metavar="WORKSPACE/NAME", + expose_value=False, # We don't actually need this exposed + callback=inject_cluster, + help="NAME of the cluster in WORKSPACE you wish to access", + ) + @platform_client + @click.pass_context + def get_cluster_client(ctx: click.Context, *args, **kwargs): + ctx.obj = ctx.obj or {} + if isinstance(profile := ctx.obj.get("profile"), ClusterProfile): + kwargs["cluster"] = ClusterClient( + base_url=profile.host, + headers={"X-API-Key": profile.api_key}, + ) + + elif isinstance(profile, PlatformProfile): + platform: PlatformClient = ctx.obj["platform"] + kwargs["cluster"] = platform.get_cluster_client(*ctx.obj["cluster_path"]) + + else: + raise AssertionError("Profile not set, something wrong") + + return ctx.invoke(f, *args, **kwargs) + + return update_wrapper(get_cluster_client, f) diff --git a/silverback/cluster/__init__.py b/silverback/cluster/__init__.py new file mode 100644 index 00000000..c54c74d2 --- /dev/null +++ b/silverback/cluster/__init__.py @@ -0,0 +1 @@ +# NOTE: Don't import anything here from `.client` diff --git a/silverback/cluster/client.py b/silverback/cluster/client.py new file mode 100644 index 00000000..fd06ae8e --- /dev/null +++ b/silverback/cluster/client.py @@ -0,0 +1,354 @@ +from functools import cache +from typing import ClassVar, Literal + +import httpx + +from silverback.version import version + +from .types import ( + BotHealth, + BotInfo, + ClusterConfiguration, + ClusterHealth, + ClusterInfo, + ClusterState, + VariableGroupInfo, + WorkspaceInfo, +) + +DEFAULT_HEADERS = {"User-Agent": f"Silverback SDK/{version}"} + + +def handle_error_with_response(response: httpx.Response): + if 400 <= response.status_code < 500: + message = response.text + + try: + message = response.json() + except Exception: + pass + + if isinstance(message, dict): + if detail := message.get("detail"): + if isinstance(detail, list): + + def render_error(error: dict): + location = ".".join(error["loc"]) + return f"- {location}: '{error['msg']}'" + + message = "Multiple validation errors found:\n" + "\n".join( + map(render_error, detail) + ) + + else: + message = detail + + else: + message = response.text + + raise RuntimeError(message) + + response.raise_for_status() + + assert response.status_code < 300, "Should follow redirects, so not sure what the issue is" + + +class VariableGroup(VariableGroupInfo): + # NOTE: Client used only for this SDK + # NOTE: DI happens in `ClusterClient.__init__` + cluster: ClassVar["ClusterClient"] + + def __hash__(self) -> int: + return int(self.id) + + def update( + self, name: str | None = None, variables: dict[str, str | None] | None = None + ) -> "VariableGroup": + if name is not None: + # Update metadata + response = self.cluster.put(f"/variables/{self.id}", json=dict(name=name)) + handle_error_with_response(response) + + if variables is not None: + # Create a new revision + response = self.cluster.post(f"/variables/{self.id}", json=dict(variables=variables)) + handle_error_with_response(response) + return VariableGroup.model_validate(response.json()) + + return self + + def get_revision(self, revision: int | Literal["latest"] = "latest") -> VariableGroupInfo: + # TODO: Add `/latest` revision route + if revision == "latest": + revision = "" # type: ignore[assignment] + + response = self.cluster.get(f"/variables/{self.id}/{revision}") + handle_error_with_response(response) + return VariableGroupInfo.model_validate(response.json()) + + def remove(self): + response = self.cluster.delete(f"/variables/{self.id}") + handle_error_with_response(response) + + +class Bot(BotInfo): + # NOTE: Client used only for this SDK + # NOTE: DI happens in `ClusterClient.__init__` + cluster: ClassVar["ClusterClient"] + + def update( + self, + name: str | None = None, + image: str | None = None, + network: str | None = None, + account: str | None = None, + environment: list[VariableGroupInfo] | None = None, + ) -> "Bot": + form: dict = dict( + name=name, + account=account, + image=image, + network=network, + ) + + if environment: + form["environment"] = [ + dict(id=str(env.id), revision=env.revision) for env in environment + ] + + response = self.cluster.put(f"/bots/{self.id}", json=form) + handle_error_with_response(response) + return Bot.model_validate(response.json()) + + @property + def health(self) -> BotHealth: + response = self.cluster.get("/health") # TODO: Migrate this endpoint + # response = self.cluster.get(f"/bots/{self.id}/health") + handle_error_with_response(response) + raw_health = next(bot for bot in response.json()["bots"] if bot["bot_id"] == str(self.id)) + return BotHealth.model_validate(raw_health) # response.json()) TODO: Migrate this endpoint + + def stop(self): + response = self.cluster.post(f"/bots/{self.id}/stop") + handle_error_with_response(response) + + def start(self): + # response = self.cluster.post(f"/bots/{self.id}/start") TODO: Add `/start` + # NOTE: Currently, a noop PUT request will trigger a start + response = self.cluster.put(f"/bots/{self.id}", json=dict(name=self.name)) + handle_error_with_response(response) + + @property + def errors(self) -> list[str]: + response = self.cluster.get(f"/bots/{self.id}/errors") + handle_error_with_response(response) + return response.json() + + @property + def logs(self) -> list[str]: + response = self.cluster.get(f"/bots/{self.id}/logs") + handle_error_with_response(response) + return response.json() + + def remove(self): + response = self.cluster.delete(f"/bots/{self.id}") + handle_error_with_response(response) + + +class ClusterClient(httpx.Client): + def __init__(self, *args, **kwargs): + kwargs["headers"] = {**kwargs.get("headers", {}), **DEFAULT_HEADERS} + if "follow_redirects" not in kwargs: + kwargs["follow_redirects"] = True + + super().__init__(*args, **kwargs) + + # DI for other client classes + VariableGroup.cluster = self # Connect to cluster client + Bot.cluster = self # Connect to cluster client + + def send(self, request, *args, **kwargs): + try: + return super().send(request, *args, **kwargs) + + except httpx.ConnectError as e: + raise ValueError(f"{e} '{request.url}'") from e + + @property + @cache + def openapi_schema(self) -> dict: + response = self.get("/openapi.json") + handle_error_with_response(response) + return response.json() + + @property + def version(self) -> str: + # NOTE: Does not call routes + return self.openapi_schema["info"]["version"] + + @property + def state(self) -> ClusterState: + response = self.get("/") + handle_error_with_response(response) + return ClusterState.model_validate(response.json()) + + @property + def health(self) -> ClusterHealth: + response = self.get("/health") + handle_error_with_response(response) + return ClusterHealth.model_validate(response.json()) + + @property + def variable_groups(self) -> dict[str, VariableGroup]: + response = self.get("/variables") + handle_error_with_response(response) + return {vg.name: vg for vg in map(VariableGroup.model_validate, response.json())} + + def new_variable_group(self, name: str, variables: dict[str, str]) -> VariableGroup: + response = self.post("/variables", json=dict(name=name, variables=variables)) + handle_error_with_response(response) + return VariableGroup.model_validate(response.json()) + + @property + def bots(self) -> dict[str, Bot]: + response = self.get("/bots") + handle_error_with_response(response) + return {bot.name: bot for bot in map(Bot.model_validate, response.json())} + + def new_bot( + self, + name: str, + image: str, + network: str, + account: str | None = None, + environment: list[VariableGroupInfo] | None = None, + ) -> Bot: + form: dict = dict( + name=name, + image=image, + network=network, + account=account, + ) + + if environment is not None: + form["environment"] = [ + dict(id=str(env.id), revision=env.revision) for env in environment + ] + + response = self.post("/bots", json=form) + handle_error_with_response(response) + return Bot.model_validate(response.json()) + + +class Workspace(WorkspaceInfo): + # NOTE: Client used only for this SDK + # NOTE: DI happens in `PlatformClient.client` + client: ClassVar[httpx.Client] + + @property + @cache + def owner(self) -> str: + response = self.client.get(f"/users/{self.owner_id}") + handle_error_with_response(response) + return response.json().get("username") + + def build_display_fields(self) -> dict[str, str]: + return dict( + # `.id` is internal + name=self.name, + # `.slug` is index + # `.owner_id` is UUID, use for client lookup instead + owner=self.owner, + ) + + def __hash__(self) -> int: + return int(self.id) + + def get_cluster_client(self, cluster_name: str) -> ClusterClient: + if not (cluster := self.clusters.get(cluster_name)): + raise ValueError(f"Unknown cluster '{cluster_name}' in workspace '{self.name}'.") + + return ClusterClient( + base_url=f"{self.client.base_url}/c/{self.slug}/{cluster.slug}", + cookies=self.client.cookies, # NOTE: pass along platform cookies for proxy auth + ) + + @property + @cache + def clusters(self) -> dict[str, ClusterInfo]: + response = self.client.get("/clusters", params=dict(org=str(self.id))) + handle_error_with_response(response) + clusters = response.json() + # TODO: Support paging + return {cluster.slug: cluster for cluster in map(ClusterInfo.model_validate, clusters)} + + def create_cluster( + self, + cluster_slug: str | None = None, + cluster_name: str | None = None, + configuration: ClusterConfiguration = ClusterConfiguration(), + ) -> ClusterInfo: + response = self.client.post( + "/clusters/", + params=dict(org=str(self.id)), + json=dict( + name=cluster_name, + slug=cluster_slug, + configuration=configuration.model_dump(), + ), + ) + + handle_error_with_response(response) + new_cluster = ClusterInfo.model_validate_json(response.text) + self.clusters.update({new_cluster.slug: new_cluster}) # NOTE: Update cache + return new_cluster + + +class PlatformClient(httpx.Client): + def __init__(self, *args, **kwargs): + if "follow_redirects" not in kwargs: + kwargs["follow_redirects"] = True + + kwargs["headers"] = {**kwargs.get("headers", {}), **DEFAULT_HEADERS} + super().__init__(*args, **kwargs) + + # DI for other client classes + Workspace.client = self # Connect to platform client + + def send(self, request, *args, **kwargs): + try: + return super().send(request, *args, **kwargs) + + except httpx.ConnectError as e: + raise ValueError(f"{e} '{request.url}'") from e + + def get_cluster_client(self, workspace_name: str, cluster_name: str) -> ClusterClient: + if not (workspace := self.workspaces.get(workspace_name)): + raise ValueError(f"Unknown workspace '{workspace_name}'.") + + return workspace.get_cluster_client(cluster_name) + + @property + @cache + def workspaces(self) -> dict[str, Workspace]: + response = self.get("/organizations") + handle_error_with_response(response) + workspaces = response.json() + # TODO: Support paging + return { + workspace.slug: workspace for workspace in map(Workspace.model_validate, workspaces) + } + + def create_workspace( + self, + workspace_slug: str = "", + workspace_name: str = "", + ) -> Workspace: + response = self.post( + "/organizations", + json=dict(slug=workspace_slug, name=workspace_name), + ) + handle_error_with_response(response) + new_workspace = Workspace.model_validate_json(response.text) + self.workspaces.update({new_workspace.slug: new_workspace}) # NOTE: Update cache + return new_workspace diff --git a/silverback/cluster/settings.py b/silverback/cluster/settings.py new file mode 100644 index 00000000..f1b202aa --- /dev/null +++ b/silverback/cluster/settings.py @@ -0,0 +1,76 @@ +from pathlib import Path + +import tomlkit +from pydantic import BaseModel, Field, ValidationError, model_validator +from typing_extensions import Self + +PROFILE_PATH = Path.home() / ".silverback" / "profile.toml" +DEFAULT_PROFILE = "default" + + +class AuthenticationConfig(BaseModel): + """Authentication host configuration information (~/.silverback/profile.toml)""" + + host: str = "https://account.apeworx.io" + client_id: str = Field(default="lcylrp34lnggGO-E-KKlMJgvAI4Q2Jhf6U2G6CB5uMg", alias="client-id") + + +class BaseProfile(BaseModel): + """Profile information (~/.silverback/profile.toml)""" + + host: str + + +class ClusterProfile(BaseProfile): + api_key: str = Field(alias="api-key") # direct access to a cluster + + +class PlatformProfile(BaseProfile): + auth: str # key of `AuthenticationConfig` in authentication section + default_workspace: str = Field(alias="default-workspace", default="") + default_cluster: dict[str, str] = Field(alias="default-cluster", default_factory=dict) + + +class ProfileSettings(BaseModel): + """Configuration settings for working with Bot Clusters and the Silverback Platform""" + + auth: dict[str, AuthenticationConfig] + profile: dict[str, PlatformProfile | ClusterProfile] + default_profile: str = Field(default=DEFAULT_PROFILE, alias="default-profile") + + @model_validator(mode="after") + def ensure_auth_exists_for_profile(self) -> Self: + for profile_name, profile in self.profile.items(): + if isinstance(profile, PlatformProfile) and profile.auth not in self.auth: + auth_names = "', '".join(self.auth) + raise ValidationError( + f"Key `profile.'{profile_name}'.auth` must be one of '{auth_names}'." + ) + + return self + + @classmethod + def from_config_file(cls) -> Self: + # TODO: Figure out why `BaseSettings` doesn't work well (probably uses tomlkit) + settings_dict: dict # NOTE: So mypy knows it's not redefined + + if PROFILE_PATH.exists(): + # NOTE: cast to dict because tomlkit has a bug in it that mutates dicts + settings_dict = dict(tomlkit.loads(PROFILE_PATH.read_text())) + + else: # Write the defaults to disk for next time + settings_dict = dict( + auth={ + DEFAULT_PROFILE: AuthenticationConfig().model_dump(), + }, + profile={ + DEFAULT_PROFILE: PlatformProfile( + auth=DEFAULT_PROFILE, + host="https://silverback.apeworx.io", + ).model_dump() + }, + ) + PROFILE_PATH.parent.mkdir(exist_ok=True) + PROFILE_PATH.write_text(tomlkit.dumps(settings_dict)) + + return cls.model_validate(settings_dict) diff --git a/silverback/cluster/types.py b/silverback/cluster/types.py new file mode 100644 index 00000000..428cd371 --- /dev/null +++ b/silverback/cluster/types.py @@ -0,0 +1,308 @@ +import enum +import math +import uuid +from datetime import datetime +from typing import Annotated + +from pydantic import BaseModel, Field, computed_field, field_validator + + +class WorkspaceInfo(BaseModel): + id: uuid.UUID + owner_id: uuid.UUID + name: str + slug: str + + +class ClusterConfiguration(BaseModel): + """Configuration of the cluster (represented as 16 byte value)""" + + # NOTE: This configuration must be encode-able to a uint64 value for db storage + # and on-chain processing through ApePay + + # NOTE: All defaults should be the minimal end of the scale, + # so that `__or__` works right + + # Version byte (Byte 0) + # NOTE: Just in-case we change this after release + version: int = 1 + + # Bot Worker Configuration (Bytes 1-2) + cpu: Annotated[int, Field(ge=0, le=6)] = 0 # defaults to 0.25 vCPU + """Allocated vCPUs per bot: + - 0.25 vCPU (0) + - 0.50 vCPU (1) + - 1.00 vCPU (2) + - 2.00 vCPU (3) + - 4.00 vCPU (4) + - 8.00 vCPU (5) + - 16.0 vCPU (6)""" + + memory: Annotated[int, Field(ge=0, le=120)] = 0 # defaults to 512 MiB + """Total memory per bot (in GB, 0 means '512 MiB')""" + + # NOTE: Configure # of workers based on cpu & memory settings + + # Runner configuration (Bytes 3-5) + networks: Annotated[int, Field(ge=1, le=20)] = 1 + """Maximum number of concurrent network runners""" + + bots: Annotated[int, Field(ge=1, le=250)] = 1 + """Maximum number of concurrent bots running""" + + triggers: Annotated[int, Field(ge=50, le=1000, multiple_of=5)] = 50 + """Maximum number of task triggers across all running bots""" + + # Recorder configuration (Byte 6) + storage: Annotated[int, Field(ge=0, le=250)] = 0 # 512 GB + """Total task results and metrics parquet storage (in TB, 0 means '512 GB')""" + + # Cluster general configuration (Byte 7) + secrets: Annotated[int, Field(ge=10, le=100)] = 10 + """Total managed secrets""" + + @field_validator("cpu", mode="before") + def parse_cpu_value(cls, value: str | int) -> int: + if not isinstance(value, str): + return value + + return round(math.log2(float(value.split(" ")[0]) * 1024 / 256)) + + @field_validator("memory", mode="before") + def parse_memory_value(cls, value: str | int) -> int: + if not isinstance(value, str): + return value + + mem, units = value.split(" ") + if units.lower() == "mib": + assert mem == "512" + return 0 + + assert units.lower() == "gb" + return int(mem) + + @field_validator("storage", mode="before") + def parse_storage_value(cls, value: str | int) -> int: + if not isinstance(value, str): + return value + + storage, units = value.split(" ") + if units.lower() == "gb": + assert storage == "512" + return 0 + + assert units.lower() == "tb" + return int(storage) + + def settings_display_dict(self) -> dict: + return dict( + version=self.version, + bots=dict( + cpu=f"{256 * 2**self.cpu / 1024} vCPU", + memory=f"{self.memory} GB" if self.memory > 0 else "512 MiB", + ), + general=dict( + bots=self.bots, + secrets=self.secrets, + ), + runner=dict( + networks=self.networks, + triggers=self.triggers, + ), + recorder=dict( + storage=f"{self.storage} TB" if self.storage > 0 else "512 GB", + ), + ) + + @staticmethod + def _decode_byte(value: int, byte: int) -> int: + # NOTE: All configuration settings must be uint8 integer values when encoded + return (value >> (8 * byte)) & (2**8 - 1) # NOTE: max uint8 + + @classmethod + def decode(cls, value: int) -> "ClusterConfiguration": + """Decode the configuration from 8 byte integer value""" + if isinstance(value, ClusterConfiguration): + return value # TODO: Something weird with SQLModel + + # NOTE: Do not change the order of these, these are not forwards compatible + return cls( + version=cls._decode_byte(value, 0), + cpu=cls._decode_byte(value, 1), + memory=cls._decode_byte(value, 2), + networks=cls._decode_byte(value, 3), + bots=cls._decode_byte(value, 4), + triggers=5 * cls._decode_byte(value, 5), + storage=cls._decode_byte(value, 6), + secrets=cls._decode_byte(value, 7), + ) + + @staticmethod + def _encode_byte(value: int, byte: int) -> int: + return value << (8 * byte) + + def encode(self) -> int: + """Encode configuration as 8 byte integer value""" + # NOTE: Do not change the order of these, these are not forwards compatible + return ( + self._encode_byte(self.version, 0) + + self._encode_byte(self.cpu, 1) + + self._encode_byte(self.memory, 2) + + self._encode_byte(self.networks, 3) + + self._encode_byte(self.bots, 4) + + self._encode_byte(self.triggers // 5, 5) + + self._encode_byte(self.storage, 6) + + self._encode_byte(self.secrets, 7) + ) + + +class ClusterTier(enum.IntEnum): + """Suggestions for different tier configurations""" + + PERSONAL = ClusterConfiguration( + cpu="0.25 vCPU", + memory="512 MiB", + networks=3, + bots=5, + triggers=50, + storage="512 GB", + secrets=10, + ).encode() + PROFESSIONAL = ClusterConfiguration( + cpu="1 vCPU", + memory="2 GB", + networks=10, + bots=20, + triggers=400, + storage="5 TB", + secrets=25, + ).encode() + + def configuration(self) -> ClusterConfiguration: + return ClusterConfiguration.decode(int(self)) + + +class ResourceStatus(enum.IntEnum): + """ + Generic enum that represents that status of any associated resource or service. + + ```{note} + Calling `str(...)` on this will produce a human-readable status for display. + ``` + """ + + CREATED = 0 + """Resource record created, but not provisioning yet (likely awaiting payment)""" + + # NOTE: `1` is reserved + + PROVISIONING = 2 + """Resource is provisioning infrastructure (on payment received)""" + + STARTUP = 3 + """Resource is being put into the RUNNING state""" + + RUNNING = 4 + """Resource is in good health (Resource itself should be reporting status now)""" + + # NOTE: `5` is reserved + + SHUTDOWN = 6 + """Resource is being put into the STOPPED state""" + + STOPPED = 7 + """Resource has stopped (due to errors, user action, or resource contraints)""" + + DEPROVISIONING = 8 + """User removal action or payment expiration event triggered""" + + REMOVED = 9 + """Infrastructure de-provisioning complete (Cannot change from this state)""" + + def __str__(self) -> str: + return self.name.capitalize() + + +class ClusterInfo(BaseModel): + # NOTE: Raw API object (gets exported) + id: uuid.UUID # NOTE: Keep this private, used as a temporary secret key for payment + version: str | None # NOTE: Unprovisioned clusters have no known version yet + configuration: ClusterConfiguration | None = None # NOTE: self-hosted clusters have no config + + name: str # User-friendly display name + slug: str # Shorthand name, for CLI and URI usage + + created: datetime # When the resource was first created + status: ResourceStatus + last_updated: datetime # Last time the resource was changed (upgrade, provisioning, etc.) + + +class ClusterState(BaseModel): + """ + Cluster Build Information and Configuration, direct from cluster control service + """ + + version: str = Field(alias="cluster_version") # TODO: Rename in cluster + configuration: ClusterConfiguration | None = None # TODO: Add to cluster + # TODO: Add other useful summary fields for frontend use + + +class ServiceHealth(BaseModel): + healthy: bool + + +class ClusterHealth(BaseModel): + ars: ServiceHealth = Field(exclude=True) # TODO: Replace w/ cluster + ccs: ServiceHealth = Field(exclude=True) # TODO: Replace w/ cluster + bots: dict[str, ServiceHealth] = {} + + @field_validator("bots", mode="before") # TODO: Fix so this is default + def convert_bot_health(cls, bots): + return {b["instance_id"]: ServiceHealth.model_validate(b) for b in bots} + + @computed_field + def cluster(self) -> ServiceHealth: + return ServiceHealth(healthy=self.ars.healthy and self.ccs.healthy) + + +class VariableGroupInfo(BaseModel): + id: uuid.UUID + name: str + revision: int + variables: list[str] + created: datetime + + +class EnvironmentVariable(BaseModel): + name: str + group_id: uuid.UUID + group_revision: int + + +class BotTaskStatus(BaseModel): + last_status: str + exit_code: int | None + reason: str | None + started_at: datetime | None + stop_code: str | None + stopped_at: datetime | None + stopped_reason: str | None + + +class BotHealth(BaseModel): + bot_id: uuid.UUID + task_status: BotTaskStatus | None + healthy: bool + + +class BotInfo(BaseModel): + id: uuid.UUID # TODO: Change `.instance_id` field to `id: UUID` + name: str + created: datetime + + image: str + network: str + account: str | None + revision: int + + environment: list[EnvironmentVariable] = [] diff --git a/silverback/worker.py b/silverback/worker.py new file mode 100644 index 00000000..ba48ba60 --- /dev/null +++ b/silverback/worker.py @@ -0,0 +1,26 @@ +import asyncio +from concurrent.futures import ThreadPoolExecutor + +from taskiq import AsyncBroker +from taskiq.cli.worker.run import shutdown_broker +from taskiq.receiver import Receiver + + +async def run_worker(broker: AsyncBroker, worker_count=2, shutdown_timeout=90): + try: + tasks = [] + with ThreadPoolExecutor(max_workers=worker_count) as pool: + for _ in range(worker_count): + receiver = Receiver( + broker=broker, + executor=pool, + validate_params=True, + max_async_tasks=1, + max_prefetch=0, + ) + broker.is_worker_process = True + tasks.append(receiver.listen()) + + await asyncio.gather(*tasks) + finally: + await shutdown_broker(broker, shutdown_timeout)