Skip to content

Commit

Permalink
Progress.
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubadamw committed Oct 19, 2024
1 parent c547568 commit bad6933
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 150 deletions.
306 changes: 190 additions & 116 deletions Cargo.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "sync"] }
tonic = { version = "0.12", features = ["tls"] }
tonic = { version = "0.12", features = ["tls", "tls-native-roots"] }
tracing = "0.1"

[build-dependencies]
tonic-build = "0.12"
tempfile = "3"

[dev-dependencies]
dotenv = "0.15"
7 changes: 6 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@ fn prepend_package_name_and_build(
writeln!(temp_file, "package {package_name};")?;

let target_path = temp_file.into_temp_path();
tonic_build::compile_protos(target_path)?;
tonic_build::configure()
.disable_package_emission()
.compile_protos(
&[&target_path],
&[target_path.parent().expect("must succeed")],
)?;
Ok(())
}

Expand Down
34 changes: 34 additions & 0 deletions examples/fibonacci.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use hatchet_sdk::{Client, StepBuilder, WorkflowBuilder};

pub async fn execute(n: std::num::NonZero<usize>) -> anyhow::Result<usize> {
let mut sum = 0;
let mut last = 0;
let mut current = 1;
for _i in 1..=n.get() {
sum = last + current;
last = current;
current = sum;
}
Ok(sum)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
dotenv::dotenv().ok();

let client = Client::new()?;
let mut worker = client.worker("example").build();
worker.register_workflow(
WorkflowBuilder::default()
.name("fibonacci")
.step(
StepBuilder::default()
.name("compute")
.function(&execute)
.build()?,
)
.build()?,
);
worker.start().await?;
Ok(())
}
2 changes: 1 addition & 1 deletion hatchet
Submodule hatchet updated 421 files
3 changes: 1 addition & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use crate::worker::WorkerBuilder;
pub(crate) struct Environment {
pub(crate) token: SecretString,
pub(crate) host_port: Option<String>,
#[serde(default)]
pub(crate) listener_v2_timeout: u64,
pub(crate) listener_v2_timeout: Option<u64>,
#[serde(default)]
pub(crate) tls_strategy: crate::ClientTlStrategy,
pub(crate) tls_cert_file: Option<String>,
Expand Down
20 changes: 15 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@
pub enum Error {
#[error("failed to load configuration from the environment: {0}")]
Environment(#[from] envy::Error),
#[error("transport error: {0}")]
TonicTransport(#[from] tonic::transport::Error),
#[error("status: {0}")]
TonicStatus(#[from] tonic::Status),
#[error("worker registration request: {0}")]
CouldNotRegisterWorker(tonic::Status),
#[error("workflow registration request:: {0}")]
CouldNotPutWorkflow(tonic::Status),
#[error("dispatcher listen error: {0}")]
CouldNotListenToDispatcher(tonic::Status),
#[error("step status send error: {0}")]
CouldNotSendStepStatus(tonic::Status),
#[error("heartbeat error: {0}")]
CouldNotSendHeartbeat(tonic::Status),
#[error("dispatcher connection error: {0}")]
CouldNotConnectToDispatcher(tonic::transport::Error),
#[error("workflow service connection error: {0:?}")]
CouldNotConnectToWorkflowService(tonic::transport::Error),
#[error("could not read file under `{1}`: {0}")]
CouldNotReadFile(std::io::Error, String),
#[error("environment variables {0} and {1} cannot be set simultaneously")]
CantSetBothEnvironmentVariables(&'static str, &'static str),
#[error("could not subscribe to actions after {0} retries")]
CouldNotSubscribeToActions(usize),
#[error("could not decode the provided token to retrieve the host/port pair")]
#[error("could not decode the provided token to retrieve the host/port pair: {0}")]
CouldNotDecodeToken(jsonwebtoken::errors::Error),
#[error("could not decode action payload: {0}")]
CouldNotDecodeActionPayload(serde_json::Error),
Expand Down
3 changes: 2 additions & 1 deletion src/worker/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ where
heartbeat_at: Some(std::time::SystemTime::now().into()),
worker_id: worker_id.clone(),
})
.await?;
.await
.map_err(crate::Error::CouldNotSendHeartbeat)?;

tokio::select! {
_ = interval.tick() => {
Expand Down
24 changes: 18 additions & 6 deletions src/worker/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ where
StepActionEventType::StepEventTypeStarted,
Default::default(),
))
.await?
.await
.map_err(crate::Error::CouldNotSendStepStatus)?
.into_inner();

let input = serde_json::from_str(&action.action_payload)
Expand Down Expand Up @@ -97,7 +98,8 @@ where

dispatcher
.send_step_action_event(action_event)
.await?
.await
.map_err(crate::Error::CouldNotSendStepStatus)?
.into_inner();

Ok(())
Expand All @@ -110,7 +112,7 @@ pub(crate) async fn run<F>(
namespace: &str,
worker_id: &str,
workflows: Vec<Workflow>,
listener_v2_timeout: u64,
listener_v2_timeout: Option<u64>,
mut interrupt_receiver: tokio::sync::mpsc::Receiver<()>,
_heartbeat_interrupt_sender: tokio::sync::mpsc::Sender<()>,
) -> crate::Result<()>
Expand Down Expand Up @@ -141,15 +143,25 @@ where
}
.into_request();
request.set_timeout(DEFAULT_ACTION_TIMEOUT);
dispatcher.listen(request).await?.into_inner()
dispatcher
.listen(request)
.await
.map_err(crate::Error::CouldNotListenToDispatcher)?
.into_inner()
}
ListenStrategy::V2 => {
let mut request = WorkerListenRequest {
worker_id: worker_id.to_owned(),
}
.into_request();
request.set_timeout(std::time::Duration::from_millis(listener_v2_timeout));
dispatcher.listen_v2(request).await?.into_inner()
if let Some(listener_v2_timeout) = listener_v2_timeout {
request.set_timeout(std::time::Duration::from_millis(listener_v2_timeout));
}
dispatcher
.listen_v2(request)
.await
.map_err(crate::Error::CouldNotListenToDispatcher)?
.into_inner()
}
};

