Skip to content

Commit

Permalink
chore: Nuke promiseManager (#19094)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 authored Dec 5, 2023
1 parent f0aeefb commit 8f81c31
Show file tree
Hide file tree
Showing 10 changed files with 3 additions and 443 deletions.
26 changes: 2 additions & 24 deletions plugin-server/src/main/graphile-worker/graphile-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import { Pool } from 'pg'

import { EnqueuedJob, Hub } from '../../types'
import { instrument } from '../../utils/metrics'
import { runRetriableFunction } from '../../utils/retries'
import { status } from '../../utils/status'
import { createPostgresPool } from '../../utils/utils'
import { graphileEnqueueJobCounter } from './metrics'
Expand Down Expand Up @@ -60,36 +59,15 @@ export class GraphileWorker {
await this.migrate()
}

async enqueue(
jobName: string,
job: EnqueuedJob,
instrumentationContext?: InstrumentationContext,
retryOnFailure = false
): Promise<void> {
async enqueue(jobName: string, job: EnqueuedJob, instrumentationContext?: InstrumentationContext): Promise<void> {
const jobType = 'type' in job ? job.type : 'buffer'

let jobPayload: Record<string, any> = {}
if ('payload' in job) {
jobPayload = job.payload
}

let enqueueFn = () => this._enqueue(jobName, job)

// This branch will be removed once we implement a Kafka queue for all jobs
// as we've done for buffer events (see e.g. anonymous-event-buffer-consumer.ts)
if (retryOnFailure) {
enqueueFn = () =>
runRetriableFunction({
hub: this.hub,
metricName: `job_queues_enqueue_${jobName}`,
maxAttempts: 10,
retryBaseMs: 6000,
retryMultiplier: 2,
tryFn: async () => this._enqueue(jobName, job),
catchFn: () => status.error('🔴', 'Exhausted attempts to enqueue job.'),
payload: job,
})
}
const enqueueFn = () => this._enqueue(jobName, job)

await instrument(
this.hub.statsd,
Expand Down
2 changes: 0 additions & 2 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import { TeamManager } from './worker/ingestion/team-manager'
import { PluginsApiKeyManager } from './worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './worker/vm/extensions/helpers/root-acess-manager'
import { LazyPluginVM } from './worker/vm/lazy'
import { PromiseManager } from './worker/vm/promise-manager'

export { Element } from '@posthog/plugin-scaffold' // Re-export Element from scaffolding, for backwards compat.

Expand Down Expand Up @@ -263,7 +262,6 @@ export interface Hub extends PluginsServerConfig {
organizationManager: OrganizationManager
pluginsApiKeyManager: PluginsApiKeyManager
rootAccessManager: RootAccessManager
promiseManager: PromiseManager
eventsProcessor: EventsProcessor
appMetrics: AppMetrics
// geoip database, setup in workers
Expand Down
4 changes: 0 additions & 4 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import { status } from '../status'
import { createRedisPool, UUIDT } from '../utils'
import { PluginsApiKeyManager } from './../../worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './../../worker/vm/extensions/helpers/root-acess-manager'
import { PromiseManager } from './../../worker/vm/promise-manager'
import { DB } from './db'
import { KafkaProducerWrapper } from './kafka-producer-wrapper'
import { PostgresRouter } from './postgres'
Expand Down Expand Up @@ -135,8 +134,6 @@ export async function createHub(
status.warn('🪣', `Object storage could not be created`)
}

const promiseManager = new PromiseManager(serverConfig)

const db = new DB(
postgres,
redisPool,
Expand Down Expand Up @@ -195,7 +192,6 @@ export async function createHub(
organizationManager,
pluginsApiKeyManager,
rootAccessManager,
promiseManager,
conversionBufferEnabledTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
poeEmbraceJoinForTeams: buildIntegerMatcher(process.env.POE_EMBRACE_JOIN_FOR_TEAMS, true),
Expand Down
115 changes: 0 additions & 115 deletions plugin-server/src/utils/retries.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import { RetryError } from '@posthog/plugin-scaffold'

import { runInTransaction } from '../sentry'
import { Hub } from '../types'
import { status } from '../utils/status'
import { AppMetricIdentifier, ErrorWithContext } from '../worker/ingestion/app-metrics'
import { sleep } from './utils'

// Simple retries in our code
Expand Down Expand Up @@ -39,116 +34,6 @@ export function getNextRetryMs(baseMs: number, multiplier: number, attempt: numb
return baseMs * multiplier ** (attempt - 1)
}

export interface RetriableFunctionDefinition {
payload: Record<string, any>
tryFn: () => void | Promise<void>
catchFn?: (error: Error | RetryError) => void | Promise<void>
finallyFn?: (attempts: number) => void | Promise<void>
}

export interface RetryParams {
maxAttempts: number
retryBaseMs: number
retryMultiplier: number
}

export interface MetricsDefinition {
metricName: string
appMetric?: AppMetricIdentifier
appMetricErrorContext?: Omit<ErrorWithContext, 'error'>
}

export type RetriableFunctionPayload = RetriableFunctionDefinition &
Partial<RetryParams> &
MetricsDefinition & { hub: Hub }

function iterateRetryLoop(retriableFunctionPayload: RetriableFunctionPayload, attempt = 1): Promise<void> {
const {
metricName,
hub,
payload,
tryFn,
catchFn,
finallyFn,
maxAttempts = process.env.PLUGINS_RETRY_ATTEMPTS ? parseInt(process.env.PLUGINS_RETRY_ATTEMPTS) : 3,
retryBaseMs = 3000,
retryMultiplier = 2,
appMetric,
appMetricErrorContext,
} = retriableFunctionPayload
return runInTransaction(
{
name: 'retryLoop',
op: metricName,
description: '?',
data: {
metricName,
payload,
attempt,
},
},
async () => {
let nextIterationPromise: Promise<void> | undefined
try {
await tryFn()
if (appMetric) {
await hub.appMetrics.queueMetric({
...appMetric,
successes: attempt == 1 ? 1 : 0,
successesOnRetry: attempt == 1 ? 0 : 1,
})
}
} catch (error) {
if (error instanceof RetryError) {
error._attempt = attempt
error._maxAttempts = maxAttempts
}
if (error instanceof RetryError && attempt < maxAttempts) {
const nextRetryMs = getNextRetryMs(retryBaseMs, retryMultiplier, attempt)
nextIterationPromise = new Promise((resolve, reject) =>
setTimeout(() => {
// This is not awaited directly so that attempts beyond the first one don't stall the payload queue
iterateRetryLoop(retriableFunctionPayload, attempt + 1)
.then(resolve)
.catch(reject)
}, nextRetryMs)
)
hub.promiseManager.trackPromise(nextIterationPromise, 'retries')
await hub.promiseManager.awaitPromisesIfNeeded()
} else {
await catchFn?.(error)
if (appMetric) {
await hub.appMetrics.queueError(
{
...appMetric,
failures: 1,
},
{
error,
...appMetricErrorContext,
}
)
}
}
}
if (!nextIterationPromise) {
await finallyFn?.(attempt)
}
}
)
}

/** Run function with `RetryError` handling. */
export async function runRetriableFunction(retriableFunctionPayload: RetriableFunctionPayload): Promise<void> {
const { finallyFn } = retriableFunctionPayload
await iterateRetryLoop({
...retriableFunctionPayload,
finallyFn: async (attempts) => {
await finallyFn?.(attempts)
},
})
}

/**
* Retry a function, respecting `error.isRetriable`.
*/
Expand Down
35 changes: 0 additions & 35 deletions plugin-server/src/worker/vm/promise-manager.ts

This file was deleted.

17 changes: 0 additions & 17 deletions plugin-server/tests/main/jobs/graphile-worker.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { GraphileWorker } from '../../../src/main/graphile-worker/graphile-worker'
import { EnqueuedJob, Hub, JobName } from '../../../src/types'
import { runRetriableFunction } from '../../../src/utils/retries'
import { UUID } from '../../../src/utils/utils'
import { PromiseManager } from '../../../src/worker/vm/promise-manager'

jest.mock('../../../src/utils/retries')
jest.mock('../../../src/utils/status')
Expand All @@ -20,7 +18,6 @@ jest.mock('graphile-worker', () => {

const mockHub: Hub = {
instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'),
promiseManager: new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any),
JOB_QUEUES: 'fs',
} as Hub

Expand All @@ -36,22 +33,8 @@ describe('graphileWorker', () => {
jest.spyOn(graphileWorker, '_enqueue').mockImplementation(() => Promise.resolve())
await graphileWorker.enqueue(JobName.PLUGIN_JOB, { type: 'foo' } as EnqueuedJob)

expect(runRetriableFunction).not.toHaveBeenCalled()
expect(graphileWorker._enqueue).toHaveBeenCalledWith(JobName.PLUGIN_JOB, { type: 'foo' })
})

it('calls runRetriableFunction with the correct parameters if retryOnFailure=true', async () => {
jest.spyOn(graphileWorker, '_enqueue').mockImplementation(() => Promise.resolve())
await graphileWorker.enqueue(JobName.PLUGIN_JOB, { type: 'foo' } as EnqueuedJob, undefined, true)
expect(runRetriableFunction).toHaveBeenCalled()
const runRetriableFunctionArgs = jest.mocked(runRetriableFunction).mock.calls[0][0]

expect(runRetriableFunctionArgs.metricName).toEqual('job_queues_enqueue_pluginJob')
expect(runRetriableFunctionArgs.payload).toEqual({ type: 'foo' })
expect(runRetriableFunctionArgs.tryFn).not.toBeUndefined()
expect(runRetriableFunctionArgs.catchFn).not.toBeUndefined()
expect(runRetriableFunctionArgs.finallyFn).toBeUndefined()
})
})

describe('syncState()', () => {
Expand Down
2 changes: 0 additions & 2 deletions plugin-server/tests/main/jobs/schedule.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ import { runScheduledTasks } from '../../../src/main/graphile-worker/schedule'
import { Hub } from '../../../src/types'
import { KafkaProducerWrapper } from '../../../src/utils/db/kafka-producer-wrapper'
import { UUID } from '../../../src/utils/utils'
import { PromiseManager } from '../../../src/worker/vm/promise-manager'

const mockHub: Hub = {
instanceId: new UUID('F8B2F832-6639-4596-ABFC-F9664BC88E84'),
promiseManager: new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any),
JOB_QUEUES: 'fs',
} as Hub

Expand Down
Loading

0 comments on commit 8f81c31

Please sign in to comment.