diff --git a/Cargo.lock b/Cargo.lock index 9e3655f..7a1ce9e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1079,12 +1079,6 @@ dependencies = [ "bincode", "chrono", "datafusion", - "datafusion-common", - "datafusion-execution", - "datafusion-expr", - "datafusion-physical-expr", - "datafusion-physical-optimizer", - "datafusion-physical-plan", "futures", "half", "itertools 0.13.0", @@ -1104,11 +1098,6 @@ dependencies = [ "arrow", "arrow-schema", "datafusion", - "datafusion-common", - "datafusion-expr", - "datafusion-functions", - "datafusion-functions-aggregate", - "datafusion-physical-expr", "df-streams-core", "futures", "rand", @@ -1117,6 +1106,7 @@ dependencies = [ "serde_json", "tempfile", "tokio", + "tokio-postgres", "tracing", "tracing-log", "tracing-subscriber", @@ -1161,6 +1151,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fastrand" version = "2.1.0" @@ -1369,6 +1365,15 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest", +] + [[package]] name = "humantime" version = "2.1.0" @@ -1711,6 +1716,18 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4569e456d394deccd22ce1c1913e6ea0e54519f577285001215d33557431afe4" +dependencies = [ + "hermit-abi", + "libc", + "wasi", + "windows-sys", +] + [[package]] name = "nom" version = "7.1.3" @@ -1905,7 +1922,7 @@ checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.5.3", "smallvec", "windows-targets", ] @@ -2033,6 +2050,37 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231b230927b5e4ad203db57bbcbee2802f6bce620b1e4a9024a07d94e2907ec" +[[package]] +name = "postgres-protocol" +version = "0.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acda0ebdebc28befa84bee35e651e4c5f09073d668c7aed4cf7e23c3cda84b23" +dependencies = [ + "base64", + "byteorder", + "bytes", + "fallible-iterator", + "hmac", + "md-5", + "memchr", + "rand", + "sha2", + "stringprep", +] + +[[package]] +name = "postgres-types" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02048d9e032fb3cc3413bbf7b83a15d84a5d419778e2628751896d856498eee9" +dependencies = [ + "bytes", + "fallible-iterator", + "postgres-protocol", + "serde", + "serde_json", +] + [[package]] name = "ppv-lite86" version = "0.2.18" @@ -2136,6 +2184,15 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "redox_syscall" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.5.3" @@ -2376,6 +2433,16 @@ version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b6b67fb9a61334225b5b790716f609cd58395f895b3fe8b328786812a40bc3b" +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "sqlparser" version = "0.49.0" @@ -2403,6 +2470,17 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strum" version = "0.25.0" @@ -2557,9 +2635,13 @@ checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" dependencies = [ "backtrace", "bytes", + "libc", + "mio", "parking_lot", "pin-project-lite", + "socket2", "tokio-macros", + "windows-sys", ] [[package]] @@ -2573,6 +2655,32 @@ dependencies = [ "syn 2.0.72", ] +[[package]] +name = "tokio-postgres" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03adcf0147e203b6032c0b2d30be1415ba03bc348901f3ff1cc0df6a733e60c3" +dependencies = [ + "async-trait", + "byteorder", + "bytes", + "fallible-iterator", + "futures-channel", + "futures-util", + "log", + "parking_lot", + "percent-encoding", + "phf", + "pin-project-lite", + "postgres-protocol", + "postgres-types", + "rand", + "socket2", + "tokio", + "tokio-util", + "whoami", +] + [[package]] name = "tokio-util" version = "0.7.11" @@ -2717,6 +2825,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-properties" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4259d9d4425d9f0661581b804cb85fe66a4c631cadd8f490d1c13a35d5d9291" + [[package]] name = "unicode-segmentation" version = "1.11.0" @@ -2784,6 +2898,12 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.92" @@ -2848,6 +2968,17 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "whoami" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44ab49fad634e88f55bf8f9bb3abd2f27d7204172a112c7c9987e01c1c94ea9" +dependencies = [ + "redox_syscall 0.4.1", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 34cd93d..38ca59c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,18 +17,7 @@ description = "Embeddable stream processing engine" [workspace.dependencies] df-streams-core = { path = "crates/core" } - -datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-common = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-expr = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-functions = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-functions-aggregate = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-optimizer = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-physical-expr = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-physical-plan = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-physical-optimizer = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-sql = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } -datafusion-execution = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } +datafusion = "40.0" arrow = { version = "52.0.0", features = ["prettyprint"] } arrow-array = { version = "52.0.0", default-features = false, features = [ @@ -59,3 +48,6 @@ serde_json = "1" base64 = "0.22.1" chrono = { version = "0.4.38", default-features = false } itertools = "0.13" + +[patch.crates-io] +datafusion = { git = "https://github.com/probably-nothing-labs/arrow-datafusion" } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 0fd1203..f031124 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -5,12 +5,6 @@ edition = { workspace = true } [dependencies] datafusion = { workspace = true } -datafusion-expr = { workspace = true } -datafusion-common = { workspace = true } -datafusion-execution = { workspace = true } -datafusion-physical-expr = { workspace = true } -datafusion-physical-plan = { workspace = true } -datafusion-physical-optimizer = { workspace = true } arrow = { workspace = true } arrow-schema = { workspace = true } diff --git a/crates/core/src/accumulators/serializable_accumulator.rs b/crates/core/src/accumulators/serializable_accumulator.rs index 6b43415..0c34626 100644 --- a/crates/core/src/accumulators/serializable_accumulator.rs +++ b/crates/core/src/accumulators/serializable_accumulator.rs @@ -1,7 +1,7 @@ use arrow::array::{Array, ArrayRef}; +use datafusion::common::{Result, ScalarValue}; use datafusion::functions_aggregate::array_agg::ArrayAggAccumulator; -use datafusion_common::{Result, ScalarValue}; -use datafusion_expr::Accumulator; +use datafusion::logical_expr::Accumulator; use serde::{Deserialize, Serialize}; use super::serialize::SerializableScalarValue; @@ -42,7 +42,7 @@ impl SerializableAccumulator for ArrayAggAccumulator { let datatype = if let Some(ScalarValue::List(list)) = state.first() { list.data_type().clone() } else { - return Err(datafusion_common::DataFusionError::Internal( + return Err(datafusion::common::DataFusionError::Internal( "Invalid state for ArrayAggAccumulator".to_string(), )); }; diff --git a/crates/core/src/accumulators/serialize.rs b/crates/core/src/accumulators/serialize.rs index 253d008..92332d2 100644 --- a/crates/core/src/accumulators/serialize.rs +++ b/crates/core/src/accumulators/serialize.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use arrow_array::GenericListArray; use base64::{engine::general_purpose::STANDARD, Engine as _}; -use datafusion_common::ScalarValue; +use datafusion::common::ScalarValue; use arrow::{ buffer::{OffsetBuffer, ScalarBuffer}, @@ -515,7 +515,7 @@ pub fn json_to_scalar(json: &Value) -> Result Result { let config = SessionConfig::new().set( "datafusion.execution.batch_size", - datafusion_common::ScalarValue::UInt64(Some(32)), + datafusion::common::ScalarValue::UInt64(Some(32)), ); let runtime = Arc::new(RuntimeEnv::default()); diff --git a/crates/core/src/dataframe.rs b/crates/core/src/dataframe.rs deleted file mode 100644 index 473d53e..0000000 --- a/crates/core/src/dataframe.rs +++ /dev/null @@ -1,75 +0,0 @@ -#![allow(missing_docs)] - -use async_trait::async_trait; -use std::time::Duration; - -pub use datafusion::dataframe::DataFrame; -use datafusion_common::{DataFusionError, Result}; -use datafusion_execution::SendableRecordBatchStream; -use datafusion_expr::logical_plan::LogicalPlanBuilder; -use crate::logical_plan::StreamingLogicalPlanBuilder; -use datafusion_expr::Expr; - -use futures::StreamExt; - -#[async_trait] -pub trait StreamingDataframe { - type Error; - - fn streaming_window( - self, - group_expr: Vec, - aggr_expr: Vec, - window_length: Duration, - slide: Option, - ) -> Result - where - Self: Sized; - - async fn print_stream(self) -> Result<(), Self::Error> - where - Self: Sized; -} - -#[async_trait] -impl StreamingDataframe for DataFrame { - type Error = DataFusionError; - - fn streaming_window( - self, - group_expr: Vec, - aggr_expr: Vec, - window_length: Duration, - slide: Option, - ) -> Result { - let (session_state, plan) = self.into_parts(); - - let plan = LogicalPlanBuilder::from(plan) - .streaming_window(group_expr, aggr_expr, window_length, slide)? - .build()?; - Ok(DataFrame::new(session_state, plan)) - } - - async fn print_stream(self) -> Result<(), Self::Error> { - let mut stream: SendableRecordBatchStream = self.execute_stream().await?; - loop { - match stream.next().await.transpose() { - Ok(Some(batch)) => { - if batch.num_rows() > 0 { - println!( - "{}", - arrow::util::pretty::pretty_format_batches(&[batch]).unwrap() - ); - } - } - Ok(None) => { - log::warn!("No RecordBatch in stream"); - } - Err(err) => { - log::error!("Error reading stream: {:?}", err); - return Err(err); - } - } - } - } -} diff --git a/crates/core/src/datasource/kafka/kafka_config.rs b/crates/core/src/datasource/kafka/kafka_config.rs index 34d7377..bd9f728 100644 --- a/crates/core/src/datasource/kafka/kafka_config.rs +++ b/crates/core/src/datasource/kafka/kafka_config.rs @@ -4,8 +4,8 @@ use std::{sync::Arc, time::Duration}; use arrow_schema::{DataType, Field, Fields, Schema, SchemaRef, TimeUnit}; -use datafusion_common::{plan_err, DataFusionError, Result}; -use datafusion_expr::Expr; +use datafusion::common::{plan_err, DataFusionError, Result}; +use datafusion::logical_expr::Expr; use crate::physical_plan::utils::time::TimestampUnit; use crate::utils::arrow_helpers::infer_arrow_schema_from_json_value; diff --git a/crates/core/src/datasource/kafka/kafka_stream_read.rs b/crates/core/src/datasource/kafka/kafka_stream_read.rs index 48d9cd8..9b82bd3 100644 --- a/crates/core/src/datasource/kafka/kafka_stream_read.rs +++ b/crates/core/src/datasource/kafka/kafka_stream_read.rs @@ -17,9 +17,9 @@ use crate::state_backend::rocksdb_backend::get_global_rocksdb; use crate::utils::arrow_helpers::json_records_to_arrow_record_batch; use arrow::compute::{max, min}; -use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; -use datafusion_physical_plan::streaming::PartitionStream; +use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder; +use datafusion::physical_plan::streaming::PartitionStream; use rdkafka::consumer::{Consumer, StreamConsumer}; use rdkafka::{ClientConfig, Message, Timestamp, TopicPartitionList}; diff --git a/crates/core/src/datasource/kafka/topic_reader.rs b/crates/core/src/datasource/kafka/topic_reader.rs index 4a2f54c..884dd14 100644 --- a/crates/core/src/datasource/kafka/topic_reader.rs +++ b/crates/core/src/datasource/kafka/topic_reader.rs @@ -3,11 +3,11 @@ use std::{any::Any, sync::Arc}; use arrow_schema::{Schema, SchemaRef, SortOptions}; use datafusion::catalog::Session; +use datafusion::common::{not_impl_err, plan_err, Result}; use datafusion::datasource::TableProvider; -use datafusion_common::{not_impl_err, plan_err, Result}; -use datafusion_expr::{Expr, TableType}; -use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr}; -use datafusion_physical_plan::{streaming::StreamingTableExec, ExecutionPlan}; +use datafusion::logical_expr::{Expr, TableType}; +use datafusion::physical_expr::{expressions, LexOrdering, PhysicalSortExpr}; +use datafusion::physical_plan::{streaming::StreamingTableExec, ExecutionPlan}; use super::{KafkaReadConfig, KafkaStreamRead}; diff --git a/crates/core/src/datasource/kafka/topic_writer.rs b/crates/core/src/datasource/kafka/topic_writer.rs index 70a8f6c..77d57cf 100644 --- a/crates/core/src/datasource/kafka/topic_writer.rs +++ b/crates/core/src/datasource/kafka/topic_writer.rs @@ -7,15 +7,15 @@ use std::{any::Any, sync::Arc}; use arrow_schema::SchemaRef; use datafusion::catalog::Session; +use datafusion::common::{not_impl_err, Result}; use datafusion::datasource::TableProvider; +use datafusion::execution::TaskContext; +use datafusion::logical_expr::{Expr, TableType}; use datafusion::physical_plan::{ insert::{DataSink, DataSinkExec}, - DisplayAs, DisplayFormatType, SendableRecordBatchStream, + metrics::MetricsSet, + DisplayAs, DisplayFormatType, ExecutionPlan, SendableRecordBatchStream, }; -use datafusion_common::{not_impl_err, Result}; -use datafusion_execution::TaskContext; -use datafusion_expr::{Expr, TableType}; -use datafusion_physical_plan::{metrics::MetricsSet, ExecutionPlan}; use rdkafka::producer::FutureProducer; use rdkafka::producer::FutureRecord; diff --git a/crates/core/src/datastream.rs b/crates/core/src/datastream.rs index c50f25d..067acdf 100644 --- a/crates/core/src/datastream.rs +++ b/crates/core/src/datastream.rs @@ -1,12 +1,11 @@ use futures::StreamExt; use std::{sync::Arc, time::Duration}; +use datafusion::common::{DataFusionError, Result}; pub use datafusion::dataframe::DataFrame; use datafusion::dataframe::DataFrameWriteOptions; -use datafusion_common::{DataFusionError, Result}; -use datafusion_execution::SendableRecordBatchStream; -use datafusion_expr::logical_plan::LogicalPlanBuilder; -use datafusion_expr::Expr; +use datafusion::execution::SendableRecordBatchStream; +use datafusion::logical_expr::{logical_plan::LogicalPlanBuilder, Expr}; use crate::context::Context; use crate::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; @@ -15,7 +14,7 @@ use crate::physical_plan::utils::time::TimestampUnit; #[derive(Clone)] pub struct DataStream { - pub(crate) df: Arc, + pub df: Arc, pub(crate) context: Arc, } diff --git a/crates/core/src/logical_plan/mod.rs b/crates/core/src/logical_plan/mod.rs index 0f31476..1325e80 100644 --- a/crates/core/src/logical_plan/mod.rs +++ b/crates/core/src/logical_plan/mod.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use std::time::Duration; -use datafusion_common::Result; +use datafusion::common::Result; -use datafusion_expr::builder::add_group_by_exprs_from_dependencies; -use datafusion_expr::expr_rewriter::normalize_cols; -use datafusion_expr::logical_plan::{Extension, LogicalPlan}; -use datafusion_expr::LogicalPlanBuilder; -use datafusion_expr::{Aggregate, Expr}; +use datafusion::logical_expr::builder::add_group_by_exprs_from_dependencies; +use datafusion::logical_expr::expr_rewriter::normalize_cols; +use datafusion::logical_expr::logical_plan::{Extension, LogicalPlan}; +use datafusion::logical_expr::LogicalPlanBuilder; +use datafusion::logical_expr::{Aggregate, Expr}; pub mod streaming_window; use streaming_window::{StreamingWindowPlanNode, StreamingWindowSchema, StreamingWindowType}; diff --git a/crates/core/src/logical_plan/streaming_window.rs b/crates/core/src/logical_plan/streaming_window.rs index 420b4ca..8bb2a93 100644 --- a/crates/core/src/logical_plan/streaming_window.rs +++ b/crates/core/src/logical_plan/streaming_window.rs @@ -6,9 +6,9 @@ use std::time::Duration; use arrow::datatypes::{DataType, Field, SchemaBuilder, TimeUnit}; +use datafusion::common::{DFSchema, DFSchemaRef, Result}; +use datafusion::logical_expr::{Aggregate, Expr}; use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNodeCore}; -use datafusion_common::{DFSchema, DFSchemaRef, Result}; -use datafusion_expr::{Aggregate, Expr}; //TODO: Avoid use of Aggregate here as we need to clone the internal expressions back and forth. #[derive(PartialEq, Eq, Hash)] diff --git a/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs b/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs index 5b409a0..97809bf 100644 --- a/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs +++ b/crates/core/src/physical_optimizer/coalesce_before_streaming_window_aggregate.rs @@ -1,9 +1,9 @@ use std::sync::Arc; -use datafusion_physical_expr::Partitioning; -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use datafusion_physical_plan::repartition::RepartitionExec; -use datafusion_physical_plan::ExecutionPlanProperties; +use datafusion::physical_expr::Partitioning; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::repartition::RepartitionExec; +use datafusion::physical_plan::ExecutionPlanProperties; use datafusion::common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion::error::Result; @@ -33,7 +33,7 @@ impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate { fn optimize( &self, plan: Arc, - config: &datafusion_common::config::ConfigOptions, + config: &datafusion::common::config::ConfigOptions, ) -> Result> { plan.transform(|original| { if let Some(streaming_aggr_exec) = @@ -41,9 +41,9 @@ impl PhysicalOptimizerRule for CoaslesceBeforeStreamingAggregate { { let input = streaming_aggr_exec.input(); let partitions = match input.output_partitioning() { - datafusion_physical_expr::Partitioning::RoundRobinBatch(size) => size, - datafusion_physical_expr::Partitioning::Hash(_, size) => size, - datafusion_physical_expr::Partitioning::UnknownPartitioning(size) => size, + datafusion::physical_expr::Partitioning::RoundRobinBatch(size) => size, + datafusion::physical_expr::Partitioning::Hash(_, size) => size, + datafusion::physical_expr::Partitioning::UnknownPartitioning(size) => size, }; if *partitions == 1 { return Ok(Transformed::no(original)); diff --git a/crates/core/src/physical_plan/streaming_window.rs b/crates/core/src/physical_plan/streaming_window.rs index 5b8f10e..0eeb0b2 100644 --- a/crates/core/src/physical_plan/streaming_window.rs +++ b/crates/core/src/physical_plan/streaming_window.rs @@ -19,27 +19,24 @@ use arrow_array::{ use arrow_ord::cmp; use arrow_schema::{DataType, Field, Schema, SchemaBuilder, SchemaRef, TimeUnit}; -use datafusion_common::{ +use datafusion::common::{ downcast_value, internal_err, stats::Precision, DataFusionError, Statistics, }; -use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{ +use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; +use datafusion::physical_expr::{ equivalence::{collapse_lex_req, ProjectionMapping}, expressions::UnKnownColumn, AggregateExpr, Partitioning, PhysicalExpr, PhysicalSortRequirement, }; -use datafusion_physical_plan::windows::get_ordered_partition_by_indices; -use datafusion_physical_plan::{ +use datafusion::physical_plan::{ aggregates::{ aggregate_expressions, finalize_aggregation, get_finer_aggregate_exprs_requirement, AggregateMode, PhysicalGroupBy, }, - InputOrderMode, -}; -use datafusion_physical_plan::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, + windows::get_ordered_partition_by_indices, DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, ExecutionPlanProperties, - PlanProperties, + InputOrderMode, PlanProperties, }; use futures::{Stream, StreamExt}; use tracing::debug; @@ -79,7 +76,7 @@ impl DisplayAs for FranzWindowFrame { } } -use datafusion_common::Result; +use datafusion::common::Result; impl FranzWindowFrame { pub fn new( @@ -473,7 +470,7 @@ impl ExecutionPlan for FranzStreamingWindowExec { fn repartitioned( &self, _target_partitions: usize, - _config: &datafusion_common::config::ConfigOptions, + _config: &datafusion::common::config::ConfigOptions, ) -> Result>> { Ok(None) } diff --git a/crates/core/src/physical_plan/utils/accumulators.rs b/crates/core/src/physical_plan/utils/accumulators.rs index 772d283..caf938c 100644 --- a/crates/core/src/physical_plan/utils/accumulators.rs +++ b/crates/core/src/physical_plan/utils/accumulators.rs @@ -1,9 +1,10 @@ use std::sync::Arc; -use datafusion_expr::Accumulator; -use datafusion_physical_expr::AggregateExpr; +use datafusion::common::Result; +use datafusion::logical_expr::Accumulator; +use datafusion::physical_expr::AggregateExpr; + pub(crate) type AccumulatorItem = Box; -use datafusion_common::Result; pub(crate) fn create_accumulators( aggr_expr: &[Arc], diff --git a/crates/core/src/physical_plan/utils/mod.rs b/crates/core/src/physical_plan/utils/mod.rs index a2f772c..c691d87 100644 --- a/crates/core/src/physical_plan/utils/mod.rs +++ b/crates/core/src/physical_plan/utils/mod.rs @@ -1,6 +1,6 @@ use std::result; -use datafusion_common::DataFusionError; +use datafusion::common::DataFusionError; pub mod accumulators; pub mod time; diff --git a/crates/core/src/physical_plan/utils/time.rs b/crates/core/src/physical_plan/utils/time.rs index 4035ce0..0a97542 100644 --- a/crates/core/src/physical_plan/utils/time.rs +++ b/crates/core/src/physical_plan/utils/time.rs @@ -1,4 +1,4 @@ -use std::time::SystemTime; +use std::time::{Duration, SystemTime}; use arrow::{ compute::{max, min}, @@ -9,8 +9,7 @@ use arrow_array::{ TimestampMillisecondArray, }; use chrono::NaiveDateTime; -use datafusion_common::DataFusionError; -use std::time::Duration; +use datafusion::common::DataFusionError; #[derive(Debug, Clone)] pub enum TimestampUnit { diff --git a/crates/core/src/planner/streaming_window.rs b/crates/core/src/planner/streaming_window.rs index 9a5239e..34ec2d2 100644 --- a/crates/core/src/planner/streaming_window.rs +++ b/crates/core/src/planner/streaming_window.rs @@ -1,19 +1,20 @@ use async_trait::async_trait; -use datafusion_common::{internal_err, DFSchema}; -use datafusion_expr::Expr; -use datafusion_physical_expr::create_physical_expr; -use datafusion_physical_plan::aggregates::PhysicalGroupBy; use itertools::multiunzip; use std::sync::Arc; +use datafusion::common::{internal_err, DFSchema}; use datafusion::error::Result; use datafusion::execution::context::SessionState; +use datafusion::logical_expr::Expr; use datafusion::logical_expr::{LogicalPlan, UserDefinedLogicalNode}; -use datafusion::physical_plan::aggregates::AggregateMode; +use datafusion::physical_expr::create_physical_expr; +use datafusion::physical_plan::{ + aggregates::{AggregateMode, PhysicalGroupBy}, + ExecutionPlan, +}; use datafusion::physical_planner::{ create_aggregate_expr_and_maybe_filter, ExtensionPlanner, PhysicalPlanner, }; -use datafusion_physical_plan::ExecutionPlan; use crate::logical_plan::streaming_window::{StreamingWindowPlanNode, StreamingWindowType}; use crate::physical_plan::streaming_window::{FranzStreamingWindowExec, FranzStreamingWindowType}; diff --git a/crates/core/src/state_backend/rocksdb_backend.rs b/crates/core/src/state_backend/rocksdb_backend.rs index 69204e0..ae43ae0 100644 --- a/crates/core/src/state_backend/rocksdb_backend.rs +++ b/crates/core/src/state_backend/rocksdb_backend.rs @@ -1,11 +1,13 @@ -use std::{env, sync::OnceLock}; +use std::{ + env, + sync::{Arc, OnceLock}, +}; -use datafusion_common::DataFusionError; +use datafusion::common::DataFusionError; use log::debug; use rocksdb::{ BoundColumnFamily, ColumnFamilyDescriptor, DBWithThreadMode, MultiThreaded, Options, DB, }; -use std::sync::Arc; pub struct RocksDBBackend { db: DBWithThreadMode, diff --git a/crates/core/src/utils/row_encoder.rs b/crates/core/src/utils/row_encoder.rs index fea5a8f..6b168f1 100644 --- a/crates/core/src/utils/row_encoder.rs +++ b/crates/core/src/utils/row_encoder.rs @@ -1,6 +1,6 @@ use arrow::json::writer::{JsonFormat, Writer}; use datafusion::arrow::record_batch::RecordBatch; -use datafusion_common::Result; +use datafusion::common::Result; pub trait RowEncoder { fn encode(&self, batch: &RecordBatch) -> Result>>; diff --git a/examples/Cargo.toml b/examples/Cargo.toml index a94c834..2ed7ae9 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -6,11 +6,6 @@ publish = false [dependencies] datafusion = { workspace = true } -datafusion-common = { workspace = true } -datafusion-expr = { workspace = true } -datafusion-functions = { workspace = true } -datafusion-functions-aggregate = { workspace = true } -datafusion-physical-expr = { workspace = true } df-streams-core = { workspace = true } @@ -26,3 +21,4 @@ tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] } tempfile = { version = "3" } rdkafka = { workspace = true } rand = "0.8.5" +tokio-postgres = { version = "0.7.11", features = ["with-serde_json-1"] } diff --git a/examples/examples/csv_streaming.rs b/examples/examples/csv_streaming.rs index a0d01c4..aee299f 100644 --- a/examples/examples/csv_streaming.rs +++ b/examples/examples/csv_streaming.rs @@ -2,8 +2,8 @@ use datafusion::common::test_util::datafusion_test_data; use datafusion::dataframe::DataFrameWriteOptions; use datafusion::datasource::MemTable; use datafusion::error::Result; +use datafusion::logical_expr::{col, max, min}; use datafusion::prelude::*; -use datafusion_expr::{col, max, min}; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results with streaming aggregation and streaming window diff --git a/examples/examples/kafka_rideshare.rs b/examples/examples/kafka_rideshare.rs index e155ddf..eb858f0 100644 --- a/examples/examples/kafka_rideshare.rs +++ b/examples/examples/kafka_rideshare.rs @@ -2,9 +2,9 @@ #![allow(unused_variables)] use datafusion::error::Result; -use datafusion_expr::{col, max, min}; -use datafusion_functions::core::expr_ext::FieldAccessor; -use datafusion_functions_aggregate::count::count; +use datafusion::functions::core::expr_ext::FieldAccessor; +use datafusion::functions_aggregate::count::count; +use datafusion::logical_expr::{col, max, min}; use df_streams_core::context::Context; use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; diff --git a/examples/examples/simple_aggregation.rs b/examples/examples/simple_aggregation.rs index 3ce7878..257b9fb 100644 --- a/examples/examples/simple_aggregation.rs +++ b/examples/examples/simple_aggregation.rs @@ -2,7 +2,7 @@ use std::time::Duration; use datafusion::error::Result; use datafusion::functions_aggregate::average::avg; -use datafusion_expr::{col, max, min}; +use datafusion::logical_expr::{col, max, min}; use df_streams_core::context::Context; use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; diff --git a/examples/examples/stream_join.rs b/examples/examples/stream_join.rs index 75a69f8..223f26c 100644 --- a/examples/examples/stream_join.rs +++ b/examples/examples/stream_join.rs @@ -3,9 +3,9 @@ #![allow(unused_imports)] use datafusion::error::Result; -use datafusion_expr::{col, max, min}; -use datafusion_functions::core::expr_ext::FieldAccessor; -use datafusion_functions_aggregate::count::count; +use datafusion::functions::core::expr_ext::FieldAccessor; +use datafusion::functions_aggregate::count::count; +use datafusion::logical_expr::{col, max, min}; use df_streams_core::context::Context; use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};