Expand Down
60 changes: 44 additions & 16 deletions src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use crate::{client::Environment, ClientTlStrategy, Workflow};
#[derive(derive_builder::Builder)]
#[builder(pattern = "owned", build_fn(private, name = "build_private"))]
pub struct Worker<'a> {
#[builder(default, setter(into))]
name: String,
#[builder(default, setter(into))]
max_runs: Option<i32>,
#[builder(vis = "pub(crate)")]
environment: &'a super::client::Environment,
Expand Down Expand Up @@ -47,18 +49,26 @@ struct TokenClaims {
}

fn construct_endpoint_url<'a>(
tls_strategy: ClientTlStrategy,
host_port_in_environment: Option<&'a str>,
token: &SecretString,
) -> crate::Result<String> {
use secrecy::ExposeSecret;

let protocol = match tls_strategy {
ClientTlStrategy::None => "http",
ClientTlStrategy::Tls | ClientTlStrategy::MTls => "https",
};

Ok(format!(
"https://{}",
"{protocol}://{}",
host_port_in_environment
.map(|value| crate::Result::Ok(std::borrow::Cow::Borrowed(value)))
.unwrap_or_else(|| {
let key = jsonwebtoken::DecodingKey::from_secret(&[]);
let mut validation = jsonwebtoken::Validation::new(jsonwebtoken::Algorithm::HS256);
let mut validation = jsonwebtoken::Validation::new(jsonwebtoken::Algorithm::ES256);
validation.insecure_disable_signature_validation();
validation.validate_aud = false;
let data: jsonwebtoken::TokenData<TokenClaims> =
jsonwebtoken::decode(token.expose_secret(), &key, &validation)
.map_err(crate::Error::CouldNotDecodeToken)?;
Expand All @@ -75,11 +85,14 @@ async fn construct_endpoint(
host_port: Option<&str>,
token: &SecretString,
) -> crate::Result<tonic::transport::Endpoint> {
let mut endpoint = tonic::transport::Endpoint::new(construct_endpoint_url(host_port, token)?)?;
let mut endpoint =
tonic::transport::Endpoint::new(construct_endpoint_url(tls_strategy, host_port, token)?)
.expect("endpoint must be valid");
endpoint = match tls_strategy {
crate::ClientTlStrategy::None => todo!(),
crate::ClientTlStrategy::None => endpoint,
crate::ClientTlStrategy::Tls | crate::ClientTlStrategy::MTls => {
let mut tls_config = tonic::transport::channel::ClientTlsConfig::new();
let mut tls_config =
tonic::transport::channel::ClientTlsConfig::new().with_enabled_roots();
if let Some(domain_name) = tls_server_name {
tls_config = tls_config.domain_name(domain_name);
};
Expand All @@ -104,7 +117,9 @@ async fn construct_endpoint(
tls_config =
tls_config.ca_certificate(Certificate::from_pem(extra_root_certificate));
}
endpoint.tls_config(tls_config)?
endpoint
.tls_config(tls_config)
.expect("TLS config must be valid")
}
};
Ok(endpoint)
Expand Down Expand Up @@ -155,6 +170,8 @@ impl<'a> Worker<'a> {
)
.await?;

println!("{}", endpoint.uri());

let authorization_header: tonic::metadata::MetadataValue<tonic::metadata::Ascii> =
format!("Bearer {token}", token = token.expose_secret())
.parse()
Expand All @@ -163,7 +180,10 @@ impl<'a> Worker<'a> {

let mut workflow_service_client =
grpc::workflow_service_client::WorkflowServiceClient::with_interceptor(
endpoint.connect().await?,
endpoint
.connect()
.await
.map_err(crate::Error::CouldNotConnectToWorkflowService)?,
move |mut request: tonic::Request<()>| {
request
.metadata_mut()
Expand Down Expand Up @@ -199,7 +219,7 @@ impl<'a> Worker<'a> {
"{namespaced_workflow_name}:{name}",
name = step.name
),
timeout: step.timeout.as_secs().to_string(),
timeout: format!("{}s", step.timeout.as_secs()),
inputs: "{}".to_owned(),
parents: step.parents.clone(),
user_data: Default::default(),
Expand All @@ -211,20 +231,25 @@ impl<'a> Worker<'a> {
}],
name: namespaced_workflow_name,
concurrency: None, // FIXME: Implement.
schedule_timeout: Some(workflow.schedule_timeout.as_secs().to_string()),
cron_input: None, // FIXME: Implement.
on_failure_job: None, // FIXME: Implement.
sticky: None, // FIXME: Implement.
schedule_timeout: Some(format!("{}s", workflow.schedule_timeout.as_secs())),
cron_input: None, // FIXME: Implement.
on_failure_job: None, // FIXME: Implement.
sticky: None, // FIXME: Implement.
default_priority: None, // FIXME: Implement.
kind: None,
}),
})
.await?;
.await
.map_err(crate::Error::CouldNotPutWorkflow)?;
}

// FIXME: Account for all the settings from `self.environment`.
let mut dispatcher = {
grpc::dispatcher_client::DispatcherClient::with_interceptor(
endpoint.connect().await?,
endpoint
.connect()
.await
.map_err(crate::Error::CouldNotConnectToDispatcher)?,
move |mut request: tonic::Request<()>| {
request
.metadata_mut()
Expand All @@ -246,8 +271,11 @@ impl<'a> Worker<'a> {
request
};

let WorkerRegisterResponse { worker_id, .. } =
dispatcher.register(request).await?.into_inner();
let WorkerRegisterResponse { worker_id, .. } = dispatcher
.register(request)
.await
.map_err(crate::Error::CouldNotRegisterWorker)?
.into_inner();

futures_util::try_join! {
heartbeat::run(dispatcher.clone(), &worker_id, heartbeat_interrupt_receiver),
Expand Down
2 changes: 1 addition & 1 deletion src/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl StepBuilder {
pub struct Workflow {
#[builder(setter(into))]
pub(crate) name: String,
#[builder(setter(into))]
#[builder(default, setter(into))]
pub(crate) description: String,
#[builder(default, setter(into))]
pub(crate) version: String,
Expand Down

0 comments on commit bad6933

Please sign in to comment.