Skip to content

Commit

Permalink
Adding formats crate and Decoder trait (#37)
Browse files Browse the repository at this point in the history
  • Loading branch information
ameyc authored Sep 10, 2024
1 parent f1cd05b commit 64b6e82
Show file tree
Hide file tree
Showing 11 changed files with 202 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ datafusion = { workspace = true }
arrow = { workspace = true }
thiserror = "1.0.63"
pyo3 = { workspace = true, optional = true }
serde_json.workspace = true
4 changes: 3 additions & 1 deletion crates/common/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use thiserror::Error;

use arrow::error::ArrowError;
use datafusion::error::DataFusionError;

use serde_json::Error as JsonError;
#[cfg(feature = "python")]
mod py_err;

Expand All @@ -22,6 +22,8 @@ pub enum DenormalizedError {
KafkaConfig(String),
#[error("Arrow Error")]
Arrow(#[from] ArrowError),
#[error("Json Error")]
Json(#[from] JsonError),
#[error(transparent)]
Other(#[from] anyhow::Error),
}
23 changes: 1 addition & 22 deletions crates/core/src/datasource/kafka/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit};

use datafusion::logical_expr::SortExpr;

use crate::formats::StreamEncoding;
use crate::physical_plan::utils::time::TimestampUnit;
use crate::utils::arrow_helpers::infer_arrow_schema_from_json_value;
use denormalized_common::error::{DenormalizedError, Result};
Expand Down Expand Up @@ -307,28 +308,6 @@ impl KafkaTopicBuilder {
}
}

/// The data encoding for [`StreamTable`]
#[derive(Debug, Clone, Copy)]
pub enum StreamEncoding {
Avro,
Json,
}

impl FromStr for StreamEncoding {
type Err = DenormalizedError;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"avro" => Ok(Self::Avro),
"json" => Ok(Self::Json),
_ => Err(Self::Err::KafkaConfig(format!(
"Unrecognised StreamEncoding {}",
s
))),
}
}
}

