-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add a debug table for clickhouse_events_json #23377
Conversation
📸 UI snapshots have been updated2 snapshot changes in total. 0 added, 2 modified, 0 deleted:
Triggered by this commit. |
📸 UI snapshots have been updated2 snapshot changes in total. 0 added, 2 modified, 0 deleted:
Triggered by this commit. |
Size Change: 0 B Total Size: 1.06 MB ℹ️ View Unchanged
|
📸 UI snapshots have been updated3 snapshot changes in total. 0 added, 3 modified, 0 deleted:
Triggered by this commit. |
📸 UI snapshots have been updated2 snapshot changes in total. 0 added, 2 modified, 0 deleted:
Triggered by this commit. |
📸 UI snapshots have been updated4 snapshot changes in total. 0 added, 4 modified, 0 deleted:
Triggered by this commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The kafka_handle_error_mode
is just a suggestion.
I'd just review the ver field which I don't think is needed in this case!
Great having this debug table available from now on 🙏 🚀
posthog/models/kafka_debug/sql.py
Outdated
payload String | ||
) | ||
ENGINE={kafka_engine(kafka_host=",".join(self.brokers), topic=self.topic, group=self.consumer_group)} | ||
SETTINGS input_format_values_interpret_expressions=0, kafka_skip_broken_messages = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we can enable kafka_handle_error_mode='stream'
to store the failed parsed messages in the _error
and _raw_message
columns? That way we don't need to skip messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should ever skip messages here since we are just grabbing the payload as a string (no JSON deserialization) I'll remove the kafka_skip_broken_messages
setting
posthog/models/kafka_debug/sql.py
Outdated
return f"{self.topic}_debug" | ||
|
||
def get_create_table_sql(self) -> str: | ||
engine = MergeTreeEngine(self.table_name, ver="timestamp") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need the ver
field if we go for a MergeTree table.
Aren't we replicating this table data to all the nodes? Or do you want to keep just the consumed data inside every instance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh lol. This is what I get for doing copilot too fast
posthog/models/kafka_debug/sql.py
Outdated
_timestamp DateTime, | ||
_timestamp_ms Nullable(DateTime64(3)), | ||
_partition UInt64, | ||
_offset UInt64 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on whether we finally use the kafka_handle_error_mode='stream'
mode, we would need two additional columns here for the _error
and _raw_message
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That shouldn't be required here since we are just grabbing the payload right? It's not actually going to try to deserialize the JSON
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure to be honest, since we are defining the Kafka engine with a JSONEachRow. I don't know if ClickHouse will try to validate that every row is a valid JSON before ingesting it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure it is safe because I've used it to debug bad payloads before. But, let's just enable stream
ing for fun!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh ok! 💯 In that case then no problem, I just was not completely sure.
* master: chore(data-warehouse): make sure exception is passed through at workflow step (#23409) feat: add a launch compound for posthog with local billing (#23410) chore: add backfill_personless_distinct_ids command (#23404) fix: add missing billing tests (#23408) Schema-Enforcer plugin not global (#23412) chore: Enable person batch exports only on supported destinations (#23354) fix: allow entering a custom value while property values load (#23405) perf: Materialize elements_chain (#23170) fix(experiments): provide `required_scope` for experiments API (#23385) feat(survey): Allow events to repeatedly activate surveys (#23238) chore: maybe this will stop them flapping (#23401) chore(data-warehouse): Added number formatting for source settings (#23221) fix(multi project flags): remove flag id from URL when switching projects (#23394)
📸 UI snapshots have been updated1 snapshot changes in total. 0 added, 1 modified, 0 deleted:
Triggered by this commit. |
Problem
We currently don't have any metrics to compare the number of events coming into kafka and the number of metrics that are at rest in
sharded_events
tables.Changes
This adds on a kafka debug table that:
Does this work well for both Cloud and self-hosted?
How did you test this code?