Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dockerfile to run emit_measurements and kafka #56

Merged
merged 4 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
target/
.git/
.gitignore
Dockerfile
.dockerignore
**/*.rs.bk
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/probably-nothing-labs/denormalized.git"
version = "0.0.1"
rust-version = "1.81.0"
description = "Embeddable stream processing engine"

[workspace.dependencies]
Expand Down
100 changes: 100 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# This file builds an image that runs Kakfa and the emit_measurments.rs scripts for generating fake data
#
# docker build -t emgeee/kafka_emit_measurements:latest .
#

# Stage 1: Build the Rust application
FROM rust:1.81.0-slim-bookworm AS builder
WORKDIR /usr/src/app

# Install build dependencies and zig
RUN apt-get update && apt-get install -y \
cmake \
g++ \
libssl-dev \
pkg-config \
wget \
xz-utils \
&& wget https://ziglang.org/download/0.13.0/zig-linux-x86_64-0.13.0.tar.xz \
&& tar -xf zig-linux-x86_64-0.13.0.tar.xz \
&& mv zig-linux-x86_64-0.13.0 /usr/local/zig \
&& rm zig-linux-x86_64-0.13.0.tar.xz \
&& rm -rf /var/lib/apt/lists/*

# Add zig to PATH
ENV PATH="/usr/local/zig/:$PATH"

# Install cargo-zigbuild
RUN cargo install --locked cargo-zigbuild && \
rustup target add x86_64-unknown-linux-musl

# Copy and build
COPY . .
RUN cargo zigbuild --target x86_64-unknown-linux-musl --release --example emit_measurements && \
cp target/x86_64-unknown-linux-musl/release/examples/emit_measurements /tmp/ && \
rm -rf /usr/src/app/*

# Stage 2: Final image with Kafka and Rust binary
FROM confluentinc/cp-kafka:7.5.1
USER root

# Install minimal dependencies
RUN yum update -y && \
yum install -y openssl-devel && \
yum clean all && \
rm -rf /var/cache/yum

# Copy only the binary from builder stage
COPY --from=builder /tmp/emit_measurements /usr/local/bin/

# Create startup script
COPY <<EOF /startup.sh
#!/bin/bash
# Generate Cluster ID if not provided
CLUSTER_ID=\${CLUSTER_ID:-\$(kafka-storage random-uuid)}
echo "Using Cluster ID: \$CLUSTER_ID"
export CLUSTER_ID

# Format storage directory
echo "Formatting storage directory..."
kafka-storage format -t \$CLUSTER_ID -c /etc/kafka/kraft/server.properties

# Start Kafka with KRaft
/etc/confluent/docker/run &

# Wait for Kafka to be ready
until kafka-topics --bootstrap-server localhost:9092 --list &>/dev/null; do
echo "Waiting for Kafka to be ready..."
sleep 5
done

# Create topics with 1GB retention
echo "Creating temperature topic..."
kafka-topics --bootstrap-server localhost:9092 --create --if-not-exists --topic temperature --partitions 1 --replication-factor 1 --config retention.bytes=1073741824

echo "Creating humidity topic..."
kafka-topics --bootstrap-server localhost:9092 --create --if-not-exists --topic humidity --partitions 1 --replication-factor 1 --config retention.bytes=1073741824

# Start the Rust application
/usr/local/bin/emit_measurements
EOF

RUN chmod +x /startup.sh

# Kafka configuration
ENV KAFKA_NODE_ID=1 \
KAFKA_PROCESS_ROLES=broker,controller \
KAFKA_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=1 \
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=1

# Expose Kafka port
EXPOSE 9092

CMD ["/startup.sh"]
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,8 @@ Details about developing the python bindings can be found in [py-denormalized/RE

### Running an example

1. Start kafka in docker `docker run -p 9092:9092 --name kafka apache/kafka`
2. Start emitting some sample data: `cargo run --example emit_measurements`
3. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation`
1. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .`
2. Run a [simple streaming aggregation](./examples/examples/simple_aggregation.rs) on the data using denormalized: `cargo run --example simple_aggregation`

### Checkpointing

Expand Down
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ datafusion = { workspace = true }
arrow = { workspace = true, features = ["prettyprint"] }
arrow-schema = { workspace = true }
arrow-array = { workspace = true }
anyhow = "1.0.86"
tracing = { workspace = true }
futures = { workspace = true }
tracing-log = { workspace = true }
Expand Down
6 changes: 3 additions & 3 deletions examples/examples/emit_measurements.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use rand::seq::SliceRandom;
use rdkafka::producer::FutureProducer;
use std::result::Result;
use std::time::{SystemTime, UNIX_EPOCH};

use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

use denormalized::prelude::*;
use denormalized_examples::Measurment;

/// This script emits test data to a kafka cluster
Expand All @@ -14,7 +14,7 @@ use denormalized_examples::Measurment;
/// Sample sensor data will then be emitted to two topics: `temperature` and `humidity`
/// This data is read processed by other exmpales
#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> Result<(), anyhow::Error> {
let mut tasks = tokio::task::JoinSet::new();

let producer: FutureProducer = ClientConfig::new()
Expand Down
13 changes: 13 additions & 0 deletions py-denormalized/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@ denormalized-python

Python bindings for [denormalized](https://github.com/probably-nothing-labs/denormalized)

Denormalized is a single node stream processing engine written in Rust. This directory contains the bindings for building pipelines using python.

## Getting Started

1. Install denormalized `pip install denormalized`
2. Start the custom docker image that contains an instance of kafka along with with a script that emits some sample data to kafka `docker build -t emgeee/kafka_emit_measurements:latest .`
3. Copy the [stream_aggregate.py](python/examples/stream_aggregate.py) example

This script will connect to the kafka instance running in docker and aggregate the metrics in realtime.

There are several other examples in the [examples/ folder](python/examples/) that demonstrate other capabilities including stream joins and UDAFs.


## Development

Make sure you're in the `py-denormalized/` directory.
Expand Down
7 changes: 6 additions & 1 deletion py-denormalized/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ features = ["pyo3/extension-module"]
module-name = "denormalized._d_internal"

[tool.rye]
dev-dependencies = ["pip>=24.2", "ipython>=8.26.0", "pytest>=8.3.2"]
dev-dependencies = [
"pip>=24.2",
"ipython>=8.26.0",
"pytest>=8.3.2",
"maturin>=1.7.4",
]

# Enable docstring linting using the google style guide
[tool.ruff.lint]
Expand Down
8 changes: 5 additions & 3 deletions py-denormalized/python/examples/stream_aggregate.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
"""stream_aggregate example."""
"""stream_aggregate example.

docker build -t emgeee/kafka_emit_measurements:latest .
"""

import json
import pprint as pp
import signal
import sys
import pprint as pp

from denormalized import Context
from denormalized.datafusion import col
Expand All @@ -12,7 +15,6 @@


def signal_handler(sig, frame):
print("You pressed Ctrl+C!")
sys.exit(0)


Expand Down
11 changes: 6 additions & 5 deletions py-denormalized/python/examples/stream_join.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
"""stream_aggregate example."""
"""stream_aggregate example.

docker build -t emgeee/kafka_emit_measurements:latest .
"""

import json
import pprint as pp
import signal
import sys
import pprint as pp

from denormalized import Context
from denormalized.datafusion import col, expr
from denormalized.datafusion import col
from denormalized.datafusion import functions as f
from denormalized.datafusion import lit


def signal_handler(sig, frame):
print("You pressed Ctrl+C!")
sys.exit(0)


Expand Down
1 change: 1 addition & 0 deletions py-denormalized/requirements-dev.lock
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jedi==0.19.1
# via ipython
matplotlib-inline==0.1.7
# via ipython
maturin==1.7.4
numpy==2.1.0
# via pyarrow
packaging==24.1
Expand Down
Loading