Skip to content

Commit

Permalink
Merge pull request #36 from firstbatchxyz/erhant/bugfixes-timeouts-po…
Browse files Browse the repository at this point in the history
…stprocess

fix timeout issue + handle swan post-process error
  • Loading branch information
erhant authored Jan 6, 2025
2 parents ad2bea9 + c050e7f commit e5f3fa6
Show file tree
Hide file tree
Showing 17 changed files with 396 additions and 174 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ RUST_LOG=none,dria_oracle=info
# example: ac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
SECRET_KEY=your-secret-key

# Coordinator address (optional)
COORDINATOR_ADDRESS=

## Arweave configurations
# path to wallet, only required if your BYTE_LIMIT is enough that
# you may do an Arweave upload to store a large value on-chain
Expand Down
3 changes: 1 addition & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "dria-oracle"
description = "Dria Knowledge Network Oracle Node"
version = "0.2.0"
version = "0.2.1"
edition = "2021"
license = "Apache-2.0"
readme = "README.md"
Expand All @@ -19,7 +19,6 @@ tokio = { version = "1.39.2", features = [
"signal",
] }
tokio-util = "0.7.13"
lazy_static = "1.5.0"

# workflows
dkn-workflows = { git = "https://github.com/firstbatchxyz/dkn-compute-node" }
Expand Down
31 changes: 24 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,31 @@ cargo install --git https://github.com/firstbatchxyz/dria-oracle-node

Create an `.env` file by copying `.env.example`. You have to fill the following variables:

