From 1ed7987e06e230bc9a7ae6da76c777bac3f72fee Mon Sep 17 00:00:00 2001 From: Tiina Turban Date: Thu, 16 Nov 2023 16:11:36 +0100 Subject: [PATCH] feat: Remove exportEvents --- .../functional_tests/exports-v1.test.ts | 122 +- plugin-server/src/config/config.ts | 4 - .../on-event-handler-consumer.ts | 8 +- plugin-server/src/types.ts | 12 - plugin-server/src/utils/event.ts | 19 +- .../src/worker/ingestion/app-metrics.ts | 2 +- .../src/worker/plugins/loadPluginsFromDB.ts | 2 +- plugin-server/src/worker/vm/capabilities.ts | 4 +- plugin-server/src/worker/vm/lazy.ts | 4 - .../src/worker/vm/upgrades/export-events.ts | 138 -- .../export-historical-events-v2.ts | 772 ----------- .../export-historical-events.ts | 372 ----- .../vm/upgrades/utils/export-events-buffer.ts | 93 -- .../upgrades/utils/fetchEventsForInterval.ts | 105 -- .../src/worker/vm/upgrades/utils/utils.ts | 79 -- plugin-server/src/worker/vm/vm.ts | 17 - .../tests/historical-export-e2e.test.ts | 148 -- plugin-server/tests/worker/buffer.test.ts | 73 - .../tests/worker/capabilities.test.ts | 16 +- plugin-server/tests/worker/plugins.test.ts | 22 - plugin-server/tests/worker/vm.test.ts | 224 --- .../export-historical-events-v2.test.ts.snap | 52 - .../export-historical-events-v2.test.ts | 1221 ----------------- .../export-historical-events.test.ts | 127 -- .../utils/fetchEventsForInterval.test.ts | 116 -- 25 files changed, 40 insertions(+), 3712 deletions(-) delete mode 100644 plugin-server/src/worker/vm/upgrades/export-events.ts delete mode 100644 plugin-server/src/worker/vm/upgrades/historical-export/export-historical-events-v2.ts delete mode 100644 plugin-server/src/worker/vm/upgrades/historical-export/export-historical-events.ts delete mode 100644 plugin-server/src/worker/vm/upgrades/utils/export-events-buffer.ts delete mode 100644 plugin-server/src/worker/vm/upgrades/utils/fetchEventsForInterval.ts delete mode 100644 plugin-server/src/worker/vm/upgrades/utils/utils.ts delete mode 100644 plugin-server/tests/historical-export-e2e.test.ts delete mode 100644 plugin-server/tests/worker/vm/upgrades/historical-export/__snapshots__/export-historical-events-v2.test.ts.snap delete mode 100644 plugin-server/tests/worker/vm/upgrades/historical-export/export-historical-events-v2.test.ts delete mode 100644 plugin-server/tests/worker/vm/upgrades/historical-export/export-historical-events.test.ts delete mode 100644 plugin-server/tests/worker/vm/upgrades/utils/fetchEventsForInterval.test.ts diff --git a/plugin-server/functional_tests/exports-v1.test.ts b/plugin-server/functional_tests/exports-v1.test.ts index bf2dedd865e05..37bed7e22b694 100644 --- a/plugin-server/functional_tests/exports-v1.test.ts +++ b/plugin-server/functional_tests/exports-v1.test.ts @@ -3,7 +3,6 @@ import { createServer, Server } from 'http' import { UUIDT } from '../src/utils/utils' import { capture, createAndReloadPluginConfig, createOrganization, createPlugin, createTeam } from './api' import { waitForExpect } from './expectations' -import { produce } from './kafka' let organizationId: string let server: Server @@ -43,10 +42,10 @@ test.concurrent(`exports: exporting events on ingestion`, async () => { plugin_type: 'source', is_global: false, source__index_ts: ` - export const exportEvents = async (events, { global, config }) => { + export const onEvent = async (event, { global, config }) => { await fetch( "http://localhost:${server.address()?.port}/${teamId}", - {method: "POST", body: JSON.stringify(events)} + {method: "POST", body: JSON.stringify(event)} ) } `, @@ -67,14 +66,11 @@ test.concurrent(`exports: exporting events on ingestion`, async () => { }, }) - // Then check that the exportEvents function was called + // Then check that the onEvent function was called await waitForExpect( () => { - const exportEvents = webHookCalledWith[`/${teamId}`] - expect(exportEvents.length).toBeGreaterThan(0) - const exportedEvents = exportEvents[0] - - expect(exportedEvents).toEqual([ + const onEvents = webHookCalledWith[`/${teamId}`] + expect(onEvents).toEqual([ expect.objectContaining({ distinct_id: distinctId, team_id: teamId, @@ -102,10 +98,10 @@ test.concurrent(`exports: exporting $autocapture events on ingestion`, async () plugin_type: 'source', is_global: false, source__index_ts: ` - export const exportEvents = async (events, { global, config }) => { + export const onEvent = async (event, { global, config }) => { await fetch( "http://localhost:${server.address()?.port}/${teamId}", - {method: "POST", body: JSON.stringify(events)} + {method: "POST", body: JSON.stringify(event)} ) } `, @@ -128,13 +124,11 @@ test.concurrent(`exports: exporting $autocapture events on ingestion`, async () }, }) - // Then check that the exportEvents function was called + // Then check that the onEvent function was called await waitForExpect( () => { - const exportEvents = webHookCalledWith[`/${teamId}`] - expect(exportEvents.length).toBeGreaterThan(0) - const exportedEvents = exportEvents[0] - expect(exportedEvents).toEqual([ + const onEvents = webHookCalledWith[`/${teamId}`] + expect(onEvents).toEqual([ expect.objectContaining({ distinct_id: distinctId, team_id: teamId, @@ -163,99 +157,3 @@ test.concurrent(`exports: exporting $autocapture events on ingestion`, async () 1_000 ) }) - -test.concurrent(`exports: historical exports`, async () => { - const teamId = await createTeam(organizationId) - const distinctId = new UUIDT().toString() - const uuid = new UUIDT().toString() - - const plugin = await createPlugin({ - organization_id: organizationId, - name: 'export plugin', - plugin_type: 'source', - is_global: false, - source__index_ts: ` - export const exportEvents = async (events, { global, config }) => { - await fetch( - "http://localhost:${server.address()?.port}/${teamId}", - {method: "POST", body: JSON.stringify(events)} - ) - } - `, - }) - const pluginConfig = await createAndReloadPluginConfig(teamId, plugin.id) - - // First let's capture an event and wait for it to be ingested so - // so we can check that the historical event is the same as the one - // passed to processEvent on initial ingestion. - await capture({ - teamId, - distinctId, - uuid, - event: '$autocapture', - properties: { - name: 'hehe', - uuid: new UUIDT().toString(), - $elements: [{ tag_name: 'div', nth_child: 1, nth_of_type: 2, $el_text: '💻' }], - }, - }) - - // Then check that the exportEvents function was called - const [exportedEvent] = await waitForExpect( - () => { - const exportEvents = webHookCalledWith[`/${teamId}`] - expect(exportEvents.length).toBeGreaterThan(0) - return exportEvents[0] - }, - 60_000, - 1_000 - ) - - // NOTE: the frontend doesn't actually push to this queue but rather - // adds directly to PostgreSQL using the graphile-worker stored - // procedure `add_job`. I'd rather keep these tests graphile - // unaware. - await produce({ - topic: 'jobs', - message: Buffer.from( - JSON.stringify({ - type: 'Export historical events', - pluginConfigId: pluginConfig.id, - pluginConfigTeam: teamId, - payload: { - dateFrom: new Date(Date.now() - 60000).toISOString(), - dateTo: new Date(Date.now()).toISOString(), - }, - }) - ), - key: teamId.toString(), - }) - - // Then check that the exportEvents function was called with the - // same data that was used with the non-historical export, with the - // additions of details related to the historical export. - await waitForExpect( - () => { - const historicallyExportedEvents = webHookCalledWith[`/${teamId}`].filter((events) => - events.some((event) => event.properties['$$is_historical_export_event']) - ) - expect(historicallyExportedEvents.length).toBeGreaterThan(0) - - const historicallyExportedEvent = historicallyExportedEvents[0] - expect(historicallyExportedEvent).toEqual([ - expect.objectContaining({ - ...exportedEvent, - ip: '', // NOTE: for some reason this is "" when exported historically, but null otherwise. - properties: { - ...exportedEvent.properties, - $$is_historical_export_event: true, - $$historical_export_timestamp: expect.any(String), - $$historical_export_source_db: 'clickhouse', - }, - }), - ]) - }, - 60_000, - 1_000 - ) -}) diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index aa2ada4a10e49..434cb21eb2685 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -124,10 +124,6 @@ export function getDefaultConfig(): PluginsServerConfig { PLUGIN_SERVER_MODE: null, PLUGIN_LOAD_SEQUENTIALLY: false, KAFKAJS_LOG_LEVEL: 'WARN', - HISTORICAL_EXPORTS_ENABLED: true, - HISTORICAL_EXPORTS_MAX_RETRY_COUNT: 15, - HISTORICAL_EXPORTS_INITIAL_FETCH_TIME_WINDOW: 10 * 60 * 1000, - HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: 1.5, APP_METRICS_GATHERED_FOR_ALL: isDevEnv() ? true : false, MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0, USE_KAFKA_FOR_SCHEDULED_TASKS: true, diff --git a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts index 66e5385baae60..80f4347bcb383 100644 --- a/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/on-event-handler-consumer.ts @@ -24,9 +24,7 @@ export const startAsyncOnEventHandlerConsumer = async ({ }) => { /* Consumes analytics events from the Kafka topic `clickhouse_events_json` - and processes any onEvent plugin handlers configured for the team. This - also includes `exportEvents` handlers defined in plugins as these are - also handled via modifying `onEvent` to call `exportEvents`. + and processes any onEvent plugin handlers configured for the team. At the moment this is just a wrapper around `IngestionConsumer`. We may want to further remove that abstraction in the future. @@ -61,9 +59,7 @@ export const startAsyncWebhooksHandlerConsumer = async ({ }) => { /* Consumes analytics events from the Kafka topic `clickhouse_events_json` - and processes any onEvent plugin handlers configured for the team. This - also includes `exportEvents` handlers defined in plugins as these are - also handled via modifying `onEvent` to call `exportEvents`. + and processes any onEvent plugin handlers configured for the team. At the moment this is just a wrapper around `IngestionConsumer`. We may want to further remove that abstraction in the future. diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 786c4d7decc8b..68bef340d1e06 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -2,7 +2,6 @@ import { ReaderModel } from '@maxmind/geoip2-node' import ClickHouse from '@posthog/clickhouse' import { Element, - Meta, PluginAttachment, PluginConfigSchema, PluginEvent, @@ -193,10 +192,6 @@ export interface PluginsServerConfig { PLUGIN_SERVER_MODE: PluginServerMode | null PLUGIN_LOAD_SEQUENTIALLY: boolean // could help with reducing memory usage spikes on startup KAFKAJS_LOG_LEVEL: 'NOTHING' | 'DEBUG' | 'INFO' | 'WARN' | 'ERROR' - HISTORICAL_EXPORTS_ENABLED: boolean // enables historical exports for export apps - HISTORICAL_EXPORTS_MAX_RETRY_COUNT: number - HISTORICAL_EXPORTS_INITIAL_FETCH_TIME_WINDOW: number - HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: number APP_METRICS_GATHERED_FOR_ALL: boolean // whether to gather app metrics for all teams MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: number USE_KAFKA_FOR_SCHEDULED_TASKS: boolean // distribute scheduled tasks across the scheduler workers @@ -490,7 +485,6 @@ export type VMMethods = { teardownPlugin?: () => Promise getSettings?: () => PluginSettings onEvent?: (event: ProcessedPluginEvent) => Promise - exportEvents?: (events: PluginEvent[]) => Promise composeWebhook?: (event: PostHogEvent) => Webhook | null processEvent?: (event: PluginEvent) => Promise } @@ -526,12 +520,6 @@ export interface PluginConfigVMResponse { usedImports: Set } -export interface PluginConfigVMInternalResponse { - methods: VMMethods - tasks: Record> - meta: M -} - export interface EventUsage { event: string usage_count: number | null diff --git a/plugin-server/src/utils/event.ts b/plugin-server/src/utils/event.ts index ca6ff219880c3..1b97c1baa7bf4 100644 --- a/plugin-server/src/utils/event.ts +++ b/plugin-server/src/utils/event.ts @@ -2,8 +2,7 @@ import { PluginEvent, PostHogEvent, ProcessedPluginEvent } from '@posthog/plugin import { DateTime } from 'luxon' import { Message } from 'node-rdkafka' -import { ClickHouseEvent, PipelineEvent, PostIngestionEvent, RawClickHouseEvent } from '../types' -import { convertDatabaseElementsToRawElements } from '../worker/vm/upgrades/utils/fetchEventsForInterval' +import { ClickHouseEvent, Element, PipelineEvent, PostIngestionEvent, RawClickHouseEvent } from '../types' import { chainToElements } from './db/elements-chain' import { personInitialAndUTMProperties } from './db/utils' import { @@ -12,6 +11,22 @@ import { clickHouseTimestampToISO, } from './utils' +interface RawElement extends Element { + $el_text?: string +} + +const convertDatabaseElementsToRawElements = (elements: RawElement[]): RawElement[] => { + for (const element of elements) { + if (element.attributes && element.attributes.attr__class) { + element.attr_class = element.attributes.attr__class + } + if (element.text) { + element.$el_text = element.text + } + } + return elements +} + export function convertToProcessedPluginEvent(event: PostIngestionEvent): ProcessedPluginEvent { return { distinct_id: event.distinctId, diff --git a/plugin-server/src/worker/ingestion/app-metrics.ts b/plugin-server/src/worker/ingestion/app-metrics.ts index 36791e235b242..d8f52a7401150 100644 --- a/plugin-server/src/worker/ingestion/app-metrics.ts +++ b/plugin-server/src/worker/ingestion/app-metrics.ts @@ -15,7 +15,7 @@ export interface AppMetricIdentifier { pluginConfigId: number jobId?: string // Keep in sync with posthog/queries/app_metrics/serializers.py - category: 'processEvent' | 'onEvent' | 'exportEvents' | 'scheduledTask' | 'webhook' | 'composeWebhook' + category: 'processEvent' | 'onEvent' | 'scheduledTask' | 'webhook' | 'composeWebhook' } export interface AppMetric extends AppMetricIdentifier { diff --git a/plugin-server/src/worker/plugins/loadPluginsFromDB.ts b/plugin-server/src/worker/plugins/loadPluginsFromDB.ts index 282a20389882c..3f556b7e6b160 100644 --- a/plugin-server/src/worker/plugins/loadPluginsFromDB.ts +++ b/plugin-server/src/worker/plugins/loadPluginsFromDB.ts @@ -70,7 +70,7 @@ export async function loadPluginsFromDB( let method = undefined if (plugin.capabilities?.methods) { const methods = plugin.capabilities.methods - if (methods?.some((method) => [PluginMethod.onEvent.toString(), 'exportEvents'].includes(method))) { + if (methods?.some((method) => [PluginMethod.onEvent.toString()].includes(method))) { method = PluginMethod.onEvent } else if (methods?.some((method) => [PluginMethod.composeWebhook.toString()].includes(method))) { method = PluginMethod.composeWebhook diff --git a/plugin-server/src/worker/vm/capabilities.ts b/plugin-server/src/worker/vm/capabilities.ts index d4144665b8735..bca6cc36f1e4a 100644 --- a/plugin-server/src/worker/vm/capabilities.ts +++ b/plugin-server/src/worker/vm/capabilities.ts @@ -47,9 +47,7 @@ function shouldSetupPlugin(serverCapability: keyof PluginServerCapabilities, plu return (pluginCapabilities.jobs || []).length > 0 } if (serverCapability === 'processAsyncOnEventHandlers') { - return pluginCapabilities.methods?.some((method) => - ['onEvent', 'exportEvents', 'composeWebhook'].includes(method) - ) + return pluginCapabilities.methods?.some((method) => ['onEvent', 'composeWebhook'].includes(method)) } return false diff --git a/plugin-server/src/worker/vm/lazy.ts b/plugin-server/src/worker/vm/lazy.ts index 85cfde7ccf64a..459edd0fcc3a9 100644 --- a/plugin-server/src/worker/vm/lazy.ts +++ b/plugin-server/src/worker/vm/lazy.ts @@ -68,10 +68,6 @@ export class LazyPluginVM { this.initVm() } - public async getExportEvents(): Promise { - return await this.getVmMethod('exportEvents') - } - public async getOnEvent(): Promise { return await this.getVmMethod('onEvent') } diff --git a/plugin-server/src/worker/vm/upgrades/export-events.ts b/plugin-server/src/worker/vm/upgrades/export-events.ts deleted file mode 100644 index c41d417362963..0000000000000 --- a/plugin-server/src/worker/vm/upgrades/export-events.ts +++ /dev/null @@ -1,138 +0,0 @@ -import { Plugin, PluginEvent, PluginMeta, ProcessedPluginEvent } from '@posthog/plugin-scaffold' -import { Counter } from 'prom-client' - -import { Hub, PluginConfig, PluginConfigVMInternalResponse, PluginTaskType } from '../../../types' -import { isTestEnv } from '../../../utils/env-utils' -import { stringClamp } from '../../../utils/utils' -import { ExportEventsBuffer } from './utils/export-events-buffer' - -export const MAXIMUM_RETRIES = 3 -const EXPORT_BUFFER_BYTES_MINIMUM = 1 -const EXPORT_BUFFER_BYTES_DEFAULT = 900 * 1024 // 900 KiB -const EXPORT_BUFFER_BYTES_MAXIMUM = 100 * 1024 * 1024 -const EXPORT_BUFFER_SECONDS_MINIMUM = 1 -const EXPORT_BUFFER_SECONDS_MAXIMUM = 600 -const EXPORT_BUFFER_SECONDS_DEFAULT = isTestEnv() ? 0 : 10 - -export const appRetriesCounter = new Counter({ - name: 'export_app_retries', - help: 'Count of events retries processing onEvent apps, by team and plugin.', - labelNames: ['team_id', 'plugin_id'], -}) - -type ExportEventsUpgrade = Plugin<{ - global: { - exportEventsBuffer: ExportEventsBuffer - exportEventsToIgnore: Set - exportEventsWithRetry: (payload: ExportEventsJobPayload, meta: PluginMeta) => Promise - } - config: { - exportEventsBufferBytes: string - exportEventsBufferSeconds: string - exportEventsToIgnore: string - } - jobs: { - exportEventsWithRetry: ExportEventsJobPayload - } -}> - -interface ExportEventsJobPayload extends Record { - batch: PluginEvent[] - batchId: number - retriesPerformedSoFar: number -} - -/** - * Inject export abstraction code into plugin VM if it has method `exportEvents`: - * - add `global`/`config`/`jobs` stuff specified in the `ExportEventsUpgrade` type above, - * - patch `onEvent` with code to add the event to a buffer. - */ -export function upgradeExportEvents( - hub: Hub, - pluginConfig: PluginConfig, - response: PluginConfigVMInternalResponse> -): void { - const { methods, tasks, meta } = response - - const uploadBytes = stringClamp( - meta.config.exportEventsBufferBytes, - EXPORT_BUFFER_BYTES_DEFAULT, - EXPORT_BUFFER_BYTES_MINIMUM, - EXPORT_BUFFER_BYTES_MAXIMUM - ) - const uploadSeconds = stringClamp( - meta.config.exportEventsBufferSeconds, - EXPORT_BUFFER_SECONDS_DEFAULT, - EXPORT_BUFFER_SECONDS_MINIMUM, - EXPORT_BUFFER_SECONDS_MAXIMUM - ) - - meta.global.exportEventsToIgnore = new Set( - meta.config.exportEventsToIgnore - ? meta.config.exportEventsToIgnore.split(',').map((event: string) => event.trim()) - : null - ) - - meta.global.exportEventsBuffer = new ExportEventsBuffer(hub, pluginConfig, { - limit: uploadBytes, - timeoutSeconds: uploadSeconds, - onFlush: async (batch) => { - const jobPayload = { - batch, - batchId: Math.floor(Math.random() * 1000000), - retriesPerformedSoFar: 0, - } - // Running the first export code directly, without a job in between - await meta.global.exportEventsWithRetry(jobPayload, meta) - }, - }) - - meta.global.exportEventsWithRetry = async ( - payload: ExportEventsJobPayload, - // eslint-disable-next-line @typescript-eslint/no-unused-vars - meta: PluginMeta - ) => { - try { - await methods.exportEvents?.(payload.batch) - await hub.appMetrics.queueMetric({ - teamId: pluginConfig.team_id, - pluginConfigId: pluginConfig.id, - category: 'exportEvents', - successes: payload.batch.length, - }) - } catch (err) { - // We've disabled all retries as we move exportEvents to a new system - await hub.appMetrics.queueError( - { - teamId: pluginConfig.team_id, - pluginConfigId: pluginConfig.id, - category: 'exportEvents', - failures: payload.batch.length, - }, - { - error: err, - eventCount: payload.batch.length, - } - ) - } - } - - tasks.job['exportEventsWithRetry'] = { - name: 'exportEventsWithRetry', - type: PluginTaskType.Job, - exec: (payload) => meta.global.exportEventsWithRetry(payload as ExportEventsJobPayload, meta), - } - - const oldOnEvent = methods.onEvent - methods.onEvent = async (event: ProcessedPluginEvent) => { - if (!meta.global.exportEventsToIgnore.has(event.event)) { - await meta.global.exportEventsBuffer.add(event, JSON.stringify(event).length) - } - await oldOnEvent?.(event) - } - - const oldTeardownPlugin = methods.teardownPlugin - methods.teardownPlugin = async () => { - await Promise.all([meta.global.exportEventsBuffer.flush(), oldTeardownPlugin?.()]) - } -} diff --git a/plugin-server/src/worker/vm/upgrades/historical-export/export-historical-events-v2.ts b/plugin-server/src/worker/vm/upgrades/historical-export/export-historical-events-v2.ts deleted file mode 100644 index 2a5cc2816cf2b..0000000000000 --- a/plugin-server/src/worker/vm/upgrades/historical-export/export-historical-events-v2.ts +++ /dev/null @@ -1,772 +0,0 @@ -/* -Historical exports (v2) work the following way: - -- User triggers a `Export historical events V2` job from the UI. - This saves the time range as the running export with parallelism options. -- `runEveryMinute` acts as a coordinator: It takes the time range job runs on, splits it into chunks, - ensures that enough pieces are running, reports progress and finalizes the export. - - If a certain running chunk hasn't reported progress in a while, it is also restarted. -- `exportHistoricalEvents` job is responsible for exporting data between particular start and end points (chunk) - - It tracks its progress under `statusKey` - - It dynamically resizes the time window we fetch data to minimize jobs that need to be scheduled and clickhouse queries - - It calls plugins `exportEvents` with each batch of events it finds - - It handles retries by retrying RetryErrors up to 15 times - -Error handling: -- Failing to fetch events from clickhouse stops the export outright -- For every batch of events fetched, `exportEvents` RetryError is retried up to 15 times -- Unknown errors raised by `exportEvents` cause export to fail -- We periodically check whether a running chunk has made progress. If not, the chunk is restarted - -Note: -- parallelism is only settable by superusers to avoid abuse. -- Double-processing might be possible if a task is queued in graphile worker for a long time -*/ - -import { Plugin, PluginEvent, PluginMeta, RetryError } from '@posthog/plugin-scaffold' -import * as Sentry from '@sentry/node' -import { DateTime } from 'luxon' - -import { - Hub, - ISOTimestamp, - JobSpec, - PluginConfig, - PluginConfigVMInternalResponse, - PluginLogEntry, - PluginLogEntrySource, - PluginLogEntryType, - PluginTask, - PluginTaskType, -} from '../../../../types' -import { createPluginActivityLog } from '../../../../utils/db/activity-log' -import { processError } from '../../../../utils/db/error' -import { isTestEnv } from '../../../../utils/env-utils' -import { status } from '../../../../utils/status' -import { fetchEventsForInterval } from '../utils/fetchEventsForInterval' - -const TEN_MINUTES = 1000 * 60 * 10 -const TWELVE_HOURS = 1000 * 60 * 60 * 12 -export const EVENTS_PER_RUN_SMALL = 500 -export const EVENTS_PER_RUN_BIG = 10000 - -export const EXPORT_PARAMETERS_KEY = 'EXPORT_PARAMETERS' -export const EXPORT_COORDINATION_KEY = 'EXPORT_COORDINATION' - -export const INTERFACE_JOB_NAME = 'Export historical events V2' - -export const JOB_SPEC: JobSpec = { - payload: { - dateRange: { - title: 'Export date range', - type: 'daterange', - required: true, - }, - parallelism: { - title: 'Parallelism', - type: 'number', - default: 1, - staff_only: true, - }, - }, -} - -export interface TestFunctions { - exportHistoricalEvents: (payload: ExportHistoricalEventsJobPayload) => Promise - getTimestampBoundaries: (payload: ExportHistoricalEventsUIPayload) => [ISOTimestamp, ISOTimestamp] - nextCursor: (payload: ExportHistoricalEventsJobPayload, eventCount: number) => OffsetParams - coordinateHistoricalExport: (update?: CoordinationUpdate) => Promise - calculateCoordination: ( - params: ExportParams, - done: Array, - running: Array - ) => Promise - getExportDateRange: (params: ExportParams) => Array<[ISOTimestamp, ISOTimestamp]> - progressBar: (progress: number, length?: number) => string - stopExport: (params: ExportParams, message: string, status: 'success' | 'fail') => Promise - shouldResume: (status: ExportChunkStatus, now: number) => void -} - -export type ExportHistoricalEventsUpgradeV2 = Plugin<{ - global: { - _testFunctions: TestFunctions - } -}> - -export interface ExportHistoricalEventsJobPayload { - // Current cursor to what's being exported - timestampCursor: number - - // The lower and upper bound of the timestamp interval to be processed - startTime: number - endTime: number - - // The offset *within* a given timestamp interval - offset: number - - // how many retries a payload has had (max = 15) - retriesPerformedSoFar: number - - // used for ensuring only one "export task" is running if the server restarts - exportId: string | number - - // Time frame to fetch events for. - fetchTimeInterval: number - - // Key to report export status to - statusKey: string -} - -type OffsetParams = Pick - -export interface ExportHistoricalEventsUIPayload { - dateRange: [string, string] - parallelism?: number - // API-generated token - $job_id?: string -} - -export interface ExportParams { - id: string | number - parallelism: number - dateFrom: ISOTimestamp - dateTo: ISOTimestamp - abortMessage?: string -} - -interface CoordinationPayload { - running?: Array - done?: Array - progress?: number -} - -interface CoordinationUpdate { - hasChanges: boolean - done: Array - running: Array - toStartRunning: Array<[ISOTimestamp, ISOTimestamp]> - toResume: Array - progress: number - exportIsDone: boolean -} - -export interface ExportChunkStatus extends ExportHistoricalEventsJobPayload { - done: boolean - progress: number - // When was this status recorded - statusTime: number -} - -export function addHistoricalEventsExportCapabilityV2( - hub: Hub, - pluginConfig: PluginConfig, - response: PluginConfigVMInternalResponse> -) { - const { methods, tasks, meta } = response - - const currentPublicJobs = pluginConfig.plugin?.public_jobs || {} - - // Set the number of events to fetch per chunk, defaulting to 500 unless - // the plugin indicates bigger batches are preferable (notably plugins writing - // to blob storage with a fixed cost per batch), in which case we use 10000. - // - // It also has the other benefit of using fewer requests to ClickHouse. In - // its current implementation the querying logic for pulling pages of - // events from ClickHouse will read a much larger amount of data from disk - // than is required, due to us trying to order the dataset by `timestamp` - // and this not being included in the `sharded_events` table sort key. - let eventsPerRun = EVENTS_PER_RUN_SMALL - if (methods.getSettings && methods.getSettings()?.handlesLargeBatches) { - eventsPerRun = EVENTS_PER_RUN_BIG - } - - // If public job hasn't been registered or has changed, update it! - if ( - Object.keys(currentPublicJobs[INTERFACE_JOB_NAME]?.payload || {}).length != - Object.keys(JOB_SPEC.payload!).length - ) { - hub.promiseManager.trackPromise( - hub.db.addOrUpdatePublicJob(pluginConfig.plugin_id, INTERFACE_JOB_NAME, JOB_SPEC), - 'exports v2 addOrUpdatePublicJob' - ) - } - const oldRunEveryMinute = tasks.schedule.runEveryMinute - - tasks.job[INTERFACE_JOB_NAME] = { - name: INTERFACE_JOB_NAME, - type: PluginTaskType.Job, - exec: async (payload: ExportHistoricalEventsUIPayload) => { - const id = payload.$job_id || String(Math.floor(Math.random() * 10000 + 1)) - const parallelism = Number(payload.parallelism ?? 1) - const [dateFrom, dateTo] = getTimestampBoundaries(payload) - const params: ExportParams = { - id, - parallelism, - dateFrom, - dateTo, - } - - // only let one export run at a time - const alreadyRunningExport = await getExportParameters() - if (!!alreadyRunningExport) { - await stopExport(params, 'Export already running, not starting another.', 'fail', { keepEntry: true }) - return - } - - // Clear old (conflicting) storage - await meta.storage.del(EXPORT_COORDINATION_KEY) - await meta.storage.set(EXPORT_PARAMETERS_KEY, params) - - createLog(`Starting export ${dateFrom} - ${dateTo}. id=${id}, parallelism=${parallelism}`, { - type: PluginLogEntryType.Info, - }) - - await coordinateHistoricalExport() - }, - } as unknown as PluginTask // :KLUDGE: Work around typing limitations - - tasks.job['exportHistoricalEventsV2'] = { - name: 'exportHistoricalEventsV2', - type: PluginTaskType.Job, - exec: (payload) => exportHistoricalEvents(payload as ExportHistoricalEventsJobPayload), - } - - tasks.schedule.runEveryMinute = { - name: 'runEveryMinute', - type: PluginTaskType.Schedule, - exec: async () => { - await oldRunEveryMinute?.exec?.() - await coordinateHistoricalExport() - }, - // :TRICKY: We don't want to track app metrics for runEveryMinute for historical exports _unless_ plugin also has `runEveryMinute` - __ignoreForAppMetrics: !oldRunEveryMinute || !!oldRunEveryMinute.__ignoreForAppMetrics, - } - - async function coordinateHistoricalExport(update?: CoordinationUpdate) { - const params = await getExportParameters() - - if (!params) { - // No export running! - return - } - - if (params.abortMessage) { - // For manually triggering the export to abort - await stopExport(params, `Export aborted: ${params.abortMessage}`, 'fail') - return - } - - const { done, running } = (await meta.storage.get(EXPORT_COORDINATION_KEY, {})) as CoordinationPayload - update = update || (await calculateCoordination(params, done || [], running || [])) - - createLog(`Export progress: ${progressBar(update.progress)} (${Math.round(1000 * update.progress) / 10}%)`, { - type: PluginLogEntryType.Info, - }) - - if (update.exportIsDone) { - await stopExport(params, 'Export has finished! 💯', 'success') - return - } - - if (update.hasChanges) { - await Promise.all( - update.toStartRunning.map(async ([startDate, endDate]) => { - createLog(`Starting job to export ${startDate} to ${endDate}`, { type: PluginLogEntryType.Debug }) - - const payload: ExportHistoricalEventsJobPayload = { - timestampCursor: new Date(startDate).getTime(), - startTime: new Date(startDate).getTime(), - endTime: new Date(endDate).getTime(), - offset: 0, - retriesPerformedSoFar: 0, - exportId: params.id, - fetchTimeInterval: hub.HISTORICAL_EXPORTS_INITIAL_FETCH_TIME_WINDOW, - statusKey: `EXPORT_DATE_STATUS_${startDate}`, - } - await startChunk(payload, 0) - }) - ) - - await Promise.all( - update.toResume.map(async (payload: ExportChunkStatus) => { - createLog( - `Export chunk from ${dateRange( - payload.startTime, - payload.endTime - )} seems inactive, restarting!`, - { type: PluginLogEntryType.Debug } - ) - await startChunk(payload, payload.progress) - }) - ) - } - - await meta.storage.set(EXPORT_COORDINATION_KEY, { - done: update.done, - running: update.running, - progress: update.progress, - }) - } - - async function calculateCoordination( - params: ExportParams, - done: Array, - running: Array - ): Promise { - const now = Date.now() - const allDates = getExportDateRange(params) - - let hasChanges = false - const doneDates = new Set(done) - const runningDates = new Set(running) - const progressPerDay = 1.0 / allDates.length - - let progress = progressPerDay * done.length - const toResume: Array = [] - - for (const date of running || []) { - const dateStatus = (await meta.storage.get(`EXPORT_DATE_STATUS_${date}`, null)) as ExportChunkStatus | null - - if (dateStatus?.done) { - hasChanges = true - doneDates.add(date) - runningDates.delete(date) - progress += progressPerDay - continue - } else { - progress += progressPerDay * (dateStatus?.progress ?? 0) - } - - if (dateStatus && shouldResume(dateStatus, now)) { - // :TODO: Temporary debugging code - createLog(`toResume found: now=${now}, dateStatus=${JSON.stringify(dateStatus)}`, { - type: PluginLogEntryType.Debug, - }) - hasChanges = true - toResume.push(dateStatus) - } - } - - const toStartRunning: Array<[ISOTimestamp, ISOTimestamp]> = [] - - if (runningDates.size < params.parallelism && doneDates.size + runningDates.size < allDates.length) { - for (const [startDate, endDate] of allDates) { - if (!doneDates.has(startDate) && !runningDates.has(startDate)) { - runningDates.add(startDate) - toStartRunning.push([startDate, endDate]) - hasChanges = true - - if (runningDates.size === params.parallelism) { - break - } - } - } - } - - return { - hasChanges, - done: Array.from(doneDates.values()), - running: Array.from(runningDates.values()), - toStartRunning, - toResume, - progress, - exportIsDone: doneDates.size === allDates.length, - } - } - - async function startChunk(payload: ExportHistoricalEventsJobPayload, progress: number): Promise { - // Save for detecting retries - await meta.storage.set(payload.statusKey, { - ...payload, - done: false, - progress, - statusTime: Date.now(), - } as ExportChunkStatus) - - // Start the job - await meta.jobs.exportHistoricalEventsV2(payload).runNow() - } - - async function exportHistoricalEvents(payload: ExportHistoricalEventsJobPayload): Promise { - status.info('ℹ️', 'Running export historical events', { - pluginConfigId: pluginConfig.id, - payload, - }) - - const activeExportParameters = await getExportParameters() - if (activeExportParameters?.id != payload.exportId) { - // This export has finished or has been stopped - return - } - - if (activeExportParameters.abortMessage) { - // For manually triggering the export to abort - createLog(`Export manually aborted ${activeExportParameters.abortMessage}`, { - type: PluginLogEntryType.Info, - }) - return - } - - if (payload.timestampCursor >= payload.endTime) { - createLog(`Finished exporting chunk from ${dateRange(payload.startTime, payload.endTime)}`, { - type: PluginLogEntryType.Debug, - }) - await meta.storage.set(payload.statusKey, { - ...payload, - done: true, - progress: 1, - statusTime: Date.now(), - } as ExportChunkStatus) - - return - } - - const progress = (payload.timestampCursor - payload.startTime) / (payload.endTime - payload.startTime) - - await meta.storage.set(payload.statusKey, { - ...payload, - done: false, - progress: progress, - statusTime: Date.now(), - } as ExportChunkStatus) - - let events: PluginEvent[] = [] - - try { - events = await fetchEventsForInterval( - hub.db, - pluginConfig.team_id, - new Date(payload.timestampCursor), - payload.offset, - payload.fetchTimeInterval, - eventsPerRun - ) - } catch (error) { - Sentry.captureException(error, { tags: { team_id: pluginConfig.team_id } }) - - await handleFetchError(error, activeExportParameters, payload) - return - } - - // We bump the statusTime every minute to let the coordinator know we are still - // alive and we don't need to be resumed. - const interval = setInterval(async () => { - const now = Date.now() - createLog(`Still running, updating ${payload.statusKey} statusTime for plugin ${pluginConfig.id} to ${now}`) - await meta.storage.set(payload.statusKey, { - ...payload, - done: false, - progress: progress, - statusTime: now, - } as ExportChunkStatus) - }, 60 * 1000) - - if (events.length > 0) { - try { - await methods.exportEvents!(events) - - createLog( - `Successfully processed events ${payload.offset}-${payload.offset + events.length} from ${dateRange( - payload.timestampCursor, - payload.timestampCursor + payload.fetchTimeInterval - )}.`, - { type: PluginLogEntryType.Debug } - ) - await hub.appMetrics.queueMetric({ - teamId: pluginConfig.team_id, - pluginConfigId: pluginConfig.id, - jobId: payload.exportId.toString(), - category: 'exportEvents', - successes: payload.retriesPerformedSoFar == 0 ? events.length : 0, - successesOnRetry: payload.retriesPerformedSoFar == 0 ? 0 : events.length, - }) - } catch (error) { - clearInterval(interval) - - await handleExportError(error, activeExportParameters, payload, events.length) - return - } - } - - clearInterval(interval) - - const { timestampCursor, fetchTimeInterval, offset } = nextCursor(payload, events.length) - - await meta.jobs - .exportHistoricalEventsV2({ - ...payload, - retriesPerformedSoFar: 0, - timestampCursor, - offset, - fetchTimeInterval, - } as ExportHistoricalEventsJobPayload) - .runIn(1, 'seconds') - } - - async function handleExportError( - error: Error, - params: ExportParams, - payload: ExportHistoricalEventsJobPayload, - eventCount: number - ): Promise { - if (error instanceof RetryError && payload.retriesPerformedSoFar + 1 < hub.HISTORICAL_EXPORTS_MAX_RETRY_COUNT) { - const nextRetrySeconds = retryDelaySeconds(payload.retriesPerformedSoFar) - - createLog( - `Failed processing events ${payload.offset}-${payload.offset + eventCount} from ${dateRange( - payload.timestampCursor, - payload.timestampCursor + payload.fetchTimeInterval - )}. Retrying in ${nextRetrySeconds}s`, - { - type: PluginLogEntryType.Warn, - } - ) - - await meta.jobs - .exportHistoricalEventsV2({ - ...payload, - retriesPerformedSoFar: payload.retriesPerformedSoFar + 1, - } as ExportHistoricalEventsJobPayload) - .runIn(nextRetrySeconds, 'seconds') - } else { - if (error instanceof RetryError) { - const message = `Exporting chunk ${dateRange(payload.startTime, payload.endTime)} failed after ${ - hub.HISTORICAL_EXPORTS_MAX_RETRY_COUNT - } retries. Stopping export.` - await stopExport(params, message, 'fail') - await processError(hub, pluginConfig, message) - } else { - await stopExport(params, `exportEvents returned unknown error, stopping export. error=${error}`, 'fail') - await processError(hub, pluginConfig, error) - } - await hub.appMetrics.queueError( - { - teamId: pluginConfig.team_id, - pluginConfigId: pluginConfig.id, - jobId: payload.exportId.toString(), - category: 'exportEvents', - failures: eventCount, - }, - { - error, - eventCount, - } - ) - } - } - - async function handleFetchError( - error: Error, - params: ExportParams, - payload: ExportHistoricalEventsJobPayload - ): Promise { - if (error instanceof RetryError && payload.retriesPerformedSoFar + 1 < hub.HISTORICAL_EXPORTS_MAX_RETRY_COUNT) { - const nextRetrySeconds = retryDelaySeconds(payload.retriesPerformedSoFar) - - createLog( - `Failed to fetch events from ${dateRange( - payload.timestampCursor, - payload.timestampCursor + payload.fetchTimeInterval - )}. Retrying in ${nextRetrySeconds}s`, - { - type: PluginLogEntryType.Warn, - } - ) - - await meta.jobs - .exportHistoricalEventsV2({ - ...payload, - retriesPerformedSoFar: payload.retriesPerformedSoFar + 1, - } as ExportHistoricalEventsJobPayload) - .runIn(nextRetrySeconds, 'seconds') - } else { - if (error instanceof RetryError) { - const message = `Fetching chunk ${dateRange(payload.startTime, payload.endTime)} failed after ${ - hub.HISTORICAL_EXPORTS_MAX_RETRY_COUNT - } retries. Stopping export.` - await stopExport(params, message, 'fail') - await processError(hub, pluginConfig, message) - } else { - await processError(hub, pluginConfig, error) - await stopExport(params, 'Failed fetching events. Stopping export - please try again later.', 'fail') - } - await hub.appMetrics.queueError( - { - teamId: pluginConfig.team_id, - pluginConfigId: pluginConfig.id, - jobId: payload.exportId.toString(), - category: 'exportEvents', - failures: 1, - }, - { - error, - eventCount: 1, - } - ) - } - } - - async function stopExport( - params: ExportParams, - message: string, - status: 'success' | 'fail', - options: { keepEntry?: boolean } = {} - ) { - if (!options.keepEntry) { - await meta.storage.del(EXPORT_PARAMETERS_KEY) - } - - const payload = status == 'success' ? params : { ...params, failure_reason: message } - await createPluginActivityLog( - hub, - pluginConfig.team_id, - pluginConfig.id, - status === 'success' ? 'export_success' : 'export_fail', - { - trigger: { - job_id: params.id.toString(), - job_type: INTERFACE_JOB_NAME, - payload, - }, - } - ) - - createLog(message, { - type: status === 'success' ? PluginLogEntryType.Info : PluginLogEntryType.Error, - }) - } - - function getTimestampBoundaries(payload: ExportHistoricalEventsUIPayload): [ISOTimestamp, ISOTimestamp] { - const min = DateTime.fromISO(payload.dateRange[0], { zone: 'UTC' }) - // :TRICKY: UI shows the end date to be inclusive - const max = DateTime.fromISO(payload.dateRange[1], { zone: 'UTC' }).plus({ days: 1 }) - - if (!min.isValid || !max.isValid) { - createLog(`'dateRange' should be two dates in ISO string format.`, { - type: PluginLogEntryType.Error, - }) - throw new Error(`'dateRange' should be two dates in ISO string format.`) - } - return [min.toISO(), max.toISO()] as [ISOTimestamp, ISOTimestamp] - } - - function retryDelaySeconds(retriesPerformedSoFar: number): number { - return 2 ** retriesPerformedSoFar * 3 - } - - function shouldResume(status: ExportChunkStatus, now: number): boolean { - // When a export hasn't updated in 10 minutes plus whatever time is spent on retries, it's likely already timed out or died - // Note that status updates happen every time the export makes _any_ progress - // NOTE from the future: we discovered that 10 minutes was not enough time as we have exports running for longer - // without failing, and this logic was triggering multiple simultaneous resumes. Simultaneous resumes start to fight to update - // the status, and cause duplicate data to be exported. Overall, a nightmare. - // To mitigate this, we have historialExportEvents update the status as it waits. - return now >= status.statusTime + TEN_MINUTES + retryDelaySeconds(status.retriesPerformedSoFar + 1) * 1000 - } - - function nextCursor(payload: ExportHistoricalEventsJobPayload, eventCount: number): OffsetParams { - // More on the same time window - if (eventCount === eventsPerRun) { - return { - timestampCursor: payload.timestampCursor, - fetchTimeInterval: payload.fetchTimeInterval, - offset: payload.offset + eventsPerRun, - } - } - - const nextCursor = payload.timestampCursor + payload.fetchTimeInterval - let nextFetchInterval = payload.fetchTimeInterval - // If we're fetching too small of a window at a time, increase window to fetch - if (payload.offset === 0 && eventCount < eventsPerRun * 0.5) { - nextFetchInterval = Math.min( - Math.floor(payload.fetchTimeInterval * hub.HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER), - TWELVE_HOURS - ) - } - // If time window seems too large, reduce it - if (payload.offset > 2 * eventsPerRun) { - nextFetchInterval = Math.max( - Math.floor(payload.fetchTimeInterval / hub.HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER), - TEN_MINUTES - ) - } - - // If we would end up fetching too many events next time, reduce fetch interval - if (nextCursor + nextFetchInterval > payload.endTime) { - nextFetchInterval = payload.endTime - nextCursor - } - - return { - timestampCursor: nextCursor, - fetchTimeInterval: nextFetchInterval, - offset: 0, - } - } - - function getExportDateRange({ dateFrom, dateTo }: ExportParams): Array<[ISOTimestamp, ISOTimestamp]> { - const result: Array<[ISOTimestamp, ISOTimestamp]> = [] - let date = dateFrom - while (date < dateTo) { - let nextDate = DateTime.fromISO(date).toUTC().plus({ days: 1 }).startOf('day').toISO() as ISOTimestamp - if (nextDate > dateTo) { - nextDate = dateTo - } - result.push([date, nextDate]) - date = nextDate - } - - return result - } - - function progressBar(progress: number, length = 20): string { - const filledBar = Math.round(progress * length) - - const progressBarCompleted = Array.from({ length: filledBar }) - .map(() => '■') - .join('') - const progressBarRemaining = Array.from({ length: length - filledBar }) - .map(() => '□') - .join('') - - return progressBarCompleted + progressBarRemaining - } - - function dateRange(startTime: number, endTime: number): string { - return `${new Date(startTime).toISOString()} to ${new Date(endTime).toISOString()}` - } - - async function getExportParameters(): Promise { - return (await meta.storage.get(EXPORT_PARAMETERS_KEY, null)) as ExportParams | null - } - - function createLog(message: string, overrides: Partial = {}) { - hub.promiseManager.trackPromise( - hub.db.queuePluginLogEntry({ - pluginConfig, - message: message, - source: PluginLogEntrySource.System, - type: PluginLogEntryType.Log, - instanceId: hub.instanceId, - ...overrides, - }), - 'exports v2 - createLog' - ) - } - - if (isTestEnv()) { - meta.global._testFunctions = { - exportHistoricalEvents, - getTimestampBoundaries, - nextCursor, - coordinateHistoricalExport, - calculateCoordination, - getExportDateRange, - progressBar, - stopExport, - shouldResume, - } - } - - // NOTE: we return the eventsPerRun, purely for testing purposes - return { eventsPerRun } -} diff --git a/plugin-server/src/worker/vm/upgrades/historical-export/export-historical-events.ts b/plugin-server/src/worker/vm/upgrades/historical-export/export-historical-events.ts deleted file mode 100644 index 0c87e61c6cc56..0000000000000 --- a/plugin-server/src/worker/vm/upgrades/historical-export/export-historical-events.ts +++ /dev/null @@ -1,372 +0,0 @@ -import { PluginEvent, PluginMeta, RetryError } from '@posthog/plugin-scaffold' -import * as Sentry from '@sentry/node' - -import { - Hub, - JobSpec, - PluginConfig, - PluginConfigVMInternalResponse, - PluginLogEntrySource, - PluginLogEntryType, - PluginTask, - PluginTaskType, -} from '../../../../types' -import { fetchEventsForInterval } from '../utils/fetchEventsForInterval' -import { - ExportHistoricalEventsJobPayload, - ExportHistoricalEventsUpgrade, - fetchTimestampBoundariesForTeam, -} from '../utils/utils' - -const TEN_MINUTES = 1000 * 60 * 10 -const EVENTS_TIME_INTERVAL = TEN_MINUTES -const EVENTS_PER_RUN = 500 - -const TIMESTAMP_CURSOR_KEY = 'timestamp_cursor' -const MAX_UNIX_TIMESTAMP_KEY = 'max_timestamp' -const MIN_UNIX_TIMESTAMP_KEY = 'min_timestamp' -const EXPORT_RUNNING_KEY = 'is_export_running' -const RUN_EVERY_MINUTE_LAST_RUN_KEY = 'run_every_minute_last' -const BATCH_ID_CURSOR_KEY = 'batch_id' -const OLD_TIMESTAMP_CURSOR_KEY = 'old_timestamp_cursor' - -const INTERFACE_JOB_NAME = 'Export historical events' - -const JOB_SPEC: JobSpec = { - payload: { - dateFrom: { - title: 'Export start date', - type: 'date', - required: true, - }, - dateTo: { - title: 'Export end date', - type: 'date', - required: true, - }, - }, -} - -export function addHistoricalEventsExportCapability( - hub: Hub, - pluginConfig: PluginConfig, - response: PluginConfigVMInternalResponse> -): void { - const { methods, tasks, meta } = response - - const currentPublicJobs = pluginConfig.plugin?.public_jobs || {} - - // If public job hasn't been registered or has changed, update it! - if ( - Object.keys(currentPublicJobs[INTERFACE_JOB_NAME]?.payload || {}).length !== - Object.keys(JOB_SPEC.payload!).length - ) { - hub.promiseManager.trackPromise( - hub.db.addOrUpdatePublicJob(pluginConfig.plugin_id, INTERFACE_JOB_NAME, JOB_SPEC), - 'exports addOrUpdatePublicJob' - ) - } - - const oldSetupPlugin = methods.setupPlugin - - const oldRunEveryMinute = tasks.schedule.runEveryMinute - - methods.setupPlugin = async () => { - await meta.utils.cursor.init(BATCH_ID_CURSOR_KEY) - - const storedTimestampCursor = await meta.storage.get(TIMESTAMP_CURSOR_KEY, null) - await meta.storage.set(OLD_TIMESTAMP_CURSOR_KEY, storedTimestampCursor || 0) - await meta.storage.set(RUN_EVERY_MINUTE_LAST_RUN_KEY, Date.now() + TEN_MINUTES) - - await oldSetupPlugin?.() - } - - tasks.schedule.runEveryMinute = { - name: 'runEveryMinute', - type: PluginTaskType.Schedule, - exec: async () => { - await oldRunEveryMinute?.exec?.() - - const lastRun = await meta.storage.get(RUN_EVERY_MINUTE_LAST_RUN_KEY, 0) - const exportShouldBeRunning = await meta.storage.get(EXPORT_RUNNING_KEY, false) - - const have10MinutesPassed = Date.now() - Number(lastRun) < TEN_MINUTES - - // only run every 10 minutes _if_ an export is in progress - if (!exportShouldBeRunning || !have10MinutesPassed) { - return - } - - const oldTimestampCursor = await meta.storage.get(OLD_TIMESTAMP_CURSOR_KEY, 0) - const currentTimestampCursor = await meta.storage.get(TIMESTAMP_CURSOR_KEY, 0) - - // if the cursor hasn't been incremented after 10 minutes that means we didn't pick up from - // where we left off automatically after a restart, or something else has gone wrong - // thus, kick off a new export chain with a new batchId - if (exportShouldBeRunning && oldTimestampCursor === currentTimestampCursor) { - const batchId = await meta.utils.cursor.increment(BATCH_ID_CURSOR_KEY) - createLog(`Restarting export after noticing inactivity. Batch ID: ${batchId}`) - await meta.jobs - .exportHistoricalEvents({ retriesPerformedSoFar: 0, incrementTimestampCursor: true, batchId }) - .runNow() - } - - // set the old timestamp cursor to the current one so we can see if it changed in 10 minutes - await meta.storage.set(OLD_TIMESTAMP_CURSOR_KEY, currentTimestampCursor) - - await meta.storage.set(RUN_EVERY_MINUTE_LAST_RUN_KEY, Date.now()) - }, - - // :TRICKY: We don't want to track app metrics for runEveryMinute for historical exports _unless_ plugin also has `runEveryMinute` - __ignoreForAppMetrics: !oldRunEveryMinute || !!oldRunEveryMinute.__ignoreForAppMetrics, - } - - tasks.job['exportHistoricalEvents'] = { - name: 'exportHistoricalEvents', - type: PluginTaskType.Job, - exec: (payload) => meta.global.exportHistoricalEvents(payload as ExportHistoricalEventsJobPayload), - } - - tasks.job[INTERFACE_JOB_NAME] = { - name: INTERFACE_JOB_NAME, - type: PluginTaskType.Job, - // TODO: Accept timestamp as payload - exec: async (payload: ExportHistoricalEventsJobPayload) => { - // only let one export run at a time - const exportAlreadyRunning = await meta.storage.get(EXPORT_RUNNING_KEY, false) - if (exportAlreadyRunning) { - return - } - - await meta.storage.set(RUN_EVERY_MINUTE_LAST_RUN_KEY, Date.now() + TEN_MINUTES) - await meta.storage.set(EXPORT_RUNNING_KEY, true) - - // get rid of all state pertaining to a previous run - await meta.storage.del(TIMESTAMP_CURSOR_KEY) - await meta.storage.del(MAX_UNIX_TIMESTAMP_KEY) - await meta.storage.del(MIN_UNIX_TIMESTAMP_KEY) - meta.global.maxTimestamp = null - meta.global.minTimestamp = null - - await meta.global.initTimestampsAndCursor(payload) - - const batchId = await meta.utils.cursor.increment(BATCH_ID_CURSOR_KEY) - - await meta.jobs - .exportHistoricalEvents({ retriesPerformedSoFar: 0, incrementTimestampCursor: true, batchId: batchId }) - .runNow() - }, - } as unknown as PluginTask // :KLUDGE: Work around typing limitations - - meta.global.exportHistoricalEvents = async (payload: ExportHistoricalEventsJobPayload): Promise => { - if (payload.retriesPerformedSoFar >= 15) { - // create some log error here - return - } - - // this is handling for duplicates when the plugin server restarts - const currentBatchId = await meta.storage.get(BATCH_ID_CURSOR_KEY, 0) - if (currentBatchId !== payload.batchId) { - return - } - - let timestampCursor = payload.timestampCursor - let intraIntervalOffset = payload.intraIntervalOffset ?? 0 - - // this ensures minTimestamp and timestampLimit are not null - // each thread will set them the first time they run this job - // we do this to prevent us from doing 2 additional queries - // to postgres each time the job runs - await meta.global.setTimestampBoundaries() - - // This is the first run OR we're done with an interval - if (payload.incrementTimestampCursor || !timestampCursor) { - // Done with a timestamp interval, reset offset - intraIntervalOffset = 0 - - // This ensures we never process an interval twice - const incrementedCursor = await meta.utils.cursor.increment(TIMESTAMP_CURSOR_KEY, EVENTS_TIME_INTERVAL) - - meta.global.updateProgressBar(incrementedCursor) - - timestampCursor = Number(incrementedCursor) - } - - if (timestampCursor > meta.global.maxTimestamp!) { - await meta.storage.del(EXPORT_RUNNING_KEY) - createLog(`Done exporting all events`) - return - } - - let events: PluginEvent[] = [] - - let fetchEventsError: Error | unknown | null = null - try { - events = await fetchEventsForInterval( - hub.db, - pluginConfig.team_id, - new Date(timestampCursor), - intraIntervalOffset, - EVENTS_TIME_INTERVAL, - EVENTS_PER_RUN - ) - } catch (error) { - fetchEventsError = error - Sentry.captureException(error, { tags: { team_id: pluginConfig.team_id } }) - } - - let exportEventsError: Error | unknown | null = null - - if (fetchEventsError) { - await meta.storage.del(EXPORT_RUNNING_KEY) - createLog(`Failed fetching events. Stopping export - please try again later.`) - return - } else { - if (events.length > 0) { - try { - await methods.exportEvents!(events) - } catch (error) { - exportEventsError = error - } - } - } - - if (exportEventsError instanceof RetryError) { - const nextRetrySeconds = 2 ** payload.retriesPerformedSoFar * 3 - - // "Failed processing events 0-100 from 2021-08-19T12:34:26.061Z to 2021-08-19T12:44:26.061Z. Retrying in 3s" - createLog( - `Failed processing events ${intraIntervalOffset}-${intraIntervalOffset + events.length} from ${new Date( - timestampCursor - ).toISOString()} to ${new Date( - timestampCursor + EVENTS_TIME_INTERVAL - ).toISOString()}. Retrying in ${nextRetrySeconds}s` - ) - - await meta.jobs - .exportHistoricalEvents({ - intraIntervalOffset, - timestampCursor, - retriesPerformedSoFar: payload.retriesPerformedSoFar + 1, - }) - .runIn(nextRetrySeconds, 'seconds') - } else if (!exportEventsError) { - const incrementTimestampCursor = events.length === 0 - - await meta.jobs - .exportHistoricalEvents({ - timestampCursor, - incrementTimestampCursor, - retriesPerformedSoFar: 0, - intraIntervalOffset: intraIntervalOffset + EVENTS_PER_RUN, - batchId: payload.batchId, - }) - .runIn(1, 'seconds') - } - - if (events.length > 0) { - createLog( - `Successfully processed events ${intraIntervalOffset}-${ - intraIntervalOffset + events.length - } from ${new Date(timestampCursor).toISOString()} to ${new Date( - timestampCursor + EVENTS_TIME_INTERVAL - ).toISOString()}.` - ) - } - } - - // initTimestampsAndCursor decides what timestamp boundaries to use before - // the export starts. if a payload is passed with boundaries, we use that, - // but if no payload is specified, we use the boundaries determined at setupPlugin - meta.global.initTimestampsAndCursor = async (payload?: ExportHistoricalEventsJobPayload) => { - // initTimestampsAndCursor will only run on **one** thread, because of our guard against - // multiple exports. as a result, we need to set the boundaries on postgres, and - // only set them in global when the job runs, so all threads have global state in sync - - // Fetch the max and min timestamps for a team's events - const timestampBoundaries = await fetchTimestampBoundariesForTeam(hub.db, pluginConfig.team_id, '_timestamp') - - if (payload && payload.dateFrom) { - try { - const dateFrom = new Date(payload.dateFrom).getTime() - await meta.utils.cursor.init(TIMESTAMP_CURSOR_KEY, dateFrom - EVENTS_TIME_INTERVAL) - await meta.storage.set(MIN_UNIX_TIMESTAMP_KEY, dateFrom) - } catch (error) { - createLog(`'dateFrom' should be an timestamp in ISO string format.`) - throw error - } - } else { - // no timestamp override specified via the payload, default to the first event ever ingested - if (!timestampBoundaries) { - throw new Error( - `Unable to determine the lower timestamp bound for the export automatically. Please specify a 'dateFrom' value.` - ) - } - - const dateFrom = timestampBoundaries.min.getTime() - await meta.utils.cursor.init(TIMESTAMP_CURSOR_KEY, dateFrom - EVENTS_TIME_INTERVAL) - await meta.storage.set(MIN_UNIX_TIMESTAMP_KEY, dateFrom) - } - - if (payload && payload.dateTo) { - try { - await meta.storage.set(MAX_UNIX_TIMESTAMP_KEY, new Date(payload.dateTo).getTime()) - } catch (error) { - createLog(`'dateTo' should be an timestamp in ISO string format.`) - throw error - } - } else { - // no timestamp override specified via the payload, default to the last event before the plugin was enabled - if (!timestampBoundaries) { - throw new Error( - `Unable to determine the upper timestamp bound for the export automatically. Please specify a 'dateTo' value.` - ) - } - await meta.storage.set(MAX_UNIX_TIMESTAMP_KEY, timestampBoundaries.max.getTime()) - } - } - - // this ensures we have the global object correctly set on every thread - // without having to always do a postgres query when an export job for an - // inteval is triggered - meta.global.setTimestampBoundaries = async () => { - if (!meta.global.maxTimestamp) { - const storedTimestampLimit = await meta.storage.get(MAX_UNIX_TIMESTAMP_KEY, null) - meta.global.maxTimestamp = Number(storedTimestampLimit) - } - - if (!meta.global.minTimestamp) { - const storedMinTimestamp = await meta.storage.get(MIN_UNIX_TIMESTAMP_KEY, null) - meta.global.minTimestamp = Number(storedMinTimestamp) - } - } - - meta.global.updateProgressBar = (incrementedCursor) => { - const progressNumerator = incrementedCursor - meta.global.minTimestamp! - const progressDenominator = meta.global.maxTimestamp! - meta.global.minTimestamp! - - const progress = progressDenominator === 0 ? 20 : Math.round(progressNumerator / progressDenominator) * 20 - const percentage = Math.round((1000 * progressNumerator) / progressDenominator) / 10 - - const progressBarCompleted = Array.from({ length: progress }) - .map(() => '■') - .join('') - const progressBarRemaining = Array.from({ length: 20 - progress }) - .map(() => '□') - .join('') - createLog(`Export progress: ${progressBarCompleted}${progressBarRemaining} (${percentage}%)`) - } - - function createLog(message: string, type: PluginLogEntryType = PluginLogEntryType.Log) { - hub.promiseManager.trackPromise( - hub.db.queuePluginLogEntry({ - pluginConfig, - message: `(${hub.instanceId}) ${message}`, - source: PluginLogEntrySource.System, - type: type, - instanceId: hub.instanceId, - }), - 'exports createLog' - ) - } -} diff --git a/plugin-server/src/worker/vm/upgrades/utils/export-events-buffer.ts b/plugin-server/src/worker/vm/upgrades/utils/export-events-buffer.ts deleted file mode 100644 index 281b4b5dab460..0000000000000 --- a/plugin-server/src/worker/vm/upgrades/utils/export-events-buffer.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { runInTransaction } from '../../../../sentry' -import { Hub, PluginConfig } from '../../../../types' -import { timeoutGuard } from '../../../../utils/db/utils' - -export type BufferOptions = { - limit: number - timeoutSeconds: number - onFlush?: (objects: any[], points: number) => void | Promise -} - -export class ExportEventsBuffer { - buffer: any[] - timeout: NodeJS.Timeout | null - points: number - options: BufferOptions - pluginConfig: PluginConfig - hub: Hub - - constructor(hub: Hub, pluginConfig: PluginConfig, opts?: Partial) { - this.buffer = [] - this.timeout = null - this.points = 0 - this.options = { - limit: 10, - timeoutSeconds: 60, - ...opts, - } - this.pluginConfig = pluginConfig - this.hub = hub - } - - public async add(object: Record, points = 1): Promise { - // flush existing if adding would make us go over the limit - if (this.points && this.points + points > this.options.limit) { - await this.flush() - } - - // add the object to the buffer - this.points += points - this.buffer.push(object) - - if (this.points > this.options.limit) { - // flush (again?) if we are now over the limit - await this.flush() - } else if (!this.timeout) { - // if not, make sure there's a flush timeout - this.timeout = setTimeout(async () => await this.flush(), this.options.timeoutSeconds * 1000) - } - } - - public async flush(): Promise { - const oldBuffer = this.buffer - const oldPoints = this.points - this.buffer = [] - this.points = 0 - - this.hub.promiseManager.trackPromise( - this._flush(oldBuffer, oldPoints, new Date()), - 'ExportEventsBuffer flush logs' - ) - await this.hub.promiseManager.awaitPromisesIfNeeded() - } - - public async _flush(oldBuffer: any[], oldPoints: number, _: Date): Promise { - if (this.timeout) { - clearTimeout(this.timeout) - this.timeout = null - } - - const slowTimeout = timeoutGuard( - `ExportEventsBuffer flush promise running for more than 5 minutes`, - { - plugin_id: this.pluginConfig.plugin_id, - team_id: this.pluginConfig.team_id, - plugin_config_id: this.pluginConfig.id, - }, - 300_000 - ) - try { - await runInTransaction( - { - name: 'export-events-buffer', - op: 'ExportEventsBuffer.flush', - }, - async () => { - await this.options.onFlush?.(oldBuffer, oldPoints) - } - ) - } finally { - clearTimeout(slowTimeout) - } - } -} diff --git a/plugin-server/src/worker/vm/upgrades/utils/fetchEventsForInterval.ts b/plugin-server/src/worker/vm/upgrades/utils/fetchEventsForInterval.ts deleted file mode 100644 index 16353c424a5c6..0000000000000 --- a/plugin-server/src/worker/vm/upgrades/utils/fetchEventsForInterval.ts +++ /dev/null @@ -1,105 +0,0 @@ -import { RetryError } from '@posthog/plugin-scaffold' -import { DateTime } from 'luxon' - -import { Element, RawClickHouseEvent, TimestampFormat } from '../../../../types' -import { DB } from '../../../../utils/db/db' -import { parseRawClickHouseEvent } from '../../../../utils/event' -import { status } from '../../../../utils/status' -import { castTimestampToClickhouseFormat } from '../../../../utils/utils' -import { HistoricalExportEvent } from './utils' - -export interface RawElement extends Element { - $el_text?: string -} - -export const fetchEventsForInterval = async ( - db: DB, - teamId: number, - timestampLowerBound: Date, - offset: number, - eventsTimeInterval: number, - eventsPerRun: number -): Promise => { - const timestampUpperBound = new Date(timestampLowerBound.getTime() + eventsTimeInterval) - - const chTimestampLower = castTimestampToClickhouseFormat( - DateTime.fromISO(timestampLowerBound.toISOString()), - TimestampFormat.ClickHouseSecondPrecision - ) - const chTimestampHigher = castTimestampToClickhouseFormat( - DateTime.fromISO(timestampUpperBound.toISOString()), - TimestampFormat.ClickHouseSecondPrecision - ) - - // :TODO: Adding tag messes up the return value? - const fetchEventsQuery = ` - SELECT - event, - uuid, - team_id, - distinct_id, - properties, - timestamp, - created_at, - elements_chain - FROM events - WHERE team_id = ${teamId} - AND timestamp >= '${chTimestampLower}' - AND timestamp < '${chTimestampHigher}' - ORDER BY timestamp - LIMIT ${eventsPerRun} - OFFSET ${offset}` - - let clickhouseFetchEventsResult: { data: RawClickHouseEvent[] } - - try { - clickhouseFetchEventsResult = await db.clickhouseQuery(fetchEventsQuery) - } catch (error) { - // TODO: add more specific error handling based on the error from - // `clickhouseQuery` (e.g. if it's a timeout, we should retry, if it's a - // query syntax error, we should not retry) - status.error('🔥', 'clickhouse_export_fetch_failure', { error }) - throw new RetryError("Couldn't fetch events from ClickHouse") - } - - return clickhouseFetchEventsResult.data.map(convertClickhouseEventToPluginEvent) -} - -const convertClickhouseEventToPluginEvent = (event: RawClickHouseEvent): HistoricalExportEvent => { - const clickhouseEvent = parseRawClickHouseEvent(event) - const parsedEvent = { - uuid: clickhouseEvent.uuid, - team_id: clickhouseEvent.team_id, - distinct_id: clickhouseEvent.distinct_id, - properties: clickhouseEvent.properties, - elements: - clickhouseEvent.event === '$autocapture' && clickhouseEvent.elements_chain - ? convertDatabaseElementsToRawElements(clickhouseEvent.elements_chain) - : undefined, - timestamp: clickhouseEvent.timestamp.toISO(), - now: DateTime.now().toISO(), - event: clickhouseEvent.event || '', - ip: clickhouseEvent.properties['$ip'] || '', - site_url: '', - } - return addHistoricalExportEventProperties(parsedEvent) -} - -const addHistoricalExportEventProperties = (event: HistoricalExportEvent): HistoricalExportEvent => { - event.properties['$$historical_export_source_db'] = 'clickhouse' - event.properties['$$is_historical_export_event'] = true - event.properties['$$historical_export_timestamp'] = new Date().toISOString() - return event -} - -export const convertDatabaseElementsToRawElements = (elements: RawElement[]): RawElement[] => { - for (const element of elements) { - if (element.attributes && element.attributes.attr__class) { - element.attr_class = element.attributes.attr__class - } - if (element.text) { - element.$el_text = element.text - } - } - return elements -} diff --git a/plugin-server/src/worker/vm/upgrades/utils/utils.ts b/plugin-server/src/worker/vm/upgrades/utils/utils.ts deleted file mode 100644 index 7109288dd4de2..0000000000000 --- a/plugin-server/src/worker/vm/upgrades/utils/utils.ts +++ /dev/null @@ -1,79 +0,0 @@ -import { PluginEvent, Properties } from '@posthog/plugin-scaffold' -import { Plugin } from '@posthog/plugin-scaffold' -import * as Sentry from '@sentry/node' -import { DateTime } from 'luxon' -import { Client } from 'pg' - -import { DB } from '../../../../utils/db/db' - -export interface TimestampBoundaries { - min: Date - max: Date -} - -export interface ExportHistoricalEventsJobPayload extends Record { - // The lower bound of the timestamp interval to be processed - timestampCursor?: number - - // The offset *within* a given timestamp interval - intraIntervalOffset?: number - - // how many retries a payload has had (max = 15) - retriesPerformedSoFar: number - - // tells us we're ready to pick up a new interval - incrementTimestampCursor: boolean - - // used for ensuring only one "export task" is running if the server restarts - batchId: number -} - -export interface HistoricalExportEvent extends PluginEvent { - properties: Properties // can't be undefined -} - -export type ExportHistoricalEventsUpgrade = Plugin<{ - global: { - pgClient: Client - eventsToIgnore: Set - sanitizedTableName: string - exportHistoricalEvents: (payload: ExportHistoricalEventsJobPayload) => Promise - initTimestampsAndCursor: (payload: ExportHistoricalEventsJobPayload | undefined) => Promise - setTimestampBoundaries: () => Promise - updateProgressBar: (incrementedCursor: number) => void - timestampBoundariesForTeam: TimestampBoundaries - maxTimestamp: number | null - minTimestamp: number | null - } -}> - -export const clickhouseEventTimestampToDate = (timestamp: string): Date => { - return new Date(DateTime.fromFormat(timestamp, 'yyyy-MM-dd HH:mm:ss').toISO()) -} - -export const fetchTimestampBoundariesForTeam = async ( - db: DB, - teamId: number, - column: 'timestamp' | '_timestamp' -): Promise => { - try { - const clickhouseFetchTimestampsResult = await db.clickhouseQuery(` - SELECT min(${column}) as min, max(${column}) as max - FROM events - WHERE team_id = ${teamId}`) - - const min = clickhouseFetchTimestampsResult.data[0].min - const max = clickhouseFetchTimestampsResult.data[0].max - - const minDate = new Date(clickhouseEventTimestampToDate(min)) - const maxDate = new Date(clickhouseEventTimestampToDate(max)) - - const isValidMin = minDate.getTime() !== new Date(0).getTime() - const isValidMax = maxDate.getTime() !== new Date(0).getTime() - - return isValidMin && isValidMax ? { min: minDate, max: maxDate } : null - } catch (e) { - Sentry.captureException(e, { tags: { team_id: teamId } }) - return null - } -} diff --git a/plugin-server/src/worker/vm/vm.ts b/plugin-server/src/worker/vm/vm.ts index ef790d33e26db..c3b304049046f 100644 --- a/plugin-server/src/worker/vm/vm.ts +++ b/plugin-server/src/worker/vm/vm.ts @@ -13,9 +13,6 @@ import { createStorage } from './extensions/storage' import { createUtils } from './extensions/utilities' import { AVAILABLE_IMPORTS } from './imports' import { transformCode } from './transforms' -import { upgradeExportEvents } from './upgrades/export-events' -import { addHistoricalEventsExportCapability } from './upgrades/historical-export/export-historical-events' -import { addHistoricalEventsExportCapabilityV2 } from './upgrades/historical-export/export-historical-events-v2' export class TimeoutError extends RetryError { name = 'TimeoutError' @@ -191,7 +188,6 @@ export function createPluginConfigVM( const __methods = { setupPlugin: __asyncFunctionGuard(__bindMeta('setupPlugin'), 'setupPlugin'), teardownPlugin: __asyncFunctionGuard(__bindMeta('teardownPlugin'), 'teardownPlugin'), - exportEvents: __asyncFunctionGuard(__bindMeta('exportEvents'), 'exportEvents'), onEvent: __asyncFunctionGuard(__bindMeta('onEvent'), 'onEvent'), processEvent: __asyncFunctionGuard(__bindMeta('processEvent'), 'processEvent'), composeWebhook: __bindMeta('composeWebhook'), @@ -235,19 +231,6 @@ export function createPluginConfigVM( const vmResponse = vm.run(responseVar) const { methods, tasks } = vmResponse - const exportEventsExists = !!methods.exportEvents - - if (exportEventsExists) { - upgradeExportEvents(hub, pluginConfig, vmResponse) - statsdTiming('vm_setup_sync_section') - - if (hub.HISTORICAL_EXPORTS_ENABLED) { - addHistoricalEventsExportCapability(hub, pluginConfig, vmResponse) - addHistoricalEventsExportCapabilityV2(hub, pluginConfig, vmResponse) - } - } else { - statsdTiming('vm_setup_sync_section') - } statsdTiming('vm_setup_full') vmSetupMsSummary.labels(String(pluginConfig.plugin?.id)).observe(new Date().getTime() - timer.getTime()) diff --git a/plugin-server/tests/historical-export-e2e.test.ts b/plugin-server/tests/historical-export-e2e.test.ts deleted file mode 100644 index aabaab77886f1..0000000000000 --- a/plugin-server/tests/historical-export-e2e.test.ts +++ /dev/null @@ -1,148 +0,0 @@ -import { PluginEvent } from '@posthog/plugin-scaffold' - -import { defaultConfig } from '../src/config/config' -import { startPluginsServer } from '../src/main/pluginsServer' -import { EnqueuedPluginJob, Hub, LogLevel, PluginsServerConfig } from '../src/types' -import { UUIDT } from '../src/utils/utils' -import { EventPipelineRunner } from '../src/worker/ingestion/event-pipeline/runner' -import Piscina, { makePiscina } from '../src/worker/piscina' -import { writeToFile } from '../src/worker/vm/extensions/test-utils' -import { delayUntilEventIngested, resetTestDatabaseClickhouse } from './helpers/clickhouse' -import { resetGraphileWorkerSchema } from './helpers/graphile-worker' -import { resetKafka } from './helpers/kafka' -import { pluginConfig39 } from './helpers/plugins' -import { resetTestDatabase } from './helpers/sql' - -jest.mock('../src/utils/status') -jest.setTimeout(60000) // 60 sec timeout - -const { console: testConsole } = writeToFile - -const extraServerConfig: Partial = { - WORKER_CONCURRENCY: 2, - LOG_LEVEL: LogLevel.Log, - CONVERSION_BUFFER_ENABLED: false, - HISTORICAL_EXPORTS_ENABLED: true, - HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER: 2, - HISTORICAL_EXPORTS_INITIAL_FETCH_TIME_WINDOW: 8 * 60 * 60 * 1000, // 8 hours -} - -const indexJs = ` -import { console as testConsole } from 'test-utils/write-to-file' - -export async function exportEvents(events) { - for (const event of events) { - if (event.properties && event.properties['$$is_historical_export_event']) { - testConsole.log('exported historical event', event) - } - } -} -` - -describe('Historical Export (v2)', () => { - let hub: Hub - let stopServer: () => Promise - let piscina: Piscina - - beforeAll(async () => { - await resetKafka(extraServerConfig) - }) - - beforeEach(async () => { - console.info = jest.fn() - - testConsole.reset() - await Promise.all([ - await resetTestDatabase(indexJs), - await resetTestDatabaseClickhouse(extraServerConfig), - await resetGraphileWorkerSchema(defaultConfig), - ]) - - const startResponse = await startPluginsServer(extraServerConfig, makePiscina, undefined) - hub = startResponse.hub! - piscina = startResponse.piscina! - stopServer = startResponse.stop! - }) - - afterEach(async () => { - await stopServer() - }) - - afterAll(async () => { - await resetGraphileWorkerSchema(defaultConfig) - }) - - async function ingestEvent(timestamp: string, overrides: Partial = {}) { - const pluginEvent: PluginEvent = { - event: 'some_event', - distinct_id: 'some_user', - site_url: '', - team_id: 2, - timestamp: timestamp, - now: timestamp, - ip: '', - uuid: new UUIDT().toString(), - ...overrides, - } as any as PluginEvent - - const runner = new EventPipelineRunner(hub, pluginEvent) - await runner.runEventPipeline(pluginEvent) - } - - it('exports a batch of events in a time range', async () => { - await ingestEvent('2021-07-28T00:00:00.000Z') // To avoid parallel person processing which we don't handle - await Promise.all([ - ingestEvent('2021-08-01T00:00:00.000Z', { properties: { foo: 'bar' } }), - ingestEvent('2021-08-02T02:00:00.000Z'), - ingestEvent('2021-08-03T09:00:00.000Z'), - ingestEvent('2021-08-03T15:00:00.000Z'), - ingestEvent('2021-08-04T23:00:00.000Z'), - ingestEvent('2021-08-04T23:59:59.000Z'), - ingestEvent('2021-08-05T00:00:00.000Z'), - ingestEvent('2021-08-05T01:00:00.000Z'), - ]) - - await hub.kafkaProducer.flush() - await delayUntilEventIngested(() => hub.db.fetchEvents(), 9) - - await piscina.run({ - task: 'runPluginJob', - args: { - job: { - type: 'Export historical events V2', - payload: { - dateRange: ['2021-08-01', '2021-08-04'], - parallelism: 5, - $operation: 'start', - }, - pluginConfigId: pluginConfig39.id, - pluginConfigTeam: pluginConfig39.team_id, - timestamp: 0, - } as EnqueuedPluginJob, - }, - }) - - await delayUntilEventIngested(() => Promise.resolve(testConsole.read()), 6, 1000, 50) - - const exportedEventLogs = testConsole.read() as Array<[string, any]> - exportedEventLogs.sort((e1, e2) => e1[1].timestamp.localeCompare(e2[1].timestamp)) - - const timestamps = exportedEventLogs.map(([, event]) => event.timestamp) - expect(timestamps).toEqual([ - '2021-08-01T00:00:00.000Z', - '2021-08-02T02:00:00.000Z', - '2021-08-03T09:00:00.000Z', - '2021-08-03T15:00:00.000Z', - '2021-08-04T23:00:00.000Z', - '2021-08-04T23:59:59.000Z', - ]) - expect(exportedEventLogs[0][1].properties).toEqual( - expect.objectContaining({ - foo: 'bar', - $$historical_export_source_db: 'clickhouse', - $$is_historical_export_event: true, - $$historical_export_timestamp: expect.any(String), - }) - ) - }) -}) diff --git a/plugin-server/tests/worker/buffer.test.ts b/plugin-server/tests/worker/buffer.test.ts index 4a38a4fe591e7..90100a2bac472 100644 --- a/plugin-server/tests/worker/buffer.test.ts +++ b/plugin-server/tests/worker/buffer.test.ts @@ -1,8 +1,5 @@ import { delay } from '../../src/utils/utils' import { PromiseManager } from '../../src/worker/vm/promise-manager' -import { pluginConfig39 } from '../helpers/plugins' -import { Hub } from './../../src/types' -import { ExportEventsBuffer } from './../../src/worker/vm/upgrades/utils/export-events-buffer' jest.setTimeout(100000) @@ -40,73 +37,3 @@ describe('PromiseManager', () => { expect(promiseManager.pendingPromises.size).toEqual(1) }) }) - -describe('ExportEventsBuffer', () => { - let promiseManager: PromiseManager - let mockHub: Hub - let exportEventsBuffer: ExportEventsBuffer - - beforeEach(() => { - promiseManager = new PromiseManager({ MAX_PENDING_PROMISES_PER_WORKER: 1 } as any) - mockHub = { promiseManager } as any - exportEventsBuffer = new ExportEventsBuffer(mockHub, pluginConfig39, { limit: 2 }) - }) - - afterEach(async () => { - await Promise.all(promiseManager.pendingPromises) - }) - - test('add and flush work as expected', async () => { - jest.spyOn(promiseManager, 'trackPromise') - jest.spyOn(exportEventsBuffer, 'flush') - - exportEventsBuffer._flush = jest.fn(async () => { - await delay(3000) - }) - - await exportEventsBuffer.add({ event: 'event1' }, 1) - expect(exportEventsBuffer.points).toEqual(1) - expect(exportEventsBuffer.buffer.length).toEqual(1) - expect(exportEventsBuffer.flush).not.toHaveBeenCalled() - - await exportEventsBuffer.add({ event: 'event2' }, 1) - expect(exportEventsBuffer.points).toEqual(2) - expect(exportEventsBuffer.buffer.length).toEqual(2) - expect(exportEventsBuffer.flush).not.toHaveBeenCalled() - - await exportEventsBuffer.add({ event: 'event3' }, 1) - expect(exportEventsBuffer.points).toEqual(1) - expect(exportEventsBuffer.buffer.length).toEqual(1) - expect(exportEventsBuffer.buffer).toEqual([{ event: 'event3' }]) - expect(exportEventsBuffer._flush).toHaveBeenCalledWith( - [{ event: 'event1' }, { event: 'event2' }], - 2, - expect.any(Date) - ) - }) - - test('flush works correctly with promise manager', async () => { - jest.spyOn(promiseManager, 'trackPromise') - jest.spyOn(exportEventsBuffer, 'flush') - - exportEventsBuffer._flush = jest.fn(async () => { - await delay(3000) - }) - - // add a promise - promiseManager.trackPromise(delay(3000)) - expect(promiseManager.pendingPromises.size).toEqual(1) - - await exportEventsBuffer.add({ event: 'event1' }, 1) - expect(exportEventsBuffer.points).toEqual(1) - expect(exportEventsBuffer.buffer.length).toEqual(1) - expect(exportEventsBuffer.flush).not.toHaveBeenCalled() - expect(promiseManager.trackPromise).toHaveBeenCalledTimes(1) - expect(promiseManager.pendingPromises.size).toEqual(1) - - await exportEventsBuffer.add({ event: 'event2' }, 2) - expect(exportEventsBuffer.flush).toHaveBeenCalled() - expect(promiseManager.trackPromise).toHaveBeenCalledTimes(2) - expect(promiseManager.pendingPromises.size).toEqual(1) - }) -}) diff --git a/plugin-server/tests/worker/capabilities.test.ts b/plugin-server/tests/worker/capabilities.test.ts index 7ba3d3b840178..6dadbef2e88da 100644 --- a/plugin-server/tests/worker/capabilities.test.ts +++ b/plugin-server/tests/worker/capabilities.test.ts @@ -100,7 +100,7 @@ describe('capabilities', () => { const shouldSetupPlugin = shouldSetupPluginInServer( { ingestion: true }, { - methods: ['onEvent', 'exportEvents'], + methods: ['onEvent'], scheduled_tasks: ['runEveryMinute'], jobs: ['someJob'], } @@ -122,7 +122,7 @@ describe('capabilities', () => { const shouldSetupPlugin = shouldSetupPluginInServer( { ingestionOverflow: true }, { - methods: ['onEvent', 'exportEvents'], + methods: ['onEvent'], scheduled_tasks: ['runEveryMinute'], jobs: ['someJob'], } @@ -144,7 +144,7 @@ describe('capabilities', () => { const shouldSetupPlugin = shouldSetupPluginInServer( { ingestionHistorical: true }, { - methods: ['onEvent', 'exportEvents'], + methods: ['onEvent'], scheduled_tasks: ['runEveryMinute'], jobs: ['someJob'], } @@ -184,7 +184,7 @@ describe('capabilities', () => { }) describe('processAsyncOnEventHandlers', () => { - it.each(['onEvent', 'exportEvents'])( + it.each(['onEvent'])( 'returns true if plugin has %s and the server has processAsyncOnEventHandlers capability', (method) => { const shouldSetupPlugin = shouldSetupPluginInServer( @@ -195,7 +195,7 @@ describe('capabilities', () => { } ) - it('returns false if plugin has none of onEvent or exportEvents and the server has only processAsyncOnEventHandlers capability', () => { + it('returns false if plugin has none of onEvent and the server has only processAsyncOnEventHandlers capability', () => { const shouldSetupPlugin = shouldSetupPluginInServer( { processAsyncOnEventHandlers: true }, { methods: [] } @@ -203,8 +203,8 @@ describe('capabilities', () => { expect(shouldSetupPlugin).toEqual(false) }) - it.each(['onEvent', 'exportEvents'])( - 'returns true if plugin has %s and the server has processAsyncOnEventHandlers capability', + it.each(['onEvent'])( + 'onEvent returns true if plugin has %s and the server has processAsyncOnEventHandlers capability', (method) => { const shouldSetupPlugin = shouldSetupPluginInServer( { processAsyncOnEventHandlers: true }, @@ -214,7 +214,7 @@ describe('capabilities', () => { } ) - it('returns false if plugin has none of onEvent or exportEvents and the server has only processAsyncOnEventHandlers capability', () => { + it('returns false if plugin has none of onEvent and the server has only processAsyncOnEventHandlers capability', () => { const shouldSetupPlugin = shouldSetupPluginInServer( { processAsyncOnEventHandlers: true }, { methods: [] } diff --git a/plugin-server/tests/worker/plugins.test.ts b/plugin-server/tests/worker/plugins.test.ts index 4b169eb9f33a6..47f0596228a85 100644 --- a/plugin-server/tests/worker/plugins.test.ts +++ b/plugin-server/tests/worker/plugins.test.ts @@ -82,7 +82,6 @@ describe('plugins', () => { const vm = await pluginConfig.vm!.resolveInternalVm expect(Object.keys(vm!.methods).sort()).toEqual([ 'composeWebhook', - 'exportEvents', 'getSettings', 'onEvent', 'processEvent', @@ -732,27 +731,6 @@ describe('plugins', () => { expect(newPluginConfig.plugin!.capabilities).toEqual(pluginConfig.plugin!.capabilities) }) - test.skip('exportEvents automatically sets metrics', async () => { - getPluginRows.mockReturnValueOnce([ - mockPluginWithSourceFiles(` - export function exportEvents() {} - `), - ]) - getPluginConfigRows.mockReturnValueOnce([pluginConfig39]) - getPluginAttachmentRows.mockReturnValueOnce([pluginAttachment1]) - - await setupPlugins(hub) - const pluginConfig = hub.pluginConfigs.get(39)! - - expect(pluginConfig.plugin!.metrics).toEqual({ - events_delivered_successfully: 'sum', - events_seen: 'sum', - other_errors: 'sum', - retry_errors: 'sum', - undelivered_events: 'sum', - }) - }) - describe('loadSchedule()', () => { const mockConfig = (tasks: any) => ({ vm: { getScheduledTasks: () => Promise.resolve(tasks) } }) diff --git a/plugin-server/tests/worker/vm.test.ts b/plugin-server/tests/worker/vm.test.ts index 273523f65744b..7e3769de61328 100644 --- a/plugin-server/tests/worker/vm.test.ts +++ b/plugin-server/tests/worker/vm.test.ts @@ -59,7 +59,6 @@ describe('vm tests', () => { expect(Object.keys(vm).sort()).toEqual(['methods', 'tasks', 'usedImports', 'vm', 'vmResponseVariable']) expect(Object.keys(vm.methods).sort()).toEqual([ 'composeWebhook', - 'exportEvents', 'getSettings', 'onEvent', 'processEvent', @@ -1055,229 +1054,6 @@ describe('vm tests', () => { expect(fetch).toHaveBeenCalledWith('https://google.com/results.json?query=onEvent', undefined) }) - describe('exportEvents', () => { - beforeEach(() => { - jest.spyOn(hub.appMetrics, 'queueMetric') - }) - - test('normal operation', async () => { - const indexJs = ` - async function exportEvents (events, meta) { - await fetch('https://export.com/results.json?query=' + events[0].event + '&events=' + events.length) - } - ` - await resetTestDatabase(indexJs) - const vm = await createReadyPluginConfigVm( - hub, - { - ...pluginConfig39, - config: { - ...pluginConfig39.config, - exportEventsBufferBytes: '10000', - exportEventsBufferSeconds: '1', - exportEventsToIgnore: `${defaultEvent.event},otherEvent`, - }, - }, - indexJs - ) - - await vm.methods.onEvent!(defaultEvent) - await vm.methods.onEvent!({ ...defaultEvent, event: 'otherEvent' }) - await vm.methods.onEvent!({ ...defaultEvent, event: 'otherEvent2' }) - await vm.methods.onEvent!({ ...defaultEvent, event: 'otherEvent3' }) - await delay(1010) - expect(fetch).toHaveBeenCalledWith('https://export.com/results.json?query=otherEvent2&events=2', undefined) - expect(hub.appMetrics.queueMetric).toHaveBeenCalledWith({ - teamId: pluginConfig39.team_id, - pluginConfigId: pluginConfig39.id, - category: 'exportEvents', - successes: 2, - }) - - // adds exportEventsWithRetry job and onEvent function - expect(Object.keys(vm.tasks.job)).toEqual(expect.arrayContaining(['exportEventsWithRetry'])) - expect(Object.keys(vm.tasks.schedule)).toEqual(['runEveryMinute']) - expect( - Object.keys(vm.methods) - .filter((m) => !!vm.methods[m as keyof typeof vm.methods]) - .sort() - ).toEqual(expect.arrayContaining(['exportEvents', 'onEvent', 'teardownPlugin'])) - }) - - test('works with onEvent', async () => { - // the exportEvents upgrade patches onEvent, testing that the old one still works - const indexJs = ` - async function exportEvents (events, meta) { - await fetch('https://export.com/results.json?query=' + events[0].event + '&events=' + events.length) - } - async function onEvent (event, meta) { - await fetch('https://onevent.com/') - } - ` - await resetTestDatabase(indexJs) - const vm = await createReadyPluginConfigVm( - hub, - { - ...pluginConfig39, - config: { - ...pluginConfig39.config, - exportEventsBufferBytes: '10000', - exportEventsBufferSeconds: '1', - exportEventsToIgnore: defaultEvent.event, - }, - }, - indexJs - ) - const event: ProcessedPluginEvent = { - ...defaultEvent, - event: 'exported', - } - await vm.methods.onEvent!(event) - await vm.methods.onEvent!(defaultEvent) - await vm.methods.onEvent!(event) - await delay(1010) - expect(fetch).toHaveBeenCalledTimes(4) - expect(fetch).toHaveBeenCalledWith('https://onevent.com/', undefined) - expect(fetch).toHaveBeenCalledWith('https://export.com/results.json?query=exported&events=2', undefined) - }) - - test('buffers bytes with exportEventsBufferBytes', async () => { - const indexJs = ` - async function exportEvents (events, meta) { - // console.log(meta.config) - await fetch('https://export.com/?length=' + JSON.stringify(events).length + '&count=' + events.length) - } - ` - await resetTestDatabase(indexJs) - const vm = await createReadyPluginConfigVm( - hub, - { - ...pluginConfig39, - config: { - ...pluginConfig39.config, - exportEventsBufferBytes: '1000', - exportEventsBufferSeconds: '1', - exportEventsToIgnore: defaultEvent.event, - }, - }, - indexJs - ) - const event: ProcessedPluginEvent = { - uuid: new UUIDT().toString(), - distinct_id: 'my_id', - ip: '127.0.0.1', - team_id: 3, - timestamp: new Date().toISOString(), - event: 'exported', - properties: {}, - } - for (let i = 0; i < 100; i++) { - await vm.methods.onEvent!(event) - } - await delay(1010) - - // This tests that the requests were broken up correctly according to the exportEventsBufferBytes config - // If you add data to the event above you should see more requests, and vice versa - expect(fetch).toHaveBeenCalledTimes(20) - expect((fetch as any).mock.calls).toEqual([ - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ['https://export.com/?length=866&count=5'], - ]) - }) - - test('buffers bytes with very tiny exportEventsBufferBytes', async () => { - const indexJs = ` - async function exportEvents (events, meta) { - // console.log(meta.config) - await fetch('https://export.com/?length=' + JSON.stringify(events).length + '&count=' + events.length) - } - ` - await resetTestDatabase(indexJs) - const vm = await createReadyPluginConfigVm( - hub, - { - ...pluginConfig39, - config: { - ...pluginConfig39.config, - exportEventsBufferBytes: '1', - exportEventsBufferSeconds: '1', - exportEventsToIgnore: defaultEvent.event, - }, - }, - indexJs - ) - const event: ProcessedPluginEvent = { - uuid: new UUIDT().toString(), - distinct_id: 'my_id', - ip: '127.0.0.1', - team_id: 3, - timestamp: new Date().toISOString(), - event: 'exported', - properties: {}, - } - for (let i = 0; i < 100; i++) { - await vm.methods.onEvent!(event) - } - await delay(1010) - - expect(fetch).toHaveBeenCalledTimes(100) - expect((fetch as any).mock.calls).toEqual( - Array.from(Array(100)).map(() => ['https://export.com/?length=174&count=1']) - ) - }) - - test('flushes on teardown', async () => { - const indexJs = ` - async function exportEvents (events, meta) { - await fetch('https://export.com/results.json?query=' + events[0].event + '&events=' + events.length) - } - ` - await resetTestDatabase(indexJs) - const vm = await createReadyPluginConfigVm( - hub, - { - ...pluginConfig39, - config: { - ...pluginConfig39.config, - exportEventsBufferBytes: '10000', - exportEventsBufferSeconds: '1000', - exportEventsToIgnore: '', - }, - }, - indexJs - ) - await vm.methods.onEvent!(defaultEvent) - expect(fetch).not.toHaveBeenCalledWith( - 'https://export.com/results.json?query=default event&events=1', - undefined - ) - - await vm.methods.teardownPlugin!() - expect(fetch).toHaveBeenCalledWith( - 'https://export.com/results.json?query=default event&events=1', - undefined - ) - }) - }) - test('imports', async () => { const indexJs = ` const urlImport = require('url'); diff --git a/plugin-server/tests/worker/vm/upgrades/historical-export/__snapshots__/export-historical-events-v2.test.ts.snap b/plugin-server/tests/worker/vm/upgrades/historical-export/__snapshots__/export-historical-events-v2.test.ts.snap deleted file mode 100644 index be76415898682..0000000000000 --- a/plugin-server/tests/worker/vm/upgrades/historical-export/__snapshots__/export-historical-events-v2.test.ts.snap +++ /dev/null @@ -1,52 +0,0 @@ -// Jest Snapshot v1, https://goo.gl/fbAQLP - -exports[`addHistoricalEventsExportCapabilityV2() exportHistoricalEvents() calls exportEvents and logs with fetched events 1`] = ` -Array [ - Array [ - Object { - "category": "exportEvents", - "jobId": "1", - "pluginConfigId": 39, - "successes": 3, - "successesOnRetry": 0, - "teamId": 2, - }, - ], -] -`; - -exports[`addHistoricalEventsExportCapabilityV2() exportHistoricalEvents() stops processing after HISTORICAL_EXPORTS_MAX_RETRY_COUNT retries 1`] = ` -Array [ - Array [ - Object { - "category": "exportEvents", - "failures": 3, - "jobId": "1", - "pluginConfigId": 39, - "teamId": 2, - }, - Object { - "error": [RetryError: Retry error], - "eventCount": 3, - }, - ], -] -`; - -exports[`addHistoricalEventsExportCapabilityV2() exportHistoricalEvents() stops processing date if an unknown error was raised in exportEvents 1`] = ` -Array [ - Array [ - Object { - "category": "exportEvents", - "failures": 3, - "jobId": "1", - "pluginConfigId": 39, - "teamId": 2, - }, - Object { - "error": [Error: Unknown error], - "eventCount": 3, - }, - ], -] -`; diff --git a/plugin-server/tests/worker/vm/upgrades/historical-export/export-historical-events-v2.test.ts b/plugin-server/tests/worker/vm/upgrades/historical-export/export-historical-events-v2.test.ts deleted file mode 100644 index 10ed56848df53..0000000000000 --- a/plugin-server/tests/worker/vm/upgrades/historical-export/export-historical-events-v2.test.ts +++ /dev/null @@ -1,1221 +0,0 @@ -import { PluginMeta, RetryError } from '@posthog/plugin-scaffold' - -import { - Hub, - ISOTimestamp, - PluginConfig, - PluginConfigVMInternalResponse, - PluginTaskType, -} from '../../../../../src/types' -import { createPluginActivityLog } from '../../../../../src/utils/db/activity-log' -import { createHub } from '../../../../../src/utils/db/hub' -import { createStorage } from '../../../../../src/worker/vm/extensions/storage' -import { createUtils } from '../../../../../src/worker/vm/extensions/utilities' -import { - addHistoricalEventsExportCapabilityV2, - EVENTS_PER_RUN_SMALL, - EXPORT_COORDINATION_KEY, - EXPORT_PARAMETERS_KEY, - ExportHistoricalEventsJobPayload, - ExportHistoricalEventsUpgradeV2, - INTERFACE_JOB_NAME, - JOB_SPEC, - TestFunctions, -} from '../../../../../src/worker/vm/upgrades/historical-export/export-historical-events-v2' -import { fetchEventsForInterval } from '../../../../../src/worker/vm/upgrades/utils/fetchEventsForInterval' -import { plugin60, pluginConfig39 } from '../../../../helpers/plugins' -import { resetTestDatabase } from '../../../../helpers/sql' - -jest.mock('../../../../../src/utils/status') -jest.mock('../../../../../src/worker/vm/upgrades/utils/fetchEventsForInterval') -jest.mock('../../../../../src/utils/db/activity-log') - -const ONE_HOUR = 1000 * 60 * 60 - -describe('addHistoricalEventsExportCapabilityV2()', () => { - let hub: Hub - let closeHub: () => Promise - let vm: PluginConfigVMInternalResponse> - let runNow: jest.Mock, runIn: jest.Mock - - beforeEach(() => { - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-expect-error - vm = undefined - }) - - beforeAll(async () => { - ;[hub, closeHub] = await createHub() - hub.kafkaProducer.queueMessage = jest.fn() - hub.kafkaProducer.flush = jest.fn() - jest.spyOn(hub.db, 'queuePluginLogEntry') - jest.spyOn(hub.appMetrics, 'queueMetric') - jest.spyOn(hub.appMetrics, 'queueError') - - jest.spyOn(Date, 'now').mockReturnValue(1_000_000_000) - }) - - afterAll(async () => { - await hub.promiseManager.awaitPromisesIfNeeded() - await closeHub() - }) - - function storage() { - return createStorage(hub, pluginConfig39) - } - - function createVM(pluginConfig: PluginConfig = pluginConfig39, schedule = {}) { - runIn = jest.fn() - runNow = jest.fn() - - const mockVM = { - methods: { - exportEvents: jest.fn(), - }, - tasks: { - schedule, - job: {}, - }, - meta: { - storage: storage(), - utils: createUtils(hub, pluginConfig.id), - jobs: { - exportHistoricalEventsV2: jest.fn().mockReturnValue({ runNow, runIn }), - }, - global: {}, - }, - } as unknown as PluginConfigVMInternalResponse> - - addHistoricalEventsExportCapabilityV2(hub, pluginConfig, mockVM) - - vm = mockVM - } - - function getTestMethod(name: T): TestFunctions[T] { - // @ts-expect-error testing-related schenanigans - return (...args: any[]) => { - if (!vm) { - createVM() - } - // @ts-expect-error testing-related schenanigans - return vm.meta.global._testFunctions[name](...args) - } - } - - describe('exportHistoricalEvents()', () => { - const exportHistoricalEvents = getTestMethod('exportHistoricalEvents') - const exportParams = { - id: 1, - parallelism: 3, - dateFrom: '2021-10-29T00:00:00.000Z' as ISOTimestamp, - dateTo: '2021-11-01T05:00:00.000Z' as ISOTimestamp, - } - - const defaultPayload: ExportHistoricalEventsJobPayload = { - timestampCursor: 1635724800000, - startTime: 1635724800000, - endTime: 1635742800000, - exportId: 1, - fetchTimeInterval: ONE_HOUR, - offset: 0, - retriesPerformedSoFar: 0, - statusKey: 'statusKey', - } - - beforeEach(async () => { - await resetTestDatabase() - await storage().set(EXPORT_PARAMETERS_KEY, exportParams) - }) - - afterEach(() => { - jest.clearAllTimers() - jest.useRealTimers() - }) - - it('stores current progress in storage under `statusKey`', async () => { - jest.mocked(fetchEventsForInterval).mockResolvedValue([]) - - await exportHistoricalEvents({ ...defaultPayload, timestampCursor: 1635730000000 }) - expect(await storage().get('statusKey', null)).toEqual({ - ...defaultPayload, - timestampCursor: 1635730000000, - done: false, - progress: expect.closeTo(0.28888), - statusTime: Date.now(), - }) - }) - - it('logs and marks part of export done if reached the end', async () => { - await exportHistoricalEvents({ ...defaultPayload, timestampCursor: defaultPayload.endTime }) - - expect(fetchEventsForInterval).not.toHaveBeenCalled() - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining( - 'Finished exporting chunk from 2021-11-01T00:00:00.000Z to 2021-11-01T05:00:00.000Z' - ), - }) - ) - expect(await storage().get('statusKey', null)).toEqual({ - ...defaultPayload, - timestampCursor: defaultPayload.endTime, - done: true, - progress: 1, - statusTime: Date.now(), - }) - }) - - it('calls exportEvents and logs with fetched events', async () => { - createVM() - - jest.useFakeTimers({ - // These are required otherwise queries and other things were breaking. - doNotFake: ['setImmediate', 'clearImmediate', 'clearInterval', 'nextTick', 'Date'], - }) - - jest.spyOn(vm.meta.storage, 'set') - jest.spyOn(global, 'clearInterval') - - const defaultProgress = - (defaultPayload.timestampCursor - defaultPayload.startTime) / - (defaultPayload.endTime - defaultPayload.startTime) - - jest.mocked(vm.methods.exportEvents).mockImplementationOnce(async () => { - let advanced = 0 - while (advanced < 3) { - // The +1 accounts for the first status update that happens once at the beginning of - // exportHistoricalEvents. - expect(vm.meta.storage.set).toHaveBeenCalledTimes(advanced + 1) - - expect(await storage().get('statusKey', null)).toEqual({ - ...defaultPayload, - timestampCursor: defaultPayload.startTime, - done: false, - progress: defaultProgress, - statusTime: Date.now(), - }) - - advanced = advanced + 1 - jest.advanceTimersByTime(60 * 1000) - } - return - }) - jest.mocked(fetchEventsForInterval).mockResolvedValue([1, 2, 3]) - - await exportHistoricalEvents(defaultPayload) - - expect(clearInterval).toHaveBeenCalledTimes(1) - expect(vm.methods.exportEvents).toHaveBeenCalledWith([1, 2, 3]) - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining( - 'Successfully processed events 0-3 from 2021-11-01T00:00:00.000Z to 2021-11-01T01:00:00.000Z.' - ), - }) - ) - expect(jest.mocked(hub.appMetrics.queueMetric).mock.calls).toMatchSnapshot() - }) - - it('does not call exportEvents or log if no events in time range', async () => { - jest.mocked(fetchEventsForInterval).mockResolvedValue([]) - jest.spyOn(global, 'clearInterval') - - await exportHistoricalEvents(defaultPayload) - - expect(clearInterval).toHaveBeenCalledTimes(1) - expect(vm.methods.exportEvents).not.toHaveBeenCalled() - expect(hub.db.queuePluginLogEntry).not.toHaveBeenCalled() - }) - - it('stops export if events fetch fails', async () => { - jest.mocked(fetchEventsForInterval).mockRejectedValue(new Error('Fetch failed')) - await storage().set(EXPORT_PARAMETERS_KEY, { - id: 1, - parallelism: 3, - dateFrom: '2021-10-29T00:00:00.000Z' as ISOTimestamp, - dateTo: '2021-11-01T05:00:00.000Z' as ISOTimestamp, - }) - - await exportHistoricalEvents(defaultPayload) - - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining( - 'Failed fetching events. Stopping export - please try again later.' - ), - }) - ) - expect(await storage().get(EXPORT_PARAMETERS_KEY, null)).toEqual(null) - }) - - it('schedules a retry if exportEvents raises a RetryError', async () => { - createVM() - - jest.spyOn(global, 'clearInterval') - jest.mocked(fetchEventsForInterval).mockResolvedValue([1, 2, 3]) - jest.mocked(vm.methods.exportEvents).mockRejectedValue(new RetryError('Retry error')) - - await exportHistoricalEvents(defaultPayload) - - expect(clearInterval).toHaveBeenCalledTimes(1) - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining( - 'Failed processing events 0-3 from 2021-11-01T00:00:00.000Z to 2021-11-01T01:00:00.000Z.' - ), - }) - ) - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith({ - ...defaultPayload, - retriesPerformedSoFar: 1, - }) - expect(runIn).toHaveBeenCalledWith(3, 'seconds') - }) - - it('schedules a retry with exponential backoff for exportEvents RetryError', async () => { - createVM() - - jest.mocked(fetchEventsForInterval).mockResolvedValue([1, 2, 3]) - jest.mocked(vm.methods.exportEvents).mockRejectedValue(new RetryError('Retry error')) - - await exportHistoricalEvents({ ...defaultPayload, retriesPerformedSoFar: 5 }) - - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining( - 'Failed processing events 0-3 from 2021-11-01T00:00:00.000Z to 2021-11-01T01:00:00.000Z.' - ), - }) - ) - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith({ - ...defaultPayload, - retriesPerformedSoFar: 6, - }) - expect(runIn).toHaveBeenCalledWith(96, 'seconds') - }) - - it('schedules a retry with exponential backoff for fetchEventsForInterval RetryError', async () => { - createVM() - - jest.mocked(fetchEventsForInterval).mockRejectedValue(new RetryError('Retry error')) - - await exportHistoricalEvents({ ...defaultPayload, retriesPerformedSoFar: 5 }) - - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining( - 'Failed to fetch events from 2021-11-01T00:00:00.000Z to 2021-11-01T01:00:00.000Z.' - ), - }) - ) - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith({ - ...defaultPayload, - retriesPerformedSoFar: 6, - }) - expect(runIn).toHaveBeenCalledWith(96, 'seconds') - }) - - it('stops processing date if an unknown error was raised in exportEvents', async () => { - createVM() - - jest.spyOn(global, 'clearInterval') - jest.mocked(fetchEventsForInterval).mockResolvedValue([1, 2, 3]) - jest.mocked(vm.methods.exportEvents).mockRejectedValue(new Error('Unknown error')) - - await exportHistoricalEvents(defaultPayload) - - expect(clearInterval).toHaveBeenCalledTimes(1) - expect(vm.meta.jobs.exportHistoricalEventsV2).not.toHaveBeenCalled() - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining( - 'exportEvents returned unknown error, stopping export. error=Error: Unknown error' - ), - }) - ) - expect(jest.mocked(hub.appMetrics.queueError).mock.calls).toMatchSnapshot() - - expect(await storage().get(EXPORT_PARAMETERS_KEY, null)).toEqual(null) - }) - - it('stops processing after HISTORICAL_EXPORTS_MAX_RETRY_COUNT retries', async () => { - createVM() - - jest.mocked(fetchEventsForInterval).mockResolvedValue([1, 2, 3]) - jest.mocked(vm.methods.exportEvents).mockRejectedValue(new RetryError('Retry error')) - - await exportHistoricalEvents({ - ...defaultPayload, - retriesPerformedSoFar: hub.HISTORICAL_EXPORTS_MAX_RETRY_COUNT - 1, - }) - - expect(vm.meta.jobs.exportHistoricalEventsV2).not.toHaveBeenCalled() - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining( - `Exporting chunk 2021-11-01T00:00:00.000Z to 2021-11-01T05:00:00.000Z failed after ${hub.HISTORICAL_EXPORTS_MAX_RETRY_COUNT} retries. Stopping export.` - ), - }) - ) - expect(jest.mocked(hub.appMetrics.queueError).mock.calls).toMatchSnapshot() - - expect(await storage().get(EXPORT_PARAMETERS_KEY, null)).toEqual(null) - }) - - it('does nothing if no export is running', async () => { - await storage().del(EXPORT_PARAMETERS_KEY) - - await exportHistoricalEvents(defaultPayload) - - expect(fetchEventsForInterval).not.toHaveBeenCalled() - expect(hub.db.queuePluginLogEntry).not.toHaveBeenCalled() - }) - - it('stops export if abortMessage is set', async () => { - await storage().set(EXPORT_PARAMETERS_KEY, { ...exportParams, abortMessage: 'test ABORT' }) - - await exportHistoricalEvents(defaultPayload) - - expect(fetchEventsForInterval).not.toHaveBeenCalled() - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining('test ABORT'), - }) - ) - }) - - it('does nothing if a different export is running', async () => { - await exportHistoricalEvents({ ...defaultPayload, exportId: 779 }) - - expect(fetchEventsForInterval).not.toHaveBeenCalled() - expect(hub.db.queuePluginLogEntry).not.toHaveBeenCalled() - }) - - describe('calling next time window', () => { - it('calls next time range if this range was empty', async () => { - jest.mocked(fetchEventsForInterval).mockResolvedValue([]) - - await exportHistoricalEvents(defaultPayload) - - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith({ - ...defaultPayload, - timestampCursor: defaultPayload.timestampCursor + defaultPayload.fetchTimeInterval, - offset: 0, - fetchTimeInterval: - defaultPayload.fetchTimeInterval * hub.HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER, - }) - }) - - it('calls next time range if this range had some events', async () => { - jest.mocked(fetchEventsForInterval).mockResolvedValue(new Array(400)) - - await exportHistoricalEvents(defaultPayload) - - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith({ - ...defaultPayload, - timestampCursor: defaultPayload.timestampCursor + defaultPayload.fetchTimeInterval, - offset: 0, - fetchTimeInterval: defaultPayload.fetchTimeInterval, - }) - }) - - it('increases offset if this range had full page of events', async () => { - jest.mocked(fetchEventsForInterval).mockResolvedValue(new Array(500)) - - await exportHistoricalEvents(defaultPayload) - - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith({ - ...defaultPayload, - timestampCursor: defaultPayload.timestampCursor, - offset: 500, - }) - }) - - it('resets `retriesPerformedSoFar` and `offset` when page increases', async () => { - jest.mocked(fetchEventsForInterval).mockResolvedValue(new Array(300)) - - await exportHistoricalEvents({ - ...defaultPayload, - offset: 1000, - retriesPerformedSoFar: 10, - }) - - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith({ - ...defaultPayload, - timestampCursor: defaultPayload.timestampCursor + defaultPayload.fetchTimeInterval, - offset: 0, - retriesPerformedSoFar: 0, - }) - }) - }) - }) - - describe('coordinateHistoricalExport()', () => { - const coordinateHistoricalExport = getTestMethod('coordinateHistoricalExport') - - beforeEach(async () => { - await resetTestDatabase() - }) - - it('does nothing if export isnt running / is done', async () => { - await coordinateHistoricalExport() - - expect(await storage().get(EXPORT_COORDINATION_KEY, null)).toEqual(null) - expect(hub.db.queuePluginLogEntry).not.toHaveBeenCalled() - }) - - describe('export is running', () => { - const params = { - id: 1, - parallelism: 3, - dateFrom: '2021-10-29T00:00:00.000Z' as ISOTimestamp, - dateTo: '2021-11-01T05:00:00.000Z' as ISOTimestamp, - } - - beforeEach(async () => { - await storage().set(EXPORT_PARAMETERS_KEY, params) - }) - - it('logs progress of the export and does not start excessive jobs', async () => { - await coordinateHistoricalExport({ - hasChanges: false, - exportIsDone: false, - progress: 0.7553, - done: [], - running: [], - toStartRunning: [], - toResume: [], - }) - - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledTimes(1) - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining('Export progress: ■■■■■■■■■■■■■■■□□□□□ (75.5%)'), - }) - ) - - expect(vm.meta.jobs.exportHistoricalEventsV2).not.toHaveBeenCalled() - expect(await storage().get(EXPORT_PARAMETERS_KEY, null)).toEqual(params) - }) - - it('starts up new jobs and updates coordination data if needed', async () => { - await coordinateHistoricalExport({ - hasChanges: true, - exportIsDone: false, - progress: 0.7553, - done: [ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - ] as ISOTimestamp[], - running: ['2021-11-01T00:00:00.000Z'] as ISOTimestamp[], - toStartRunning: [['2021-11-01T00:00:00.000Z', '2021-11-01T05:00:00.000Z']] as Array< - [ISOTimestamp, ISOTimestamp] - >, - toResume: [], - }) - - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith({ - endTime: 1635742800000, - exportId: 1, - fetchTimeInterval: hub.HISTORICAL_EXPORTS_INITIAL_FETCH_TIME_WINDOW, - offset: 0, - retriesPerformedSoFar: 0, - startTime: 1635724800000, - timestampCursor: 1635724800000, - statusKey: 'EXPORT_DATE_STATUS_2021-11-01T00:00:00.000Z', - }) - - expect(await storage().get('EXPORT_DATE_STATUS_2021-11-01T00:00:00.000Z', null)).toEqual( - expect.objectContaining({ - done: false, - progress: 0, - statusTime: Date.now(), - }) - ) - expect(await storage().get(EXPORT_COORDINATION_KEY, null)).toEqual({ - done: ['2021-10-29T00:00:00.000Z', '2021-10-30T00:00:00.000Z', '2021-10-31T00:00:00.000Z'], - running: ['2021-11-01T00:00:00.000Z'], - progress: 0.7553, - }) - }) - - it('resumes tasks and updates coordination if needed', async () => { - const toResumePayload = { - done: false, - progress: 0.5, - statusTime: 5_000_000_000, - endTime: 1635742800000, - exportId: 1, - fetchTimeInterval: hub.HISTORICAL_EXPORTS_INITIAL_FETCH_TIME_WINDOW, - offset: 0, - retriesPerformedSoFar: 0, - startTime: 1635724800000, - timestampCursor: 1635724800000, - statusKey: 'EXPORT_DATE_STATUS_2021-11-01T00:00:00.000Z', - } - - await coordinateHistoricalExport({ - hasChanges: true, - exportIsDone: false, - progress: 0.7553, - done: [ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - ] as ISOTimestamp[], - running: ['2021-11-01T00:00:00.000Z'] as ISOTimestamp[], - toStartRunning: [], - toResume: [toResumePayload], - }) - - expect(vm.meta.jobs.exportHistoricalEventsV2).toHaveBeenCalledWith(toResumePayload) - expect(await storage().get('EXPORT_DATE_STATUS_2021-11-01T00:00:00.000Z', null)).toEqual( - expect.objectContaining({ - done: false, - progress: 0.5, - statusTime: Date.now(), - }) - ) - }) - - it('handles export being completed', async () => { - await coordinateHistoricalExport({ - hasChanges: false, - exportIsDone: true, - progress: 1, - done: [], - running: [], - toStartRunning: [], - toResume: [], - }) - - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining('Export has finished!'), - }) - ) - expect(await storage().get(EXPORT_PARAMETERS_KEY, null)).toEqual(null) - }) - - it('stops export if abortMessage is set', async () => { - await storage().set(EXPORT_PARAMETERS_KEY, { ...params, abortMessage: 'test aborting' }) - - await coordinateHistoricalExport({ - hasChanges: true, - exportIsDone: false, - progress: 0.7553, - done: [ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - ] as ISOTimestamp[], - running: ['2021-11-01T00:00:00.000Z'] as ISOTimestamp[], - toStartRunning: [['2021-11-01T00:00:00.000Z', '2021-11-01T05:00:00.000Z']] as Array< - [ISOTimestamp, ISOTimestamp] - >, - toResume: [], - }) - - expect(vm.meta.jobs.exportHistoricalEventsV2).not.toHaveBeenCalled() - expect(hub.db.queuePluginLogEntry).toHaveBeenCalledWith( - expect.objectContaining({ - message: expect.stringContaining('test aborting'), - }) - ) - expect(await storage().get(EXPORT_PARAMETERS_KEY, null)).toEqual(null) - - // verify second call also nothing happens - await coordinateHistoricalExport({ - hasChanges: true, - exportIsDone: false, - progress: 0.7553, - done: [ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - ] as ISOTimestamp[], - running: ['2021-11-01T00:00:00.000Z'] as ISOTimestamp[], - toStartRunning: [['2021-11-01T00:00:00.000Z', '2021-11-01T05:00:00.000Z']] as Array< - [ISOTimestamp, ISOTimestamp] - >, - toResume: [], - }) - - expect(vm.meta.jobs.exportHistoricalEventsV2).not.toHaveBeenCalled() - expect(await storage().get(EXPORT_PARAMETERS_KEY, null)).toEqual(null) - }) - }) - }) - - describe('calculateCoordination()', () => { - const calculateCoordination = getTestMethod('calculateCoordination') - - const params = { - id: 1, - parallelism: 3, - dateFrom: '2021-10-29T00:00:00.000Z' as ISOTimestamp, - dateTo: '2021-11-01T05:00:00.000Z' as ISOTimestamp, - } - - beforeEach(async () => { - await resetTestDatabase() - }) - - it('does nothing if enough tasks running', async () => { - const result = await calculateCoordination(params, [], [ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - ] as ISOTimestamp[]) - - expect(result).toEqual({ - hasChanges: false, - done: [], - running: ['2021-10-29T00:00:00.000Z', '2021-10-30T00:00:00.000Z', '2021-10-31T00:00:00.000Z'], - toStartRunning: [], - toResume: [], - progress: 0, - exportIsDone: false, - }) - }) - - it('kicks off new tasks if theres room', async () => { - const result = await calculateCoordination(params, [], []) - - expect(result).toEqual({ - hasChanges: true, - done: [], - running: ['2021-10-29T00:00:00.000Z', '2021-10-30T00:00:00.000Z', '2021-10-31T00:00:00.000Z'], - toStartRunning: [ - ['2021-10-29T00:00:00.000Z', '2021-10-30T00:00:00.000Z'], - ['2021-10-30T00:00:00.000Z', '2021-10-31T00:00:00.000Z'], - ['2021-10-31T00:00:00.000Z', '2021-11-01T00:00:00.000Z'], - ], - toResume: [], - progress: 0, - exportIsDone: false, - }) - }) - - it('marks running tasks as done and counts progress', async () => { - await storage().set('EXPORT_DATE_STATUS_2021-10-29T00:00:00.000Z', { - done: false, - progress: 0.5, - statusTime: Date.now() - 60_000, - }) - await storage().set('EXPORT_DATE_STATUS_2021-10-30T00:00:00.000Z', { - done: true, - progress: 1, - statusTime: Date.now() - 60_000, - }) - - const result = await calculateCoordination(params, [], [ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - ] as ISOTimestamp[]) - - expect(result).toEqual({ - hasChanges: true, - done: ['2021-10-30T00:00:00.000Z'], - running: ['2021-10-29T00:00:00.000Z', '2021-10-31T00:00:00.000Z', '2021-11-01T00:00:00.000Z'], - toStartRunning: [['2021-11-01T00:00:00.000Z', '2021-11-01T05:00:00.000Z']], - toResume: [], - progress: 0.375, - exportIsDone: false, - }) - }) - - it('notifies if export is done after marking running tasks as done', async () => { - await storage().set('EXPORT_DATE_STATUS_2021-10-30T00:00:00.000Z', { - done: true, - progress: 1, - }) - - const result = await calculateCoordination( - params, - ['2021-10-29T00:00:00.000Z', '2021-10-31T00:00:00.000Z', '2021-11-01T00:00:00.000Z'] as ISOTimestamp[], - ['2021-10-30T00:00:00.000Z'] as ISOTimestamp[] - ) - - expect(result).toEqual({ - hasChanges: true, - done: expect.arrayContaining([ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - '2021-11-01T00:00:00.000Z', - ]), - running: [], - toStartRunning: [], - toResume: [], - progress: 1, - exportIsDone: true, - }) - }) - - it('resumes running task after a long enough of a delay', async () => { - const dateStatus = { - done: false, - progress: 0.5, - statusTime: Date.now() - 70 * 60 * 1000, - retriesPerformedSoFar: 0, - } - await storage().set('EXPORT_DATE_STATUS_2021-10-29T00:00:00.000Z', dateStatus) - - const result = await calculateCoordination(params, [], [ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - ] as ISOTimestamp[]) - - expect(result).toEqual({ - hasChanges: true, - done: [], - running: ['2021-10-29T00:00:00.000Z', '2021-10-30T00:00:00.000Z', '2021-10-31T00:00:00.000Z'], - toStartRunning: [], - toResume: [dateStatus], - progress: 0.125, - exportIsDone: false, - }) - }) - - it('does not resume tasks that are done', async () => { - const dateStatus = { - done: true, - progress: 1, - statusTime: Date.now() - 70 * 60 * 1000, - retriesPerformedSoFar: 0, - } - await storage().set('EXPORT_DATE_STATUS_2021-10-29T00:00:00.000Z', dateStatus) - - const result = await calculateCoordination(params, [], [ - '2021-10-29T00:00:00.000Z', - '2021-10-30T00:00:00.000Z', - '2021-10-31T00:00:00.000Z', - ] as ISOTimestamp[]) - - expect(result).toEqual({ - hasChanges: true, - done: ['2021-10-29T00:00:00.000Z'], - running: ['2021-10-30T00:00:00.000Z', '2021-10-31T00:00:00.000Z', '2021-11-01T00:00:00.000Z'], - toStartRunning: [['2021-11-01T00:00:00.000Z', '2021-11-01T05:00:00.000Z']], - toResume: [], - progress: 0.25, - exportIsDone: false, - }) - }) - }) - - describe('nextCursor()', () => { - const nextCursor = getTestMethod('nextCursor') - - const defaultPayload: ExportHistoricalEventsJobPayload = { - timestampCursor: 0, - startTime: 0, - endTime: 1_000_000_000, - offset: 0, - retriesPerformedSoFar: 0, - exportId: 0, - fetchTimeInterval: ONE_HOUR, - statusKey: 'abc', - } - - it('increases only offset if more in current time range', () => { - expect(nextCursor(defaultPayload, EVENTS_PER_RUN_SMALL)).toEqual({ - timestampCursor: defaultPayload.timestampCursor, - fetchTimeInterval: ONE_HOUR, - offset: EVENTS_PER_RUN_SMALL, - }) - }) - it('increases only offset if more in current time range on a late page', () => { - expect(nextCursor({ ...defaultPayload, offset: 5 * EVENTS_PER_RUN_SMALL }, EVENTS_PER_RUN_SMALL)).toEqual({ - timestampCursor: defaultPayload.timestampCursor, - fetchTimeInterval: ONE_HOUR, - offset: 6 * EVENTS_PER_RUN_SMALL, - }) - }) - - it('returns existing fetchTimeInterval if time range mostly full', () => { - expect(nextCursor(defaultPayload, EVENTS_PER_RUN_SMALL * 0.9)).toEqual({ - timestampCursor: defaultPayload.timestampCursor + defaultPayload.fetchTimeInterval, - fetchTimeInterval: ONE_HOUR, - offset: 0, - }) - }) - - it('increases fetchTimeInterval if time range mostly empty', () => { - expect(nextCursor(defaultPayload, EVENTS_PER_RUN_SMALL * 0.1)).toEqual({ - timestampCursor: defaultPayload.timestampCursor + defaultPayload.fetchTimeInterval, - fetchTimeInterval: ONE_HOUR * hub.HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER, - offset: 0, - }) - }) - - it('does not increase fetchTimeInterval beyond 12 hours', () => { - const payload = { - ...defaultPayload, - fetchTimeInterval: 11.5 * 60 * 60 * 1000, // 11.5 hours - } - expect(nextCursor(payload, EVENTS_PER_RUN_SMALL * 0.1)).toEqual({ - timestampCursor: payload.timestampCursor + payload.fetchTimeInterval, - fetchTimeInterval: 12 * 60 * 60 * 1000, - offset: 0, - }) - }) - - it('decreases fetchTimeInterval if on a late page and no more to fetch', () => { - expect(nextCursor({ ...defaultPayload, offset: 5 * EVENTS_PER_RUN_SMALL }, 10)).toEqual({ - timestampCursor: defaultPayload.timestampCursor + defaultPayload.fetchTimeInterval, - fetchTimeInterval: ONE_HOUR / hub.HISTORICAL_EXPORTS_FETCH_WINDOW_MULTIPLIER, - offset: 0, - }) - }) - - it('does not decrease fetchTimeInterval below 10 minutes', () => { - const payload = { - ...defaultPayload, - offset: 5 * EVENTS_PER_RUN_SMALL, - fetchTimeInterval: 10.5 * 60 * 1000, // 10.5 minutes - } - - expect(nextCursor(payload, 10)).toEqual({ - timestampCursor: payload.timestampCursor + payload.fetchTimeInterval, - fetchTimeInterval: 10 * 60 * 1000, - offset: 0, - }) - }) - - it('reduces fetchTimeInterval if it would result going beyond endTime', () => { - const payload = { - ...defaultPayload, - endTime: 6_500_000, - timestampCursor: 5_000_000, - fetchTimeInterval: 1_000_000, - offset: 0, - } - - expect(nextCursor(payload, 10)).toEqual({ - timestampCursor: 6_000_000, - fetchTimeInterval: 500_000, - offset: 0, - }) - }) - - it('make sure to use a larger batch size if the plugin recommends it', () => { - // NOTE: this doesn't actually check that this value is used in the - // requests to ClickHouse, but :fingercrossed: it's good enough. - createVM() - - // When no settings are returned, the default small batch size is used - let eventsPerRun = addHistoricalEventsExportCapabilityV2( - hub, - { plugin: { name: 'S3 Export Plugin' } } as any, - vm - ).eventsPerRun - expect(eventsPerRun).toEqual(500) - - // Set the handlesLargeBatches flag to true and expect a big batch size - vm.methods.getSettings = jest.fn().mockReturnValue({ - handlesLargeBatches: true, - }) - eventsPerRun = addHistoricalEventsExportCapabilityV2( - hub, - { plugin: { name: 'S3 Export Plugin' } } as any, - vm - ).eventsPerRun - expect(eventsPerRun).toEqual(10000) - - // Keep the default of 500 if the flag is false - vm.methods.getSettings = jest.fn().mockReturnValue({ - handlesLargeBatches: false, - }) - eventsPerRun = addHistoricalEventsExportCapabilityV2( - hub, - { plugin: { name: 'foo' } } as any, - vm - ).eventsPerRun - expect(eventsPerRun).toEqual(500) - }) - }) - - describe('getTimestampBoundaries()', () => { - const getTimestampBoundaries = getTestMethod('getTimestampBoundaries') - - it('returns timestamp boundaries passed into interface job, increasing the end date by a day', () => { - expect( - getTimestampBoundaries({ - dateRange: ['2021-10-29', '2021-11-30'], - }) - ).toEqual(['2021-10-29T00:00:00.000Z', '2021-12-01T00:00:00.000Z']) - }) - - it('raises an error for invalid timestamp formats', () => { - expect(() => - getTimestampBoundaries({ - dateRange: ['foo', 'bar'], - }) - ).toThrow("'dateRange' should be two dates in ISO string format.") - }) - }) - - describe('getExportDateRange()', () => { - const getExportDateRange = getTestMethod('getExportDateRange') - - it('returns values in range from start of the date', () => { - expect( - getExportDateRange({ - id: 1, - parallelism: 1, - dateFrom: '2021-10-29T00:00:00.000Z' as ISOTimestamp, - dateTo: '2021-10-29T00:00:00.000Z' as ISOTimestamp, - }) - ).toEqual([]) - - expect( - getExportDateRange({ - id: 1, - parallelism: 1, - dateFrom: '2021-10-29T00:00:00.000Z' as ISOTimestamp, - dateTo: '2021-11-02T00:00:00.000Z' as ISOTimestamp, - }) - ).toEqual([ - ['2021-10-29T00:00:00.000Z', '2021-10-30T00:00:00.000Z'], - ['2021-10-30T00:00:00.000Z', '2021-10-31T00:00:00.000Z'], - ['2021-10-31T00:00:00.000Z', '2021-11-01T00:00:00.000Z'], - ['2021-11-01T00:00:00.000Z', '2021-11-02T00:00:00.000Z'], - ]) - }) - - it('handles partial-day ranges gracefully', () => { - expect( - getExportDateRange({ - id: 1, - parallelism: 1, - dateFrom: '2021-10-29T01:00:00.000Z' as ISOTimestamp, - dateTo: '2021-10-30T05:55:00.000Z' as ISOTimestamp, - }) - ).toEqual([ - ['2021-10-29T01:00:00.000Z', '2021-10-30T00:00:00.000Z'], - ['2021-10-30T00:00:00.000Z', '2021-10-30T05:55:00.000Z'], - ]) - }) - }) - - describe('progressBar()', () => { - const progressBar = getTestMethod('progressBar') - - it('calculates progress correctly', () => { - expect(progressBar(0)).toEqual('□□□□□□□□□□□□□□□□□□□□') - expect(progressBar(1)).toEqual('■■■■■■■■■■■■■■■■■■■■') - expect(progressBar(0.5)).toEqual('■■■■■■■■■■□□□□□□□□□□') - expect(progressBar(0.7)).toEqual('■■■■■■■■■■■■■■□□□□□□') - expect(progressBar(0.12)).toEqual('■■□□□□□□□□□□□□□□□□□□') - expect(progressBar(0.12, 10)).toEqual('■□□□□□□□□□') - }) - }) - - describe('stopExport()', () => { - const stopExport = getTestMethod('stopExport') - - const params = { - id: 1, - parallelism: 3, - dateFrom: '2021-10-29T00:00:00.000Z' as ISOTimestamp, - dateTo: '2021-11-01T05:00:00.000Z' as ISOTimestamp, - } - - it('unsets EXPORT_PARAMETERS_KEY', async () => { - await storage().set(EXPORT_PARAMETERS_KEY, params) - - await stopExport(params, '', 'success') - - expect(await storage().get(EXPORT_PARAMETERS_KEY, null)).toEqual(null) - }) - - it('captures activity for export success', async () => { - await stopExport(params, '', 'success') - - expect(createPluginActivityLog).toHaveBeenCalledWith( - hub, - pluginConfig39.team_id, - pluginConfig39.id, - 'export_success', - { - trigger: { - job_id: '1', - job_type: INTERFACE_JOB_NAME, - payload: params, - }, - } - ) - }) - - it('captures activity for export failure', async () => { - await stopExport(params, 'Some error message', 'fail') - - expect(createPluginActivityLog).toHaveBeenCalledWith( - hub, - pluginConfig39.team_id, - pluginConfig39.id, - 'export_fail', - { - trigger: { - job_id: '1', - job_type: INTERFACE_JOB_NAME, - payload: { - ...params, - failure_reason: 'Some error message', - }, - }, - } - ) - }) - }) - - describe('shouldResume()', () => { - const shouldResume = getTestMethod('shouldResume') - - it('resumes task when a bit over 10 minutes have passed', () => { - const status = { - statusTime: 10_000_000_000, - retriesPerformedSoFar: 0, - } as any - - expect(shouldResume(status, 10_000_000_000)).toEqual(false) - expect(shouldResume(status, 9_000_000_000)).toEqual(false) - expect(shouldResume(status, 10_000_060_000)).toEqual(false) - expect(shouldResume(status, 10_000_590_000)).toEqual(false) - expect(shouldResume(status, 10_000_600_000)).toEqual(false) - expect(shouldResume(status, 10_003_660_000)).toEqual(true) - }) - - it('accounts for retries exponential backoff', () => { - const status = { - statusTime: 10_000_000_000, - retriesPerformedSoFar: 10, - } as any - - expect(shouldResume(status, 10_000_660_000)).toEqual(false) - // Roughly 2**11*3 seconds are waited between retry 10 and 11 - expect(shouldResume(status, 10_006_000_000)).toEqual(false) - expect(shouldResume(status, 10_006_200_000)).toEqual(false) - }) - }) - - describe('updating public jobs', () => { - beforeEach(() => { - jest.spyOn(hub.db, 'addOrUpdatePublicJob') - }) - - it('updates when public job has not been yet registered', () => { - const pluginConfig: PluginConfig = { - ...pluginConfig39, - plugin: { - ...plugin60, - public_jobs: {}, - }, - } - createVM(pluginConfig) - - expect(hub.db.addOrUpdatePublicJob).toHaveBeenCalledWith( - pluginConfig39.plugin_id, - INTERFACE_JOB_NAME, - JOB_SPEC - ) - }) - - it('updates when public job definition has changed', () => { - const pluginConfig: PluginConfig = { - ...pluginConfig39, - plugin: { - ...plugin60, - public_jobs: { [INTERFACE_JOB_NAME]: { payload: {} } }, - }, - } - createVM(pluginConfig) - - expect(hub.db.addOrUpdatePublicJob).toHaveBeenCalledWith( - pluginConfig39.plugin_id, - INTERFACE_JOB_NAME, - JOB_SPEC - ) - }) - - it('does not update if public job has already been registered', () => { - const pluginConfig: PluginConfig = { - ...pluginConfig39, - plugin: { - ...plugin60, - public_jobs: { [INTERFACE_JOB_NAME]: JOB_SPEC }, - }, - } - createVM(pluginConfig) - - expect(hub.db.addOrUpdatePublicJob).not.toHaveBeenCalled() - }) - }) - - describe('tasks.schedule.runEveryMinute()', () => { - it('sets __ignoreForAppMetrics if runEveryMinute was not previously defined', async () => { - createVM() - - expect(vm.tasks.schedule.runEveryMinute).toEqual({ - name: 'runEveryMinute', - type: PluginTaskType.Schedule, - exec: expect.any(Function), - __ignoreForAppMetrics: true, - }) - - await vm.tasks.schedule.runEveryMinute.exec() - }) - - it('calls original method and does not set __ignoreForAppMetrics if runEveryMinute was previously defined in plugin', async () => { - const pluginRunEveryMinute = jest.fn() - - createVM(pluginConfig39, { - runEveryMinute: { - name: 'runEveryMinute', - type: PluginTaskType.Schedule, - exec: pluginRunEveryMinute, - }, - }) - - expect(vm.tasks.schedule.runEveryMinute).toEqual({ - name: 'runEveryMinute', - type: PluginTaskType.Schedule, - exec: expect.any(Function), - __ignoreForAppMetrics: false, - }) - - await vm.tasks.schedule.runEveryMinute.exec() - - expect(pluginRunEveryMinute).toHaveBeenCalled() - }) - - it('calls original method and sets __ignoreForAppMetrics if runEveryMinute was previously also wrapped', async () => { - const pluginRunEveryMinute = jest.fn() - - createVM(pluginConfig39, { - runEveryMinute: { - name: 'runEveryMinute', - type: PluginTaskType.Schedule, - exec: pluginRunEveryMinute, - __ignoreForAppMetrics: true, - }, - }) - - expect(vm.tasks.schedule.runEveryMinute).toEqual({ - name: 'runEveryMinute', - type: PluginTaskType.Schedule, - exec: expect.any(Function), - __ignoreForAppMetrics: true, - }) - - await vm.tasks.schedule.runEveryMinute.exec() - - expect(pluginRunEveryMinute).toHaveBeenCalled() - }) - }) -}) diff --git a/plugin-server/tests/worker/vm/upgrades/historical-export/export-historical-events.test.ts b/plugin-server/tests/worker/vm/upgrades/historical-export/export-historical-events.test.ts deleted file mode 100644 index 02fd27eab837f..0000000000000 --- a/plugin-server/tests/worker/vm/upgrades/historical-export/export-historical-events.test.ts +++ /dev/null @@ -1,127 +0,0 @@ -import { PluginMeta } from '@posthog/plugin-scaffold' -import deepmerge from 'deepmerge' - -import { Hub, PluginConfig, PluginConfigVMInternalResponse } from '../../../../../src/types' -import { createHub } from '../../../../../src/utils/db/hub' -import { createStorage } from '../../../../../src/worker/vm/extensions/storage' -import { createUtils } from '../../../../../src/worker/vm/extensions/utilities' -import { addHistoricalEventsExportCapability } from '../../../../../src/worker/vm/upgrades/historical-export/export-historical-events' -import { ExportHistoricalEventsUpgrade } from '../../../../../src/worker/vm/upgrades/utils/utils' -import { pluginConfig39 } from '../../../../helpers/plugins' - -jest.mock('../../../../../src/utils/status') - -describe('addHistoricalEventsExportCapability()', () => { - let hub: Hub - let closeHub: () => Promise - let _pluginConfig39: PluginConfig - - beforeEach(async () => { - ;[hub, closeHub] = await createHub() - - _pluginConfig39 = { ...pluginConfig39 } - }) - - afterEach(async () => { - await closeHub() - }) - - function addCapabilities(overrides?: any) { - const mockVM = deepmerge(overrides, { - methods: { - exportEvents: jest.fn(), - }, - tasks: { - schedule: {}, - job: {}, - }, - meta: { - storage: createStorage(hub, _pluginConfig39), - utils: createUtils(hub, _pluginConfig39.id), - jobs: { - exportHistoricalEvents: jest.fn().mockReturnValue(jest.fn()), - }, - global: {}, - }, - }) as unknown as PluginConfigVMInternalResponse> - - addHistoricalEventsExportCapability(hub, _pluginConfig39, mockVM) - - return mockVM - } - - it('adds new methods, scheduled tasks and jobs', () => { - const vm = addCapabilities() - - expect(Object.keys(vm.methods)).toEqual(['exportEvents', 'setupPlugin']) - expect(Object.keys(vm.tasks.schedule)).toEqual(['runEveryMinute']) - expect(Object.keys(vm.tasks.job)).toEqual(['exportHistoricalEvents', 'Export historical events']) - expect(Object.keys(vm.meta.global)).toEqual([ - 'exportHistoricalEvents', - 'initTimestampsAndCursor', - 'setTimestampBoundaries', - 'updateProgressBar', - ]) - }) - - it('registers public job spec theres not currently a spec', () => { - const addOrUpdatePublicJobSpy = jest.spyOn(hub.db, 'addOrUpdatePublicJob') - addCapabilities() - - expect(addOrUpdatePublicJobSpy).toHaveBeenCalledWith(60, 'Export historical events', { - payload: { - dateFrom: { required: true, title: 'Export start date', type: 'date' }, - dateTo: { required: true, title: 'Export end date', type: 'date' }, - }, - }) - }) - - it('updates plugin job spec if current spec is outdated', () => { - const addOrUpdatePublicJobSpy = jest.spyOn(hub.db, 'addOrUpdatePublicJob') - - _pluginConfig39.plugin = { - public_jobs: { - 'Export historical events': { payload: { foo: 'bar' } }, - }, - } as any - - addCapabilities() - - expect(addOrUpdatePublicJobSpy).toHaveBeenCalledWith(60, 'Export historical events', { - payload: { - dateFrom: { required: true, title: 'Export start date', type: 'date' }, - dateTo: { required: true, title: 'Export end date', type: 'date' }, - }, - }) - }) - - it('does not update plugin job spec if current spec matches stored spec', () => { - const addOrUpdatePublicJobSpy = jest.spyOn(hub.db, 'addOrUpdatePublicJob') - - _pluginConfig39.plugin = { - public_jobs: { - 'Export historical events': { - payload: { - dateFrom: { required: true, title: 'Export start date', type: 'date' }, - dateTo: { required: true, title: 'Export end date', type: 'date' }, - }, - }, - }, - } as any - - addCapabilities() - - expect(addOrUpdatePublicJobSpy).not.toHaveBeenCalled() - }) - - describe('setupPlugin()', () => { - it('calls original setupPlugin()', async () => { - const setupPlugin = jest.fn() - const vm = addCapabilities({ methods: { setupPlugin } }) - - await vm.methods.setupPlugin!() - - expect(setupPlugin).toHaveBeenCalled() - }) - }) -}) diff --git a/plugin-server/tests/worker/vm/upgrades/utils/fetchEventsForInterval.test.ts b/plugin-server/tests/worker/vm/upgrades/utils/fetchEventsForInterval.test.ts deleted file mode 100644 index 2c9ad2d8b486b..0000000000000 --- a/plugin-server/tests/worker/vm/upgrades/utils/fetchEventsForInterval.test.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { PluginEvent } from '@posthog/plugin-scaffold' - -import { Hub } from '../../../../../src/types' -import { createHub } from '../../../../../src/utils/db/hub' -import { UUIDT } from '../../../../../src/utils/utils' -import { EventPipelineRunner } from '../../../../../src/worker/ingestion/event-pipeline/runner' -import { fetchEventsForInterval } from '../../../../../src/worker/vm/upgrades/utils/fetchEventsForInterval' -import { HistoricalExportEvent } from '../../../../../src/worker/vm/upgrades/utils/utils' -import { delayUntilEventIngested, resetTestDatabaseClickhouse } from '../../../../helpers/clickhouse' -import { resetTestDatabase } from '../../../../helpers/sql' - -jest.mock('../../../../../src/utils/status') - -const THIRTY_MINUTES = 1000 * 60 * 30 - -describe('fetchEventsForInterval()', () => { - let hub: Hub - let closeServer: () => Promise - - beforeEach(async () => { - await resetTestDatabase() - await resetTestDatabaseClickhouse() - ;[hub, closeServer] = await createHub() - }) - - afterEach(async () => { - await closeServer() - }) - - async function ingestEvent(timestamp: string, overrides: Partial = {}) { - const pluginEvent: PluginEvent = { - event: 'some_event', - distinct_id: 'some_user', - site_url: '', - team_id: 2, - timestamp: timestamp, - now: timestamp, - ip: '', - uuid: new UUIDT().toString(), - ...overrides, - } as any as PluginEvent - - const runner = new EventPipelineRunner(hub, pluginEvent) - await runner.runEventPipeline(pluginEvent) - } - - function extract( - events: Array, - key: T - ): Array { - return events.map((event) => event[key]) - } - - it('fetches events and parses them', async () => { - // To avoid parallel person processing which we don't handle we're doing one first alone - await ingestEvent('2021-06-01T00:00:00.000Z') // too old - await Promise.all([ - ingestEvent('2021-09-01T00:00:00.000Z'), // too new - - ingestEvent('2021-08-01T00:01:00.000Z'), - ingestEvent('2021-08-01T00:02:00.000Z', { properties: { foo: 'bar' } }), - ingestEvent('2021-08-01T00:03:00.000Z'), - ingestEvent('2021-08-01T00:29:59.000Z'), - ingestEvent('2021-08-01T00:33:00.000Z'), - ]) - - await hub.kafkaProducer.flush() - await delayUntilEventIngested(() => hub.db.fetchEvents(), 7) - - const events = await fetchEventsForInterval( - hub.db, - 2, - new Date('2021-08-01T00:00:00.000Z'), - 0, - THIRTY_MINUTES, - 2 - ) - - expect(events.length).toEqual(2) - expect(extract(events, 'timestamp')).toEqual(['2021-08-01T00:01:00.000Z', '2021-08-01T00:02:00.000Z']) - expect(extract(events, 'properties')).toEqual([ - { - $$historical_export_source_db: 'clickhouse', - $$historical_export_timestamp: expect.any(String), - $$is_historical_export_event: true, - }, - { - $$historical_export_source_db: 'clickhouse', - $$historical_export_timestamp: expect.any(String), - $$is_historical_export_event: true, - foo: 'bar', - }, - ]) - - const offsetEvents = await fetchEventsForInterval( - hub.db, - 2, - new Date('2021-08-01T00:00:00.000Z'), - 2, - THIRTY_MINUTES, - 2 - ) - expect(offsetEvents.length).toEqual(2) - expect(extract(offsetEvents, 'timestamp')).toEqual(['2021-08-01T00:03:00.000Z', '2021-08-01T00:29:59.000Z']) - - const offsetEvents2 = await fetchEventsForInterval( - hub.db, - 2, - new Date('2021-08-01T00:00:00.000Z'), - 4, - THIRTY_MINUTES, - 2 - ) - expect(offsetEvents2.length).toEqual(0) - }) -})