Skip to content

Commit

Permalink
Merge pull request #4404 from systeminit/remove_pinga_timeout
Browse files Browse the repository at this point in the history
fix: remove pinga heartbeat
  • Loading branch information
sprutton1 authored Aug 22, 2024
2 parents 868ad97 + 97dc9b8 commit 19bf00e
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 92 deletions.
6 changes: 0 additions & 6 deletions bin/lang-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ async function main() {
// We don't have the executionId yet, so this field will be empty
let errorFn = makeConsole(executionId).error;

const interval = setInterval(() => {
console.log(JSON.stringify({ protocol: "heartbeat" }));
}, 5000);

try {
const requestJson = fs.readFileSync(STDIN_FD, "utf8");
debug({ request: requestJson });
Expand All @@ -98,8 +94,6 @@ async function main() {
onError(errorFn, err as Error, executionId);
}

clearInterval(interval);

// NOTE(nick): hey friends, Nick here. I won't implicate @jobelenus in this comment because I have a solid chance of
// going off the rails, but I do need to give him attribution here. Thank you for pairing with me to find this.
// Alright so here's the deal: whether I am employed at SI or not (or even contributing to SI or not), I need you to
Expand Down
2 changes: 0 additions & 2 deletions lib/cyclone-server/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,6 @@ where
.stdout
.map(|ls_result| match ls_result {
Ok(ls_msg) => match ls_msg {
LangServerMessage::Heartbeat => Ok(Message::Heartbeat),
LangServerMessage::Output(mut output) => {
Self::filter_output(&mut output, &self.sensitive_strings)?;
Ok(Message::OutputStream(output.into()))
Expand Down Expand Up @@ -427,7 +426,6 @@ where
#[derive(Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(tag = "protocol", rename_all = "camelCase")]
pub enum LangServerMessage<Success> {
Heartbeat,
Output(LangServerOutput),
Result(LangServerResult<Success>),
}
Expand Down
94 changes: 35 additions & 59 deletions lib/veritech-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use tokio_util::sync::CancellationToken;
use veritech_core::{
nats_action_run_subject, nats_cancel_execution_subject, nats_reconciliation_subject,
nats_resolver_function_subject, nats_schema_variant_definition_subject,
nats_validation_subject, reply_mailbox_for_keep_alive, reply_mailbox_for_output,
reply_mailbox_for_result, FINAL_MESSAGE_HEADER_KEY,
nats_validation_subject, reply_mailbox_for_output, reply_mailbox_for_result,
FINAL_MESSAGE_HEADER_KEY,
};

pub use cyclone_core::{
Expand All @@ -31,8 +31,6 @@ pub enum ClientError {
JSONSerialize(#[source] serde_json::Error),
#[error("nats error")]
Nats(#[from] si_data_nats::NatsError),
#[error("no keep alive from cyclone")]
NoKeepAlive,
#[error("no function result from cyclone; bug!")]
NoResult,
#[error("unable to publish message: {0:?}")]
Expand Down Expand Up @@ -178,18 +176,6 @@ impl Client {
.start(&self.nats)
.await?;

// Construct a subscriber stream for keep-alive messages
let keep_alive_subscriber_subject = reply_mailbox_for_keep_alive(&reply_mailbox_root);
trace!(
messaging.destination = &keep_alive_subscriber_subject.as_str(),
"subscribing for keep-alive messages"
);
let mut keep_alive_subscriber: Subscriber<()> =
Subscriber::create(keep_alive_subscriber_subject)
.final_message_header_key(FINAL_MESSAGE_HEADER_KEY)
.start(&self.nats)
.await?;

let shutdown_token = CancellationToken::new();
let span = Span::current();

Expand Down Expand Up @@ -225,54 +211,44 @@ impl Client {

let span = Span::current();

loop {
tokio::select! {
_ = keep_alive_subscriber.next() => {
debug!("Heartbeat from veritech");
continue;
}
// Abort if no keep-alive for too long
_ = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
return Err(ClientError::NoKeepAlive);
}
// Wait for one message on the result reply mailbox
result = result_subscriber.try_next() => {
shutdown_token.cancel();
tokio::select! {
// Wait for one message on the result reply mailbox
result = result_subscriber.try_next() => {
shutdown_token.cancel();

root_subscriber.unsubscribe_after(0).await?;
result_subscriber.unsubscribe_after(0).await?;
match result? {
Some(result) => {
span.follows_from(result.process_span);
return Ok(result.payload);
}
None => return Err(ClientError::NoResult),
root_subscriber.unsubscribe_after(0).await?;
result_subscriber.unsubscribe_after(0).await?;
match result? {
Some(result) => {
span.follows_from(result.process_span);
Ok(result.payload)
}
None => Err(ClientError::NoResult),
}
maybe_msg = root_subscriber.next() => {
shutdown_token.cancel();
}
maybe_msg = root_subscriber.next() => {
shutdown_token.cancel();

match &maybe_msg {
Some(msg) => {
propagation::associate_current_span_from_headers(msg.headers());
error!(
subject = reply_mailbox_root,
msg = ?msg,
"received an unexpected message or error on reply subject prefix"
)
}
None => {
error!(
subject = reply_mailbox_root,
"reply subject prefix subscriber unexpectedly closed"
)
}
};
match &maybe_msg {
Some(msg) => {
propagation::associate_current_span_from_headers(msg.headers());
error!(
subject = reply_mailbox_root,
msg = ?msg,
"received an unexpected message or error on reply subject prefix"
)
}
None => {
error!(
subject = reply_mailbox_root,
"reply subject prefix subscriber unexpectedly closed"
)
}
};

// In all cases, we're considering a message on this subscriber to be fatal and
// will return with an error
return Err(ClientError::PublishingFailed(maybe_msg.ok_or(ClientError::RootConnectionClosed)?));
}
// In all cases, we're considering a message on this subscriber to be fatal and
// will return with an error
Err(ClientError::PublishingFailed(maybe_msg.ok_or(ClientError::RootConnectionClosed)?))
}
}
}
Expand Down
22 changes: 1 addition & 21 deletions lib/veritech-server/src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@ use si_data_nats::{NatsClient, Subject};
use si_pool_noodle::{FunctionResult, OutputStream};
use telemetry_nats::propagation;
use thiserror::Error;
use veritech_core::{
reply_mailbox_for_keep_alive, reply_mailbox_for_output, reply_mailbox_for_result,
FINAL_MESSAGE_HEADER_KEY,
};
use veritech_core::{reply_mailbox_for_output, reply_mailbox_for_result, FINAL_MESSAGE_HEADER_KEY};

#[remain::sorted]
#[derive(Error, Debug)]
Expand All @@ -22,7 +19,6 @@ type Result<T> = std::result::Result<T, PublisherError>;
#[derive(Debug)]
pub struct Publisher<'a> {
nats: &'a NatsClient,
reply_mailbox_keep_alive: Subject,
reply_mailbox_output: Subject,
reply_mailbox_result: Subject,
}
Expand All @@ -33,7 +29,6 @@ impl<'a> Publisher<'a> {
nats,
reply_mailbox_output: reply_mailbox_for_output(reply_mailbox).into(),
reply_mailbox_result: reply_mailbox_for_result(reply_mailbox).into(),
reply_mailbox_keep_alive: reply_mailbox_for_keep_alive(reply_mailbox).into(),
}
}

Expand Down Expand Up @@ -75,19 +70,4 @@ impl<'a> Publisher<'a> {
.await
.map_err(|err| PublisherError::NatsPublish(err, self.reply_mailbox_result.to_string()))
}

pub async fn publish_keep_alive(&self) -> Result<()> {
let nats_msg = serde_json::to_string(&()).map_err(PublisherError::JSONSerialize)?;

self.nats
.publish_with_headers(
self.reply_mailbox_keep_alive.clone(),
propagation::empty_injected_headers(),
nats_msg.into(),
)
.await
.map_err(|err| {
PublisherError::NatsPublish(err, self.reply_mailbox_keep_alive.to_string())
})
}
}
4 changes: 0 additions & 4 deletions lib/veritech-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,6 @@ async fn resolver_function_request(
}
Ok(ProgressMessage::Heartbeat) => {
trace!("received heartbeat message");
publisher.publish_keep_alive().await.map_err(|err| {
metric!(counter.function_run.resolver = -1);
span.record_err(err)
})?
}
Err(err) => {
warn!(error = ?err, "next progress message was an error, bailing out");
Expand Down

0 comments on commit 19bf00e

Please sign in to comment.