Skip to content

Commit

Permalink
feat(error tracking): rust consumer skeleton (#25235)
Browse files Browse the repository at this point in the history
Co-authored-by: David Newell <[email protected]>
  • Loading branch information
oliverb123 and daibhin authored Sep 30, 2024
1 parent 62b456a commit d7d28b8
Show file tree
Hide file tree
Showing 31 changed files with 528 additions and 385 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/rust-docker-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ jobs:
dockerfile: ./rust/Dockerfile
- image: property-defs-rs
dockerfile: ./rust/Dockerfile
- image: error-tracking-rs
dockerfile: ./rust/Dockerfile
runs-on: depot-ubuntu-22.04-4
permissions:
id-token: write # allow issuing OIDC tokens for this workflow run
Expand All @@ -46,6 +48,7 @@ jobs:
hook-janitor_digest: ${{ steps.digest.outputs.hook-janitor_digest }}
hook-worker_digest: ${{ steps.digest.outputs.hook-worker_digest }}
hook-migrator_digest: ${{ steps.digest.outputs.hook-migrator_digest }}
error-tracking-rs_digest: ${{ steps.digest.outputs.error-tracking-rs_digest }}

defaults:
run:
Expand Down Expand Up @@ -141,6 +144,10 @@ jobs:
values:
image:
sha: '${{ needs.build.outputs.property-defs-rs_digest }}'
# - release: error-tracking-rs - disabled until a charts in place, for now we just build
# values:
# image:
# sha: '${{ needs.build.outputs.error-tracking-rs_digest }}'
- release: hoghooks
values:
api_image:
Expand Down
31 changes: 30 additions & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ members = [
"common/metrics",
"common/dns",
"common/alloc",
"common/types",
"feature-flags",
"hook-api",
"hook-common",
Expand All @@ -17,6 +18,7 @@ members = [
"cyclotron-node",
"cyclotron-janitor",
"cyclotron-fetch",
"error-tracking",
]

[workspace.lints.rust]
Expand Down
1 change: 1 addition & 0 deletions rust/capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ flate2 = { workspace = true }
governor = { workspace = true }
health = { path = "../common/health" }
common-alloc = { path = "../common/alloc" }
common-types = { path = "../common/types" }
metrics = { workspace = true }
metrics-exporter-prometheus = { workspace = true }
opentelemetry = { workspace = true }
Expand Down
36 changes: 0 additions & 36 deletions rust/capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use axum::http::StatusCode;
use axum::response::{IntoResponse, Response};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use time::OffsetDateTime;
use uuid::Uuid;

use crate::token::InvalidTokenReason;

Expand Down Expand Up @@ -94,37 +92,3 @@ impl IntoResponse for CaptureError {
.into_response()
}
}

#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum DataType {
AnalyticsMain,
AnalyticsHistorical,
ClientIngestionWarning,
HeatmapMain,
ExceptionMain,
SnapshotMain,
}
#[derive(Clone, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
#[serde(skip_serializing)]
pub data_type: DataType,
pub uuid: Uuid,
pub distinct_id: String,
pub ip: String,
pub data: String,
pub now: String,
#[serde(
with = "time::serde::rfc3339::option",
skip_serializing_if = "Option::is_none"
)]
pub sent_at: Option<OffsetDateTime>,
pub token: String,
#[serde(skip_serializing)]
pub session_id: Option<String>,
}

impl ProcessedEvent {
pub fn key(&self) -> String {
format!("{}:{}", self.token, self.distinct_id)
}
}
59 changes: 40 additions & 19 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::limiters::redis::RedisLimiter;
use crate::v0_request::{DataType, ProcessedEvent};
use async_trait::async_trait;

use health::HealthHandle;
use metrics::{counter, gauge, histogram};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
Expand All @@ -12,7 +14,7 @@ use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::api::CaptureError;
use crate::config::KafkaConfig;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;
Expand Down Expand Up @@ -195,15 +197,17 @@ impl KafkaSink {
}

async fn kafka_send(&self, event: ProcessedEvent) -> Result<DeliveryFuture, CaptureError> {
let (event, metadata) = (event.event, event.metadata);

let payload = serde_json::to_string(&event).map_err(|e| {
error!("failed to serialize event: {}", e);
CaptureError::NonRetryableSinkError
})?;

let token = event.token.clone();
let data_type = event.data_type;
let data_type = metadata.data_type;
let event_key = event.key();
let session_id = event.session_id.clone();
let session_id = metadata.session_id.clone();

drop(event); // Events can be EXTREMELY memory hungry

Expand Down Expand Up @@ -349,12 +353,14 @@ impl Event for KafkaSink {

#[cfg(test)]
mod tests {
use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::api::CaptureError;
use crate::config;
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
use crate::sinks::Event;
use crate::utils::uuid_v7;
use crate::v0_request::{DataType, ProcessedEvent, ProcessedEventMetadata};
use common_types::CapturedEvent;
use health::HealthRegistry;
use rand::distributions::Alphanumeric;
use rand::Rng;
Expand Down Expand Up @@ -405,18 +411,26 @@ mod tests {
// We test different cases in a single test to amortize the startup cost of the producer.

let (cluster, sink) = start_on_mocked_sink(Some(3000000)).await;
let event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
let event: CapturedEvent = CapturedEvent {
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
data: "".to_string(),
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
};

let metadata = ProcessedEventMetadata {
data_type: DataType::AnalyticsMain,
session_id: None,
};

let event = ProcessedEvent {
event,
metadata: metadata.clone(),
};

// Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster
for _ in 0..20 {
if sink.send(event.clone()).await.is_ok() {
Expand All @@ -438,17 +452,21 @@ mod tests {
.take(2_000_000)
.map(char::from)
.collect();
let big_event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
let captured = CapturedEvent {
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
data: big_data,
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
session_id: None,
};

let big_event = ProcessedEvent {
event: captured,
metadata: metadata.clone(),
};

sink.send(big_event)
.await
.expect("failed to send event larger than default max size");
Expand All @@ -459,17 +477,20 @@ mod tests {
.take(4_000_000)
.map(char::from)
.collect();
let big_event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
data: big_data,
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
session_id: None,

let big_event = ProcessedEvent {
event: CapturedEvent {
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
data: big_data,
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
},
metadata: metadata.clone(),
};

match sink.send(big_event).await {
Err(CaptureError::EventTooBig) => {} // Expected
Err(err) => panic!("wrong error code {}", err),
Expand Down
2 changes: 1 addition & 1 deletion rust/capture/src/sinks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use async_trait::async_trait;

use crate::api::{CaptureError, ProcessedEvent};
use crate::{api::CaptureError, v0_request::ProcessedEvent};

pub mod kafka;
pub mod print;
Expand Down
4 changes: 3 additions & 1 deletion rust/capture/src/sinks/print.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use async_trait::async_trait;

use metrics::{counter, histogram};
use tracing::log::info;

use crate::api::{CaptureError, ProcessedEvent};
use crate::api::CaptureError;
use crate::sinks::Event;
use crate::v0_request::ProcessedEvent;

pub struct PrintSink {}

Expand Down
Loading

0 comments on commit d7d28b8

Please sign in to comment.