Skip to content

Commit

Permalink
save work
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Sep 12, 2024
1 parent e2a6beb commit 1f08df3
Show file tree
Hide file tree
Showing 15 changed files with 242 additions and 49 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ name = "denormalized"
version = { workspace = true }
edition = { workspace = true }

[features]
default = []
python = []

[dependencies]
denormalized-common = { workspace = true }
denormalized-orchestrator = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ impl DataStream {
self,
right: impl Joinable,
join_type: JoinType,
left_cols: &[String],
right_cols: &[String],
left_cols: &[&str],
right_cols: &[&str],
filter: Option<Expr>,
) -> Result<Self> {
let (session_state, plan) = self.df.as_ref().clone().into_parts();
Expand Down
5 changes: 3 additions & 2 deletions py-denormalized/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ name = "denormalized_python"
crate-type = ["cdylib"]

[dependencies]
pyo3 = { workspace = true, features = ["experimental-async", "chrono"] }
pyo3 = { workspace = true, features = ["extension-module", "abi3", "abi3-py311"] }
chrono = { workspace = true }
denormalized = { workspace = true }
denormalized = { workspace = true, features = ["python"] }
datafusion = { workspace = true, features = [
"pyarrow",
"avro",
"unicode_expressions",
] }
datafusion-python = { path = "../../datafusion-python/" }
tokio = { workspace = true }
1 change: 1 addition & 0 deletions py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ classifiers = [
dynamic = ["version"]
dependencies = [
"pyarrow>=17.0.0",
"datafusion>=40.1.0",
]

[project.optional-dependencies]
Expand Down
16 changes: 16 additions & 0 deletions py-denormalized/python/denormalized/context.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,24 @@
from denormalized._internal import PyContext

from denormalized.datastream import DataStream as DataStream

class Context:
"""Context."""

def __init__(self) -> None:
"""__init__."""
self.ctx = PyContext()

def __repr__(self):
return self.ctx.__repr__()

def __str__(self):
return self.ctx.__str__()

def from_topic(self, topic: str, sample_json: str, bootstrap_servers: str) -> DataStream:
"""Create a new context from a topic."""
py_ds = self.ctx.from_topic(topic, sample_json, bootstrap_servers)
ds = DataStream(py_ds)

return ds

6 changes: 6 additions & 0 deletions py-denormalized/python/denormalized/datastream.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ def __init__(self, ds: PyDataStream) -> None:
"""__init__."""
self.ds = ds

def __repr__(self):
return self.ds.__repr__()

def __str__(self):
return self.ds.__str__()

def schema(self) -> pa.Schema:
"""Schema."""
return self.ds.schema()
16 changes: 14 additions & 2 deletions py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json

from denormalized import Context
from denormalized import Context, DataStream
from datafusion import Expr

sample_event = {
"occurred_at_ms": 100,
Expand All @@ -9,5 +10,16 @@
}

ctx = Context()
print(ctx)
# ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092")

ds = ctx.from_topic("temperature", json.dumps(sample_event), "localhost:9092")
# expr = Expr.literal(4)
# print(expr)

# print(ds.schema())


from denormalized._internal import PyContext

# ctx_internal = PyContext()
# ctx_internal.foo()
5 changes: 5 additions & 0 deletions py-denormalized/requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
-e file:.
asttokens==2.4.1
# via stack-data
datafusion==40.1.0
# via denormalized
decorator==5.1.1
# via ipython
executing==2.0.1
Expand Down Expand Up @@ -41,6 +43,7 @@ ptyprocess==0.7.0
pure-eval==0.2.3
# via stack-data
pyarrow==17.0.0
# via datafusion
# via denormalized
pygments==2.18.0
# via ipython
Expand All @@ -52,5 +55,7 @@ stack-data==0.6.3
traitlets==5.14.3
# via ipython
# via matplotlib-inline
typing-extensions==4.12.2
# via datafusion
wcwidth==0.2.13
# via prompt-toolkit
5 changes: 5 additions & 0 deletions py-denormalized/requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
# universal: false

-e file:.
datafusion==40.1.0
# via denormalized
numpy==2.1.0
# via pyarrow
pyarrow==17.0.0
# via datafusion
# via denormalized
typing-extensions==4.12.2
# via datafusion
66 changes: 42 additions & 24 deletions py-denormalized/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,14 @@ use std::sync::Arc;

use denormalized::context::Context;
use denormalized::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
use denormalized::datastream::DataStream;
use denormalized::physical_plan::utils::time::TimestampUnit;

use tokio::task::JoinHandle;

use crate::datastream::PyDataStream;
use crate::errors::py_denormalized_err;
use crate::utils::{get_tokio_runtime, wait_for_future};

#[pyclass(module = "denormalized", subclass)]
#[derive(Clone)]
Expand All @@ -22,40 +27,53 @@ impl PyContext {
/// creates a new PyDataFrame
#[new]
pub fn new() -> PyResult<Self> {
if let Ok(ds) = Context::new() {
Ok(Self {
context: Arc::new(ds),
})
} else {
Err(PyValueError::new_err("Failed to create new PyContext"))
}
Ok(Self {
context: Arc::new(Context::new()?),
})
}

fn foo(&self, _py: Python) -> PyResult<String> {
println!("Fooooo");
Ok("foo wtf".to_string())
}


fn __repr__(&self, _py: Python) -> PyResult<String> {
Ok("PyContext".to_string())
Ok("__repr__ PyContext".to_string())
}

#[pyo3(signature = (topic, sample_json, bootstrap_servers))]
pub async fn from_topic(
fn __str__(&self, _py: Python) -> PyResult<String> {
Ok("__str__ PyContext".to_string())
}

pub fn from_topic(
&self,
topic: String,
sample_json: String,
bootstrap_servers: String,
py: Python,
) -> PyResult<PyDataStream> {
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic = topic_builder
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(topic)
.infer_schema_from_json(sample_json.as_str())?
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;

let ds = self.context.from_topic(source_topic).await.unwrap();
let context = self.context.clone();
let rt = &get_tokio_runtime(py).0;
let fut: JoinHandle<denormalized::common::error::Result<DataStream>> =
rt.spawn(async move {
let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic = topic_builder
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(topic)
.infer_schema_from_json(sample_json.as_str())?
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "latest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;

context.from_topic(source_topic).await
});

let ds = wait_for_future(py, fut).map_err(py_denormalized_err)??;

Ok(PyDataStream::new(ds))
}
Expand Down
28 changes: 18 additions & 10 deletions py-denormalized/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use datafusion::prelude::Expr;

use denormalized::datastream::DataStream;

use crate::errors::py_denormalized_err;
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion_python::expr::{join::PyJoinType, PyExpr};

Expand All @@ -27,7 +28,11 @@ impl PyDataStream {
#[pymethods]
impl PyDataStream {
fn __repr__(&self, _py: Python) -> PyResult<String> {
Ok("PyDataStream".to_string())
Ok("__repr__ PyDataStream".to_string())
}

fn __str__(&self, _py: Python) -> PyResult<String> {
Ok("__str__ PyDataStream".to_string())
}

fn schema(&self) -> PyArrowType<Schema> {
Expand All @@ -36,12 +41,8 @@ impl PyDataStream {

pub fn select(&self, expr_list: Vec<PyExpr>) -> PyResult<Self> {
let expr_list: Vec<_> = expr_list.into_iter().map(|e: PyExpr| e.expr).collect();
let ds = self
.ds
.as_ref()
.clone()
.select(expr_list)
.map_err(PyErr::from)?;

let ds = self.ds.as_ref().clone().select(expr_list)?;
Ok(Self::new(ds))
}

Expand Down Expand Up @@ -72,6 +73,9 @@ impl PyDataStream {

let filter = filter.map(|f| f.into());

let left_cols = left_cols.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();
let right_cols = right_cols.iter().map(|s| s.as_ref()).collect::<Vec<&str>>();

let ds = self.ds.as_ref().clone().join(
right_ds,
join_type.into(),
Expand Down Expand Up @@ -113,7 +117,11 @@ impl PyDataStream {
Ok(Self::new(ds))
}

pub async fn print_stream(&self) -> PyResult<()> {
pub fn print_expr(&self, expr: PyExpr) -> () {
println!("{:?}", expr);
}

pub fn print_stream(&self) -> PyResult<()> {
// Implement the method using the original Rust code
todo!()
}
Expand All @@ -128,12 +136,12 @@ impl PyDataStream {
todo!()
}

pub async fn print_physical_plan(&self) -> PyResult<Self> {
pub fn print_physical_plan(&self) -> PyResult<Self> {
// Implement the method using the original Rust code
todo!()
}

pub async fn sink_kafka(&self, bootstrap_servers: String, topic: String) -> PyResult<()> {
pub fn sink_kafka(&self, bootstrap_servers: String, topic: String) -> PyResult<()> {
// Implement the method using the original Rust code
todo!()
}
Expand Down
Loading

0 comments on commit 1f08df3

Please sign in to comment.