Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a mechanism for converting SQL schemas based on negotiated version #2417

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions cli/src/clients/admin_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use crate::clients::AdminClientInterface;
use super::errors::ApiError;

/// Min/max supported admin API versions
pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V1;
pub const MAX_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V1;
pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V2;
pub const MAX_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V2;

#[derive(Error, Debug)]
#[error(transparent)]
Expand Down Expand Up @@ -186,10 +186,13 @@ impl AdminClient {
.request(method, path)
.timeout(self.request_timeout);

match self.bearer_token.as_deref() {
let request_builder = match self.bearer_token.as_deref() {
Some(token) => request_builder.bearer_auth(token),
None => request_builder,
}
};

let (api_version_header, api_version) = self.admin_api_version.into();
request_builder.header(api_version_header, api_version)
}

/// Prepare a request builder that encodes the body as JSON.
Expand Down
76 changes: 61 additions & 15 deletions cli/src/clients/datafusion_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::str::FromStr;

use anyhow::Result;
use arrow::array::{Array, ArrayAccessor, AsArray, StringArray};
use arrow::datatypes::{ArrowTemporalType, Date64Type};
use arrow::datatypes::ArrowTemporalType;
use arrow::record_batch::RecordBatch;
use arrow_convert::{ArrowDeserialize, ArrowField};
use bytes::Bytes;
Expand Down Expand Up @@ -111,10 +111,17 @@ fn value_as_u64_opt(batch: &RecordBatch, col: usize, row: usize) -> Option<u64>
}

fn value_as_dt_opt(batch: &RecordBatch, col: usize, row: usize) -> Option<chrono::DateTime<Local>> {
batch
.column(col)
.as_primitive::<arrow::datatypes::Date64Type>()
.value_as_local_datetime_opt(row)
let col = batch.column(col);
match col.data_type() {
arrow::datatypes::DataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, _) => col
.as_primitive::<arrow::datatypes::TimestampMillisecondType>()
.value_as_local_datetime_opt(row),
jackkleeman marked this conversation as resolved.
Show resolved Hide resolved
// older versions of restate used Date64 instead of TimestampMillisecond
arrow::datatypes::DataType::Date64 => col
.as_primitive::<arrow::datatypes::Date64Type>()
.value_as_local_datetime_opt(row),
_ => panic!("Column is not a timestamp"),
}
}

#[derive(ValueEnum, Copy, Clone, Eq, Hash, PartialEq, Debug, Default)]
Expand Down Expand Up @@ -515,11 +522,7 @@ pub async fn get_service_status(
.column(2)
.as_primitive::<arrow::datatypes::Int64Type>()
.value(i);
let oldest_at = batch
.column(3)
.as_primitive::<arrow::datatypes::Date64Type>()
.value_as_local_datetime_opt(i)
.unwrap();
let oldest_at = value_as_dt_opt(&batch, 3, i).unwrap();

let oldest_invocation = batch.column(4).as_string::<i32>().value_string(i);

Expand Down Expand Up @@ -744,23 +747,66 @@ impl From<RestateDateTime> for DateTime<Local> {
}
}

pub static TIMEZONE_UTC: std::sync::LazyLock<std::sync::Arc<str>> =
std::sync::LazyLock::new(|| std::sync::Arc::from("+00:00"));

impl arrow_convert::field::ArrowField for RestateDateTime {
type Type = Self;

#[inline]
fn data_type() -> arrow::datatypes::DataType {
arrow::datatypes::DataType::Date64
arrow::datatypes::DataType::Timestamp(
arrow::datatypes::TimeUnit::Millisecond,
Some(TIMEZONE_UTC.clone()),
)
}
}

impl arrow_convert::deserialize::ArrowDeserialize for RestateDateTime {
type ArrayType = arrow::array::Date64Array;
type ArrayType = TimestampMillisecondArray;

#[inline]
fn arrow_deserialize(v: Option<i64>) -> Option<Self> {
v.and_then(arrow::temporal_conversions::as_datetime::<Date64Type>)
.map(|naive| Local.from_utc_datetime(&naive))
.map(RestateDateTime)
let timestamp = arrow::temporal_conversions::as_datetime::<
arrow::datatypes::TimestampMillisecondType,
>(v?)?;
Some(RestateDateTime(Local.from_utc_datetime(&timestamp)))
}
}

// This newtype is necessary to implement ArrowArray, which is implemented for TimestampNanosecond but not TimestampMillisecond for some reason
jackkleeman marked this conversation as resolved.
Show resolved Hide resolved
#[repr(transparent)]
struct TimestampMillisecondArray(arrow::array::TimestampMillisecondArray);

