Skip to content

Commit

Permalink
Add postgres examples
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 8, 2024
1 parent 57db66f commit 5efbeb5
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
38 changes: 38 additions & 0 deletions examples/examples/seed_postgres_db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use tokio_postgres::{Error, GenericClient, NoTls};

/// docker run --name postgres -e POSTGRES_PASSWORD=password -e POSTGRES_DB=postgres_db -p 5432:5432 -d postgres:16-alpine
#[tokio::main]
async fn main() -> Result<(), Error> {
let (client, connection) = tokio_postgres::connect(
"host=localhost user=postgres password=password dbname=postgres_db",
NoTls,
)
.await?;

// The connection object performs the actual communication with the database,
// so spawn it off to run on its own.
tokio::spawn(async move {
if let Err(e) = connection.await {
eprintln!("connection error: {}", e);
}
});

let table_name = "companies";
client
.execute(
format!(
r#"CREATE TABLE IF NOT EXISTS {} (id SERIAL PRIMARY KEY, name VARCHAR)"#,
table_name
)
.as_str(),
&[],
)
.await?;

let stmt = client
.prepare(format!("INSERT INTO {} (name) VALUES ($1)", table_name).as_str())
.await?;
client.execute(&stmt, &[&"test"]).await?;

Ok(())
}
48 changes: 48 additions & 0 deletions examples/examples/sink_to_postgres.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use std::time::Duration;

use datafusion::error::Result;
use datafusion::functions_aggregate::average::avg;
use datafusion::logical_expr::{col, max, min};

use df_streams_core::context::Context;
use df_streams_core::datasource::kafka::{ConnectionOpts, KafkaTopicBuilder};
use df_streams_core::physical_plan::utils::time::TimestampUnit;

#[tokio::main]
async fn main() -> Result<()> {
let sample_event = r#"{"occurred_at_ms": 1715201766763, "temperature": 87.2}"#;

let bootstrap_servers = String::from("localhost:9092");

let ctx = Context::new()?;

let mut topic_builder = KafkaTopicBuilder::new(bootstrap_servers.clone());

let source_topic = topic_builder
.with_timestamp(String::from("occurred_at_ms"), TimestampUnit::Int64Millis)
.with_encoding("json")?
.with_topic(String::from("temperature"))
.infer_schema_from_json(sample_event)?
.build_reader(ConnectionOpts::from([
("auto.offset.reset".to_string(), "earliest".to_string()),
("group.id".to_string(), "sample_pipeline".to_string()),
]))
.await?;

let ds = ctx.from_topic(source_topic).await?.streaming_window(
vec![],
vec![
min(col("temperature")).alias("min"),
max(col("temperature")).alias("max"),
avg(col("temperature")).alias("average"),
],
Duration::from_millis(1_000), // 5 second window
None,
)?;

// ds.clone().print_stream().await?;

println!("{}", ds.df.schema());

Ok(())
}

0 comments on commit 5efbeb5

Please sign in to comment.