Skip to content

Commit

Permalink
Ignore errors on writing metadata to lakehouse
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Oct 7, 2023
1 parent 34a162c commit 443121e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 3 deletions.
9 changes: 9 additions & 0 deletions dash/pipe/provider/src/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,20 @@ pub trait Function {

#[derive(Clone, Debug, Default)]
pub struct FunctionContext {
is_disabled_load: bool,
is_disabled_write_metadata: bool,
is_terminating: Arc<AtomicBool>,
}

impl FunctionContext {
pub fn disable_load(&mut self) {
self.is_disabled_load = true;
}

pub(crate) const fn is_disabled_load(&self) -> bool {
self.is_disabled_load
}

pub fn disable_store_metadata(&mut self) {
self.is_disabled_write_metadata = true;
}
Expand Down
25 changes: 22 additions & 3 deletions dash/pipe/provider/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ where

Some(ReadContext {
_job: ReadSession {
function_context: function_context.clone(),
storage: storage.input.clone(),
stream: match &self.queue_group {
Some(queue_group) => {
Expand Down Expand Up @@ -368,6 +369,7 @@ struct ReadSession<Value>
where
Value: Default,
{
function_context: FunctionContext,
storage: Arc<StorageSet>,
stream: Subscriber,
tx: Arc<Sender<PipeMessage<Value>>>,
Expand Down Expand Up @@ -406,7 +408,7 @@ where

match self.read_message_one().await? {
Some(input) => {
if input.payloads.is_empty() {
if self.function_context.is_disabled_load() || input.payloads.is_empty() {
send_one(&self.tx, input.load_payloads_as_empty()).await
} else {
let storage = self.storage.clone();
Expand Down Expand Up @@ -449,6 +451,18 @@ impl WriteContext {
&mut self,
input_payloads: &HashMap<String, PipePayload<()>>,
messages: PipeMessages<Value>,
) where
Value: Send + Sync + Default + Serialize + JsonSchema,
{
if let Err(error) = self.try_write_outputs(input_payloads, messages).await {
error!("{error}");
}
}

async fn try_write_outputs<Value>(
&mut self,
input_payloads: &HashMap<String, PipePayload<()>>,
messages: PipeMessages<Value>,
) -> Result<()>
where
Value: Send + Sync + Default + Serialize + JsonSchema,
Expand All @@ -464,11 +478,16 @@ impl WriteContext {

for output in outputs {
if !self.function_context.is_disabled_store_metadata() {
self.storage
if let Err(error) = self
.storage
.get_default_metadata()
.put_metadata(&[&output])
.await?;
.await
{
warn!("{error}");
}
}

let output = output
.to_json_bytes()
.map_err(|error| anyhow!("failed to parse NATS output: {error}"))?;
Expand Down

0 comments on commit 443121e

Please sign in to comment.