Skip to content

Commit

Permalink
Merge branch 'master' into feat/rejig
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra authored Oct 6, 2024
2 parents bf6e11b + f529a78 commit 7ead63e
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 21 deletions.
1 change: 1 addition & 0 deletions posthog/hogql/functions/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ def compare_types(arg_types: list[ConstantType], sig_arg_types: tuple[ConstantTy
"parseDateTime": HogQLFunctionMeta("parseDateTimeOrNull", 2, 3, tz_aware=True),
"parseDateTimeBestEffort": HogQLFunctionMeta("parseDateTime64BestEffortOrNull", 1, 2, tz_aware=True),
"toTypeName": HogQLFunctionMeta("toTypeName", 1, 1),
"cityHash64": HogQLFunctionMeta("cityHash64", 1, 1),
# dates and times
"toTimeZone": HogQLFunctionMeta("toTimeZone", 2, 2),
"timeZoneOf": HogQLFunctionMeta("timeZoneOf", 1, 1),
Expand Down
20 changes: 9 additions & 11 deletions posthog/hogql_queries/events_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,15 @@ def to_query(self) -> ast.SelectQuery:
Person.objects.filter(team=self.team), self.query.personId
).first()
where_exprs.append(
parse_expr(
"cityHash64(distinct_id) in {list}", # Because the events table is partitioned by cityHash64(distinct_ids), using cityhash for the comparison is much quicker,
{
"list": ast.Constant(
value=[
ast.Call(name="cityHash64", args=[ast.Constant(value=id)])
for id in get_distinct_ids_for_subquery(person, self.team)
]
)
},
timings=self.timings,
ast.CompareOperation(
left=ast.Call(name="cityHash64", args=[ast.Field(chain=["distinct_id"])]),
right=ast.Tuple(
exprs=[
ast.Call(name="cityHash64", args=[ast.Constant(value=id)])
for id in get_distinct_ids_for_subquery(person, self.team)
]
),
op=ast.CompareOperationOp.In,
)
)
if self.query.filterTestAccounts:
Expand Down
10 changes: 6 additions & 4 deletions posthog/hogql_queries/test/test_events_query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,17 @@ def test_person_id_expands_to_distinct_ids(self):
# matching team
query_ast = EventsQueryRunner(query=query, team=self.team).to_query()
where_expr = cast(ast.CompareOperation, cast(ast.And, query_ast.where).exprs[0])
right_expr = cast(ast.Constant, where_expr.right)
self.assertEqual([x.args[0].value for x in right_expr.value], ["id1", "id2"])
right_expr = cast(ast.Tuple, where_expr.right)
self.assertEqual(
[cast(ast.Constant, cast(ast.Call, x).args[0]).value for x in right_expr.exprs], ["id1", "id2"]
)

# another team
another_team = Team.objects.create(organization=Organization.objects.create())
query_ast = EventsQueryRunner(query=query, team=another_team).to_query()
where_expr = cast(ast.CompareOperation, cast(ast.And, query_ast.where).exprs[0])
right_expr = cast(ast.Constant, where_expr.right)
self.assertEqual(right_expr.value, [])
right_expr = cast(ast.Tuple, where_expr.right)
self.assertEqual(right_expr.exprs, [])

def test_test_account_filters(self):
self.team.test_account_filters = [
Expand Down
2 changes: 2 additions & 0 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

56 changes: 56 additions & 0 deletions rust/common/types/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,59 @@ impl CapturedEvent {
format!("{}:{}", self.token, self.distinct_id)
}
}

#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
#[serde(rename_all = "snake_case")]
pub enum PersonMode {
Full,
Propertyless,
ForceUpgrade,
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct ClickHouseEvent {
pub uuid: Uuid,
pub team_id: i32,
pub event: String,
pub distinct_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub properties: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub person_id: Option<String>,
// TODO: verify timestamp format
pub timestamp: String,
// TODO: verify timestamp format
pub created_at: String,
pub elements_chain: String,
// TODO: verify timestamp format
#[serde(skip_serializing_if = "Option::is_none")]
pub person_created_at: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub person_properties: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub group0_properties: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub group1_properties: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub group2_properties: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub group3_properties: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub group4_properties: Option<String>,
// TODO: verify timestamp format
#[serde(skip_serializing_if = "Option::is_none")]
pub group0_created_at: Option<String>,
// TODO: verify timestamp format
#[serde(skip_serializing_if = "Option::is_none")]
pub group1_created_at: Option<String>,
// TODO: verify timestamp format
#[serde(skip_serializing_if = "Option::is_none")]
pub group2_created_at: Option<String>,
// TODO: verify timestamp format
#[serde(skip_serializing_if = "Option::is_none")]
pub group3_created_at: Option<String>,
// TODO: verify timestamp format
#[serde(skip_serializing_if = "Option::is_none")]
pub group4_created_at: Option<String>,
pub person_mode: PersonMode,
}
1 change: 1 addition & 0 deletions rust/common/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ mod team;

// Events
pub use event::CapturedEvent;
pub use event::ClickHouseEvent;

// Teams
pub use team::Team;
2 changes: 2 additions & 0 deletions rust/cymbal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ metrics = { workspace = true }
common-metrics = { path = "../common/metrics" }
common-alloc = { path = "../common/alloc" }
common-kafka = { path = "../common/kafka" }
common-types = { path = "../common/types" }
thiserror = { workspace = true }
sqlx = { workspace = true }
serde_json = { workspace = true }
serde = { workspace = true }

[lints]
workspace = true
2 changes: 2 additions & 0 deletions rust/cymbal/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod app_context;
pub mod config;
pub mod error;
pub mod symbols;
pub mod traits;
49 changes: 43 additions & 6 deletions rust/cymbal/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ use std::{future::ready, sync::Arc};
use axum::{routing::get, Router};
use common_kafka::kafka_consumer::RecvErr;
use common_metrics::{serve, setup_metrics_routes};
use cymbal::{app_context::AppContext, config::Config, error::Error};
use common_types::ClickHouseEvent;
use cymbal::{
app_context::AppContext,
config::Config,
error::Error,
symbols::types::{PropertyView, RawFrame},
};
use envconfig::Envconfig;
use serde_json::Value;
use tokio::task::JoinHandle;
use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer};
Expand Down Expand Up @@ -57,22 +62,54 @@ async fn main() -> Result<(), Error> {
context.worker_liveness.report_healthy().await;
// Just grab the event as a serde_json::Value and immediately drop it,
// we can work out a real type for it later (once we're deployed etc)
let (_, offset): (Value, _) = match context.consumer.json_recv().await {
let (event, offset): (ClickHouseEvent, _) = match context.consumer.json_recv().await {
Ok(r) => r,
Err(RecvErr::Kafka(e)) => {
return Err(e.into()); // Just die if we recieve a Kafka error
}
Err(err) => {
// If we failed to parse the message, or it was empty, just log and continue, our
// consumer has already stored the offset for us.
metrics::counter!("error_tracking_errors", "cause" => "recv_err").increment(1);
metrics::counter!("cymbal_errors", "cause" => "recv_err").increment(1);
error!("Error receiving message: {:?}", err);
continue;
}
};
metrics::counter!("error_tracking_events_received").increment(1);
metrics::counter!("cymbal_events_received").increment(1);

// This is where the rest of the processing would go
offset.store().unwrap();

if event.event != "$exception" {
error!("event of type {}", event.event);
continue;
}

let Some(properties) = &event.properties else {
metrics::counter!("cymbal_errors", "cause" => "no_properties").increment(1);
continue;
};

let properties: PropertyView = match serde_json::from_str(properties) {
Ok(r) => r,
Err(err) => {
metrics::counter!("cymbal_errors", "cause" => "invalid_exception_properties")
.increment(1);
error!("Error parsing properties: {:?}", err);
continue;
}
};

let _stack_trace: Vec<RawFrame> =
match serde_json::from_str(&properties.exception_stack_trace_raw) {
Ok(r) => r,
Err(err) => {
metrics::counter!("cymbal_errors", "cause" => "invalid_stack_trace")
.increment(1);
error!("Error parsing stack trace: {:?}", err);
continue;
}
};

metrics::counter!("cymbal_stack_track_processed").increment(1);
}
}
25 changes: 25 additions & 0 deletions rust/cymbal/src/symbols/js.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use serde::{Deserialize, Serialize};

// A minifed JS stack frame. Just the minimal information needed to lookup some
// sourcemap for it and produce a "real" stack frame.
#[derive(Debug, Clone, Deserialize)]
pub struct RawJSFrame {
#[serde(rename = "lineno")]
pub line: u32,
#[serde(rename = "colno")]
pub column: u32,
#[serde(rename = "filename")]
pub uri: String,
pub in_app: bool,
#[serde(rename = "function")]
pub fn_name: String,
}

#[derive(Debug, Clone, Serialize)]
pub struct ProcessedFrame {
pub line: u32,
pub column: u32,
pub uri: String,
pub in_app: bool,
pub fn_name: String,
}
3 changes: 3 additions & 0 deletions rust/cymbal/src/symbols/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod js;
pub mod symbolifier;
pub mod types;
9 changes: 9 additions & 0 deletions rust/cymbal/src/symbols/symbolifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use super::types::{ProcessedStack, RawStack};

pub struct Symbolifier;

impl Symbolifier {
pub fn symbolify(_stack: RawStack) -> ProcessedStack {
unimplemented!()
}
}
79 changes: 79 additions & 0 deletions rust/cymbal/src/symbols/types.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use std::collections::HashMap;

use serde::Deserialize;
use serde::Serialize;
use serde_json::Value;

use super::{js::RawJSFrame, symbolifier::Symbolifier};

#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum RawFrame {
JavaScript(RawJSFrame),
}

// Stacks don't care what the "type" of the frames they contain are, and even permit
// frames of different types to be mixed together, because we're going to end up "exploding"
// them into their frame-set anyway, and dispatching a task per frame in a language-agnostic
// way. Supporting mixed-type stacks is a side benefit of this - I don't know that we'll ever
// see them, but we get the flexibility "for free"
#[derive(Debug, Deserialize)]
pub struct RawStack {
pub frames: Vec<RawFrame>,
}

pub enum ProcessedFrame {
JavaScript(),
}

pub struct ProcessedStack {
pub frames: Vec<ProcessedFrame>,
}

impl RawStack {
pub async fn process(self, _sym: &Symbolifier) -> ProcessedStack {
unimplemented!()
}
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct PropertyView {
#[serde(rename = "$exception_type")]
pub exception_type: String,
#[serde(rename = "$exception_message")]
pub exception_message: String,
#[serde(rename = "$exception_stack_trace_raw")]
pub exception_stack_trace_raw: String,
#[serde(rename = "$exception_level")]
pub exception_level: String,
#[serde(rename = "$exception_source")]
pub exception_source: String,
#[serde(rename = "$exception_lineno")]
pub exception_line: u32,
#[serde(rename = "$exception_colno")]
pub exception_col: u32,
#[serde(flatten)]
other: HashMap<String, Value>,
}

#[cfg(test)]
mod test {
use common_types::ClickHouseEvent;

use crate::symbols::types::{PropertyView, RawFrame};

#[test]
fn it_symbolifies() {
let raw: &'static str = include_str!("../../tests/static/raw_js_stack.json");

let raw: ClickHouseEvent = serde_json::from_str(raw).unwrap();

let exception_properties: PropertyView =
serde_json::from_str(&raw.properties.unwrap()).unwrap();

let stack_trace: Vec<RawFrame> =
serde_json::from_str(&exception_properties.exception_stack_trace_raw).unwrap();

println!("{:?}", stack_trace);
}
}
13 changes: 13 additions & 0 deletions rust/cymbal/src/traits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::symbols::types::RawStack;

// An "exception" is anything that can self-identify with a "fingerprint"
pub trait Exception {
fn id(&self) -> String;
fn to_raw(self) -> serde_json::Value;
}

// Some excpetions have a raw stack trace we can process. If they do,

pub trait Stacked: Exception {
fn stack(&self) -> RawStack;
}
Loading

0 comments on commit 7ead63e

Please sign in to comment.