- Get an RPC URL from a provider such as Alchemy or Infura, and set it as `RPC_URL`.
- Provide an Ethereum wallet secret koy to `SECRET_KEY`, make sure it has funds to pay for gas and tokens.
- Get an RPC URL from a provider such as [Alchemy](https://www.alchemy.com/) or [Infura](https://www.infura.io/), and set it as `RPC_URL`.
- Provide an Ethereum wallet secret key to `SECRET_KEY`, make sure it has funds to pay for gas and tokens.

Optionally, you can save gas costs using Arweave:
> [!NOTE]
>
> The contract addresses are determined with respect to the chain connected via RPC URL, but you can override it via `COORDINATOR_ADDRESS` environment variable.
> In any case, you should not need to do this.
### Arweave

You can save gas costs using [Arweave](https://arweave.org/):

- Provide an Arweave wallet so that you can use Arweave for large results. Alternatively, dont provide a wallet but instead set `ARWEAVE_BYTE_LIMIT` to a very large value. TODO: this should be done automatically if wallet does not exist
- Provide an Arweave wallet via `ARWEAVE_WALLET_PATH` variable so that you can use Arweave for large results. You can create one [here](https://arweave.app/).
- You can set `ARWEAVE_BYTE_LIMIT` to determine the byte length threshold, beyond which values are uploaded to Arweave. It defaults to 1024, so any data less than that many bytes will be written as-is.

If you omit Arweave, it will only use the client for downloading things from Arweave, but will never upload.

### LLM Providers

As for the LLM providers:

- If you are using Ollama, make sure it is running and the host & port are correct.
- If you are using OpenAI, make sure you provide the `OPENAI_API_KEY`.
- If you are using Gemini, make sure you provide the `GEMINI_API_KEY`.
- If you are using OpenRouter, make sure you provide the `OPENROUTER_API_KEY`.
- If you are using OpenAI, provide the `OPENAI_API_KEY`.
- If you are using Gemini, provide the `GEMINI_API_KEY`.
- If you are using OpenRouter, provide the `OPENROUTER_API_KEY`.

## Usage

Expand All @@ -52,6 +64,11 @@ The CLI provides several methods to interact with the oracle contracts.
- [Viewing Tasks](#viewing-tasks)
- [Balance & Rewards](#balance--rewards)

> [!TIP]
>
> By default logs will be `info` level, but you can add a `DEBUG=1` env variable and it will use `debug` level instead.
> You can set `RUST_LOG` variable yourself as well.
### Registration

To serve oracle requests, you **MUST** first register as your desired oracle type, i.e. `generator` or `validator`. These are handled by the registration commands `register` and `unregister` which accepts multiple arguments to register at once. You can then see your registrations with `registrations` command.
Expand Down
38 changes: 26 additions & 12 deletions misc/arweave.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,46 @@
/**
* A helper script to print the content of an Arweave transaction, where transaction id is hex-encoded.
* This means that the input is a 64-char hexadecimal.
* A helper script to print the content of an Arweave transaction.
*
* Usage:
*
* bun run ./misc/arweave.js 0x30613233613135613236663864663332366165306137663863633636343437336238373463353966333964623436366665316337313531393634623734393231
* ```sh
* # calldata as-is
* bun run ./misc/arweave.js 0x7b2261727765617665223a224d49555775656361634b417a62755442335a6a57613463784e6461774d71435a704550694f71675a625a63227d
*
* Tip:
* # as an object (with escaped quotes)
* bun run ./misc/arweave.js "{\"arweave\":\"MIUWuecacKAzbuTB3ZjWa4cxNdawMqCZpEPiOqgZbZc\"}"
*
* Can be piped to `pbcopy` on macOS to copy the output to clipboard.
* # base64 txid
* bun run ./misc/arweave.js MIUWuecacKAzbuTB3ZjWa4cxNdawMqCZpEPiOqgZbZc
* ```
*
* Can be piped to `pbcopy` on macOS to copy the output to clipboard.
*/

// parse input
let input = process.argv[2];
if (!input) {
console.error("No input provided.");
return;
}

// get rid of 0x
let arweaveTxId;
if (input.startsWith("0x")) {
input = input.slice(2);
// if it starts with 0x, we assume its all hex
arweaveTxId = JSON.parse(
Buffer.from(input.slice(2), "hex").toString()
).arweave;
} else if (input.startsWith("{")) {
// if it starts with {, we assume its a JSON string
console.log("input", input);
arweaveTxId = JSON.parse(input).arweave;
} else {
// otherwise, we assume its a base64 txid
arweaveTxId = input;
}
const inputDecoded = Buffer.from(input, "hex").toString();
const obj = JSON.parse(inputDecoded);

// construct the URL
// download the actual response from Arweave
const url = `https://arweave.net/${obj.arweave}`;
console.log(url);
const url = `https://arweave.net/${arweaveTxId}`;
const res = await fetch(url);

console.log(await res.text());
88 changes: 69 additions & 19 deletions src/cli/commands/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::{
use alloy::{
eips::BlockNumberOrTag,
primitives::{utils::format_ether, U256},
rpc::types::Log,
};
use dkn_workflows::{DriaWorkflowsConfig, Model, ModelProvider};
use eyre::{eyre, Context, Result};
Expand Down Expand Up @@ -45,6 +44,7 @@ impl DriaOracle {
}
}
}

log::info!(
"Running as: {}",
kinds
Expand Down Expand Up @@ -122,8 +122,12 @@ impl DriaOracle {
next = event_stream.next() => {
match next {
Some(Ok((event, log))) => {
self.handle_event_log(event, log, &kinds, &model_config)
.await
log::debug!(
"Handling task {} (tx: {})",
event.taskId,
log.transaction_hash.unwrap_or_default()
);
self.handle_event_log(event, &kinds, &model_config).await
}
Some(Err(e)) => log::error!("Could not handle event: {}", e),
None => {
Expand All @@ -141,19 +145,16 @@ impl DriaOracle {
async fn handle_event_log(
&self,
event: StatusUpdate,
log: Log,
kinds: &[OracleKind],
model_config: &DriaWorkflowsConfig,
workflows: &DriaWorkflowsConfig,
) {
let task_id = event.taskId;
log::debug!(
"Handling task {} (tx: {})",
task_id,
log.transaction_hash.unwrap_or_default()
);
let Ok(status) = TaskStatus::try_from(event.statusAfter) else {
log::error!("Could not parse task status: {}", event.statusAfter);
return;
};

// handle request
match handle_request(self, kinds, model_config, event).await {
match handle_request(self, kinds, workflows, status, event.taskId, event.protocol).await {
Ok(Some(receipt)) => {
log::info!(
"Task {} processed successfully. (tx: {})",
Expand All @@ -171,7 +172,7 @@ impl DriaOracle {
async fn handle_previous_tasks(
&self,
from_block: BlockNumberOrTag,
model_config: &DriaWorkflowsConfig,
workflows: &DriaWorkflowsConfig,
kinds: &[OracleKind],
) -> Result<()> {
log::info!(
Expand All @@ -183,19 +184,30 @@ impl DriaOracle {
.await?;

for (event, log) in prev_tasks {
let status_before = TaskStatus::try_from(event.statusBefore)?;
let status_after = TaskStatus::try_from(event.statusAfter)?;
let task_id = event.taskId;
log::info!(
"Previous task: {} ({} -> {})",
task_id,
TaskStatus::try_from(event.statusBefore).unwrap_or_default(),
TaskStatus::try_from(event.statusAfter).unwrap_or_default()
status_before,
status_after
);
log::debug!(
"Handling task {} (tx: {})",
task_id,
log.transaction_hash.unwrap_or_default()
);
match handle_request(self, kinds, model_config, event).await {
match handle_request(
self,
kinds,
workflows,
status_after,
event.taskId,
event.protocol,
)
.await
{
Ok(Some(receipt)) => {
log::info!(
"Task {} processed successfully. (tx: {})",
Expand All @@ -212,6 +224,7 @@ impl DriaOracle {

Ok(())
}

pub(in crate::cli) async fn view_task_events(
&self,
from_block: impl Into<BlockNumberOrTag> + Clone,
Expand All @@ -233,12 +246,13 @@ impl DriaOracle {

let task_events = self.get_tasks_in_range(from_block, to_block).await?;

for (event, _) in task_events {
for (event, log) in task_events {
log::info!(
"Task: {} ({} -> {})",
"Task {} changed from {} to {} at block {}",
event.taskId,
TaskStatus::try_from(event.statusBefore).unwrap_or_default(),
TaskStatus::try_from(event.statusAfter).unwrap_or_default()
TaskStatus::try_from(event.statusAfter).unwrap_or_default(),
log.block_number.unwrap_or_default()
);
}

Expand Down Expand Up @@ -290,6 +304,42 @@ impl DriaOracle {
Ok(())
}

pub(in crate::cli) async fn process_task(
&self,
workflows: &DriaWorkflowsConfig,
kinds: &[OracleKind],
task_id: U256,
) -> Result<()> {
log::info!("Processing task {}.", task_id);
let request = self.get_task_request(task_id).await?;

log::info!(
"Request Information:\nRequester: {}\nStatus: {}\nInput: {}\nModels: {}",
request.requester,
TaskStatus::try_from(request.status)?,
bytes_to_string(&request.input)?,
bytes_to_string(&request.models)?
);

// TODO: !!!
let status = TaskStatus::try_from(request.status)?;
match handle_request(self, kinds, workflows, status, task_id, request.protocol).await {
Ok(Some(receipt)) => {
log::info!(
"Task {} processed successfully. (tx: {})",
task_id,
receipt.transaction_hash
)
}
Ok(None) => {
log::info!("Task {} ignored.", task_id)
}
Err(e) => log::error!("Could not process task: {:?}", e),
}

Ok(())
}

pub async fn request_task(
&self,
input: &str,
Expand Down
14 changes: 13 additions & 1 deletion src/cli/commands/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,19 @@ pub enum Commands {
models: Vec<Model>,
},
/// View status of a given task.
View { task_id: U256 },
View {
#[arg(help = "Task id.", required = true)]
task_id: U256,
},
/// Process a single task.
Process {
#[arg(help = "Task id.", required = true)]
task_id: U256,
#[arg(help = "The oracle kinds to handle the task as.", required = false)]
kinds: Vec<OracleKind>,
#[arg(short, long = "model", help = "The models to use for this task.", required = true, value_parser = parse_model)]
models: Vec<Model>,
},
/// View tasks between specific blocks.
Tasks {
#[arg(long, help = "Starting block number, defaults to 'earliest'.", value_parser = parse_block_number_or_tag)]
Expand Down
17 changes: 16 additions & 1 deletion src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
mod commands;
use std::time::Duration;

use commands::Commands;

mod parsers;
use dkn_workflows::DriaWorkflowsConfig;
use parsers::*;

use crate::{DriaOracle, DriaOracleConfig};
Expand All @@ -25,6 +28,9 @@ struct Cli {
/// Ethereum wallet's secret (private) key.
#[arg(short, long, env = "SECRET_KEY", value_parser = parse_secret_key)]
secret_key: B256,

#[arg(short, long, env = "TX_TIMEOUT_SECS", default_value = "30")]
tx_timeout: Option<u64>,
}

/// Main CLI entry point.
Expand All @@ -39,7 +45,8 @@ pub async fn cli() -> Result<()> {

// create node
let config = DriaOracleConfig::new(&secret_key, rpc_url)
.wrap_err("could not create oracle configuration")?;
.wrap_err("could not create oracle configuration")?
.with_tx_timeout(Duration::from_secs(30)); // timeout is 30secs by default
let node = DriaOracle::new(config)
.await
.wrap_err("could not create oracle node")?;
Expand Down Expand Up @@ -89,6 +96,14 @@ pub async fn cli() -> Result<()> {
}
}
Commands::View { task_id } => node.view_task(task_id).await?,
Commands::Process {
task_id,
kinds,
models,
} => {
node.process_task(&DriaWorkflowsConfig::new(models), &kinds, task_id)
.await?
}
Commands::Tasks { from, to } => {
node.view_task_events(
from.unwrap_or(BlockNumberOrTag::Earliest),
Expand Down
2 changes: 1 addition & 1 deletion src/compute/generation/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ mod tests {
// cargo test --package dria-oracle --lib --all-features -- compute::generation::execute::tests::test_raw_workflow --exact --show-output --ignored
dotenvy::dotenv().unwrap();

let contract_result = hex_literal::hex!("7b2261727765617665223a2239397a4252676c4c663443696b35676c57444f667542463736456e417a4a6344303431545a614c6d6f6934227d");
let contract_result = hex_literal::hex!("7b2261727765617665223a224d49555775656361634b417a62755442335a6a57613463784e6461774d71435a704550694f71675a625a63227d");
let request = GenerationRequest::try_parse_bytes(&contract_result.into())
.await
.unwrap();
Expand Down
Loading

0 comments on commit e5f3fa6

Please sign in to comment.