Skip to content

Commit

Permalink
example works
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Nov 7, 2024
1 parent d521829 commit d55fde6
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 34 deletions.
5 changes: 5 additions & 0 deletions py-denormalized/python/denormalized/data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ def with_column(self, name: str, predicate: Expr) -> "DataStream":
DataStream: A new DataStream with the additional column.
"""
return DataStream(self.ds.with_column(name, to_internal_expr(predicate)))

def drop_columns(self, columns: list[str]) -> "DataStream":
"""Drops columns from the DataStream.
"""
return DataStream(self.ds.drop_columns(columns))

def join_on(
self, right: "DataStream", join_type: str, on_exprs: list[Expr]
Expand Down
68 changes: 34 additions & 34 deletions py-denormalized/python/examples/stream_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import signal
import sys
import pprint as pp

from denormalized import Context
from denormalized.datafusion import col, expr
Expand All @@ -27,46 +28,45 @@ def signal_handler(sig, frame):


def print_batch(rb):
print(rb.to_pydict())
pp.pprint(rb.to_pydict())


ctx = Context()
temperature_ds = ctx.from_topic(
"temperature", json.dumps(sample_event), bootstrap_server
).window(
[],
[
f.count(col("reading"), distinct=False, filter=None).alias("temp_count"),
],
4000,
None,
)

humidity_ds = ctx.from_topic(
"humidity", json.dumps(sample_event), bootstrap_server
).window(
[],
[
f.count(col("reading"), distinct=False, filter=None).alias("temp_count"),
],
4000,
None,
humidity_ds = (
ctx.from_topic("humidity", json.dumps(sample_event), bootstrap_server)
.with_column("humidity_sensor", col("sensor_name"))
.drop_columns(["sensor_name"])
.window(
[col("humidity_sensor")],
[
f.count(col("reading")).alias("avg_humidity"),
],
4000,
None,
)
.with_column("humidity_window_start_time", col("window_start_time"))
.with_column("humidity_window_end_time", col("window_end_time"))
.drop_columns(["window_start_time", "window_end_time"])
)

temperature_ds = temperature_ds.join(
humidity_ds, "left", ["sensor_name"], ["sensor_name"]
).sink(print_batch)

# temperature_ds = temperature_ds.window(
# [],
# [
# f.count(col("temperature.reading"), distinct=False, filter=None).alias(
# "temperature_count"
# ),
# f.count(col("humidity.reading"), distinct=False, filter=None).alias(
# "humidity_count"
# ),
# ],
# 4000,
# None,
# ).sink(print_batch)
joined_ds = (
temperature_ds.window(
[col("sensor_name")],
[
f.avg(col("reading")).alias("avg_temperature"),
],
4000,
None,
)
.join(
humidity_ds,
"inner",
["sensor_name", "window_start_time"],
["humidity_sensor", "humidity_window_start_time"],
)
.sink(print_batch)
)
7 changes: 7 additions & 0 deletions py-denormalized/src/datastream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,13 @@ impl PyDataStream {
let ds = self.ds.as_ref().clone().with_column(name, expr.into())?;
Ok(Self::new(ds))
}

pub fn drop_columns(&self, columns: Vec<String>) -> Result<Self> {
let columns_ref: Vec<&str> = columns.iter().map(|s| s.as_str()).collect();

let ds = self.ds.as_ref().clone().drop_columns(&columns_ref)?;
Ok(Self::new(ds))
}

pub fn join_on(
&self,
Expand Down

0 comments on commit d55fde6

Please sign in to comment.