Skip to content

Commit

Permalink
feat(propdefs): add filtering to allow for gradual rollout (#24820)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored and timgl committed Sep 10, 2024
1 parent 28f3367 commit 91cd660
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 13 deletions.
1 change: 0 additions & 1 deletion plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ export function getDefaultConfig(): PluginsServerConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: 10 * 1000, // 10 seconds
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: '',
DROP_EVENTS_BY_TOKEN: '',
SKIP_DEFINITIONS_FOR_TEAM_IDS: '',
PIPELINE_STEP_STALLED_LOG_TIMEOUT: 30,
RELOAD_PLUGIN_JITTER_MAX_MS: 60000,
RUSTY_HOOK_FOR_TEAMS: '',
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ export interface PluginsServerConfig extends CdpConfig {
EXTERNAL_REQUEST_TIMEOUT_MS: number
DROP_EVENTS_BY_TOKEN_DISTINCT_ID: string
DROP_EVENTS_BY_TOKEN: string
SKIP_DEFINITIONS_FOR_TEAM_IDS: string // Comma separated list of team IDs to skip processing property/event definitions for
RELOAD_PLUGIN_JITTER_MAX_MS: number
RUSTY_HOOK_FOR_TEAMS: string
RUSTY_HOOK_ROLLOUT_PERCENTAGE: number
Expand Down
55 changes: 47 additions & 8 deletions plugin-server/src/worker/ingestion/property-definitions-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ function willFitInPostgresColumn(str: string, maxLength = DJANGO_EVENT_MAX_CHARF
return unicodeCharacters.length <= maxLength
}

// TODO - this entire class should be removed once propdefs-rs is fully up and running. This class does
// 3 different things: Tracks 1st event seen, updates event/property definitions, and auto-creates group-types.
// propdefs-rs will handle the definitions portion of the work, and this should become a simple 1st event seen and group-types
// manager
export class PropertyDefinitionsManager {
db: DB
teamManager: TeamManager
Expand All @@ -74,6 +78,7 @@ export class PropertyDefinitionsManager {
eventPropertiesCache: LRU<string, Set<string>> // Map<JSON.stringify([TeamId, Event], Set<Property>>
eventLastSeenCache: LRU<string, number> // key: JSON.stringify([team_id, event]); value: parseInt(YYYYMMDD)
propertyDefinitionsCache: PropertyDefinitionsCache
teamIdsToSkip: Set<number> = new Set()
private readonly lruCacheSize: number

constructor(
Expand Down Expand Up @@ -103,13 +108,24 @@ export class PropertyDefinitionsManager {
updateAgeOnGet: true,
})
this.propertyDefinitionsCache = new PropertyDefinitionsCache(serverConfig)

const skipTeams = serverConfig.SKIP_DEFINITIONS_FOR_TEAM_IDS ?? ''

if (skipTeams.length > 0) {
this.teamIdsToSkip = new Set(skipTeams.split(',').map((id) => parseInt(id)))
}
}

public async updateEventNamesAndProperties(teamId: number, event: string, properties: Properties): Promise<void> {
if (EVENTS_WITHOUT_EVENT_DEFINITION.includes(event)) {
return
}

// We don't bail out early here because this code also handles creating new
// group-types and groupt-type indexes, and 1st event tracking, and we want
// to do that even IF we're not generating any definitions for the event.
const shouldSkip = this.teamIdsToSkip.has(teamId)

event = sanitizeString(event)
if (!willFitInPostgresColumn(event)) {
return
Expand All @@ -129,13 +145,28 @@ export class PropertyDefinitionsManager {
if (!team) {
return
}
await this.cacheEventNamesAndProperties(team.id, event)
await Promise.all([
this.syncEventDefinitions(team, event),
this.syncEventProperties(team, event, properties),
this.syncPropertyDefinitions(team, event, properties),
this.teamManager.setTeamIngestedEvent(team, properties),
])
if (!shouldSkip) {
// If we're skipping the definition updates for this team, we shouldn't bother with any
// prefetching or caching.
await this.cacheEventNamesAndProperties(team.id, event)
}

// We always track 1st event ingestion
const promises = [this.teamManager.setTeamIngestedEvent(team, properties)]

// Property definitions are more complicated - group-type-index resolution is done here, and
// that includes automatic group creation, which we want to do even if we're skipping the
// rest of the definition updates, so we run this and tell it to skip, rather than skipping
// it outright.
promises.push(this.syncPropertyDefinitions(team, event, properties, shouldSkip))

// Event and event-property definitions only do what they say on the tin, so we can skip them
if (!shouldSkip) {
promises.push(this.syncEventDefinitions(team, event))
promises.push(this.syncEventProperties(team, event, properties))
}

await Promise.all(promises)
} finally {
clearTimeout(timeout)
updateEventNamesAndPropertiesMsSummary.observe(Date.now() - timer.valueOf())
Expand Down Expand Up @@ -202,11 +233,19 @@ ON CONSTRAINT posthog_eventdefinition_team_id_name_80fa0b87_uniq DO UPDATE SET l
}
}

private async syncPropertyDefinitions(team: Team, event: string, properties: Properties) {
private async syncPropertyDefinitions(team: Team, event: string, properties: Properties, shouldSkip: boolean) {
const toInsert: Array<
[string, string, number, number | null, boolean, null, null, TeamId, PropertyType | null]
> = []
// This call to `getPropertyDefinitions` is the place where group-type-indexes are resolved, and by extension,
// where groups are auto-created. We want to ALWAYS do this, even if we're skipping the rest of the definition
// update process. We have to do it on every property, too, unfortunately.
for await (const definitions of this.getPropertyDefinitions(team.id, event, properties)) {
// Now that we've done the group-type-index resolution/creation, we can skip everything else.
if (shouldSkip) {
continue
}

let { key } = definitions
key = sanitizeString(key)
if (!willFitInPostgresColumn(key)) {
Expand Down
3 changes: 2 additions & 1 deletion pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 63 additions & 0 deletions rust/property-defs-rs/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::{num::ParseIntError, str::FromStr};

use envconfig::Envconfig;
use rdkafka::ClientConfig;

Expand Down Expand Up @@ -92,6 +94,16 @@ pub struct Config {

#[envconfig(from = "BIND_PORT", default = "3301")]
pub port: u16,

// The set of teams to opt-in or opt-out of property definitions processing (depending on the setting below)
#[envconfig(default = "")]
pub filtered_teams: TeamList,

// Whether the team list above is used to filter teams OUT of processing (opt-out) or IN to processing (opt-in).
// Defaults to opt-in for now, skipping all updates for teams not in the list. TODO - change this to opt-out
// once rollout is complete.
#[envconfig(default = "opt_in")]
pub filter_mode: TeamFilterMode,
}

#[derive(Envconfig, Clone)]
Expand Down Expand Up @@ -125,3 +137,54 @@ impl From<&KafkaConfig> for ClientConfig {
client_config
}
}

#[derive(Clone)]
pub struct TeamList {
pub teams: Vec<i32>,
}

impl FromStr for TeamList {
type Err = ParseIntError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut teams = Vec::new();
for team in s.trim().split(',') {
if team.is_empty() {
continue;
}
teams.push(team.parse()?);
}
Ok(TeamList { teams })
}
}

#[derive(Clone)]
pub enum TeamFilterMode {
OptIn,
OptOut,
}

impl FromStr for TeamFilterMode {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().trim() {
"opt_in" => Ok(TeamFilterMode::OptIn),
"opt_out" => Ok(TeamFilterMode::OptOut),
"opt-in" => Ok(TeamFilterMode::OptIn),
"opt-out" => Ok(TeamFilterMode::OptOut),
"optin" => Ok(TeamFilterMode::OptIn),
"optout" => Ok(TeamFilterMode::OptOut),
_ => Err(format!("Invalid team filter mode: {}", s)),
}
}
}

impl TeamFilterMode {
pub fn should_process(&self, list: &[i32], team_id: i32) -> bool {
match self {
TeamFilterMode::OptIn => list.contains(&team_id),
TeamFilterMode::OptOut => !list.contains(&team_id),
}
}
}
16 changes: 13 additions & 3 deletions rust/property-defs-rs/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ use envconfig::Envconfig;
use futures::future::ready;
use property_defs_rs::{
app_context::AppContext,
config::Config,
config::{Config, TeamFilterMode, TeamList},
message_to_event,
metrics_consts::{
BATCH_ACQUIRE_TIME, CACHE_CONSUMED, COMPACTED_UPDATES, EVENTS_RECEIVED, FORCED_SMALL_BATCH,
PERMIT_WAIT_TIME, RECV_DEQUEUED, TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE,
UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED,
PERMIT_WAIT_TIME, RECV_DEQUEUED, SKIPPED_DUE_TO_TEAM_FILTER, TRANSACTION_LIMIT_SATURATION,
UPDATES_FILTERED_BY_CACHE, UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME,
WORKER_BLOCKED,
},
types::Update,
};
Expand Down Expand Up @@ -70,6 +71,8 @@ async fn spawn_producer_loop(
shared_cache: Arc<Cache<Update, ()>>,
skip_threshold: usize,
compaction_batch_size: usize,
team_filter_mode: TeamFilterMode,
team_list: TeamList,
) {
let mut batch = AHashSet::with_capacity(compaction_batch_size);
let mut last_send = tokio::time::Instant::now();
Expand All @@ -83,6 +86,11 @@ async fn spawn_producer_loop(
continue;
};

if !team_filter_mode.should_process(&team_list.teams, event.team_id) {
metrics::counter!(SKIPPED_DUE_TO_TEAM_FILTER).increment(1);
continue;
}

let updates = event.into_updates(skip_threshold);

metrics::counter!(EVENTS_RECEIVED).increment(1);
Expand Down Expand Up @@ -155,6 +163,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
cache.clone(),
config.update_count_skip_threshold,
config.compaction_batch_size,
config.filter_mode.clone(),
config.filtered_teams.clone(),
));
}

Expand Down
1 change: 1 addition & 0 deletions rust/property-defs-rs/src/metrics_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ pub const UPDATE_TRANSACTION_TIME: &str = "prop_defs_update_transaction_time_ms"
pub const GROUP_TYPE_RESOLVE_TIME: &str = "prop_defs_group_type_resolve_time_ms";
pub const UPDATES_SKIPPED: &str = "prop_defs_skipped_updates";
pub const GROUP_TYPE_READS: &str = "prop_defs_group_type_reads";
pub const SKIPPED_DUE_TO_TEAM_FILTER: &str = "prop_defs_skipped_due_to_team_filter";

0 comments on commit 91cd660

Please sign in to comment.