Skip to content

Commit

Permalink
fix: Move redis cache outside of pg transaction (#17509)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 authored Sep 19, 2023
1 parent 240b545 commit 1cdcedb
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 64 deletions.
15 changes: 1 addition & 14 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1348,8 +1348,7 @@ export class DB {
propertiesLastUpdatedAt: PropertiesLastUpdatedAt,
propertiesLastOperation: PropertiesLastOperation,
version: number,
tx?: TransactionClient,
options: { cache?: boolean } = { cache: true }
tx?: TransactionClient
): Promise<void> {
const result = await this.postgres.query(
tx ?? PostgresUse.COMMON_WRITE,
Expand All @@ -1375,13 +1374,6 @@ export class DB {
if (result.rows.length === 0) {
throw new RaceConditionError('Parallel posthog_group inserts, retry')
}

if (options?.cache) {
await this.updateGroupCache(teamId, groupTypeIndex, groupKey, {
properties: groupProperties,
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouse),
})
}
}

public async updateGroup(
Expand Down Expand Up @@ -1418,11 +1410,6 @@ export class DB {
],
'upsertGroup'
)

await this.updateGroupCache(teamId, groupTypeIndex, groupKey, {
properties: groupProperties,
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouse),
})
}

public async upsertGroupClickhouse(
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/worker/ingestion/process-event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ export class EventsProcessor {
err,
})
}
// Adds group_0 etc values to properties
properties = await addGroupProperties(team.id, properties, this.groupTypeManager)

if (event === '$groupidentify') {
Expand Down
26 changes: 16 additions & 10 deletions plugin-server/src/worker/ingestion/properties-updater.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import { Properties } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'

import { Group, GroupTypeIndex, TeamId } from '../../types'
import { Group, GroupTypeIndex, TeamId, TimestampFormat } from '../../types'
import { DB } from '../../utils/db/db'
import { PostgresUse } from '../../utils/db/postgres'
import { RaceConditionError } from '../../utils/utils'
import { castTimestampOrNow, RaceConditionError } from '../../utils/utils'

interface PropertiesUpdate {
updated: boolean
Expand Down Expand Up @@ -70,14 +70,20 @@ export async function upsertGroup(
)

if (propertiesUpdate.updated) {
await db.upsertGroupClickhouse(
teamId,
groupTypeIndex,
groupKey,
propertiesUpdate.properties,
createdAt,
version
)
await Promise.all([
db.updateGroupCache(teamId, groupTypeIndex, groupKey, {
properties: propertiesUpdate.properties,
created_at: castTimestampOrNow(createdAt, TimestampFormat.ClickHouse),
}),
db.upsertGroupClickhouse(
teamId,
groupTypeIndex,
groupKey,
propertiesUpdate.properties,
createdAt,
version
),
])
}
} catch (error) {
if (error instanceof RaceConditionError) {
Expand Down
40 changes: 0 additions & 40 deletions plugin-server/tests/main/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -631,46 +631,6 @@ describe('DB', () => {
version: 2,
})
})

describe('with caching', () => {
it('insertGroup() and updateGroup() update cache', async () => {
expect(await fetchGroupCache(2, 0, 'group_key')).toEqual(null)

await db.insertGroup(
2,
0,
'group_key',
{ prop: 'val' },
TIMESTAMP,
{ prop: TIMESTAMP.toISO() },
{ prop: PropertyUpdateOperation.Set },
1,
undefined,
{ cache: true }
)

expect(await fetchGroupCache(2, 0, 'group_key')).toEqual({
created_at: CLICKHOUSE_TIMESTAMP,
properties: { prop: 'val' },
})

await db.updateGroup(
2,
0,
'group_key',
{ prop: 'newVal', prop2: 2 },
TIMESTAMP,
{ prop: TIMESTAMP.toISO(), prop2: TIMESTAMP.toISO() },
{ prop: PropertyUpdateOperation.Set, prop2: PropertyUpdateOperation.Set },
2
)

expect(await fetchGroupCache(2, 0, 'group_key')).toEqual({
created_at: CLICKHOUSE_TIMESTAMP,
properties: { prop: 'newVal', prop2: 2 },
})
})
})
})

describe('updateGroupCache()', () => {
Expand Down

0 comments on commit 1cdcedb

Please sign in to comment.