Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding Kafka Timestamp support when no ts column is provided #63

Merged
merged 6 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ jobs:
strategy:
matrix:
platform:
- runner: macos-12
- runner: macos-14
target: x86_64
- runner: macos-14
target: aarch64
Expand Down
16 changes: 4 additions & 12 deletions crates/core/src/datasource/kafka/kafka_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct KafkaReadConfig {
pub encoding: StreamEncoding,
pub order: Vec<Vec<SortExpr>>,
pub partition_count: i32,
pub timestamp_column: String,
pub timestamp_unit: TimestampUnit,
pub timestamp_column: Option<String>,
pub timestamp_unit: Option<TimestampUnit>,

pub kafka_connection_opts: ConnectionOpts,
}
Expand Down Expand Up @@ -232,17 +232,9 @@ impl KafkaTopicBuilder {
.as_ref()
.ok_or_else(|| DenormalizedError::KafkaConfig("encoding required".to_string()))?;

let timestamp_column = self
.timestamp_column
.as_ref()
.ok_or_else(|| DenormalizedError::KafkaConfig("timestamp_column required".to_string()))?
.clone();
let timestamp_column = self.timestamp_column.clone();

let timestamp_unit = self
.timestamp_unit
.as_ref()
.ok_or_else(|| DenormalizedError::KafkaConfig("timestamp_unit required".to_string()))?
.clone();
let timestamp_unit = self.timestamp_unit.clone();

let mut kafka_connection_opts = ConnectionOpts::new();
for (key, value) in opts.into_iter() {
Expand Down
41 changes: 32 additions & 9 deletions crates/core/src/datasource/kafka/kafka_stream_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::sync::Arc;
use std::time::Duration;

use arrow::datatypes::TimestampMillisecondType;
use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, StructArray};
use arrow_array::{
Array, ArrayRef, Int64Array, PrimitiveArray, RecordBatch, StringArray, StructArray,
};
use arrow_schema::{DataType, Field, SchemaRef, TimeUnit};
use crossbeam::channel;
use denormalized_orchestrator::channel_manager::{create_channel, get_sender, take_receiver};
Expand Down Expand Up @@ -146,8 +148,17 @@ impl PartitionStream for KafkaStreamRead {
let mut builder = RecordBatchReceiverStreamBuilder::new(self.config.schema.clone(), 1);
let tx = builder.tx();
let canonical_schema = self.config.schema.clone();
let timestamp_column: String = self.config.timestamp_column.clone();
let timestamp_unit = self.config.timestamp_unit.clone();
let use_kafka_timestamps = self.config.timestamp_column.is_none();
let timestamp_column = self
.config
.timestamp_column
.clone()
.unwrap_or(String::from(""));
let timestamp_unit = self
.config
.timestamp_unit
.clone()
.unwrap_or(crate::prelude::TimestampUnit::Int64Millis);
let batch_timeout: Duration = Duration::from_millis(100);
let mut decoder = self.config.build_decoder();

Expand Down Expand Up @@ -180,10 +191,13 @@ impl PartitionStream for KafkaStreamRead {

let mut offsets_read: HashMap<i32, i64> = HashMap::new();

let mut timestamps: Vec<i64> = Vec::new();
loop {
match tokio::time::timeout(batch_timeout, consumer.recv()).await {
Ok(Ok(m)) => {
let payload = m.payload().expect("Message payload is empty");
let ts = m.timestamp().to_millis().unwrap_or(-1);
timestamps.push(ts);
decoder.push_to_buffer(payload.to_owned());
offsets_read
.entry(m.partition())
Expand All @@ -207,12 +221,21 @@ impl PartitionStream for KafkaStreamRead {

if !offsets_read.is_empty() {
let record_batch = decoder.to_record_batch().unwrap();
let ts_column = record_batch
.column_by_name(timestamp_column.as_str())
.map(|ts_col| {
Arc::new(array_to_timestamp_array(ts_col, timestamp_unit.clone()))
})
.unwrap();

let kafka_ts_array = Int64Array::from_iter_values(timestamps);
let ts_column_array: Arc<dyn Array> = Arc::new(kafka_ts_array);
let mut ts_column = Arc::new(array_to_timestamp_array(
ts_column_array.as_ref(),
timestamp_unit.clone(),
));

// Timestamp column was provided. TODO: This code is a hack for now we need a little cleanup here.
if !use_kafka_timestamps {
let arr = record_batch
.column_by_name(timestamp_column.as_str())
.unwrap();
ts_column = Arc::new(array_to_timestamp_array(arr, timestamp_unit.clone()));
};

let binary_vec = Vec::from_iter(
std::iter::repeat(String::from("no_barrier")).take(ts_column.len()),
Expand Down
62 changes: 34 additions & 28 deletions crates/core/src/physical_plan/continuous/streaming_window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,6 @@ impl WindowAggStream {
let mut watermark_lock: std::sync::MutexGuard<Option<SystemTime>> =
self.latest_watermark.lock().unwrap();

debug!("latest watermark currently is {:?}", *watermark_lock);
if let Some(current_watermark) = *watermark_lock {
if current_watermark <= watermark.min_timestamp {
*watermark_lock = Some(watermark.min_timestamp)
Expand Down Expand Up @@ -987,34 +986,41 @@ impl FullWindowAggStream {
if self.seen_windows.contains(&start_time)
&& !self.cached_frames.contains_key(&start_time)
{
panic!("we are reopening a window already seen.")
debug!(
"received late data for window with start time {:?}. dropping it.",
start_time
);
Ok(RecordBatch::new_empty(Arc::new(
add_window_columns_to_schema(self.schema.clone()),
)))
} else {
let frame = self.cached_frames.entry(start_time).or_insert({
FullWindowAggFrame::new(
start_time,
batch_end_time.unwrap(),
&self.exec_aggregate_expressions,
self.aggregate_expressions.clone(),
self.filter_expressions.clone(),
self.schema.clone(),
self.baseline_metrics.clone(),
)
});

self.seen_windows.insert(start_time);

//last two columns are timestamp columns, so remove them before pushing them onto a frame.
let col_size = rb.num_columns();
rb.remove_column(col_size - 1);
rb.remove_column(col_size - 2);

frame.aggregate_batch(rb);

self.watermark = self
.watermark
.map_or(Some(start_time), |w| Some(w.max(start_time)));

self.finalize_windows()
}
let frame = self.cached_frames.entry(start_time).or_insert({
FullWindowAggFrame::new(
start_time,
batch_end_time.unwrap(),
&self.exec_aggregate_expressions,
self.aggregate_expressions.clone(),
self.filter_expressions.clone(),
self.schema.clone(),
self.baseline_metrics.clone(),
)
});

self.seen_windows.insert(start_time);

//last two columns are timestamp columns, so remove them before pushing them onto a frame.
let col_size = rb.num_columns();
rb.remove_column(col_size - 1);
rb.remove_column(col_size - 2);

frame.aggregate_batch(rb);

self.watermark = self
.watermark
.map_or(Some(start_time), |w| Some(w.max(start_time)));

self.finalize_windows()
} else {
Ok(RecordBatch::new_empty(Arc::new(
add_window_columns_to_schema(self.schema.clone()),
Expand Down
1 change: 1 addition & 0 deletions py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ description = "Embeddable stream processing engine"
dependencies = [
"pyarrow>=17.0.0",
"datafusion>=40.1.0",
"pip>=24.3.1",
]

[project.optional-dependencies]
Expand Down
39 changes: 16 additions & 23 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,27 @@
class Context:
"""A context manager for handling data stream operations.

This class provides an interface for creating and managing data streams,
particularly for working with Kafka topics and stream processing.

Attributes:
ctx: Internal PyContext instance managing Rust-side operations
This class provides functionality to create and manage data streams
from various sources like Kafka topics.
"""

def __init__(self) -> None:
"""Initialize a new Context instance."""
"""Initializes a new Context instance with PyContext."""
self.ctx = PyContext()

def __repr__(self):
"""Return a string representation of the Context object.
"""Returns the string representation of the PyContext object.

Returns:
str: A detailed string representation of the context
str: String representation of the underlying PyContext.
"""
return self.ctx.__repr__()

def __str__(self):
"""Return a readable string description of the Context object.
"""Returns the string representation of the PyContext object.

Returns:
str: A human-readable string description
str: String representation of the underlying PyContext.
"""
return self.ctx.__str__()

Expand All @@ -37,31 +34,27 @@ def from_topic(
topic: str,
sample_json: str,
bootstrap_servers: str,
timestamp_column: str,
timestamp_column: str | None = None,
group_id: str = "default_group",
) -> DataStream:
"""Create a new DataStream from a Kafka topic.
"""Creates a new DataStream from a Kafka topic.

Args:
topic: Name of the Kafka topic to consume from
sample_json: Sample JSON string representing the expected message format
bootstrap_servers: Comma-separated list of Kafka broker addresses
timestamp_column: Column name containing event timestamps
group_id: Kafka consumer group ID (defaults to "default_group")
topic: The name of the Kafka topic to consume from.
sample_json: A sample JSON string representing the expected message format.
bootstrap_servers: Comma-separated list of Kafka broker addresses.
timestamp_column: Optional column name containing message timestamps.
group_id: Kafka consumer group ID, defaults to "default_group".

Returns:
DataStream: A new DataStream instance configured for the specified topic

Raises:
ValueError: If the topic name is empty or invalid
ConnectionError: If unable to connect to Kafka brokers
DataStream: A new DataStream instance connected to the specified topic.
"""
py_ds = self.ctx.from_topic(
topic,
sample_json,
bootstrap_servers,
timestamp_column,
group_id,
timestamp_column,
)
ds = DataStream(py_ds)
return ds
14 changes: 12 additions & 2 deletions py-denormalized/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,23 +71,33 @@ impl PyContext {
Ok("PyContext".to_string())
}

#[pyo3(signature = (
topic,
sample_json,
bootstrap_servers,
group_id,
timestamp_column = None
))]
pub fn from_topic(
&self,
py: Python,
topic: String,
sample_json: String,
bootstrap_servers: String,
timestamp_column: String,
group_id: String,
timestamp_column: Option<String>,
) -> PyResult<PyDataStream> {
let context = self.context.clone();
let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<DataStream>> =
rt.spawn(async move {
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

if let Some(ts_col) = timestamp_column {
topic_builder.with_timestamp(ts_col, TimestampUnit::Int64Millis);
}

let source_topic = topic_builder
.with_timestamp(timestamp_column, TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(topic)
.infer_schema_from_json(sample_json.as_str())?
Expand Down
2 changes: 2 additions & 0 deletions py-denormalized/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.