Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite committed Dec 25, 2024
1 parent 209685f commit aaadad3
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 16 deletions.
2 changes: 0 additions & 2 deletions plugin-server/functional_tests/scheduled-tasks-runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'invalidTaskType', pluginConfigId: 1 })),
key,
,
})

await waitForExpect(() => {
Expand All @@ -85,7 +84,6 @@ describe('dlq handling', () => {
topic: 'scheduled_tasks',
message: Buffer.from(JSON.stringify({ taskType: 'runEveryMinute', pluginConfigId: 'asdf' })),
key,
,
})

await waitForExpect(() => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ import {
} from '../../../config/kafka-topics'
import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer'
import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config'
import { KafkaProducerWrapper } from '../../../kafka/producer'
import { PluginServerService, PluginsServerConfig, RedisPool, TeamId, ValueMatcher } from '../../../types'
import { BackgroundRefresher } from '../../../utils/background-refresher'
import { KafkaProducerWrapper } from '../../../kafka/producer'
import { PostgresRouter } from '../../../utils/db/postgres'
import { createRedisPool } from '../../../utils/db/redis'
import { status } from '../../../utils/status'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import { KafkaConsumer, Message, MessageHeader, PartitionMetadata } from 'node-r
import path from 'path'
import { Counter } from 'prom-client'

import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { KafkaProducerWrapper } from '../../../kafka/producer'
import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types'
import { status } from '../../../utils/status'
import { captureIngestionWarning } from '../../../worker/ingestion/utils'
import { eventDroppedCounter } from '../metrics'
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import {
CdpProcessedEventsConsumer,
} from '../cdp/cdp-consumers'
import { defaultConfig } from '../config/config'
import { KafkaProducerWrapper } from '../kafka/producer'
import { Hub, PluginServerCapabilities, PluginServerService, PluginsServerConfig } from '../types'
import { closeHub, createHub, createKafkaClient } from '../utils/db/hub'
import { KafkaProducerWrapper } from '../kafka/producer'
import { PostgresRouter } from '../utils/db/postgres'
import { createRedisClient } from '../utils/db/redis'
import { cancelAllScheduledJobs } from '../utils/node-schedule'
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ import { DateTime } from 'luxon'
import { VM } from 'vm2'

import { EncryptedFields } from './cdp/encryption-utils'
import { KafkaProducerWrapper } from './kafka/producer'
import { ObjectStorage } from './main/services/object_storage'
import { Celery } from './utils/db/celery'
import { DB } from './utils/db/db'
import { KafkaProducerWrapper } from './kafka/producer'
import { PostgresRouter } from './utils/db/postgres'
import { UUID } from './utils/utils'
import { ActionManager } from './worker/ingestion/action-manager'
Expand Down
10 changes: 7 additions & 3 deletions plugin-server/src/utils/db/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1099,12 +1099,12 @@ export class DB {
return queryResult.data as PluginLogEntry[]
}

public queuePluginLogEntry(entry: LogEntryPayload): void {
public queuePluginLogEntry(entry: LogEntryPayload): Promise<void> {
const { pluginConfig, source, message, type, timestamp, instanceId } = entry
const configuredLogLevel = pluginConfig.plugin?.log_level || this.pluginsDefaultLogLevel

if (!shouldStoreLog(configuredLogLevel, type)) {
return
return Promise.resolve()
}

const parsedEntry = {
Expand All @@ -1122,7 +1122,7 @@ export class DB {
if (parsedEntry.message.length > 50_000) {
const { message, ...rest } = parsedEntry
status.warn('⚠️', 'Plugin log entry too long, ignoring.', rest)
return
return Promise.resolve()
}

pluginLogEntryCounter.labels({ plugin_id: String(pluginConfig.plugin_id), source }).inc()
Expand All @@ -1144,9 +1144,13 @@ export class DB {
entry: parsedEntry,
})
})

// TRICKY: We don't want to block the caller, so we return a promise that resolves immediately.
return Promise.resolve()
} catch (e) {
captureException(e, { tags: { team_id: entry.pluginConfig.team_id } })
console.error('Failed to produce message', e, parsedEntry)
return Promise.resolve()
}
}

Expand Down
6 changes: 2 additions & 4 deletions plugin-server/src/utils/db/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export class MessageSizeTooLarge extends Error {
readonly isRetriable = false
}

export function processError(
export async function processError(
server: Hub,
pluginConfig: PluginConfig | null,
error: Error | string,
Expand Down Expand Up @@ -60,16 +60,14 @@ export function processError(
event: event,
}

server.db.queuePluginLogEntry({
await server.db.queuePluginLogEntry({
pluginConfig,
source: PluginLogEntrySource.Plugin,
type: PluginLogEntryType.Error,
message: pluginError.stack ?? pluginError.message,
instanceId: server.instanceId,
timestamp: pluginError.time,
})

return Promise.resolve()
}

export function cleanErrorStackTrace(stack: string | undefined): string | undefined {
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/src/worker/plugins/teardown.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export async function teardownPlugins(server: Hub, pluginConfig?: PluginConfig):
(async () => {
try {
await teardownPlugin()
server.db.queuePluginLogEntry({
await server.db.queuePluginLogEntry({
pluginConfig,
source: PluginLogEntrySource.System,
type: PluginLogEntryType.Debug,
Expand All @@ -23,7 +23,7 @@ export async function teardownPlugins(server: Hub, pluginConfig?: PluginConfig):
})
} catch (error) {
await processError(server, pluginConfig, error)
server.db.queuePluginLogEntry({
await server.db.queuePluginLogEntry({
pluginConfig,
source: PluginLogEntrySource.System,
type: PluginLogEntryType.Error,
Expand All @@ -34,7 +34,7 @@ export async function teardownPlugins(server: Hub, pluginConfig?: PluginConfig):
})()
)
} else {
server.db.queuePluginLogEntry({
await server.db.queuePluginLogEntry({
pluginConfig,
source: PluginLogEntrySource.System,
type: PluginLogEntryType.Debug,
Expand Down

0 comments on commit aaadad3

Please sign in to comment.