Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: PostHog/posthog
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: c3b65af89d52365821784871d0ca87ea8b27935f
Choose a base ref
..
head repository: PostHog/posthog
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 6c4b2d9b9c17dea692f6270e0b1d7cdf8384f608
Choose a head ref
11 changes: 8 additions & 3 deletions plugin-server/src/utils/periodic-task.ts
Original file line number Diff line number Diff line change
@@ -17,10 +17,15 @@ export class PeriodicTask {
try {
status.debug('🔄', 'Periodic task starting...', { task })
while (!this.abortController.signal.aborted) {
const startTimeMs = +Date.now()
const startTimeMs = Date.now()
await task()
const waitTimeMs = Math.max(intervalMs - startTimeMs, minimumWaitMs)
status.debug('🔄', `Next evaluation in ${waitTimeMs / 1000}s`, { task })
const durationMs = Date.now() - startTimeMs
const waitTimeMs = Math.max(intervalMs - durationMs, minimumWaitMs)
status.debug(
'🔄',
`Task completed in ${durationMs / 1000}s, next evaluation in ${waitTimeMs / 1000}s`,
{ task }
)
await Promise.race([sleep(waitTimeMs), abortRequested])
}
status.info('✅', 'Periodic task stopped by request.', { task })
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ import { PluginEvent } from '@posthog/plugin-scaffold'
import { DateTime } from 'luxon'
import { Person } from 'types'

import { defaultConfig } from '../../../config/config'
import { normalizeEvent } from '../../../utils/event'
import { status } from '../../../utils/status'
import { DeferredPersonOverrideWriter, PersonOverrideWriter, PersonState } from '../person-state'
@@ -25,7 +24,7 @@ export async function processPersonsStep(

let overridesWriter: PersonOverrideWriter | DeferredPersonOverrideWriter | undefined = undefined
if (runner.poEEmbraceJoin) {
if (defaultConfig.POE_DEFERRED_WRITES_ENABLED) {
if (runner.hub.POE_DEFERRED_WRITES_ENABLED) {
overridesWriter = new DeferredPersonOverrideWriter(runner.hub.db.postgres)
} else {
overridesWriter = new PersonOverrideWriter(runner.hub.db.postgres)
2 changes: 1 addition & 1 deletion plugin-server/src/worker/ingestion/person-state.ts
Original file line number Diff line number Diff line change
@@ -4,13 +4,13 @@ import { ProducerRecord } from 'kafkajs'
import { DateTime } from 'luxon'
import { Counter } from 'prom-client'
import { KafkaProducerWrapper } from 'utils/db/kafka-producer-wrapper'
import { PeriodicTask } from 'utils/periodic-task'

import { KAFKA_PERSON_OVERRIDE } from '../../config/kafka-topics'
import { Person, PropertyUpdateOperation, TimestampFormat } from '../../types'
import { DB } from '../../utils/db/db'
import { PostgresRouter, PostgresUse, TransactionClient } from '../../utils/db/postgres'
import { timeoutGuard } from '../../utils/db/utils'
import { PeriodicTask } from '../../utils/periodic-task'
import { promiseRetry } from '../../utils/retries'
import { status } from '../../utils/status'
import { castTimestampOrNow, UUIDT } from '../../utils/utils'