fn get_topic_partition_count(bootstrap_servers: String, topic: String) -> Result<i32> {
let mut client_config = ClientConfig::new();
client_config.set("bootstrap.servers", bootstrap_servers.to_string());
Expand Down
44 changes: 9 additions & 35 deletions crates/core/src/datasource/kafka/kafka_stream_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@ use denormalized_orchestrator::channel_manager::{create_channel, get_sender};
use denormalized_orchestrator::orchestrator::{self, OrchestrationMessage};
use log::{debug, error};
use serde::{Deserialize, Serialize};
use serde_json::Value;
//use tracing::{debug, error};

use crate::config_extensions::denormalized_config::DenormalizedConfig;
use crate::formats::decoders::json::JsonDecoder;
use crate::formats::decoders::Decoder;
use crate::physical_plan::stream_table::PartitionStreamExt;
use crate::physical_plan::utils::time::array_to_timestamp_array;
use crate::state_backend::rocksdb_backend::get_global_rocksdb;
use crate::utils::arrow_helpers::json_records_to_arrow_record_batch;

use arrow::compute::{max, min};
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
use datafusion::physical_plan::streaming::PartitionStream;

use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::{ClientConfig, Message, Timestamp, TopicPartitionList};
use rdkafka::{ClientConfig, Message, TopicPartitionList};

use super::KafkaReadConfig;

Expand Down Expand Up @@ -134,11 +133,11 @@ impl PartitionStream for KafkaStreamRead {
let mut builder = RecordBatchReceiverStreamBuilder::new(self.config.schema.clone(), 1);
let tx = builder.tx();
let canonical_schema = self.config.schema.clone();
let json_schema = self.config.original_schema.clone();
let arrow_schema = self.config.original_schema.clone();
let timestamp_column: String = self.config.timestamp_column.clone();
let timestamp_unit = self.config.timestamp_unit.clone();
let batch_timeout = Duration::from_millis(100);
let mut channel_tag = String::from("");
let mut channel_tag: String = String::from("");
if orchestrator::SHOULD_CHECKPOINT {
let node_id = self.exec_node_id.unwrap();
channel_tag = format!("{}_{}", node_id, partition_tag);
Expand All @@ -151,6 +150,7 @@ impl PartitionStream for KafkaStreamRead {
let msg = OrchestrationMessage::RegisterStream(channel_tag.clone());
orchestrator_sender.as_ref().unwrap().send(msg).unwrap();
}
let mut json_decoder: JsonDecoder = JsonDecoder::new(arrow_schema.clone());
loop {
let mut last_offsets = HashMap::new();
if let Some(backend) = &state_backend {
Expand Down Expand Up @@ -181,7 +181,6 @@ impl PartitionStream for KafkaStreamRead {
}

let mut offsets_read: HashMap<i32, i64> = HashMap::new();
let mut batch: Vec<serde_json::Value> = Vec::new();
let start_time = datafusion::common::instant::Instant::now();

while start_time.elapsed() < batch_timeout {
Expand All @@ -192,30 +191,9 @@ impl PartitionStream for KafkaStreamRead {
.await
{
Ok(Ok(m)) => {
let timestamp = match m.timestamp() {
Timestamp::NotAvailable => -1_i64,
Timestamp::CreateTime(ts) => ts,
Timestamp::LogAppendTime(ts) => ts,
};
let key = m.key();

let payload = m.payload().expect("Message payload is empty");
let mut deserialized_record: HashMap<String, Value> =
serde_json::from_slice(payload).unwrap();
deserialized_record
.insert("kafka_timestamp".to_string(), Value::from(timestamp));
if let Some(key) = key {
deserialized_record.insert(
"kafka_key".to_string(),
Value::from(String::from_utf8_lossy(key)),
);
} else {
deserialized_record
.insert("kafka_key".to_string(), Value::from(String::from("")));
}
let new_payload = serde_json::to_value(deserialized_record).unwrap();
json_decoder.push_to_buffer(payload.to_owned());
offsets_read.insert(m.partition(), m.offset());
batch.push(new_payload);
}
Ok(Err(err)) => {
error!("Error reading from Kafka {:?}", err);
Expand All @@ -228,12 +206,8 @@ impl PartitionStream for KafkaStreamRead {
}
}

//debug!("Batch size {}", batch.len());

if !batch.is_empty() {
let record_batch: RecordBatch =
json_records_to_arrow_record_batch(batch, json_schema.clone());

if !offsets_read.is_empty() {
let record_batch = json_decoder.to_record_batch().unwrap();
let ts_column = record_batch
.column_by_name(timestamp_column.as_str())
.map(|ts_col| {
Expand Down
4 changes: 1 addition & 3 deletions crates/core/src/datasource/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ pub mod kafka_stream_read;
pub mod topic_reader;
pub mod topic_writer;

pub use kafka_config::{
ConnectionOpts, KafkaReadConfig, KafkaTopicBuilder, KafkaWriteConfig, StreamEncoding,
};
pub use kafka_config::{ConnectionOpts, KafkaReadConfig, KafkaTopicBuilder, KafkaWriteConfig};
pub use kafka_stream_read::KafkaStreamRead;
pub use topic_reader::TopicReader;
pub use topic_writer::TopicWriter;
148 changes: 148 additions & 0 deletions crates/core/src/formats/decoders/json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use std::sync::Arc;

use arrow_schema::Schema;
use serde_json::Value;

use crate::utils::arrow_helpers::json_records_to_arrow_record_batch;

use super::Decoder;

pub struct JsonDecoder {
schema: Arc<Schema>,
cache: Vec<Vec<u8>>,
}

impl JsonDecoder {
pub fn new(schema: Arc<Schema>) -> Self {
JsonDecoder {
schema: schema.clone(),
cache: Vec::new(),
}
}
}

impl Decoder for JsonDecoder {
fn to_record_batch(
&mut self,
) -> Result<arrow_array::RecordBatch, denormalized_common::DenormalizedError> {
let mut combined_json = Vec::new();
combined_json.push(b'[');
for (index, row) in self.cache.iter().enumerate() {
if index > 0 {
combined_json.push(b',');
}
combined_json.extend_from_slice(row.as_slice());
}
combined_json.push(b']');
self.cache.clear();
let result: Vec<Value> = serde_json::from_slice(&combined_json)?;
Ok(json_records_to_arrow_record_batch(
result,
self.schema.clone(),
))
}

fn push_to_buffer(&mut self, bytes: Vec<u8>) {
self.cache.push(bytes);
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow_array::{Int64Array, StringArray};
use arrow_schema::{DataType, Field};
use std::sync::Arc;

fn create_test_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]))
}

fn create_test_json(id: i64, name: &str) -> Vec<u8> {
format!(r#"{{"id": {}, "name": "{}"}}"#, id, name).into_bytes()
}

#[test]
fn test_json_decoder_new() {
let schema = create_test_schema();
let decoder = JsonDecoder::new(schema.clone());
assert_eq!(decoder.schema, schema);
assert!(decoder.cache.is_empty());
}

#[test]
fn test_push_to_buffer() {
let schema = create_test_schema();
let mut decoder = JsonDecoder::new(schema);
let json1 = create_test_json(1, "Alice");
let json2 = create_test_json(2, "Bob");

decoder.push_to_buffer(json1.clone());
decoder.push_to_buffer(json2.clone());

assert_eq!(decoder.cache, vec![json1, json2]);
}

#[test]
fn test_to_record_batch() -> Result<(), denormalized_common::DenormalizedError> {
let schema = create_test_schema();
let mut decoder = JsonDecoder::new(schema.clone());

decoder.push_to_buffer(create_test_json(1, "Alice"));
decoder.push_to_buffer(create_test_json(2, "Bob"));
decoder.push_to_buffer(create_test_json(3, "Charlie"));

let record_batch = decoder.to_record_batch()?;

assert_eq!(record_batch.num_columns(), 2);
assert_eq!(record_batch.num_rows(), 3);

let id_array = record_batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let name_array = record_batch
.column(1)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

assert_eq!(id_array.value(0), 1);
assert_eq!(id_array.value(1), 2);
assert_eq!(id_array.value(2), 3);

assert_eq!(name_array.value(0), "Alice");
assert_eq!(name_array.value(1), "Bob");
assert_eq!(name_array.value(2), "Charlie");

Ok(())
}

#[test]
fn test_to_record_batch_empty() -> Result<(), denormalized_common::DenormalizedError> {
let schema = create_test_schema();
let mut decoder = JsonDecoder::new(schema.clone());

let record_batch = decoder.to_record_batch()?;

assert_eq!(record_batch.num_columns(), 2);
assert_eq!(record_batch.num_rows(), 0);

Ok(())
}

#[test]
fn test_to_record_batch_invalid_json() {
let schema = create_test_schema();
let mut decoder = JsonDecoder::new(schema.clone());

decoder.push_to_buffer(b"{invalid_json}".to_vec());

let result = decoder.to_record_batch();
assert!(result.is_err());
}
}
10 changes: 10 additions & 0 deletions crates/core/src/formats/decoders/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use arrow_array::RecordBatch;
use denormalized_common::DenormalizedError;

pub trait Decoder {
fn push_to_buffer(&mut self, bytes: Vec<u8>);

fn to_record_batch(&mut self) -> Result<RecordBatch, DenormalizedError>;
}

pub mod json;
26 changes: 26 additions & 0 deletions crates/core/src/formats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use std::str::FromStr;

use denormalized_common::DenormalizedError;

#[derive(Debug, Clone, Copy)]
pub enum StreamEncoding {
Avro,
Json,
}

impl FromStr for StreamEncoding {
type Err = DenormalizedError;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_ascii_lowercase().as_str() {
"avro" => Ok(Self::Avro),
"json" => Ok(Self::Json),
_ => Err(Self::Err::KafkaConfig(format!(
"Unrecognised StreamEncoding {}",
s
))),
}
}
}

pub mod decoders;
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ pub mod config_extensions;
pub mod context;
pub mod datasource;
pub mod datastream;
pub mod formats;
pub mod logical_plan;
pub mod physical_optimizer;
pub mod physical_plan;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/utils/arrow_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub fn json_records_to_arrow_record_batch(
if records.is_empty() {
return RecordBatch::new_empty(schema);
}
let string_stream: Vec<String> = records.iter().map(|r| r.to_string()).collect();
let string_stream: Vec<String> = records.iter().map(|r: &JValue| r.to_string()).collect();
let cursor: Cursor<String> = Cursor::new(string_stream.join("\n"));

let mut reader = ReaderBuilder::new(schema).build(cursor).unwrap();
Expand Down

0 comments on commit 64b6e82

Please sign in to comment.