Skip to content

Commit

Permalink
Add a converter for timestamp schema fields
Browse files Browse the repository at this point in the history
  • Loading branch information
jackkleeman committed Dec 12, 2024
1 parent b8a6ed5 commit 81791f3
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 39 deletions.
88 changes: 54 additions & 34 deletions crates/admin/src/storage_query/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,28 @@ use std::{
use datafusion::{
arrow::{
array::{
Array, ArrayRef, AsArray, BinaryArray, GenericByteArray, RecordBatch, StringArray,
Array, ArrayRef, AsArray, BinaryArray, GenericByteArray, PrimitiveArray, RecordBatch,
StringArray,
},
buffer::{OffsetBuffer, ScalarBuffer},
datatypes::{ByteArrayType, DataType, Field, FieldRef, Schema, SchemaRef},
datatypes::{
ByteArrayType, DataType, Date64Type, Field, FieldRef, Schema, SchemaRef, TimeUnit,
TimestampMillisecondType,
},
error::ArrowError,
},
error::DataFusionError,
execution::{RecordBatchStream, SendableRecordBatchStream},
};
use futures::{Stream, StreamExt};

pub(super) const V1_CONVERTER: JoinConverter<LargeConverter, FullCountConverter> =
JoinConverter::new(LargeConverter, FullCountConverter);
pub(super) const V1_CONVERTER: JoinConverter<
JoinConverter<LargeConverter, FullCountConverter>,
TimestampConverter,
> = JoinConverter::new(
JoinConverter::new(LargeConverter, FullCountConverter),
TimestampConverter,
);

pub(super) struct ConvertRecordBatchStream<C> {
converter: C,
Expand Down Expand Up @@ -108,36 +117,6 @@ pub(super) trait Converter: Unpin {
fn convert_fields(&self, fields: Vec<FieldRef>) -> Vec<FieldRef>;
}

pub(super) struct NoopConverter;

impl Converter for NoopConverter {
#[inline]
fn convert_schema(&self, schema: SchemaRef) -> SchemaRef {
schema
}

#[inline]
fn convert_record_batch(
&self,
_: &SchemaRef,
record_batch: RecordBatch,
) -> Result<RecordBatch, ArrowError> {
Ok(record_batch)
}

fn convert_columns(
&self,
_: &SchemaRef,
_: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>, ArrowError> {
unreachable!()
}

fn convert_fields(&self, _: Vec<FieldRef>) -> Vec<FieldRef> {
unreachable!()
}
}

pub(super) struct JoinConverter<Before, After> {
before: Before,
after: After,
Expand Down Expand Up @@ -259,3 +238,44 @@ impl Converter for FullCountConverter {
.collect()
}
}

// Prior to 1.2, we used Date64 fields where we should have used Timestamp fields
// This is relevant for various fields used in the CLI
pub(super) struct TimestampConverter;

impl Converter for TimestampConverter {
fn convert_columns(
&self,
converted_schema: &SchemaRef,
mut columns: Vec<ArrayRef>,
) -> Result<Vec<ArrayRef>, ArrowError> {
for (i, field) in converted_schema.fields().iter().enumerate() {
if let (DataType::Date64, DataType::Timestamp(TimeUnit::Millisecond, _)) =
(field.data_type(), columns[i].data_type())
{
let col = columns[i].as_primitive::<TimestampMillisecondType>();
// this doesn't copy; the same backing array can be used because they both use i64 epoch-based times
let col =
PrimitiveArray::<Date64Type>::new(col.values().clone(), col.nulls().cloned());
columns[i] = ArrayRef::from(Box::new(col) as Box<dyn Array>);
}
}
Ok(columns)
}

fn convert_fields(&self, fields: Vec<FieldRef>) -> Vec<FieldRef> {
fields
.into_iter()
.map(|field| match (field.name().as_str(), field.data_type()) {
(
// inv ls
"last_start_at" | "next_retry_at" | "modified_at" | "created_at" |
// inv describe
"sleep_wakeup_at",
DataType::Timestamp(TimeUnit::Millisecond, _),
) => FieldRef::new(Field::clone(&field).with_data_type(DataType::Date64)),
_ => field,
})
.collect()
}
}
8 changes: 3 additions & 5 deletions crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use schemars::JsonSchema;
use serde::Deserialize;
use serde_with::serde_as;

use super::convert::{ConvertRecordBatchStream, NoopConverter, V1_CONVERTER};
use super::convert::{ConvertRecordBatchStream, V1_CONVERTER};
use super::error::StorageQueryError;
use crate::state::QueryServiceState;

Expand Down Expand Up @@ -71,10 +71,8 @@ pub async fn query(
V1_CONVERTER,
record_batch_stream,
)),
AdminApiVersion::Unknown | AdminApiVersion::V2 => Box::pin(ConvertRecordBatchStream::new(
NoopConverter,
record_batch_stream,
)),
// treat 'unknown' as latest, users can specify 1 if they want to maintain old behaviour
AdminApiVersion::Unknown | AdminApiVersion::V2 => record_batch_stream,
};

let (result_stream, content_type) = match headers.get(http::header::ACCEPT) {
Expand Down

0 comments on commit 81791f3

Please sign in to comment.