impl TimestampMillisecondArray {
fn from_ref(v: &arrow::array::TimestampMillisecondArray) -> &Self {
// SAFETY: transmuting a single-element newtype struct with repr(transparent) is safe
unsafe { std::mem::transmute(v) }
}
}

impl arrow_convert::deserialize::ArrowArray for TimestampMillisecondArray {
type BaseArrayType = arrow::array::TimestampMillisecondArray;
#[inline]
fn iter_from_array_ref(
b: &dyn Array,
) -> <Self as arrow_convert::deserialize::ArrowArrayIterable>::Iter<'_> {
let b = b.as_any().downcast_ref::<Self::BaseArrayType>().unwrap();
<Self as arrow_convert::deserialize::ArrowArrayIterable>::iter(
TimestampMillisecondArray::from_ref(b),
)
}
}

impl arrow_convert::deserialize::ArrowArrayIterable for TimestampMillisecondArray {
type Item<'a> = Option<
<arrow::datatypes::TimestampMillisecondType as arrow::array::ArrowPrimitiveType>::Native,
>;

type Iter<'a> = arrow::array::PrimitiveIter<'a, arrow::datatypes::TimestampMillisecondType>;

fn iter(&self) -> Self::Iter<'_> {
IntoIterator::into_iter(&self.0)
}
}

Expand Down
33 changes: 33 additions & 0 deletions crates/admin-rest-model/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,46 @@ use std::ops::RangeInclusive;
pub enum AdminApiVersion {
Unknown = 0,
V1 = 1,
V2 = 2,
}

impl From<AdminApiVersion> for (http::HeaderName, http::HeaderValue) {
fn from(value: AdminApiVersion) -> Self {
(
AdminApiVersion::HEADER_NAME,
http::HeaderValue::from(value.as_repr()),
Comment on lines +32 to +33
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we come to a conclusion how to encode the different versions (header vs. path)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have not

)
}
}

impl AdminApiVersion {
const HEADER_NAME: http::HeaderName =
http::HeaderName::from_static("x-restate-admin-api-version");

pub fn as_repr(&self) -> u16 {
*self as u16
}

pub fn from_headers(headers: &http::HeaderMap) -> Self {
let is_cli = matches!(headers.get(http::header::USER_AGENT), Some(value) if value.as_bytes().starts_with(b"restate-cli"));

match headers.get(Self::HEADER_NAME) {
Some(value) => match value.to_str() {
Ok(value) => match value.parse::<u16>() {
Ok(value) => match Self::try_from(value) {
Ok(value) => value,
Err(_) => Self::Unknown,
},
Err(_) => Self::Unknown,
},
Err(_) => Self::Unknown,
},
// CLI didn't used to send the version header, but if we know its the CLI, then we can treat that as V1
None if is_cli => Self::V1,
None => Self::Unknown,
}
Comment on lines +49 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I am not a huge fan of this kind of special casing because it can be confusing.

Is it standard practice to consider a request w/o a version header as the latest? While this is very convenient, it seems also like the best way to run into breaking changes because initially users don't have to think about versions until they upgrade.

Would it be an option to always consider a missing version header as v1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it is not standard practice. i think if its a header, we need to make sure that the naive curl case without the header does something sensible. if its a url path, we could perhaps deprecate the unversioned one, but given that we are versioning the whole api, it could mean fairly regularly breaking old scripts/clients which use a versioned url, when we remove support for an old api version. i don't really have a great answer here!

Copy link
Contributor

@tillrohrmann tillrohrmann Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the versioning approach via headers makes it a bit more hidden and I agree that the basic curl case should still work. At the same time, I would like to avoid a very obvious foot gun if possible.

Regarding deprecating a versioned url, I wouldn't worry too much about it. This is something that we can announce and then slowly phase out to give users time to adjust. This won't be so easily possible for the API that always serves the latest version (unless it is announced in advance).

}

pub fn choose_max_supported_version(
client_versions: RangeInclusive<AdminApiVersion>,
server_versions: RangeInclusive<u16>,
Expand Down
2 changes: 1 addition & 1 deletion crates/admin/src/rest_api/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use restate_admin_rest_model::version::{AdminApiVersion, VersionInformation};

/// Min/max supported admin api versions by the server
pub const MIN_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V1;
pub const MAX_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V1;
pub const MAX_ADMIN_API_VERSION: AdminApiVersion = AdminApiVersion::V2;

/// Version information endpoint
#[openapi(
Expand Down
Loading
Loading