Skip to content

Commit

Permalink
Add support for triggering nested workflows.
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubadamw committed Oct 20, 2024
1 parent 7a1b6c9 commit 191efc2
Show file tree
Hide file tree
Showing 12 changed files with 270 additions and 69 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ ctrlc2 = { version = "3", features = ["termination", "tokio"] }
derive_builder = "0.20"
envy = "0.4"
futures-util = "0.3"
http = "1"
jsonwebtoken = "9"
prost = "0.13"
prost-types = "0.13"
Expand Down
6 changes: 3 additions & 3 deletions examples/fibonacci.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use hatchet_sdk::{Client, StepBuilder, WorkflowBuilder};
use hatchet_sdk::{Client, Context, StepBuilder, WorkflowBuilder};

fn fibonacci(n: u32) -> u32 {
(1..=n)
Expand Down Expand Up @@ -33,7 +33,7 @@ struct Output {
result: u32,
}

async fn execute(Input { n }: Input) -> anyhow::Result<Output> {
async fn execute(_context: Context, Input { n }: Input) -> anyhow::Result<Output> {
Ok(Output {
result: fibonacci(n),
})
Expand All @@ -51,7 +51,7 @@ async fn main() -> anyhow::Result<()> {
.init();

let client = Client::new()?;
let mut worker = client.worker("example").build();
let mut worker = client.worker("example_fibonacci").build();
worker.register_workflow(
WorkflowBuilder::default()
.name("fibonacci")
Expand Down
66 changes: 66 additions & 0 deletions examples/spawn_workflow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use hatchet_sdk::{Client, Context, StepBuilder, WorkflowBuilder};

async fn execute_hello(
context: Context,
_: serde_json::Value,
) -> anyhow::Result<serde_json::Value> {
context
.trigger_workflow(
"world",
serde_json::json!({
"x": 42
}),
)
.await?;
Ok(serde_json::json!({
"message": "Hello"
}))
}

async fn execute_world(
_context: Context,
_: serde_json::Value,
) -> anyhow::Result<serde_json::Value> {
Ok(serde_json::json!({
"message": "World"
}))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();
tracing_subscriber::fmt()
.with_target(false)
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("hatchet_sdk=debug".parse()?),
)
.init();

let client = Client::new()?;
let mut worker = client.worker("example_spawn_workflow").build();
worker.register_workflow(
WorkflowBuilder::default()
.name("hello")
.step(
StepBuilder::default()
.name("execute")
.function(&execute_hello)
.build()?,
)
.build()?,
);
worker.register_workflow(
WorkflowBuilder::default()
.name("world")
.step(
StepBuilder::default()
.name("execute")
.function(&execute_world)
.build()?,
)
.build()?,
);
worker.start().await?;
Ok(())
}
2 changes: 1 addition & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct Client {
}

impl Client {
pub fn new() -> crate::Result<Self> {
pub fn new() -> crate::InternalResult<Self> {
let environment = envy::prefixed("HATCHET_CLIENT_").from_env::<Environment>()?;
Ok(Self { environment })
}
Expand Down
12 changes: 11 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
#[derive(Debug, thiserror::Error)]
pub enum Error {
pub enum InternalError {
#[error("failed to load configuration from the environment: {0}")]
Environment(#[from] envy::Error),
#[error("worker registration request: {0}")]
CouldNotRegisterWorker(tonic::Status),
#[error("workflow registration request:: {0}")]
CouldNotPutWorkflow(tonic::Status),
#[error("workflow schedule request:: {0}")]
CouldNotTriggerWorkflow(tonic::Status),
#[error("dispatcher listen error: {0}")]
CouldNotListenToDispatcher(tonic::Status),
#[error("step status send error: {0}")]
Expand All @@ -28,4 +30,12 @@ pub enum Error {
CouldNotDecodeActionPayload(serde_json::Error),
}

pub type InternalResult<T> = std::result::Result<T, InternalError>;

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("internal error: {0}")]
Internal(#[from] InternalError),
}

pub type Result<T> = std::result::Result<T, Error>;
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
mod client;
mod error;
mod step_function;
mod worker;
mod workflow;

pub use error::{Error, Result};
pub(crate) use error::{InternalError, InternalResult};

#[derive(Clone, Copy, Debug, Default, serde::Deserialize)]
#[serde(rename_all = "lowercase")]
Expand All @@ -15,6 +17,7 @@ enum ClientTlStrategy {
}

pub use client::Client;
pub use step_function::Context;
pub use worker::{Worker, WorkerBuilder};
pub use workflow::{Step, StepBuilder, Workflow, WorkflowBuilder};

Expand Down
75 changes: 75 additions & 0 deletions src/step_function.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use futures_util::lock::Mutex;
use tracing::info;

use crate::worker::{grpc, ServiceWithAuthorization};

pub struct Context {
workflow_run_id: String,
workflow_step_run_id: String,
workflow_service_client_and_spawn_index: Mutex<(
grpc::workflow_service_client::WorkflowServiceClient<
tonic::service::interceptor::InterceptedService<
tonic::transport::Channel,
ServiceWithAuthorization,
>,
>,
u16,
)>,
}

impl Context {
pub(crate) fn new(
workflow_run_id: String,
workflow_step_run_id: String,
workflow_service_client: grpc::workflow_service_client::WorkflowServiceClient<
tonic::service::interceptor::InterceptedService<
tonic::transport::Channel,
ServiceWithAuthorization,
>,
>,
) -> Self {
Self {
workflow_run_id,
workflow_service_client_and_spawn_index: Mutex::new((workflow_service_client, 0)),
workflow_step_run_id,
}
}

pub async fn trigger_workflow<I: serde::Serialize>(
&self,
workflow_name: &str,
input: I,
) -> anyhow::Result<()> {
info!("Scheduling another workflow {workflow_name}");
let mut mutex_guard = self.workflow_service_client_and_spawn_index.lock().await;
let (workflow_service_client, spawn_index) = &mut *mutex_guard;
let response = workflow_service_client
.trigger_workflow(grpc::TriggerWorkflowRequest {
name: workflow_name.to_owned(),
input: serde_json::to_string(&input).expect("must succeed"),
parent_id: Some(self.workflow_run_id.clone()),
parent_step_run_id: Some(self.workflow_step_run_id.clone()),
child_index: Some(*spawn_index as i32),
child_key: None,
additional_metadata: None, // FIXME: Add support.
desired_worker_id: None, // FIXME: Add support.
priority: Some(1), // FIXME: Add support.
})
.await
.map_err(crate::InternalError::CouldNotTriggerWorkflow)
.map_err(crate::Error::Internal)?
.into_inner();
info!(
"Scheduled another workflow run ID: {}",
response.workflow_run_id
);
*spawn_index += 1;
Ok(())
}
}

pub(crate) type StepFunction =
dyn Fn(
Context,
serde_json::Value,
) -> futures_util::future::LocalBoxFuture<'static, anyhow::Result<serde_json::Value>>;
6 changes: 3 additions & 3 deletions src/worker/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub(crate) async fn run<F>(
>,
worker_id: &str,
mut interrupt_receiver: tokio::sync::mpsc::Receiver<()>,
) -> crate::Result<()>
) -> crate::InternalResult<()>
where
F: tonic::service::Interceptor + Send + 'static,
{
Expand All @@ -24,7 +24,7 @@ where
worker_id: worker_id.clone(),
})
.await
.map_err(crate::Error::CouldNotSendHeartbeat)?;
.map_err(crate::InternalError::CouldNotSendHeartbeat)?;

tokio::select! {
_ = interval.tick() => {
Expand All @@ -35,7 +35,7 @@ where
}
}
}
crate::Result::Ok(())
crate::InternalResult::Ok(())
})
.await
.expect("must succeed spawing")?;
Expand Down
Loading

0 comments on commit 191efc2

Please sign in to comment.