diff --git a/plugin-server/src/utils/db/db.ts b/plugin-server/src/utils/db/db.ts index c2621998e3e8a..a5873a7a8b8ec 100644 --- a/plugin-server/src/utils/db/db.ts +++ b/plugin-server/src/utils/db/db.ts @@ -1348,8 +1348,7 @@ export class DB { propertiesLastUpdatedAt: PropertiesLastUpdatedAt, propertiesLastOperation: PropertiesLastOperation, version: number, - tx?: TransactionClient, - options: { cache?: boolean } = { cache: true } + tx?: TransactionClient ): Promise { const result = await this.postgres.query( tx ?? PostgresUse.COMMON_WRITE, @@ -1375,13 +1374,6 @@ export class DB { if (result.rows.length === 0) { throw new RaceConditionError('Parallel posthog_group inserts, retry') } - - if (options?.cache) { - await this.updateGroupCache(teamId, groupTypeIndex, groupKey, { - properties: groupProperties, - created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouse), - }) - } } public async updateGroup( @@ -1418,11 +1410,6 @@ export class DB { ], 'upsertGroup' ) - - await this.updateGroupCache(teamId, groupTypeIndex, groupKey, { - properties: groupProperties, - created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouse), - }) } public async upsertGroupClickhouse( diff --git a/plugin-server/src/worker/ingestion/process-event.ts b/plugin-server/src/worker/ingestion/process-event.ts index 44327a6a8bfd5..155404701790e 100644 --- a/plugin-server/src/worker/ingestion/process-event.ts +++ b/plugin-server/src/worker/ingestion/process-event.ts @@ -140,6 +140,7 @@ export class EventsProcessor { err, }) } + // Adds group_0 etc values to properties properties = await addGroupProperties(team.id, properties, this.groupTypeManager) if (event === '$groupidentify') { diff --git a/plugin-server/src/worker/ingestion/properties-updater.ts b/plugin-server/src/worker/ingestion/properties-updater.ts index 663fb3e27c85e..4eb3004b9b33e 100644 --- a/plugin-server/src/worker/ingestion/properties-updater.ts +++ b/plugin-server/src/worker/ingestion/properties-updater.ts @@ -1,10 +1,10 @@ import { Properties } from '@posthog/plugin-scaffold' import { DateTime } from 'luxon' -import { Group, GroupTypeIndex, TeamId } from '../../types' +import { Group, GroupTypeIndex, TeamId, TimestampFormat } from '../../types' import { DB } from '../../utils/db/db' import { PostgresUse } from '../../utils/db/postgres' -import { RaceConditionError } from '../../utils/utils' +import { castTimestampOrNow, RaceConditionError } from '../../utils/utils' interface PropertiesUpdate { updated: boolean @@ -70,14 +70,20 @@ export async function upsertGroup( ) if (propertiesUpdate.updated) { - await db.upsertGroupClickhouse( - teamId, - groupTypeIndex, - groupKey, - propertiesUpdate.properties, - createdAt, - version - ) + await Promise.all([ + db.updateGroupCache(teamId, groupTypeIndex, groupKey, { + properties: propertiesUpdate.properties, + created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouse), + }), + db.upsertGroupClickhouse( + teamId, + groupTypeIndex, + groupKey, + propertiesUpdate.properties, + createdAt, + version + ), + ]) } } catch (error) { if (error instanceof RaceConditionError) { diff --git a/plugin-server/tests/main/db.test.ts b/plugin-server/tests/main/db.test.ts index a2a570ce0af07..5050dac1be4f9 100644 --- a/plugin-server/tests/main/db.test.ts +++ b/plugin-server/tests/main/db.test.ts @@ -631,46 +631,6 @@ describe('DB', () => { version: 2, }) }) - - describe('with caching', () => { - it('insertGroup() and updateGroup() update cache', async () => { - expect(await fetchGroupCache(2, 0, 'group_key')).toEqual(null) - - await db.insertGroup( - 2, - 0, - 'group_key', - { prop: 'val' }, - TIMESTAMP, - { prop: TIMESTAMP.toISO() }, - { prop: PropertyUpdateOperation.Set }, - 1, - undefined, - { cache: true } - ) - - expect(await fetchGroupCache(2, 0, 'group_key')).toEqual({ - created_at: CLICKHOUSE_TIMESTAMP, - properties: { prop: 'val' }, - }) - - await db.updateGroup( - 2, - 0, - 'group_key', - { prop: 'newVal', prop2: 2 }, - TIMESTAMP, - { prop: TIMESTAMP.toISO(), prop2: TIMESTAMP.toISO() }, - { prop: PropertyUpdateOperation.Set, prop2: PropertyUpdateOperation.Set }, - 2 - ) - - expect(await fetchGroupCache(2, 0, 'group_key')).toEqual({ - created_at: CLICKHOUSE_TIMESTAMP, - properties: { prop: 'newVal', prop2: 2 }, - }) - }) - }) }) describe('updateGroupCache()', () => {