Skip to content

Commit

Permalink
Retry some Trino errors
Browse files Browse the repository at this point in the history
  • Loading branch information
emk committed Feb 29, 2024
1 parent bc8d780 commit 05b9e53
Showing 1 changed file with 53 additions and 25 deletions.
78 changes: 53 additions & 25 deletions src/drivers/trino/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//! Trino and maybe Presto driver.
use std::{fmt, str::FromStr, sync::Arc};
use std::{fmt, str::FromStr, sync::Arc, time::Duration};

use async_trait::async_trait;
use codespan_reporting::{diagnostic::Diagnostic, files::Files};
use joinery_macros::sql_quote;
use once_cell::sync::Lazy;
use prusto::{error::Error as PrustoError, Client, ClientBuilder, Presto, QueryError, Row};
use regex::Regex;
use tokio::time::sleep;
use tracing::debug;

use crate::{
Expand Down Expand Up @@ -71,6 +72,27 @@ fn rewrite_approx_quantiles(call: &FunctionCall) -> TokenStream {
}
}

/// Quick and dirty retry loop to deal with transient Trino errors. We will
/// probably ultimately want exponential backoff, etc.
macro_rules! retry_trino_error {
($e:expr) => {{
let mut max_tries = 3;
let mut sleep_duration = Duration::from_millis(500);
loop {
match $e {
Ok(val) => break Ok(val),
Err(e) if should_retry(&e) && max_tries > 0 => {
sleep(sleep_duration).await;
max_tries -= 1;
sleep_duration *= 2;
continue;
}
Err(e) => break Err(e),
}
}
}};
}

/// A locator for a Trino database. May or may not also work for Presto.
#[derive(Debug)]
pub struct TrinoLocator {
Expand Down Expand Up @@ -170,10 +192,10 @@ impl Driver for TrinoDriver {
#[tracing::instrument(skip_all)]
async fn execute_native_sql_statement(&mut self, sql: &str) -> Result<()> {
debug!(%sql, "Executing native SQL statement");
self.client
.execute(sql.to_owned())
.await
.map_err(|err| abbreviate_trino_error(sql, err))?;
retry_trino_error! {
self.client.execute(sql.to_owned()).await
}
.map_err(|err| abbreviate_trino_error(sql, err))?;
Ok(())
}

Expand Down Expand Up @@ -205,11 +227,11 @@ impl Driver for TrinoDriver {
#[tracing::instrument(skip(self))]
async fn drop_table_if_exists(&mut self, table_name: &str) -> Result<()> {
let sql = format!("DROP TABLE IF EXISTS {}", AnsiIdent(table_name));
self.client
.execute(sql.clone())
.await
.map_err(|err| abbreviate_trino_error(&sql, err))
.with_context(|| format!("Failed to drop table: {}", table_name))?;
retry_trino_error! {
self.client.execute(sql.clone()).await
}
.map_err(|err| abbreviate_trino_error(&sql, err))
.with_context(|| format!("Failed to drop table: {}", table_name))?;
Ok(())
}

Expand Down Expand Up @@ -243,12 +265,12 @@ impl DriverImpl for TrinoDriver {
TrinoString(&self.schema),
TrinoString(table_name)
);
Ok(self
.client
.get_all::<Col>(sql.clone())
.await
.map_err(|err| abbreviate_trino_error(&sql, err))
.with_context(|| format!("Failed to get columns for table: {}", table_name))?
let dataset = retry_trino_error! {
self.client.get_all::<Col>(sql.clone()).await
}
.map_err(|err| abbreviate_trino_error(&sql, err))
.with_context(|| format!("Failed to get columns for table: {}", table_name))?;
Ok(dataset
.into_vec()
.into_iter()
.map(|c| Column {
Expand All @@ -275,15 +297,12 @@ impl DriverImpl for TrinoDriver {
AnsiIdent(table_name),
cols_sql
);
let rows = self
.client
.get_all::<Row>(sql.clone())
.await
.map_err(|err| abbreviate_trino_error(&sql, err))
.with_context(|| format!("Failed to query table: {}", table_name))?
.into_vec()
.into_iter()
.map(|r| Ok(r.into_json()));
let dataset = retry_trino_error! {
self.client.get_all::<Row>(sql.clone()).await
}
.map_err(|err| abbreviate_trino_error(&sql, err))
.with_context(|| format!("Failed to query table: {}", table_name))?;
let rows = dataset.into_vec().into_iter().map(|r| Ok(r.into_json()));
Ok(Box::new(rows))
}
}
Expand Down Expand Up @@ -324,6 +343,15 @@ impl fmt::Display for TrinoString<'_> {
}
}

/// Should an error be retried?
///
/// Note that the `rusto` crate has internal support for retrying connection
/// and network errors, so we don't need to worry about that. But we do need
/// to look out for `QueryError`s that might need to be retried.
fn should_retry(e: &PrustoError) -> bool {
matches!(e, PrustoError::QueryError(QueryError { error_type, .. }) if error_type == "NO_NODES_AVAILABLE")
}

/// These errors are pages long.
fn abbreviate_trino_error(sql: &str, e: PrustoError) -> Error {
if let PrustoError::QueryError(e) = &e {
Expand Down

0 comments on commit 05b9e53

Please sign in to comment.