From 82bcac2503e1d3efe35c18bd40d5d95464484bd1 Mon Sep 17 00:00:00 2001 From: Matt Green Date: Tue, 12 Nov 2024 11:16:42 -0800 Subject: [PATCH] Add dockerfile to run emit_measurements and kafka (#56) * Add dockerfile to run emit_measurements and kafka --- .dockerignore | 6 ++ Cargo.lock | 1 + Cargo.toml | 1 + Dockerfile | 100 ++++++++++++++++++ README.md | 5 +- examples/Cargo.toml | 1 + examples/examples/emit_measurements.rs | 6 +- py-denormalized/README.md | 13 +++ py-denormalized/pyproject.toml | 7 +- .../python/examples/stream_aggregate.py | 8 +- .../python/examples/stream_join.py | 11 +- py-denormalized/requirements-dev.lock | 1 + 12 files changed, 145 insertions(+), 15 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..4ff6a5b --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +target/ +.git/ +.gitignore +Dockerfile +.dockerignore +**/*.rs.bk diff --git a/Cargo.lock b/Cargo.lock index ab6bf26..9e042d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1599,6 +1599,7 @@ dependencies = [ name = "denormalized-examples" version = "0.0.1" dependencies = [ + "anyhow", "arrow", "arrow-array", "arrow-schema", diff --git a/Cargo.toml b/Cargo.toml index f3900f1..b263184 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ license = "Apache-2.0" readme = "README.md" repository = "https://github.com/probably-nothing-labs/denormalized.git" version = "0.0.1" +rust-version = "1.81.0" description = "Embeddable stream processing engine" [workspace.dependencies] diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..c1ecd5d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,100 @@ +# This file builds an image that runs Kakfa and the emit_measurments.rs scripts for generating fake data +# +# docker build -t emgeee/kafka_emit_measurements:latest . +# + +# Stage 1: Build the Rust application +FROM rust:1.81.0-slim-bookworm AS builder +WORKDIR /usr/src/app + +# Install build dependencies and zig +RUN apt-get update && apt-get install -y \ + cmake \ + g++ \ + libssl-dev \ + pkg-config \ + wget \ + xz-utils \ + && wget https://ziglang.org/download/0.13.0/zig-linux-x86_64-0.13.0.tar.xz \ + && tar -xf zig-linux-x86_64-0.13.0.tar.xz \ + && mv zig-linux-x86_64-0.13.0 /usr/local/zig \ + && rm zig-linux-x86_64-0.13.0.tar.xz \ + && rm -rf /var/lib/apt/lists/* + +# Add zig to PATH +ENV PATH="/usr/local/zig/:$PATH" + +# Install cargo-zigbuild +RUN cargo install --locked cargo-zigbuild && \ + rustup target add x86_64-unknown-linux-musl + +# Copy and build +COPY . . +RUN cargo zigbuild --target x86_64-unknown-linux-musl --release --example emit_measurements && \ + cp target/x86_64-unknown-linux-musl/release/examples/emit_measurements /tmp/ && \ + rm -rf /usr/src/app/* + +# Stage 2: Final image with Kafka and Rust binary +FROM confluentinc/cp-kafka:7.5.1 +USER root + +# Install minimal dependencies +RUN yum update -y && \ + yum install -y openssl-devel && \ + yum clean all && \ + rm -rf /var/cache/yum + +# Copy only the binary from builder stage +COPY --from=builder /tmp/emit_measurements /usr/local/bin/ + +# Create startup script +COPY </dev/null; do + echo "Waiting for Kafka to be ready..." + sleep 5 +done + +# Create topics with 1GB retention +echo "Creating temperature topic..." +kafka-topics --bootstrap-server localhost:9092 --create --if-not-exists --topic temperature --partitions 1 --replication-factor 1 --config retention.bytes=1073741824 + +echo "Creating humidity topic..." +kafka-topics --bootstrap-server localhost:9092 --create --if-not-exists --topic humidity --partitions 1 --replication-factor 1 --config retention.bytes=1073741824 + +# Start the Rust application +/usr/local/bin/emit_measurements +EOF + +RUN chmod +x /startup.sh + +# Kafka configuration +ENV KAFKA_NODE_ID=1 \ + KAFKA_PROCESS_ROLES=broker,controller \ + KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \ + KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \ + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \ + KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \ + KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \ + KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \ + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \ + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \ + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1 + +# Expose Kafka port +EXPOSE 9092 + +CMD ["/startup.sh"] diff --git a/README.md b/README.md index 3ba7348..55ad026 100644 --- a/README.md +++ b/README.md @@ -90,9 +90,8 @@ Details about developing the python bindings can be found in [py-denormalized/RE ### Running an example -1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka` -2. Start emitting some sample data: `cargo run --example emit_measurements` -3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation` +1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .` +2. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation` ### Checkpointing diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 2c1a95d..1363491 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -12,6 +12,7 @@ datafusion = { workspace = true } arrow = { workspace = true, features = ["prettyprint"] } arrow-schema = { workspace = true } arrow-array = { workspace = true } +anyhow = "1.0.86" tracing = { workspace = true } futures = { workspace = true } tracing-log = { workspace = true } diff --git a/examples/examples/emit_measurements.rs b/examples/examples/emit_measurements.rs index 6285243..c68b044 100644 --- a/examples/examples/emit_measurements.rs +++ b/examples/examples/emit_measurements.rs @@ -1,11 +1,11 @@ use rand::seq::SliceRandom; -use rdkafka::producer::FutureProducer; +use std::result::Result; use std::time::{SystemTime, UNIX_EPOCH}; use rdkafka::config::ClientConfig; +use rdkafka::producer::FutureProducer; use rdkafka::producer::FutureRecord; -use denormalized::prelude::*; use denormalized_examples::Measurment; /// This script emits test data to a kafka cluster @@ -14,7 +14,7 @@ use denormalized_examples::Measurment; /// Sample sensor data will then be emitted to two topics: `temperature` and `humidity` /// This data is read processed by other exmpales #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), anyhow::Error> { let mut tasks = tokio::task::JoinSet::new(); let producer: FutureProducer = ClientConfig::new() diff --git a/py-denormalized/README.md b/py-denormalized/README.md index 1c40bd6..1bc40a0 100644 --- a/py-denormalized/README.md +++ b/py-denormalized/README.md @@ -3,6 +3,19 @@ denormalized-python Python bindings for [denormalized](https://github.com/probably-nothing-labs/denormalized) +Denormalized is a single node stream processing engine written in Rust. This directory contains the bindings for building pipelines using python. + +## Getting Started + +1. Install denormalized `pip install denormalized` +2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .` +3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example + +This script will connect to the kafka instance running in docker and aggregate the metrics in realtime. + +There are several other examples in the [examples/ folder](python/examples/) that demonstrate other capabilities including stream joins and UDAFs. + + ## Development Make sure you're in the `py-denormalized/` directory. diff --git a/py-denormalized/pyproject.toml b/py-denormalized/pyproject.toml index 88735ca..de58d3e 100644 --- a/py-denormalized/pyproject.toml +++ b/py-denormalized/pyproject.toml @@ -24,7 +24,12 @@ features = ["pyo3/extension-module"] module-name = "denormalized._d_internal" [tool.rye] -dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"] +dev-dependencies = [ + "pip>=24.2", + "ipython>=8.26.0", + "pytest>=8.3.2", + "maturin>=1.7.4", +] # Enable docstring linting using the google style guide [tool.ruff.lint] diff --git a/py-denormalized/python/examples/stream_aggregate.py b/py-denormalized/python/examples/stream_aggregate.py index 99c769d..6e7d5fd 100644 --- a/py-denormalized/python/examples/stream_aggregate.py +++ b/py-denormalized/python/examples/stream_aggregate.py @@ -1,9 +1,12 @@ -"""stream_aggregate example.""" +"""stream_aggregate example. + +docker build -t emgeee/kafka_emit_measurements:latest . +""" import json +import pprint as pp import signal import sys -import pprint as pp from denormalized import Context from denormalized.datafusion import col @@ -12,7 +15,6 @@ def signal_handler(sig, frame): - print("You pressed Ctrl+C!") sys.exit(0) diff --git a/py-denormalized/python/examples/stream_join.py b/py-denormalized/python/examples/stream_join.py index 1155a62..10804aa 100644 --- a/py-denormalized/python/examples/stream_join.py +++ b/py-denormalized/python/examples/stream_join.py @@ -1,18 +1,19 @@ -"""stream_aggregate example.""" +"""stream_aggregate example. + +docker build -t emgeee/kafka_emit_measurements:latest . +""" import json +import pprint as pp import signal import sys -import pprint as pp from denormalized import Context -from denormalized.datafusion import col, expr +from denormalized.datafusion import col from denormalized.datafusion import functions as f -from denormalized.datafusion import lit def signal_handler(sig, frame): - print("You pressed Ctrl+C!") sys.exit(0) diff --git a/py-denormalized/requirements-dev.lock b/py-denormalized/requirements-dev.lock index 1cfbc8f..c1d8781 100644 --- a/py-denormalized/requirements-dev.lock +++ b/py-denormalized/requirements-dev.lock @@ -25,6 +25,7 @@ jedi==0.19.1 # via ipython matplotlib-inline==0.1.7 # via ipython +maturin==1.7.4 numpy==2.1.0 # via pyarrow packaging==24.1