Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(group-analytics): Add project field to group type #25600

Merged
merged 26 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
17a95d0
refactor(group-analytics): Add project field to group type
Twixes Oct 15, 2024
94a8fb0
Update plugin server tests
Twixes Oct 15, 2024
4a2027e
Parse `project_id` as int
Twixes Oct 15, 2024
3424a72
Update test_migrations_are_safe.py
Twixes Oct 15, 2024
bc2a8d9
Create index concurrently
Twixes Oct 21, 2024
53fea03
Merge branch 'master' into group-type-project
Twixes Oct 28, 2024
553ab46
Update query snapshots
github-actions[bot] Oct 28, 2024
b75a399
Update query snapshots
github-actions[bot] Oct 28, 2024
a742aaa
Update query snapshots
github-actions[bot] Oct 28, 2024
23c08ee
Update query snapshots
github-actions[bot] Oct 28, 2024
34d86d6
Update test_migrations_are_safe.py
Twixes Oct 28, 2024
d96fffd
Update test_migrations_are_safe.py
Twixes Oct 28, 2024
583a3eb
Update query snapshots
github-actions[bot] Oct 28, 2024
0b427f3
Update query snapshots
github-actions[bot] Oct 28, 2024
e3ad80f
Update query snapshots
github-actions[bot] Oct 29, 2024
2964953
Update query snapshots
github-actions[bot] Oct 29, 2024
6915d5d
Merge branch 'master' into group-type-project
Twixes Oct 29, 2024
f04c43b
Merge branch 'master' into group-type-project
Twixes Oct 29, 2024
de4ccaa
Update query snapshots
github-actions[bot] Oct 29, 2024
4269637
Update query snapshots
github-actions[bot] Oct 30, 2024
99e3659
Merge branch 'master' into group-type-project
Twixes Oct 30, 2024
b80fed5
Update query snapshots
github-actions[bot] Oct 30, 2024
66d2d84
Update query snapshots
github-actions[bot] Oct 30, 2024
1cb439e
Update query snapshots
github-actions[bot] Oct 30, 2024
7f39f46
Merge branch 'master' into group-type-project
Twixes Oct 31, 2024
ad4b3b2
Merge branch 'master' into group-type-project
Twixes Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0501_create_group_billing_team
posthog: 0502_grouptypemapping_project
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
10 changes: 5 additions & 5 deletions plugin-server/functional_tests/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,14 @@ export const fetchGroups = async (teamId: number) => {
return queryResult.data.map((group) => ({ ...group, group_properties: JSON.parse(group.group_properties) }))
}

