From 27d49cde254d3fd7fc4f7809d8e41d7e4ebcd49e Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Fri, 25 Oct 2024 16:28:57 +0530 Subject: [PATCH 1/8] Adding config option for checkpointing --- README.md | 7 ++-- crates/core/src/context.rs | 24 +++++++---- .../src/datasource/kafka/kafka_stream_read.rs | 27 +++++++----- crates/core/src/datastream.rs | 19 ++++++--- .../continuous/grouped_window_agg_stream.rs | 41 ++++++++++++------- .../continuous/streaming_window.rs | 25 +++++++---- crates/orchestrator/src/orchestrator.rs | 2 - examples/examples/simple_aggregation.rs | 6 ++- 8 files changed, 99 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index eaf2b0d..3ba7348 100644 --- a/README.md +++ b/README.md @@ -96,11 +96,12 @@ Details about developing the python bindings can be found in [py-denormalized/RE ### Checkpointing -We use SlateDB for state backend. Initialize your Job Context to a path to local directory - +We use SlateDB for state backend. Initialize your Job Context with a custom config and a path for SlateDB backend to store state - ``` - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + let config = Context::default_config().set_bool("denormalized_config.checkpoint", true); + let ctx = Context::with_config(config)? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg/job1")) .await; ``` diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index 3419720..f3b078d 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -6,6 +6,7 @@ use datafusion::execution::{ session_state::SessionStateBuilder, }; +use crate::config_extensions::denormalized_config::DenormalizedConfig; use crate::datasource::kafka::TopicReader; use crate::datastream::DataStream; use crate::physical_optimizer::EnsureHashPartititionOnGroupByForStreamingAggregates; @@ -17,12 +18,13 @@ use denormalized_common::error::{DenormalizedError, Result}; #[derive(Clone)] pub struct Context { - pub session_conext: Arc, + pub session_context: Arc, } impl Context { - pub fn new() -> Result { - let config = SessionConfig::new() + pub fn default_config() -> SessionConfig { + let ext_config = DenormalizedConfig::default(); + let mut config = SessionConfig::new() .set( "datafusion.execution.batch_size", &datafusion::common::ScalarValue::UInt64(Some(32)), @@ -34,8 +36,16 @@ impl Context { &datafusion::common::ScalarValue::Boolean(Some(false)), ); - let runtime = Arc::new(RuntimeEnv::default()); + let _ = config.options_mut().extensions.insert(ext_config); + config + } + pub fn new() -> Result { + Context::with_config(Context::default_config()) + } + + pub fn with_config(config: SessionConfig) -> Result { + let runtime = Arc::new(RuntimeEnv::default()); let state = SessionStateBuilder::new() .with_default_features() .with_config(config) @@ -48,7 +58,7 @@ impl Context { .build(); Ok(Self { - session_conext: Arc::new(SessionContext::new_with_state(state)), + session_context: Arc::new(SessionContext::new_with_state(state)), }) } @@ -56,7 +66,7 @@ impl Context { let topic_name = topic.0.topic.clone(); self.register_table(topic_name.clone(), Arc::new(topic)) .await?; - let df = self.session_conext.table(topic_name.as_str()).await?; + let df = self.session_context.table(topic_name.as_str()).await?; let ds = DataStream::new(Arc::new(df), Arc::new(self.clone())); Ok(ds) } @@ -66,7 +76,7 @@ impl Context { name: String, table: Arc, ) -> Result<(), DenormalizedError> { - self.session_conext + self.session_context .register_table(name.as_str(), table.clone())?; Ok(()) diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 26dc488..25bec46 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -7,7 +7,7 @@ use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, Str use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; use crossbeam::channel; use denormalized_orchestrator::channel_manager::{create_channel, get_sender, take_receiver}; -use denormalized_orchestrator::orchestrator::{self, OrchestrationMessage}; +use denormalized_orchestrator::orchestrator::{OrchestrationMessage}; use futures::executor::block_on; use log::{debug, error}; use serde::{Deserialize, Serialize}; @@ -83,13 +83,13 @@ impl PartitionStream for KafkaStreamRead { } fn execute(&self, ctx: Arc) -> SendableRecordBatchStream { - let _config_options = ctx + let config_options = ctx .session_config() .options() .extensions .get::(); - let mut should_checkpoint = false; //config_options.map_or(false, |c| c.checkpoint); + let should_checkpoint = config_options.map_or(false, |c| c.checkpoint); let node_id = self.exec_node_id.unwrap(); let partition_tag = self @@ -101,13 +101,16 @@ impl PartitionStream for KafkaStreamRead { let channel_tag = format!("{}_{}", node_id, partition_tag); let mut serialized_state: Option> = None; - let state_backend = get_global_slatedb().unwrap(); + let mut state_backend = None; let mut starting_offsets: HashMap = HashMap::new(); - if orchestrator::SHOULD_CHECKPOINT { + + if should_checkpoint { create_channel(channel_tag.as_str(), 10); + let backend = get_global_slatedb().unwrap(); debug!("checking for last checkpointed offsets"); - serialized_state = block_on(state_backend.clone().get(channel_tag.as_bytes().to_vec())); + serialized_state = block_on(backend.get(channel_tag.as_bytes().to_vec())); + state_backend = Some(backend); } if let Some(serialized_state) = serialized_state { @@ -151,25 +154,26 @@ impl PartitionStream for KafkaStreamRead { builder.spawn(async move { let mut epoch = 0; let mut receiver: Option> = None; - if orchestrator::SHOULD_CHECKPOINT { + if should_checkpoint { let orchestrator_sender = get_sender("orchestrator"); let msg: OrchestrationMessage = OrchestrationMessage::RegisterStream(channel_tag.clone()); orchestrator_sender.as_ref().unwrap().send(msg).unwrap(); receiver = take_receiver(channel_tag.as_str()); } + let mut checkpoint_batch = false; loop { //let mut checkpoint_barrier: Option = None; let mut _checkpoint_barrier: Option = None; - if orchestrator::SHOULD_CHECKPOINT { + if should_checkpoint { let r = receiver.as_ref().unwrap(); for message in r.try_iter() { debug!("received checkpoint barrier for {:?}", message); if let OrchestrationMessage::CheckpointBarrier(epoch_ts) = message { epoch = epoch_ts; - should_checkpoint = true; + checkpoint_batch = true; } } } @@ -245,7 +249,7 @@ impl PartitionStream for KafkaStreamRead { let tx_result = tx.send(Ok(timestamped_record_batch)).await; match tx_result { Ok(_) => { - if should_checkpoint { + if checkpoint_batch { debug!("about to checkpoint offsets"); let off = BatchReadMetadata { epoch, @@ -255,9 +259,10 @@ impl PartitionStream for KafkaStreamRead { }; let _ = state_backend .as_ref() + .unwrap() .put(channel_tag.as_bytes().to_vec(), off.to_bytes().unwrap()); debug!("checkpointed offsets {:?}", off); - should_checkpoint = false; + checkpoint_batch = false; } } Err(err) => error!("result err {:?}. shutdown signal detected.", err), diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 8b83ec4..3598e6b 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -1,7 +1,6 @@ use datafusion::common::runtime::SpawnedTask; use datafusion::logical_expr::LogicalPlan; use datafusion::physical_plan::ExecutionPlanProperties; -use denormalized_orchestrator::orchestrator; use futures::StreamExt; use log::debug; use log::info; @@ -18,6 +17,7 @@ use datafusion::logical_expr::{ }; use datafusion::physical_plan::display::DisplayableExecutionPlan; +use crate::config_extensions::denormalized_config::DenormalizedConfig; use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; use crate::logical_plan::StreamingLogicalPlanBuilder; @@ -231,7 +231,12 @@ impl DataStream { let mut maybe_orchestrator_handle = None; - if orchestrator::SHOULD_CHECKPOINT { + let config = self.context.session_context.copied_config(); + let config_options = config.options().extensions.get::(); + + let should_checkpoint = config_options.map_or(false, |c| c.checkpoint); + + if should_checkpoint { let mut orchestrator = Orchestrator::default(); let cloned_shutdown_rx = self.shutdown_rx.clone(); let orchestrator_handle = @@ -277,10 +282,12 @@ impl DataStream { log::info!("Stream processing stopped. Cleaning up..."); - let state_backend = get_global_slatedb(); - if let Ok(db) = state_backend { - log::info!("Closing the state backend (slatedb)..."); - db.close().await.unwrap(); + if should_checkpoint { + let state_backend = get_global_slatedb(); + if let Ok(db) = state_backend { + log::info!("Closing the state backend (slatedb)..."); + db.close().await.unwrap(); + } } // Join the orchestrator handle if it exists, ensuring it is joined and awaited diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 4114584..fb2f144 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -38,13 +38,14 @@ use datafusion::{ use denormalized_orchestrator::{ channel_manager::take_receiver, - orchestrator::{self, OrchestrationMessage}, + orchestrator::{OrchestrationMessage}, }; use futures::{executor::block_on, Stream, StreamExt}; use log::debug; use serde::{Deserialize, Serialize}; use crate::{ + config_extensions::denormalized_config::DenormalizedConfig, physical_plan::utils::time::RecordBatchWatermark, state_backend::slatedb::{get_global_slatedb, SlateDBWrapper}, utils::serialization::ArrayContainer, @@ -73,11 +74,11 @@ pub struct GroupedWindowAggStream { group_by: PhysicalGroupBy, group_schema: Arc, context: Arc, - epoch: i64, + checkpoint: bool, partition: usize, channel_tag: String, receiver: Option>, - state_backend: Arc, + state_backend: Option>, } #[derive(Serialize, Deserialize)] @@ -147,11 +148,23 @@ impl GroupedWindowAggStream { .and_then(|tag| take_receiver(tag.as_str())); let channel_tag: String = channel_tag.unwrap_or(String::from("")); - let state_backend = get_global_slatedb().unwrap(); - let serialized_state = block_on(state_backend.get(channel_tag.as_bytes().to_vec())); + let config_options = context + .session_config() + .options() + .extensions + .get::(); + + let checkpoint = config_options.map_or(false, |c| c.checkpoint); + + let mut serialized_state: Option> = None; + let mut state_backend = None; + if checkpoint { + let backend = get_global_slatedb().unwrap(); + serialized_state = block_on(backend.get(channel_tag.as_bytes().to_vec())); + state_backend = Some(backend); + } - //let window_frames: BTreeMap = BTreeMap::new(); let mut stream = Self { schema: agg_schema, input, @@ -166,7 +179,7 @@ impl GroupedWindowAggStream { group_by, group_schema, context, - epoch: 0, + checkpoint, partition, channel_tag: channel_tag, receiver, @@ -340,19 +353,19 @@ impl GroupedWindowAggStream { return Poll::Pending; } }; - self.epoch += 1; - if orchestrator::SHOULD_CHECKPOINT { + let mut checkpoint_batch = false; + + if self.checkpoint { let r = self.receiver.as_ref().unwrap(); - let mut epoch: u128 = 0; for message in r.try_iter() { debug!("received checkpoint barrier for {:?}", message); - if let OrchestrationMessage::CheckpointBarrier(epoch_ts) = message { - epoch = epoch_ts; + if let OrchestrationMessage::CheckpointBarrier(_epoch_ts) = message { + checkpoint_batch = true; } } - if epoch != 0 { + if checkpoint_batch { // Prepare data for checkpointing // Clone or extract necessary data @@ -400,7 +413,7 @@ impl GroupedWindowAggStream { let key = self.channel_tag.as_bytes().to_vec(); // Clone or use `Arc` for `state_backend` - let state_backend = self.state_backend.clone(); + let state_backend = self.state_backend.clone().unwrap(); state_backend.put(key, serialized_checkpoint); } diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index e70261b..9db92f9 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -40,16 +40,19 @@ use datafusion::{ }; use denormalized_orchestrator::{ channel_manager::{create_channel, get_sender}, - orchestrator::{self, OrchestrationMessage}, + orchestrator::{OrchestrationMessage}, }; use futures::{Stream, StreamExt}; use tracing::debug; -use crate::physical_plan::{ - continuous::grouped_window_agg_stream::GroupedWindowAggStream, - utils::{ - accumulators::{create_accumulators, AccumulatorItem}, - time::{system_time_from_epoch, RecordBatchWatermark}, +use crate::{ + config_extensions::denormalized_config::DenormalizedConfig, + physical_plan::{ + continuous::grouped_window_agg_stream::GroupedWindowAggStream, + utils::{ + accumulators::{create_accumulators, AccumulatorItem}, + time::{system_time_from_epoch, RecordBatchWatermark}, + }, }, }; @@ -427,7 +430,15 @@ impl ExecutionPlan for StreamingWindowExec { .node_id() .expect("expected node id to be set."); - let channel_tag = if orchestrator::SHOULD_CHECKPOINT { + let config_options = context + .session_config() + .options() + .extensions + .get::(); + + let checkpoint = config_options.map_or(false, |c| c.checkpoint); + + let channel_tag = if checkpoint { let tag = format!("{}_{}", node_id, partition); create_channel(tag.as_str(), 10); let orchestrator_sender = get_sender("orchestrator"); diff --git a/crates/orchestrator/src/orchestrator.rs b/crates/orchestrator/src/orchestrator.rs index 8529848..ae9c1f0 100644 --- a/crates/orchestrator/src/orchestrator.rs +++ b/crates/orchestrator/src/orchestrator.rs @@ -20,8 +20,6 @@ pub struct Orchestrator { senders: HashMap>, } -pub const SHOULD_CHECKPOINT: bool = true; // THIS WILL BE MOVED INTO CONFIG - /** * 1. Keep track of checkpoint per source. * 2. Tell each downstream which checkpoints it needs to know. diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 60c3221..f77c6c4 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -19,9 +19,11 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + let config = Context::default_config().set_bool("denormalized_config.checkpoint", true); + let ctx = Context::with_config(config)? + .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg/job1")) .await; + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic From a9afe5b72aab4f86f9980529c2d9cb64a28465bc Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 29 Oct 2024 16:24:15 -0700 Subject: [PATCH 2/8] Add maturin build step for ci (#52) --- .github/workflows/python.yml | 181 ++++++++++++++++++ Cargo.lock | 141 +++----------- Cargo.toml | 6 +- crates/core/Cargo.toml | 7 +- crates/core/src/state_backend/mod.rs | 2 +- .../python/examples/stream_aggregate.py | 5 +- py-denormalized/src/lib.rs | 2 +- 7 files changed, 224 insertions(+), 120 deletions(-) create mode 100644 .github/workflows/python.yml diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml new file mode 100644 index 0000000..898cba6 --- /dev/null +++ b/.github/workflows/python.yml @@ -0,0 +1,181 @@ +# This file is autogenerated by maturin v1.7.4 +# To update, run +# +# maturin generate-ci -m py-denormalized/Cargo.toml github +# +name: Python + +on: + push: + branches: + - main + - master + tags: + - "*" + pull_request: + workflow_dispatch: + +permissions: + contents: read + +jobs: + linux: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: ubuntu-latest + target: x86_64 + - runner: ubuntu-latest + target: aarch64 + - runner: ubuntu-latest + target: s390x + - runner: ubuntu-latest + target: ppc64le + steps: + # - name: Run sccache-cache + # uses: mozilla-actions/sccache-action@v0.0.6 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --zig --out dist --manifest-path py-denormalized/Cargo.toml + # sccache: 'true' + manylinux: 2_28 + # - name: Upload wheels + # uses: actions/upload-artifact@v4 + # with: + # name: wheels-linux-${{ matrix.platform.target }} + # path: dist + + musllinux: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: ubuntu-latest + target: x86_64 + - runner: ubuntu-latest + target: aarch64 + # - runner: ubuntu-latest + # target: armv7 + steps: + # - name: Run sccache-cache + # uses: mozilla-actions/sccache-action@v0.0.6 + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --zig --out dist --manifest-path py-denormalized/Cargo.toml + # sccache: 'true' + manylinux: musllinux_1_2 + rust-toolchain: 1.81.0 + + # - name: Upload wheels + # uses: actions/upload-artifact@v4 + # with: + # name: wheels-musllinux-${{ matrix.platform.target }} + # path: dist + + # windows: + # runs-on: ${{ matrix.platform.runner }} + # strategy: + # matrix: + # platform: + # - runner: windows-latest + # target: x64 + # - runner: windows-latest + # target: x86 + # steps: + # - uses: actions/checkout@v4 + # - uses: actions/setup-python@v5 + # with: + # python-version: 3.x + # architecture: ${{ matrix.platform.target }} + # - name: Build wheels + # uses: PyO3/maturin-action@v1 + # with: + # target: ${{ matrix.platform.target }} + # args: --release --out dist --manifest-path py-denormalized/Cargo.toml + # # sccache: 'true' + # # - name: Upload wheels + # # uses: actions/upload-artifact@v4 + # # with: + # # name: wheels-windows-${{ matrix.platform.target }} + # # path: dist + + macos: + runs-on: ${{ matrix.platform.runner }} + strategy: + matrix: + platform: + - runner: macos-12 + target: x86_64 + - runner: macos-14 + target: aarch64 + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-python@v5 + with: + python-version: 3.x + - name: Build wheels + uses: PyO3/maturin-action@v1 + with: + target: ${{ matrix.platform.target }} + args: --release --out dist --manifest-path py-denormalized/Cargo.toml + # sccache: 'true' + # - name: Upload wheels + # uses: actions/upload-artifact@v4 + # with: + # name: wheels-macos-${{ matrix.platform.target }} + # path: dist + + sdist: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Build sdist + uses: PyO3/maturin-action@v1 + with: + command: sdist + args: --out dist --manifest-path py-denormalized/Cargo.toml + # - name: Upload sdist + # uses: actions/upload-artifact@v4 + # with: + # name: wheels-sdist + # path: dist + + # release: + # name: Release + # runs-on: ubuntu-latest + # if: ${{ startsWith(github.ref, 'refs/tags/') || github.event_name == 'workflow_dispatch' }} + # needs: [linux, musllinux, windows, macos, sdist] + # permissions: + # # Use to sign the release artifacts + # id-token: write + # # Used to upload release artifacts + # contents: write + # # Used to generate artifact attestation + # attestations: write + # steps: + # - uses: actions/download-artifact@v4 + # - name: Generate artifact attestation + # uses: actions/attest-build-provenance@v1 + # with: + # subject-path: 'wheels-*/*' + # - name: Publish to PyPI + # if: "startsWith(github.ref, 'refs/tags/')" + # uses: PyO3/maturin-action@v1 + # env: + # MATURIN_PYPI_TOKEN: ${{ secrets.PYPI_API_TOKEN }} + # with: + # command: upload + # args: --non-interactive --skip-existing wheels-*/* diff --git a/Cargo.lock b/Cargo.lock index a62eb7c..ab6bf26 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -612,26 +612,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bindgen" -version = "0.69.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "271383c67ccabffb7381723dea0672a673f292304fcb45c01cc648c7a8d58088" -dependencies = [ - "bitflags 2.6.0", - "cexpr", - "clang-sys", - "itertools 0.12.1", - "lazy_static", - "lazycell", - "proc-macro2", - "quote", - "regex", - "rustc-hash 1.1.0", - "shlex", - "syn 2.0.79", -] - [[package]] name = "bitflags" version = "1.3.2" @@ -752,15 +732,6 @@ dependencies = [ "shlex", ] -[[package]] -name = "cexpr" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6fac387a98bb7c37292057cffc56d62ecb629900026402633ae9160df93a8766" -dependencies = [ - "nom", -] - [[package]] name = "cfg-if" version = "1.0.0" @@ -803,17 +774,6 @@ dependencies = [ "phf_codegen", ] -[[package]] -name = "clang-sys" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b023947811758c97c59bf9d1c188fd619ad4718dcaa767947df1cadb14f39f4" -dependencies = [ - "glob", - "libc", - "libloading", -] - [[package]] name = "clap" version = "4.5.20" @@ -854,6 +814,15 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" +[[package]] +name = "cmake" +version = "0.1.51" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb1e43aa7fd152b1f968787f7dbcdeb306d1867ff373c69955211876c053f91a" +dependencies = [ + "cc", +] + [[package]] name = "cmsketch" version = "0.2.1" @@ -1203,7 +1172,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap 2.6.0", - "itertools 0.13.0", + "itertools", "log", "num-traits", "num_cpus", @@ -1338,7 +1307,7 @@ dependencies = [ "datafusion-expr", "hashbrown 0.14.5", "hex", - "itertools 0.13.0", + "itertools", "log", "md-5", "rand", @@ -1397,7 +1366,7 @@ dependencies = [ "datafusion-functions", "datafusion-functions-aggregate", "datafusion-physical-expr-common", - "itertools 0.13.0", + "itertools", "log", "paste", "rand", @@ -1427,7 +1396,7 @@ dependencies = [ "datafusion-physical-expr", "hashbrown 0.14.5", "indexmap 2.6.0", - "itertools 0.13.0", + "itertools", "log", "paste", "regex-syntax 0.8.5", @@ -1457,7 +1426,7 @@ dependencies = [ "hashbrown 0.14.5", "hex", "indexmap 2.6.0", - "itertools 0.13.0", + "itertools", "log", "paste", "petgraph", @@ -1487,7 +1456,7 @@ dependencies = [ "datafusion-execution", "datafusion-physical-expr", "datafusion-physical-plan", - "itertools 0.13.0", + "itertools", ] [[package]] @@ -1515,7 +1484,7 @@ dependencies = [ "half", "hashbrown 0.14.5", "indexmap 2.6.0", - "itertools 0.13.0", + "itertools", "log", "once_cell", "parking_lot", @@ -1591,6 +1560,7 @@ dependencies = [ "bincode", "bytes", "chrono", + "cmake", "crossbeam", "datafusion", "delegate", @@ -1600,11 +1570,10 @@ dependencies = [ "futures", "half", "hashbrown 0.14.5", - "itertools 0.13.0", + "itertools", "log", "object_store", "rdkafka", - "rocksdb", "serde", "serde_json", "slatedb", @@ -1896,7 +1865,7 @@ dependencies = [ "fastrace", "futures", "hashbrown 0.14.5", - "itertools 0.13.0", + "itertools", "madsim-tokio", "metrics", "parking_lot", @@ -1911,7 +1880,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "01dbfd6763227809019a1dc01c98b6a78949c63d3a204d0a6f8e0325f077ab5c" dependencies = [ "foyer-common", - "itertools 0.13.0", + "itertools", ] [[package]] @@ -1928,7 +1897,7 @@ dependencies = [ "foyer-intrusive", "futures", "hashbrown 0.14.5", - "itertools 0.13.0", + "itertools", "madsim-tokio", "parking_lot", "pin-project", @@ -1959,7 +1928,7 @@ dependencies = [ "foyer-memory", "fs4", "futures", - "itertools 0.13.0", + "itertools", "libc", "lz4", "madsim-tokio", @@ -2414,15 +2383,6 @@ version = "1.70.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf" -[[package]] -name = "itertools" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.13.0" @@ -2462,12 +2422,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" -[[package]] -name = "lazycell" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" - [[package]] name = "lexical-core" version = "1.0.2" @@ -2562,16 +2516,6 @@ dependencies = [ "rle-decode-fast", ] -[[package]] -name = "libloading" -version = "0.8.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" -dependencies = [ - "cfg-if", - "windows-targets", -] - [[package]] name = "libm" version = "0.2.8" @@ -2588,22 +2532,6 @@ dependencies = [ "libc", ] -[[package]] -name = "librocksdb-sys" -version = "0.16.0+8.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce3d60bc059831dc1c83903fb45c103f75db65c5a7bf22272764d9cc683e348c" -dependencies = [ - "bindgen", - "bzip2-sys", - "cc", - "glob", - "libc", - "libz-sys", - "lz4-sys", - "zstd-sys", -] - [[package]] name = "libz-sys" version = "1.1.20" @@ -3019,7 +2947,7 @@ dependencies = [ "futures", "humantime", "hyper", - "itertools 0.13.0", + "itertools", "md-5", "parking_lot", "percent-encoding", @@ -3315,7 +3243,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e9552f850d5f0964a4e4d0bf306459ac29323ddfbae05e35a7c0d35cb0803cc5" dependencies = [ "anyhow", - "itertools 0.13.0", + "itertools", "proc-macro2", "quote", "syn 2.0.79", @@ -3434,7 +3362,7 @@ dependencies = [ "pin-project-lite", "quinn-proto", "quinn-udp", - "rustc-hash 2.0.0", + "rustc-hash", "rustls", "socket2", "thiserror", @@ -3451,7 +3379,7 @@ dependencies = [ "bytes", "rand", "ring", - "rustc-hash 2.0.0", + "rustc-hash", "rustls", "slab", "thiserror", @@ -3562,6 +3490,7 @@ version = "4.7.0+2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" dependencies = [ + "cmake", "libc", "libz-sys", "num_enum", @@ -3693,16 +3622,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" -[[package]] -name = "rocksdb" -version = "0.22.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bd13e55d6d7b8cd0ea569161127567cd587676c99f4472f779a0279aa60a7a7" -dependencies = [ - "libc", - "librocksdb-sys", -] - [[package]] name = "rtrb" version = "0.3.1" @@ -3715,12 +3634,6 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" -[[package]] -name = "rustc-hash" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" - [[package]] name = "rustc-hash" version = "2.0.0" diff --git a/Cargo.toml b/Cargo.toml index 4b6da32..f3900f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,10 @@ [workspace] resolver = "2" -members = ["crates/*", "py-denormalized", "examples"] +members = [ + "crates/*", + "examples", + "py-denormalized", +] [workspace.package] authors = [ diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 9bdbd30..ce4d5c6 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -21,7 +21,7 @@ arrow-ord = { workspace = true } apache-avro = { workspace = true } base64 = { workspace = true } -rdkafka = { workspace = true } +rdkafka = { workspace = true, features = ["cmake-build"] } futures = { workspace = true } async-trait = { workspace = true } tracing = { workspace = true } @@ -31,7 +31,7 @@ log = { workspace = true } chrono = { workspace = true } itertools = { workspace = true } serde.workspace = true -rocksdb = "0.22.0" +# rocksdb = "0.22.0" bincode = "1.3.3" half = "2.4.1" delegate = "0.12.0" @@ -42,3 +42,6 @@ crossbeam = "0.8.4" slatedb = { workspace = true } # "0.2.0" object_store = "0.11.0" bytes = "1.7.2" + +[build-dependencies] +cmake = "0.1" diff --git a/crates/core/src/state_backend/mod.rs b/crates/core/src/state_backend/mod.rs index e64865d..90aba83 100644 --- a/crates/core/src/state_backend/mod.rs +++ b/crates/core/src/state_backend/mod.rs @@ -1,2 +1,2 @@ -pub mod rocksdb_backend; +// pub mod rocksdb_backend; pub mod slatedb; diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index 7c1c548..99c769d 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -3,6 +3,7 @@ import json import signal import sys +import pprint as pp from denormalized import Context from denormalized.datafusion import col @@ -27,7 +28,7 @@ def signal_handler(sig, frame): def print_batch(rb): - print(rb) + pp.pprint(rb.to_pydict()) ctx = Context() @@ -41,6 +42,8 @@ def print_batch(rb): f.min(col("reading")).alias("min"), f.max(col("reading")).alias("max"), f.avg(col("reading")).alias("average"), + f.median(col("reading")).alias("median"), + f.stddev(col("reading")).alias("stddev"), ], 1000, None, diff --git a/py-denormalized/src/lib.rs b/py-denormalized/src/lib.rs index fd92024..a040639 100644 --- a/py-denormalized/src/lib.rs +++ b/py-denormalized/src/lib.rs @@ -12,7 +12,7 @@ pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// A Python module implemented in Rust. #[pymodule] -fn _internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { +fn _py_denormalized_internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; From 245f9b216f9ddb5d1f7f349bb52e1a71dc4ef0fd Mon Sep 17 00:00:00 2001 From: Matt Green Date: Wed, 30 Oct 2024 11:26:26 -0700 Subject: [PATCH 3/8] fix: correct python module name --- py-denormalized/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-denormalized/src/lib.rs b/py-denormalized/src/lib.rs index a040639..ddb4734 100644 --- a/py-denormalized/src/lib.rs +++ b/py-denormalized/src/lib.rs @@ -11,7 +11,7 @@ pub mod utils; pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// A Python module implemented in Rust. -#[pymodule] +#[pymodule(name="_internal")] fn _py_denormalized_internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; From ee5dbc1864ef7169cdf7cf984727c22fa7b5d392 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Thu, 7 Nov 2024 10:50:03 -0800 Subject: [PATCH 4/8] Fixing the streaming join example (#54) * Fixing the streaming join example * format * add drop_columns * update python internal package name --------- Co-authored-by: Matt Green --- crates/core/src/datastream.rs | 9 ++++ examples/examples/stream_join.rs | 41 ++++++++++++------- py-denormalized/pyproject.toml | 2 +- .../python/denormalized/context.py | 3 +- .../python/denormalized/data_stream.py | 2 +- .../denormalized/datafusion/__init__.py | 2 +- .../python/denormalized/datafusion/catalog.py | 2 +- .../python/denormalized/datafusion/common.py | 2 +- .../python/denormalized/datafusion/context.py | 12 +++--- .../denormalized/datafusion/dataframe.py | 4 +- .../python/denormalized/datafusion/expr.py | 6 +-- .../denormalized/datafusion/functions.py | 2 +- .../denormalized/datafusion/object_store.py | 2 +- .../denormalized/datafusion/record_batch.py | 2 +- .../python/denormalized/datafusion/udf.py | 2 +- .../python/denormalized/feast_data_stream.py | 2 +- py-denormalized/python/denormalized/utils.py | 2 +- py-denormalized/src/lib.rs | 2 +- 18 files changed, 59 insertions(+), 40 deletions(-) diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index 3598e6b..ea86b72 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -113,6 +113,15 @@ impl DataStream { }) } + pub fn drop_columns(self, columns: &[&str]) -> Result { + Ok(Self { + df: Arc::new(self.df.as_ref().clone().drop_columns(columns)?), + context: self.context.clone(), + shutdown_tx: self.shutdown_tx.clone(), + shutdown_rx: self.shutdown_rx.clone(), + }) + } + // Join two streams using the specified expression pub fn join_on( self, diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index ac7dae4..5c6c672 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -17,7 +17,10 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()?; + let ctx = Context::new()? + .with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1")) + .await; + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic_builder = topic_builder @@ -29,7 +32,7 @@ async fn main() -> Result<()> { .clone() .with_topic(String::from("temperature")) .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?; @@ -40,30 +43,38 @@ async fn main() -> Result<()> { .clone() .with_topic(String::from("humidity")) .build_reader(ConnectionOpts::from([ - ("auto.offset.reset".to_string(), "earliest".to_string()), + ("auto.offset.reset".to_string(), "latest".to_string()), ("group.id".to_string(), "sample_pipeline".to_string()), ])) .await?, ) - .await?; + .await? + .with_column("humidity_sensor", col("sensor_name"))? + .drop_columns(&["sensor_name"])? + .window( + vec![col("humidity_sensor")], + vec![avg(col("reading")).alias("avg_humidity")], + Duration::from_millis(1_000), + None, + )? + .with_column("humidity_window_start_time", col("window_start_time"))? + .with_column("humidity_window_end_time", col("window_end_time"))? + .drop_columns(&["window_start_time", "window_end_time"])?; let joined_ds = ctx .from_topic(temperature_topic) .await? + .window( + vec![col("sensor_name")], + vec![avg(col("reading")).alias("avg_temperature")], + Duration::from_millis(1_000), + None, + )? .join( humidity_ds, JoinType::Inner, - &["sensor_name"], - &["sensor_name"], - None, - )? - .window( - vec![], - vec![ - avg(col("temperature.reading")).alias("avg_temperature"), - avg(col("humidity.reading")).alias("avg_humidity"), - ], - Duration::from_millis(1_000), + &["sensor_name", "window_start_time"], + &["humidity_sensor", "humidity_window_start_time"], None, )?; diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 171eae2..c6c5baf 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -20,7 +20,7 @@ feast = ["feast"] [tool.maturin] python-source = "python" features = ["pyo3/extension-module"] -module-name = "denormalized._internal" +module-name = "denormalized._d_internal" [tool.rye] dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"] diff --git a/py-denormalized/python/denormalized/context.py b/py-denormalized/python/denormalized/context.py index 462ce21..1b3c5df 100644 --- a/py-denormalized/python/denormalized/context.py +++ b/py-denormalized/python/denormalized/context.py @@ -1,4 +1,4 @@ -from denormalized._internal import PyContext +from denormalized._d_internal import PyContext from .data_stream import DataStream class Context: @@ -20,4 +20,3 @@ def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> Da ds = DataStream(py_ds) return ds - diff --git a/py-denormalized/python/denormalized/data_stream.py b/py-denormalized/python/denormalized/data_stream.py index 418c520..a2a5805 100644 --- a/py-denormalized/python/denormalized/data_stream.py +++ b/py-denormalized/python/denormalized/data_stream.py @@ -1,5 +1,5 @@ import pyarrow as pa -from denormalized._internal import PyDataStream +from denormalized._d_internal import PyDataStream from denormalized.datafusion import Expr from denormalized.utils import to_internal_expr, to_internal_exprs diff --git a/py-denormalized/python/denormalized/datafusion/__init__.py b/py-denormalized/python/denormalized/datafusion/__init__.py index 7419ad7..715a94d 100644 --- a/py-denormalized/python/denormalized/datafusion/__init__.py +++ b/py-denormalized/python/denormalized/datafusion/__init__.py @@ -36,7 +36,7 @@ from .catalog import Catalog, Database, Table # The following imports are okay to remain as opaque to the user. -from denormalized._internal import Config, LogicalPlan, ExecutionPlan, runtime +from denormalized._d_internal import Config, LogicalPlan, ExecutionPlan, runtime from .record_batch import RecordBatchStream, RecordBatch diff --git a/py-denormalized/python/denormalized/datafusion/catalog.py b/py-denormalized/python/denormalized/datafusion/catalog.py index d8c9092..faa5058 100644 --- a/py-denormalized/python/denormalized/datafusion/catalog.py +++ b/py-denormalized/python/denormalized/datafusion/catalog.py @@ -19,7 +19,7 @@ from __future__ import annotations -import denormalized._internal as df_internal +import denormalized._d_internal as df_internal from typing import TYPE_CHECKING diff --git a/py-denormalized/python/denormalized/datafusion/common.py b/py-denormalized/python/denormalized/datafusion/common.py index 73ed7c4..082b979 100644 --- a/py-denormalized/python/denormalized/datafusion/common.py +++ b/py-denormalized/python/denormalized/datafusion/common.py @@ -16,7 +16,7 @@ # under the License. """Common data types used throughout the DataFusion project.""" -from denormalized._internal import common as common_internal +from denormalized._d_internal import common as common_internal from enum import Enum # TODO these should all have proper wrapper classes diff --git a/py-denormalized/python/denormalized/datafusion/context.py b/py-denormalized/python/denormalized/datafusion/context.py index 19c0760..f3eed51 100644 --- a/py-denormalized/python/denormalized/datafusion/context.py +++ b/py-denormalized/python/denormalized/datafusion/context.py @@ -19,13 +19,13 @@ from __future__ import annotations -from denormalized._internal import SessionConfig as SessionConfigInternal -from denormalized._internal import RuntimeConfig as RuntimeConfigInternal -from denormalized._internal import SQLOptions as SQLOptionsInternal -from denormalized._internal import SessionContext as SessionContextInternal -from denormalized._internal import LogicalPlan, ExecutionPlan +from denormalized._d_internal import SessionConfig as SessionConfigInternal +from denormalized._d_internal import RuntimeConfig as RuntimeConfigInternal +from denormalized._d_internal import SQLOptions as SQLOptionsInternal +from denormalized._d_internal import SessionContext as SessionContextInternal +from denormalized._d_internal import LogicalPlan, ExecutionPlan -from denormalized._internal import AggregateUDF +from denormalized._d_internal import AggregateUDF from denormalized.datafusion.catalog import Catalog, Table from denormalized.datafusion.dataframe import DataFrame from denormalized.datafusion.expr import Expr, SortExpr, sort_list_to_raw_sort_list diff --git a/py-denormalized/python/denormalized/datafusion/dataframe.py b/py-denormalized/python/denormalized/datafusion/dataframe.py index 4a50545..63b41e2 100644 --- a/py-denormalized/python/denormalized/datafusion/dataframe.py +++ b/py-denormalized/python/denormalized/datafusion/dataframe.py @@ -32,9 +32,9 @@ import pathlib from typing import Callable -from denormalized._internal import DataFrame as DataFrameInternal +from denormalized._d_internal import DataFrame as DataFrameInternal from denormalized.datafusion.expr import Expr, SortExpr, sort_or_default -from denormalized._internal import ( +from denormalized._d_internal import ( LogicalPlan, ExecutionPlan, ) diff --git a/py-denormalized/python/denormalized/datafusion/expr.py b/py-denormalized/python/denormalized/datafusion/expr.py index a858a66..69f2505 100644 --- a/py-denormalized/python/denormalized/datafusion/expr.py +++ b/py-denormalized/python/denormalized/datafusion/expr.py @@ -28,9 +28,9 @@ from denormalized.datafusion.common import DataTypeMap, NullTreatment, RexType from typing_extensions import deprecated -from denormalized._internal import LogicalPlan -from denormalized._internal import expr as expr_internal -from denormalized._internal import functions as functions_internal +from denormalized._d_internal import LogicalPlan +from denormalized._d_internal import expr as expr_internal +from denormalized._d_internal import functions as functions_internal # The following are imported from the internal representation. We may choose to # give these all proper wrappers, or to simply leave as is. These were added diff --git a/py-denormalized/python/denormalized/datafusion/functions.py b/py-denormalized/python/denormalized/datafusion/functions.py index 291c578..d564672 100644 --- a/py-denormalized/python/denormalized/datafusion/functions.py +++ b/py-denormalized/python/denormalized/datafusion/functions.py @@ -18,7 +18,7 @@ from __future__ import annotations -from denormalized._internal import functions as f +from denormalized._d_internal import functions as f from denormalized.datafusion.expr import ( CaseBuilder, Expr, diff --git a/py-denormalized/python/denormalized/datafusion/object_store.py b/py-denormalized/python/denormalized/datafusion/object_store.py index 3a3371e..54610c0 100644 --- a/py-denormalized/python/denormalized/datafusion/object_store.py +++ b/py-denormalized/python/denormalized/datafusion/object_store.py @@ -16,7 +16,7 @@ # under the License. """Object store functionality.""" -from denormalized._internal import object_store +from denormalized._d_internal import object_store AmazonS3 = object_store.AmazonS3 GoogleCloud = object_store.GoogleCloud diff --git a/py-denormalized/python/denormalized/datafusion/record_batch.py b/py-denormalized/python/denormalized/datafusion/record_batch.py index e0e436e..7f7b7ef 100644 --- a/py-denormalized/python/denormalized/datafusion/record_batch.py +++ b/py-denormalized/python/denormalized/datafusion/record_batch.py @@ -27,7 +27,7 @@ if TYPE_CHECKING: import pyarrow - import denormalized._internal as df_internal + import denormalized._d_internal as df_internal import typing_extensions diff --git a/py-denormalized/python/denormalized/datafusion/udf.py b/py-denormalized/python/denormalized/datafusion/udf.py index c1d45f9..c7f4d26 100644 --- a/py-denormalized/python/denormalized/datafusion/udf.py +++ b/py-denormalized/python/denormalized/datafusion/udf.py @@ -19,7 +19,7 @@ from __future__ import annotations -import denormalized._internal as df_internal +import denormalized._d_internal as df_internal from datafusion.expr import Expr from typing import Callable, TYPE_CHECKING, TypeVar from abc import ABCMeta, abstractmethod diff --git a/py-denormalized/python/denormalized/feast_data_stream.py b/py-denormalized/python/denormalized/feast_data_stream.py index 252c2bb..e289b8c 100644 --- a/py-denormalized/python/denormalized/feast_data_stream.py +++ b/py-denormalized/python/denormalized/feast_data_stream.py @@ -2,7 +2,7 @@ from typing import Any, TypeVar, Union, cast, get_type_hints import pyarrow as pa -from denormalized._internal import PyDataStream +from denormalized._d_internal import PyDataStream from denormalized.datafusion import Expr from feast import FeatureStore, Field from feast.data_source import PushMode diff --git a/py-denormalized/python/denormalized/utils.py b/py-denormalized/python/denormalized/utils.py index 13a5dbf..adb8e10 100644 --- a/py-denormalized/python/denormalized/utils.py +++ b/py-denormalized/python/denormalized/utils.py @@ -1,4 +1,4 @@ -from denormalized._internal import expr as internal_exprs +from denormalized._d_internal import expr as internal_exprs from denormalized.datafusion import Expr diff --git a/py-denormalized/src/lib.rs b/py-denormalized/src/lib.rs index ddb4734..230171b 100644 --- a/py-denormalized/src/lib.rs +++ b/py-denormalized/src/lib.rs @@ -11,7 +11,7 @@ pub mod utils; pub(crate) struct TokioRuntime(tokio::runtime::Runtime); /// A Python module implemented in Rust. -#[pymodule(name="_internal")] +#[pymodule(name = "_d_internal")] fn _py_denormalized_internal(py: Python, m: Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; From c1ab725ceedeae818523d2dc0cf9d04863dee227 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Thu, 7 Nov 2024 13:33:09 -0800 Subject: [PATCH 5/8] merge with main --- examples/examples/simple_aggregation.rs | 14 ++++++-------- examples/examples/stream_join.rs | 13 +++++++------ 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index d00df7f..9341529 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -18,11 +18,9 @@ async fn main() -> Result<()> { .init(); let bootstrap_servers = String::from("localhost:9092"); - let config = Context::default_config().set_bool("denormalized_config.checkpoint", true); - let ctx = Context::with_config(config)? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg/job1")) - .await; - + + let config = Context::default_config().set_bool("denormalized_config.checkpoint", false); + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic @@ -37,9 +35,9 @@ async fn main() -> Result<()> { ])) .await?; - let _ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) - .await + let _ctx = Context::with_config(config)? + //.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) + //.await .from_topic(source_topic) .await? .window( diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index 5c6c672..2edc25e 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -17,10 +17,7 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1")) - .await; - + let ctx = Context::new()?; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic_builder = topic_builder @@ -73,8 +70,12 @@ async fn main() -> Result<()> { .join( humidity_ds, JoinType::Inner, - &["sensor_name", "window_start_time"], - &["humidity_sensor", "humidity_window_start_time"], + &["sensor_name", "window_start_time", "window_end_time"], + &[ + "humidity_sensor", + "humidity_window_start_time", + "humidity_window_end_time", + ], None, )?; From 8dee4fc463c99afe754c9ed56917f451404e2381 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Thu, 7 Nov 2024 13:36:30 -0800 Subject: [PATCH 6/8] Adding config option for checkpointing --- examples/examples/simple_aggregation.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 9341529..494f9bc 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -19,8 +19,6 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let config = Context::default_config().set_bool("denormalized_config.checkpoint", false); - let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic @@ -35,7 +33,7 @@ async fn main() -> Result<()> { ])) .await?; - let _ctx = Context::with_config(config)? + let _ctx = Context::new()? //.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) //.await .from_topic(source_topic) From ec28adb602bed0988d4f52e108a4a0184033be6c Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Thu, 7 Nov 2024 13:37:20 -0800 Subject: [PATCH 7/8] merge with main --- examples/examples/simple_aggregation.rs | 4 +++- examples/examples/stream_join.rs | 5 +---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 494f9bc..9341529 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -19,6 +19,8 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); + let config = Context::default_config().set_bool("denormalized_config.checkpoint", false); + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers); // Connect to source topic @@ -33,7 +35,7 @@ async fn main() -> Result<()> { ])) .await?; - let _ctx = Context::new()? + let _ctx = Context::with_config(config)? //.with_slatedb_backend(String::from("/tmp/checkpoints/simple-agg-checkpoint-1")) //.await .from_topic(source_topic) diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index 00444a4..2edc25e 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -17,10 +17,7 @@ async fn main() -> Result<()> { let bootstrap_servers = String::from("localhost:9092"); - let ctx = Context::new()? - .with_slatedb_backend(String::from("/tmp/checkpoints/stream-join-checkpoint-1")) - .await; - + let ctx = Context::new()?; let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); let source_topic_builder = topic_builder From d20e0a09c3ba1b9b5e350d8888af4d545c516420 Mon Sep 17 00:00:00 2001 From: Amey Chaugule Date: Thu, 7 Nov 2024 13:53:39 -0800 Subject: [PATCH 8/8] Cargo fmt --- crates/core/src/datasource/kafka/kafka_stream_read.rs | 2 +- .../src/physical_plan/continuous/grouped_window_agg_stream.rs | 3 +-- crates/core/src/physical_plan/continuous/streaming_window.rs | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 80f0f3d..caed655 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -7,7 +7,7 @@ use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, StringArray, Str use arrow_schema::{DataType, Field, SchemaRef, TimeUnit}; use crossbeam::channel; use denormalized_orchestrator::channel_manager::{create_channel, get_sender, take_receiver}; -use denormalized_orchestrator::orchestrator::{OrchestrationMessage}; +use denormalized_orchestrator::orchestrator::OrchestrationMessage; use futures::executor::block_on; use log::{debug, error}; use serde::{Deserialize, Serialize}; diff --git a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs index 5f455db..0388a23 100644 --- a/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs +++ b/crates/core/src/physical_plan/continuous/grouped_window_agg_stream.rs @@ -37,8 +37,7 @@ use datafusion::{ }; use denormalized_orchestrator::{ - channel_manager::take_receiver, - orchestrator::{OrchestrationMessage}, + channel_manager::take_receiver, orchestrator::OrchestrationMessage, }; use futures::{executor::block_on, Stream, StreamExt}; use log::debug; diff --git a/crates/core/src/physical_plan/continuous/streaming_window.rs b/crates/core/src/physical_plan/continuous/streaming_window.rs index 9db92f9..c6ea378 100644 --- a/crates/core/src/physical_plan/continuous/streaming_window.rs +++ b/crates/core/src/physical_plan/continuous/streaming_window.rs @@ -40,7 +40,7 @@ use datafusion::{ }; use denormalized_orchestrator::{ channel_manager::{create_channel, get_sender}, - orchestrator::{OrchestrationMessage}, + orchestrator::OrchestrationMessage, }; use futures::{Stream, StreamExt}; use tracing::debug;