diff --git a/Cargo.lock b/Cargo.lock index ab6bf26..9e042d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1599,6 +1599,7 @@ dependencies = [ name = "denormalized-examples" version = "0.0.1" dependencies = [ + "anyhow", "arrow", "arrow-array", "arrow-schema", diff --git a/Dockerfile b/Dockerfile index 94c0a9c..edad36e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -77,19 +77,18 @@ EOF RUN chmod +x /startup.sh -ENV KAFKA_NODE_ID=1 -ENV KAFKA_PROCESS_ROLES=broker,controller -ENV KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 -ENV KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER -ENV KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT -ENV KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -ENV KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 -ENV KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT -ENV KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 -ENV KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 -ENV KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 - - +# Kafka configuration +ENV KAFKA_NODE_ID=1 \ + KAFKA_PROCESS_ROLES=broker,controller \ + KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ + KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ + KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ + KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \ + KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 # Expose Kafka port EXPOSE 9092 diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 2c1a95d..1363491 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -12,6 +12,7 @@ datafusion = { workspace = true } arrow = { workspace = true, features = ["prettyprint"] } arrow-schema = { workspace = true } arrow-array = { workspace = true } +anyhow = "1.0.86" tracing = { workspace = true } futures = { workspace = true } tracing-log = { workspace = true } diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 6285243..d80e68f 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -1,11 +1,11 @@ use rand::seq::SliceRandom; -use rdkafka::producer::FutureProducer; use std::time::{SystemTime, UNIX_EPOCH}; +use std::result::Result; use rdkafka::config::ClientConfig; use rdkafka::producer::FutureRecord; +use rdkafka::producer::FutureProducer; -use denormalized::prelude::*; use denormalized_examples::Measurment; /// This script emits test data to a kafka cluster @@ -14,7 +14,7 @@ use denormalized_examples::Measurment; /// Sample sensor data will then be emitted to two topics: `temperature` and `humidity` /// This data is read processed by other exmpales #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), anyhow::Error> { let mut tasks = tokio::task::JoinSet::new(); let producer: FutureProducer = ClientConfig::new()