export const createGroupType = async (teamId: number, index: number, groupType: string) => {
export const createGroupType = async (teamId: number, projectId: number, index: number, groupType: string) => {
await postgres.query(
PostgresUse.COMMON_WRITE,
`
INSERT INTO posthog_grouptypemapping (team_id, group_type, group_type_index)
VALUES ($1, $2, $3)
INSERT INTO posthog_grouptypemapping (team_id, project_id, group_type, group_type_index)
VALUES ($1, $2, $3, $4)
`,
[teamId, groupType, index],
[teamId, projectId, groupType, index],
'insertGroupType'
)
}
Expand Down Expand Up @@ -455,7 +455,7 @@ export const createOrganizationRaw = async (organizationProperties = {}) => {

await postgres.query(
PostgresUse.COMMON_WRITE,
`INSERT into posthog_organization
`INSERT into posthog_organization
(${keys})
VALUES (${values})
`,
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/functional_tests/webhooks.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ test.concurrent(`webhooks: fires slack webhook`, async () => {
})
const teamId = await createTeam(organizationId, `http://localhost:${server.address()?.port}`)
const user = await createUser(teamId, new UUIDT().toString())
await createGroupType(teamId, 0, 'organization')
await createGroupType(teamId, teamId, 0, 'organization')
await createGroup(teamId, 0, 'TestWebhookOrg', { name: 'test-webhooks' })
const action = await createAction({
team_id: teamId,
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,7 @@ export interface RawOrganization {
/** Usable Team model. */
export interface Team {
id: number
project_id: number
uuid: string
organization_id: string
name: string
Expand Down
20 changes: 12 additions & 8 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1342,14 +1342,18 @@ export class DB {
}

public async getTeamsInOrganizationsWithRootPluginAccess(): Promise<Team[]> {
return (
await this.postgres.query(
PostgresUse.COMMON_READ,
'SELECT * from posthog_team WHERE organization_id = (SELECT id from posthog_organization WHERE plugins_access_level = $1)',
[OrganizationPluginsAccessLevel.ROOT],
'getTeamsInOrganizationsWithRootPluginAccess'
)
).rows as Team[]
const selectResult = await this.postgres.query<Team>(
PostgresUse.COMMON_READ,
'SELECT * from posthog_team WHERE organization_id = (SELECT id from posthog_organization WHERE plugins_access_level = $1)',
[OrganizationPluginsAccessLevel.ROOT],
'getTeamsInOrganizationsWithRootPluginAccess'
)
for (const row of selectResult.rows) {
// pg returns int8 as a string, since it can be larger than JS's max safe integer,
// but this is not a problem for project_id, which is a long long way from that limit.
row.project_id = parseInt(row.project_id as unknown as string)
}
return selectResult.rows
}

public async addOrUpdatePublicJob(
Expand Down
20 changes: 13 additions & 7 deletions plugin-server/src/worker/ingestion/group-type-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,19 @@ export class GroupTypeManager {
}
}

public async fetchGroupTypeIndex(teamId: TeamId, groupType: string): Promise<GroupTypeIndex | null> {
public async fetchGroupTypeIndex(
teamId: TeamId,
projectId: TeamId,
groupType: string
): Promise<GroupTypeIndex | null> {
const groupTypes = await this.fetchGroupTypes(teamId)

if (groupType in groupTypes) {
return groupTypes[groupType]
} else {
const [groupTypeIndex, isInsert] = await this.insertGroupType(
teamId,
projectId,
groupType,
Object.keys(groupTypes).length
)
Expand All @@ -70,6 +75,7 @@ export class GroupTypeManager {

public async insertGroupType(
teamId: TeamId,
projectId: TeamId,
groupType: string,
index: number
): Promise<[GroupTypeIndex | null, boolean]> {
Expand All @@ -81,21 +87,21 @@ export class GroupTypeManager {
PostgresUse.COMMON_WRITE,
`
WITH insert_result AS (
INSERT INTO posthog_grouptypemapping (team_id, group_type, group_type_index)
VALUES ($1, $2, $3)
INSERT INTO posthog_grouptypemapping (team_id, project_id, group_type, group_type_index)
VALUES ($1, $2, $3, $4)
ON CONFLICT DO NOTHING
RETURNING group_type_index
)
SELECT group_type_index, 1 AS is_insert FROM insert_result
SELECT group_type_index, 1 AS is_insert FROM insert_result
UNION
SELECT group_type_index, 0 AS is_insert FROM posthog_grouptypemapping WHERE team_id = $1 AND group_type = $2;
SELECT group_type_index, 0 AS is_insert FROM posthog_grouptypemapping WHERE team_id = $1 AND group_type = $3;
`,
[teamId, groupType, index],
[teamId, projectId, groupType, index],
'insertGroupType'
)

if (insertGroupTypeResult.rows.length == 0) {
return await this.insertGroupType(teamId, groupType, index + 1)
return await this.insertGroupType(teamId, projectId, groupType, index + 1)
}

const { group_type_index, is_insert } = insertGroupTypeResult.rows[0]
Expand Down
3 changes: 2 additions & 1 deletion plugin-server/src/worker/ingestion/groups.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import { GroupTypeManager } from './group-type-manager'

export async function addGroupProperties(
teamId: TeamId,
projectId: TeamId,
properties: Properties,
groupTypeManager: GroupTypeManager
): Promise<Properties> {
for (const [groupType, groupIdentifier] of Object.entries(properties.$groups || {})) {
const columnIndex = await groupTypeManager.fetchGroupTypeIndex(teamId, groupType)
const columnIndex = await groupTypeManager.fetchGroupTypeIndex(teamId, projectId, groupType)
if (columnIndex !== null) {
// :TODO: Update event column instead
properties[`$group_${columnIndex}`] = groupIdentifier
Expand Down
20 changes: 15 additions & 5 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ export class EventsProcessor {

if (this.pluginsServer.SKIP_UPDATE_EVENT_AND_PROPERTIES_STEP === false) {
try {
await this.groupAndFirstEventManager.updateGroupsAndFirstEvent(team.id, event, properties)
await this.groupAndFirstEventManager.updateGroupsAndFirstEvent(
team.id,
team.project_id,
event,
properties
)
} catch (err) {
Sentry.captureException(err, { tags: { team_id: team.id } })
status.warn('⚠️', 'Failed to update property definitions for an event', {
Expand All @@ -168,10 +173,10 @@ export class EventsProcessor {

if (processPerson) {
// Adds group_0 etc values to properties
properties = await addGroupProperties(team.id, properties, this.groupTypeManager)
properties = await addGroupProperties(team.id, team.project_id, properties, this.groupTypeManager)

if (event === '$groupidentify') {
await this.upsertGroup(team.id, properties, timestamp)
await this.upsertGroup(team.id, team.project_id, properties, timestamp)
}
}

Expand Down Expand Up @@ -278,13 +283,18 @@ export class EventsProcessor {
return [rawEvent, ack]
}

private async upsertGroup(teamId: number, properties: Properties, timestamp: DateTime): Promise<void> {
private async upsertGroup(
teamId: number,
projectId: number,
properties: Properties,
timestamp: DateTime
): Promise<void> {
if (!properties['$group_type'] || !properties['$group_key']) {
return
}

const { $group_type: groupType, $group_key: groupKey, $group_set: groupPropertiesToSet } = properties
const groupTypeIndex = await this.groupTypeManager.fetchGroupTypeIndex(teamId, groupType)
const groupTypeIndex = await this.groupTypeManager.fetchGroupTypeIndex(teamId, projectId, groupType)

if (groupTypeIndex !== null) {
await upsertGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ export class GroupAndFirstEventManager {
this.groupTypeManager = groupTypeManager
}

public async updateGroupsAndFirstEvent(teamId: number, event: string, properties: Properties): Promise<void> {
public async updateGroupsAndFirstEvent(
teamId: number,
projectId: number,
event: string,
properties: Properties
): Promise<void> {
if (EVENTS_WITHOUT_EVENT_DEFINITION.includes(event)) {
return
}
Expand Down Expand Up @@ -56,7 +61,9 @@ export class GroupAndFirstEventManager {
const { $group_type: groupType, $group_set: groupPropertiesToSet } = properties
if (groupType != null && groupPropertiesToSet != null) {
// This "fetch" is side-effecty, it inserts a group-type and assigns an index if one isn't found
const groupPromise = this.groupTypeManager.fetchGroupTypeIndex(teamId, groupType).then(() => {})
const groupPromise = this.groupTypeManager
.fetchGroupTypeIndex(teamId, projectId, groupType)
.then(() => {})
promises.push(groupPromise)
}
}
Expand Down
18 changes: 16 additions & 2 deletions plugin-server/src/worker/ingestion/team-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ export async function fetchTeam(client: PostgresRouter, teamId: Team['id']): Pro
`
SELECT
id,
project_id,
uuid,
organization_id,
name,
Expand All @@ -172,7 +173,13 @@ export async function fetchTeam(client: PostgresRouter, teamId: Team['id']): Pro
[teamId],
'fetchTeam'
)
return selectResult.rows[0] ?? null
if (selectResult.rows.length === 0) {
return null
}
// pg returns int8 as a string, since it can be larger than JS's max safe integer,
// but this is not a problem for project_id, which is a long long way from that limit.
selectResult.rows[0].project_id = parseInt(selectResult.rows[0].project_id as unknown as string)
return selectResult.rows[0]
}

export async function fetchTeamByToken(client: PostgresRouter, token: string): Promise<Team | null> {
Expand All @@ -181,6 +188,7 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P
`
SELECT
id,
project_id,
uuid,
organization_id,
name,
Expand All @@ -199,7 +207,13 @@ export async function fetchTeamByToken(client: PostgresRouter, token: string): P
[token],
'fetchTeamByToken'
)
return selectResult.rows[0] ?? null
if (selectResult.rows.length === 0) {
return null
}
// pg returns int8 as a string, since it can be larger than JS's max safe integer,
// but this is not a problem for project_id, which is a long long way from that limit.
selectResult.rows[0].project_id = parseInt(selectResult.rows[0].project_id as unknown as string)
return selectResult.rows[0]
}

export async function fetchTeamTokensWithRecordings(client: PostgresRouter): Promise<Record<string, TeamIDWithConfig>> {
Expand Down
18 changes: 10 additions & 8 deletions plugin-server/tests/helpers/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,16 @@ export async function createUserTeamAndOrganization(
}

export async function getTeams(hub: Hub): Promise<Team[]> {
return (
await hub.db.postgres.query(
PostgresUse.COMMON_READ,
'SELECT * FROM posthog_team ORDER BY id',
undefined,
'fetchAllTeams'
)
).rows
const selectResult = await hub.db.postgres.query<Team>(
PostgresUse.COMMON_READ,
'SELECT * FROM posthog_team ORDER BY id',
undefined,
'fetchAllTeams'
)
for (const row of selectResult.rows) {
row.project_id = parseInt(row.project_id as unknown as string)
}
return selectResult.rows
}

export async function getFirstTeam(hub: Hub): Promise<Team> {
Expand Down
2 changes: 2 additions & 0 deletions plugin-server/tests/main/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,7 @@ describe('DB', () => {
anonymize_ips: false,
api_token: 'token1',
id: teamId,
project_id: teamId,
ingested_event: true,
name: 'TEST PROJECT',
organization_id: organizationId,
Expand Down Expand Up @@ -884,6 +885,7 @@ describe('DB', () => {
anonymize_ips: false,
api_token: 'token2',
id: teamId,
project_id: teamId,
ingested_event: true,
name: 'TEST PROJECT',
organization_id: organizationId,
Expand Down
Loading
Loading