From 5efbeb5f697da7ca3e9032050336801157e90ff8 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Thu, 8 Aug 2024 09:09:18 -0700 Subject: [PATCH] Add postgres examples --- examples/examples/seed_postgres_db.rs | 38 +++++++++++++++++++++ examples/examples/sink_to_postgres.rs | 48 +++++++++++++++++++++++++++ 2 files changed, 86 insertions(+) create mode 100644 examples/examples/seed_postgres_db.rs create mode 100644 examples/examples/sink_to_postgres.rs diff --git a/examples/examples/seed_postgres_db.rs b/examples/examples/seed_postgres_db.rs new file mode 100644 index 0000000..9ea665a --- /dev/null +++ b/examples/examples/seed_postgres_db.rs @@ -0,0 +1,38 @@ +use tokio_postgres::{Error, GenericClient, NoTls}; + +/// docker run --name postgres -e POSTGRES_PASSWORD=password -e POSTGRES_DB=postgres_db -p 5432:5432 -d postgres:16-alpine +#[tokio::main] +async fn main() -> Result<(), Error> { + let (client, connection) = tokio_postgres::connect( + "host=localhost user=postgres password=password dbname=postgres_db", + NoTls, + ) + .await?; + + // The connection object performs the actual communication with the database, + // so spawn it off to run on its own. + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); + + let table_name = "companies"; + client + .execute( + format!( + r#"CREATE TABLE IF NOT EXISTS {} (id SERIAL PRIMARY KEY, name VARCHAR)"#, + table_name + ) + .as_str(), + &[], + ) + .await?; + + let stmt = client + .prepare(format!("INSERT INTO {} (name) VALUES ($1)", table_name).as_str()) + .await?; + client.execute(&stmt, &[&"test"]).await?; + + Ok(()) +} diff --git a/examples/examples/sink_to_postgres.rs b/examples/examples/sink_to_postgres.rs new file mode 100644 index 0000000..bffe124 --- /dev/null +++ b/examples/examples/sink_to_postgres.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use datafusion::error::Result; +use datafusion::functions_aggregate::average::avg; +use datafusion::logical_expr::{col, max, min}; + +use df_streams_core::context::Context; +use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder}; +use df_streams_core::physical_plan::utils::time::TimestampUnit; + +#[tokio::main] +async fn main() -> Result<()> { + let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#; + + let bootstrap_servers = String::from("localhost:9092"); + + let ctx = Context::new()?; + + let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone()); + + let source_topic = topic_builder + .with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis) + .with_encoding("json")? + .with_topic(String::from("temperature")) + .infer_schema_from_json(sample_event)? + .build_reader(ConnectionOpts::from([ + ("auto.offset.reset".to_string(), "earliest".to_string()), + ("group.id".to_string(), "sample_pipeline".to_string()), + ])) + .await?; + + let ds = ctx.from_topic(source_topic).await?.streaming_window( + vec![], + vec![ + min(col("temperature")).alias("min"), + max(col("temperature")).alias("max"), + avg(col("temperature")).alias("average"), + ], + Duration::from_millis(1_000), // 5 second window + None, + )?; + + // ds.clone().print_stream().await?; + + println!("{}", ds.df.schema()); + + Ok(()) +}