Skip to content

Commit

Permalink
Add dockerfile to run emit_measurements and kafka (#56)
Browse files Browse the repository at this point in the history
* Add dockerfile to run emit_measurements and kafka
emgeee authored Nov 12, 2024

Verified

This commit was signed with the committer’s verified signature.
eyusufatik Esad Yusuf Atik
1 parent 07c987e commit 82bcac2
Showing 12 changed files with 145 additions and 15 deletions.
6 changes: 6 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
target/
.git/
.gitignore
Dockerfile
.dockerignore
**/*.rs.bk
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/probably-nothing-labs/denormalized.git"
version = "0.0.1"
rust-version = "1.81.0"
description = "Embeddable stream processing engine"

[workspace.dependencies]
100 changes: 100 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# This file builds an image that runs Kakfa and the emit_measurments.rs scripts for generating fake data
#
# docker build -t emgeee/kafka_emit_measurements:latest .
#

# Stage 1: Build the Rust application
FROM rust:1.81.0-slim-bookworm AS builder
WORKDIR /usr/src/app

# Install build dependencies and zig
RUN apt-get update && apt-get install -y \
cmake \
g++ \
libssl-dev \
pkg-config \
wget \
xz-utils \
&& wget https://ziglang.org/download/0.13.0/zig-linux-x86_64-0.13.0.tar.xz \
&& tar -xf zig-linux-x86_64-0.13.0.tar.xz \
&& mv zig-linux-x86_64-0.13.0 /usr/local/zig \
&& rm zig-linux-x86_64-0.13.0.tar.xz \
&& rm -rf /var/lib/apt/lists/*

# Add zig to PATH
ENV PATH="/usr/local/zig/:$PATH"

# Install cargo-zigbuild
RUN cargo install --locked cargo-zigbuild && \
rustup target add x86_64-unknown-linux-musl

# Copy and build
COPY . .
RUN cargo zigbuild --target x86_64-unknown-linux-musl --release --example emit_measurements && \
cp target/x86_64-unknown-linux-musl/release/examples/emit_measurements /tmp/ && \
rm -rf /usr/src/app/*

# Stage 2: Final image with Kafka and Rust binary
FROM confluentinc/cp-kafka:7.5.1
USER root

# Install minimal dependencies
RUN yum update -y && \
yum install -y openssl-devel && \
yum clean all && \
rm -rf /var/cache/yum

# Copy only the binary from builder stage
COPY --from=builder /tmp/emit_measurements /usr/local/bin/

# Create startup script
COPY <<EOF /startup.sh
#!/bin/bash
# Generate Cluster ID if not provided
CLUSTER_ID=\${CLUSTER_ID:-\$(kafka-storage random-uuid)}
echo "Using Cluster ID: \$CLUSTER_ID"
export CLUSTER_ID

# Format storage directory
echo "Formatting storage directory..."
kafka-storage format -t \$CLUSTER_ID -c /etc/kafka/kraft/server.properties

# Start Kafka with KRaft
/etc/confluent/docker/run &

# Wait for Kafka to be ready
until kafka-topics --bootstrap-server localhost:9092 --list &>/dev/null; do
echo "Waiting for Kafka to be ready..."
sleep 5
done

# Create topics with 1GB retention
echo "Creating temperature topic..."
kafka-topics --bootstrap-server localhost:9092 --create --if-not-exists --topic temperature --partitions 1 --replication-factor 1 --config retention.bytes=1073741824

echo "Creating humidity topic..."
kafka-topics --bootstrap-server localhost:9092 --create --if-not-exists --topic humidity --partitions 1 --replication-factor 1 --config retention.bytes=1073741824

# Start the Rust application
/usr/local/bin/emit_measurements
EOF

RUN chmod +x /startup.sh

# Kafka configuration
ENV KAFKA_NODE_ID=1 \
KAFKA_PROCESS_ROLES=broker,controller \
KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1

# Expose Kafka port
EXPOSE 9092

CMD ["/startup.sh"]
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -90,9 +90,8 @@ Details about developing the python bindings can be found in [py-denormalized/RE

### Running an example

1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka`
2. Start emitting some sample data: `cargo run --example emit_measurements`
3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation`
1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .`
2. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation`

### Checkpointing

1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
@@ -12,6 +12,7 @@ datafusion = { workspace = true }
arrow = { workspace = true, features = ["prettyprint"] }
arrow-schema = { workspace = true }
arrow-array = { workspace = true }
anyhow = "1.0.86"
tracing = { workspace = true }
futures = { workspace = true }
tracing-log = { workspace = true }
6 changes: 3 additions & 3 deletions examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use rand::seq::SliceRandom;
use rdkafka::producer::FutureProducer;
use std::result::Result;
use std::time::{SystemTime, UNIX_EPOCH};

use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

use denormalized::prelude::*;
use denormalized_examples::Measurment;

/// This script emits test data to a kafka cluster
@@ -14,7 +14,7 @@ use denormalized_examples::Measurment;
/// Sample sensor data will then be emitted to two topics: `temperature` and `humidity`
/// This data is read processed by other exmpales
#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> Result<(), anyhow::Error> {
let mut tasks = tokio::task::JoinSet::new();

let producer: FutureProducer = ClientConfig::new()
13 changes: 13 additions & 0 deletions py-denormalized/README.md
Original file line number Diff line number Diff line change
@@ -3,6 +3,19 @@ denormalized-python

Python bindings for [denormalized](https://github.com/probably-nothing-labs/denormalized)

Denormalized is a single node stream processing engine written in Rust. This directory contains the bindings for building pipelines using python.

## Getting Started

1. Install denormalized `pip install denormalized`
2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .`
3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example

This script will connect to the kafka instance running in docker and aggregate the metrics in realtime.

There are several other examples in the [examples/ folder](python/examples/) that demonstrate other capabilities including stream joins and UDAFs.


## Development

Make sure you're in the `py-denormalized/` directory.
7 changes: 6 additions & 1 deletion py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
@@ -24,7 +24,12 @@ features = ["pyo3/extension-module"]
module-name = "denormalized._d_internal"

[tool.rye]
dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"]
dev-dependencies = [
"pip>=24.2",
"ipython>=8.26.0",
"pytest>=8.3.2",
"maturin>=1.7.4",
]

# Enable docstring linting using the google style guide
[tool.ruff.lint]
8 changes: 5 additions & 3 deletions py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""stream_aggregate example."""
"""stream_aggregate example.
docker build -t emgeee/kafka_emit_measurements:latest .
"""

import json
import pprint as pp
import signal
import sys
import pprint as pp

from denormalized import Context
from denormalized.datafusion import col
@@ -12,7 +15,6 @@


def signal_handler(sig, frame):
print("You pressed Ctrl+C!")
sys.exit(0)


11 changes: 6 additions & 5 deletions py-denormalized/python/examples/stream_join.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"""stream_aggregate example."""
"""stream_aggregate example.
docker build -t emgeee/kafka_emit_measurements:latest .
"""

import json
import pprint as pp
import signal
import sys
import pprint as pp

from denormalized import Context
from denormalized.datafusion import col, expr
from denormalized.datafusion import col
from denormalized.datafusion import functions as f
from denormalized.datafusion import lit


def signal_handler(sig, frame):
print("You pressed Ctrl+C!")
sys.exit(0)


1 change: 1 addition & 0 deletions py-denormalized/requirements-dev.lock
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@ jedi==0.19.1
# via ipython
matplotlib-inline==0.1.7
# via ipython
maturin==1.7.4
numpy==2.1.0
# via pyarrow
packaging==24.1

0 comments on commit 82bcac2

Please sign in to comment.