Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Webhooks groups properties #23822

Merged
merged 3 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,44 @@ export const fetchGroups = async (teamId: number) => {
return queryResult.data.map((group) => ({ ...group, group_properties: JSON.parse(group.group_properties) }))
}

export const createGroupType = async (teamId: number, index: number, groupType: string) => {
await postgres.query(
PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_grouptypemapping (team_id, group_type, group_type_index)
VALUES ($1, $2, $3)
`,
[teamId, groupType, index],
'insertGroupType'
)
}

export const createGroup = async (
teamId: number,
groupTypeIndex: number,
groupKey: string,
groupProperties: Record<string, any>
) => {
await postgres.query(
PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_group (team_id, group_key, group_type_index, group_properties, created_at, properties_last_updated_at, properties_last_operation, version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`,
[
teamId,
groupKey,
groupTypeIndex,
JSON.stringify(groupProperties),
new Date().toISOString(),
JSON.stringify({}),
JSON.stringify({}),
1,
],
'upsertGroup'
)
}

export const fetchPostgresPersons = async (teamId: number) => {
const { rows } = await postgres.query(
PostgresUse.COMMON_WRITE,
Expand Down
14 changes: 10 additions & 4 deletions plugin-server/functional_tests/webhooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import { UUIDT } from '../src/utils/utils'
import {
capture,
createAction,
createGroup,
createGroupType,
createHook,
createOrganization,
createOrganizationRaw,
createTeam,
createUser,
Expand Down Expand Up @@ -39,9 +40,13 @@ test.concurrent(`webhooks: fires slack webhook`, async () => {

const distinctId = new UUIDT().toString()

const organizationId = await createOrganization()
const organizationId = await createOrganizationRaw({
available_product_features: `array ['{ "key": "group_analytics", "name": "group_analytics" }'::jsonb]`,
})
const teamId = await createTeam(organizationId, `http://localhost:${server.address()?.port}`)
const user = await createUser(teamId, new UUIDT().toString())
await createGroupType(teamId, 0, 'organization')
await createGroup(teamId, 0, 'TestWebhookOrg', { name: 'test-webhooks' })
const action = await createAction({
team_id: teamId,
name: 'slack',
Expand All @@ -51,7 +56,7 @@ test.concurrent(`webhooks: fires slack webhook`, async () => {
deleted: false,
post_to_slack: true,
slack_message_format:
'[event.name] with [event.properties.name] was triggered by [person.properties.email]',
'[event.name] with [event.properties.name] was triggered by [person.properties.email] of [groups.organization.properties.name]',
created_by_id: user.id,
is_calculating: false,
last_calculated_at: new Date().toISOString(),
Expand Down Expand Up @@ -86,6 +91,7 @@ test.concurrent(`webhooks: fires slack webhook`, async () => {
$current_url: 'http://localhost:8000',
$elements: [{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: 'text' }],
$set: { email: '[email protected]' },
$groups: { organization: 'TestWebhookOrg' },
},
})

Expand All @@ -96,7 +102,7 @@ test.concurrent(`webhooks: fires slack webhook`, async () => {
await new Promise((resolve) => setTimeout(resolve, 1000))
}

expect(webHookCalledWith).toEqual({ text: `$autocapture with hehe was triggered by [email protected]` })
expect(webHookCalledWith).toEqual({ text: `$autocapture with hehe was triggered by [email protected] of test-webhooks` })
} finally {
server.close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import * as Sentry from '@sentry/node'
import { EachBatchPayload, KafkaMessage } from 'kafkajs'
import { QueryResult } from 'pg'
import { Counter } from 'prom-client'
import { ActionMatcher } from 'worker/ingestion/action-matcher'
import { GroupTypeManager } from 'worker/ingestion/group-type-manager'
import { OrganizationManager } from 'worker/ingestion/organization-manager'

import { GroupTypeToColumnIndex, PostIngestionEvent, RawClickHouseEvent } from '../../../types'
import { DependencyUnavailableError } from '../../../utils/db/error'
import { PostgresRouter, PostgresUse } from '../../../utils/db/postgres'
import { convertToPostIngestionEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { pipelineStepErrorCounter, pipelineStepMsSummary } from '../../../worker/ingestion/event-pipeline/metrics'
Expand Down Expand Up @@ -65,12 +67,21 @@ export async function eachBatchWebhooksHandlers(
hookCannon: HookCommander,
concurrency: number,
groupTypeManager: GroupTypeManager,
organizationManager: OrganizationManager
organizationManager: OrganizationManager,
postgres: PostgresRouter
): Promise<void> {
await eachBatchHandlerHelper(
payload,
(teamId) => actionMatcher.hasWebhooks(teamId),
(event) => eachMessageWebhooksHandlers(event, actionMatcher, hookCannon, groupTypeManager, organizationManager),
(event) =>
eachMessageWebhooksHandlers(
event,
actionMatcher,
hookCannon,
groupTypeManager,
organizationManager,
postgres
),
concurrency,
'webhooks'
)
Expand Down Expand Up @@ -140,26 +151,80 @@ export async function eachBatchHandlerHelper(
}
}

async function addGroupPropertiesToPostIngestionEvent(
event: PostIngestionEvent,
groupTypeManager: GroupTypeManager,
organizationManager: OrganizationManager,
postgres: PostgresRouter
): Promise<PostIngestionEvent> {
let groupTypes: GroupTypeToColumnIndex | undefined = undefined
if (await organizationManager.hasAvailableFeature(event.teamId, 'group_analytics')) {
// If the organization has group analytics enabled then we enrich the event with group data
groupTypes = await groupTypeManager.fetchGroupTypes(event.teamId)
}

let groups: PostIngestionEvent['groups'] = undefined
if (groupTypes) {
groups = {}

for (const [groupType, columnIndex] of Object.entries(groupTypes)) {
const groupKey = (event.properties[`$groups`] || {})[groupType]
if (!groupKey) {
continue
}

const queryString = `SELECT group_properties FROM posthog_group WHERE team_id = $1 AND group_type_index = $2 AND group_key = $3`

const selectResult: QueryResult = await postgres.query(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just noting for others that the previous group code used a Redis cache before Postgres: https://github.com/PostHog/posthog/pull/22962/files#diff-0769dccc3458a7eb730036617ac32002072330fcdc5ec5a1d11364769d5dd5b6L488

Since this is just for Webhooks now and we're using a PG read replica this seems fine for now. If anything, it would be nice to do one query for all of them rather than a query each...

PostgresUse.COMMON_READ,
queryString,
[event.teamId, columnIndex, groupKey],
'fetchGroup'
)

const groupProperties = selectResult.rows.length > 0 ? selectResult.rows[0].group_properties : {}

if (groupKey && groupProperties) {
groups[groupType] = {
index: columnIndex,
key: groupKey,
type: groupType,
properties: groupProperties,
}
}
}
}

return {
...event,
groups,
}
}

export async function eachMessageWebhooksHandlers(
clickHouseEvent: RawClickHouseEvent,
actionMatcher: ActionMatcher,
hookCannon: HookCommander,
groupTypeManager: GroupTypeManager,
organizationManager: OrganizationManager
organizationManager: OrganizationManager,
postgres: PostgresRouter
): Promise<void> {
if (!actionMatcher.hasWebhooks(clickHouseEvent.team_id)) {
// exit early if no webhooks nor resthooks
return
}

let groupTypes: GroupTypeToColumnIndex | undefined = undefined

if (await organizationManager.hasAvailableFeature(clickHouseEvent.team_id, 'group_analytics')) {
// If the organization has group analytics enabled then we enrich the event with group data
groupTypes = await groupTypeManager.fetchGroupTypes(clickHouseEvent.team_id)
}

const event = convertToPostIngestionEvent(clickHouseEvent, groupTypes)
const eventWithoutGroups = convertToPostIngestionEvent(clickHouseEvent)
// This is very inefficient, we always pull group properties for all groups (up to 5) for this event
// from PG if a webhook is defined for this team.
// Instead we should be lazily loading group properties only when needed, but this is the fastest way to fix this consumer
// that will be deprecated in the near future by CDP/Hog
const event = await addGroupPropertiesToPostIngestionEvent(
eventWithoutGroups,
groupTypeManager,
organizationManager,
postgres
)

await runInstrumentedFunction({
func: () => runWebhooks(actionMatcher, hookCannon, event),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ export const startAsyncWebhooksHandlerConsumer = async ({
hookCannon,
concurrency,
groupTypeManager,
organizationManager
organizationManager,
postgres
),
})

Expand Down
36 changes: 2 additions & 34 deletions plugin-server/src/utils/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,7 @@ import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin
import { Message } from 'node-rdkafka'

import { setUsageInNonPersonEventsCounter } from '../main/ingestion-queues/metrics'
import {
ClickHouseEvent,
GroupTypeToColumnIndex,
HookPayload,
PipelineEvent,
PostIngestionEvent,
RawClickHouseEvent,
} from '../types'
import { ClickHouseEvent, HookPayload, PipelineEvent, PostIngestionEvent, RawClickHouseEvent } from '../types'
import { chainToElements } from './db/elements-chain'
import { personInitialAndUTMProperties, sanitizeString } from './db/utils'
import {
Expand Down Expand Up @@ -110,36 +103,12 @@ export function convertToPostHogEvent(event: PostIngestionEvent): PostHogEvent {

// NOTE: PostIngestionEvent is our context event - it should never be sent directly to an output, but rather transformed into a lightweight schema
// that we can keep to as a contract
export function convertToPostIngestionEvent(
event: RawClickHouseEvent,
groupTypes?: GroupTypeToColumnIndex
): PostIngestionEvent {
export function convertToPostIngestionEvent(event: RawClickHouseEvent): PostIngestionEvent {
const properties = event.properties ? JSON.parse(event.properties) : {}
if (event.elements_chain) {
properties['$elements_chain'] = event.elements_chain
}

let groups: PostIngestionEvent['groups'] = undefined

if (groupTypes) {
groups = {}

for (const [groupType, columnIndex] of Object.entries(groupTypes)) {
const groupKey = (properties[`$groups`] || {})[groupType]
const groupProperties = event[`group${columnIndex}_properties`]

// TODO: Check that groupProperties always exist if the event is in that group
if (groupKey && groupProperties) {
groups[groupType] = {
index: columnIndex,
key: groupKey,
type: groupType,
properties: JSON.parse(groupProperties),
}
}
}
}

return {
eventUuid: event.uuid,
event: event.event!,
Expand All @@ -153,7 +122,6 @@ export function convertToPostIngestionEvent(
? clickHouseTimestampSecondPrecisionToISO(event.person_created_at)
: null,
person_properties: event.person_properties ? JSON.parse(event.person_properties) : {},
groups,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ const clickhouseEvent: RawClickHouseEvent = {
person_id: 'F99FA0A1-E0C2-4CFE-A09A-4C3C4327A4CC',
person_created_at: '2020-02-20 02:15:00' as ClickHouseTimestampSecondPrecision, // Match createEvent ts format
person_properties: '{}',
group0_properties: JSON.stringify({ name: 'PostHog' }),
person_mode: 'full',
}

Expand All @@ -47,6 +46,30 @@ describe('eachMessageWebhooksHandlers', () => {
[],
'testTag'
)
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_group (team_id, group_key, group_type_index, group_properties, created_at, properties_last_updated_at, properties_last_operation, version)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`,
[
2,
'org_posthog',
0,
JSON.stringify({ name: 'PostHog' }),
new Date().toISOString(),
JSON.stringify({}),
JSON.stringify({}),
1,
],
'upsertGroup'
)
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`UPDATE posthog_team SET slack_incoming_webhook = 'https://webhook.example.com/'`,
[],
'testTag'
)
})

afterEach(async () => {
Expand Down Expand Up @@ -120,7 +143,8 @@ describe('eachMessageWebhooksHandlers', () => {
actionMatcher,
hookCannon,
groupTypeManager,
organizationManager
organizationManager,
hub.postgres
)

// NOTE: really it would be nice to verify that fire has been called
Expand Down
Loading