Skip to content

Commit

Permalink
feat: Inline plugins (plugins without running VM2) (#23443)
Browse files Browse the repository at this point in the history
Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
Co-authored-by: Ben White <[email protected]>
  • Loading branch information
3 people authored and thmsobrmlr committed Jul 25, 2024
1 parent a6571ff commit 2f3f4ea
Show file tree
Hide file tree
Showing 32 changed files with 845 additions and 141 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0448_add_mysql_externaldatasource_source_type
posthog: 0449_alter_plugin_organization_alter_plugin_plugin_type_and_more
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
2 changes: 1 addition & 1 deletion plugin-server/functional_tests/plugins.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ test.concurrent('plugins can use attachements', async () => {
key: 'testAttachment',
contents: 'test',
})
await enablePluginConfig(teamId, plugin.id)
await enablePluginConfig(teamId, pluginConfig.id)

await reloadPlugins()

Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpProcessedEvents: true,
cdpFunctionCallbacks: true,
cdpFunctionOverflow: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
case PluginServerMode.ingestion:
Expand Down Expand Up @@ -89,6 +90,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
return {
pluginScheduledTasks: true,
appManagementSingleton: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
case PluginServerMode.cdp_processed_events:
Expand Down Expand Up @@ -121,6 +123,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
sessionRecordingBlobIngestion: true,
appManagementSingleton: true,
preflightSchedules: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
}
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { RustyHook } from '../worker/rusty-hook'
import { syncInlinePlugins } from '../worker/vm/inline/inline'
import { GraphileWorker } from './graphile-worker/graphile-worker'
import { loadPluginSchedule } from './graphile-worker/schedule'
import { startGraphileWorker } from './graphile-worker/worker-setup'
Expand Down Expand Up @@ -439,6 +440,13 @@ export async function startPluginsServer(
healthChecks['webhooks-ingestion'] = isWebhooksIngestionHealthy
}

if (capabilities.syncInlinePlugins) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
serverInstance = serverInstance ? serverInstance : { hub }

await syncInlinePlugins(hub)
}

if (hub && serverInstance) {
pubSub = new PubSub(hub, {
[hub.PLUGINS_RELOAD_PUBSUB_CHANNEL]: async () => {
Expand Down
19 changes: 10 additions & 9 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { TeamManager } from './worker/ingestion/team-manager'
import { RustyHook } from './worker/rusty-hook'
import { PluginsApiKeyManager } from './worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './worker/vm/extensions/helpers/root-acess-manager'
import { LazyPluginVM } from './worker/vm/lazy'
import { PluginInstance } from './worker/vm/lazy'

export { Element } from '@posthog/plugin-scaffold' // Re-export Element from scaffolding, for backwards compat.

Expand Down Expand Up @@ -314,7 +314,7 @@ export interface Hub extends PluginsServerConfig {
// diagnostics
lastActivity: number
lastActivityType: string
statelessVms: StatelessVmMap
statelessVms: StatelessInstanceMap
conversionBufferEnabledTeams: Set<number>
// functions
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
Expand Down Expand Up @@ -344,6 +344,7 @@ export interface PluginServerCapabilities {
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
http?: boolean
mmdb?: boolean
syncInlinePlugins?: boolean
}

export type EnqueuedJob = EnqueuedPluginJob | GraphileWorkerCronScheduleJob
Expand Down Expand Up @@ -394,9 +395,9 @@ export interface JobSpec {

export interface Plugin {
id: number
organization_id: string
organization_id?: string
name: string
plugin_type: 'local' | 'respository' | 'custom' | 'source'
plugin_type: 'local' | 'respository' | 'custom' | 'source' | 'inline'
description?: string
is_global: boolean
is_preinstalled?: boolean
Expand Down Expand Up @@ -443,7 +444,7 @@ export interface PluginConfig {
order: number
config: Record<string, unknown>
attachments?: Record<string, PluginAttachment>
vm?: LazyPluginVM | null
instance?: PluginInstance | null
created_at: string
updated_at?: string
// We're migrating to a new functions that take PostHogEvent instead of PluginEvent
Expand Down Expand Up @@ -528,7 +529,7 @@ export interface PluginTask {
__ignoreForAppMetrics?: boolean
}

export type VMMethods = {
export type PluginMethods = {
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
getSettings?: () => PluginSettings
Expand All @@ -538,7 +539,7 @@ export type VMMethods = {
}

// Helper when ensuring that a required method is implemented
export type VMMethodsConcrete = Required<VMMethods>
export type PluginMethodsConcrete = Required<PluginMethods>

export enum AlertLevel {
P0 = 0,
Expand All @@ -565,7 +566,7 @@ export interface Alert {
}
export interface PluginConfigVMResponse {
vm: VM
methods: VMMethods
methods: PluginMethods
tasks: Record<PluginTaskType, Record<string, PluginTask>>
vmResponseVariable: string
usedImports: Set<string>
Expand Down Expand Up @@ -1150,7 +1151,7 @@ export enum PropertyUpdateOperation {
SetOnce = 'set_once',
}

export type StatelessVmMap = Record<PluginId, LazyPluginVM>
export type StatelessInstanceMap = Record<PluginId, PluginInstance>

export enum OrganizationPluginsAccessLevel {
NONE = 0,
Expand Down
98 changes: 96 additions & 2 deletions plugin-server/src/utils/db/sql.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Hub, Plugin, PluginAttachmentDB, PluginCapabilities, PluginConfig, PluginConfigId } from '../../types'
import { InlinePluginDescription } from '../../worker/vm/inline/inline'
import { PostgresUse } from './postgres'

function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
Expand Down Expand Up @@ -58,6 +59,49 @@ const PLUGIN_SELECT = `SELECT
LEFT JOIN posthog_pluginsourcefile psf__site_ts
ON (psf__site_ts.plugin_id = posthog_plugin.id AND psf__site_ts.filename = 'site.ts')`

const PLUGIN_UPSERT_RETURNING = `INSERT INTO posthog_plugin
(
name,
url,
tag,
from_json,
from_web,
error,
plugin_type,
organization_id,
is_global,
capabilities,
public_jobs,
is_stateless,
log_level,
description,
is_preinstalled,
config_schema,
updated_at,
created_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, NOW(), NOW())
ON CONFLICT (url)
DO UPDATE SET
name = $1,
tag = $3,
from_json = $4,
from_web = $5,
error = $6,
plugin_type = $7,
organization_id = $8,
is_global = $9,
capabilities = $10,
public_jobs = $11,
is_stateless = $12,
log_level = $13,
description = $14,
is_preinstalled = $15,
config_schema = $16,
updated_at = NOW()
RETURNING *
`

export async function getPlugin(hub: Hub, pluginId: number): Promise<Plugin | undefined> {
const result = await hub.db.postgres.query(
PostgresUse.COMMON_READ,
Expand All @@ -68,14 +112,14 @@ export async function getPlugin(hub: Hub, pluginId: number): Promise<Plugin | un
return result.rows[0]
}

export async function getPluginRows(hub: Hub): Promise<Plugin[]> {
export async function getActivePluginRows(hub: Hub): Promise<Plugin[]> {
const { rows }: { rows: Plugin[] } = await hub.db.postgres.query(
PostgresUse.COMMON_READ,
`${PLUGIN_SELECT}
WHERE posthog_plugin.id IN (${pluginConfigsInForceQuery('plugin_id')}
GROUP BY posthog_pluginconfig.plugin_id)`,
undefined,
'getPluginRows'
'getActivePluginRows'
)

return rows
Expand Down Expand Up @@ -124,3 +168,53 @@ export async function disablePlugin(hub: Hub, pluginConfigId: PluginConfigId): P
)
await hub.db.redisPublish(hub.PLUGINS_RELOAD_PUBSUB_CHANNEL, 'reload!')
}

// Given an inline plugin description, upsert it into the known plugins table, returning the full
// Plugin object. Matching is done based on plugin url, not id, since that varies by region.
export async function upsertInlinePlugin(hub: Hub, inline: InlinePluginDescription): Promise<Plugin> {
const fullPlugin: Plugin = {
id: 0,
name: inline.name,
url: inline.url,
tag: inline.tag,
from_json: false,
from_web: false,
error: undefined,
plugin_type: 'inline',
organization_id: undefined,
is_global: inline.is_global,
capabilities: inline.capabilities,
public_jobs: undefined,
is_stateless: inline.is_stateless,
log_level: inline.log_level,
description: inline.description,
is_preinstalled: inline.is_preinstalled,
config_schema: inline.config_schema,
}

const { rows }: { rows: Plugin[] } = await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`${PLUGIN_UPSERT_RETURNING}`,
[
fullPlugin.name,
fullPlugin.url,
fullPlugin.tag,
fullPlugin.from_json,
fullPlugin.from_web,
fullPlugin.error,
fullPlugin.plugin_type,
fullPlugin.organization_id,
fullPlugin.is_global,
fullPlugin.capabilities,
fullPlugin.public_jobs,
fullPlugin.is_stateless,
fullPlugin.log_level,
fullPlugin.description,
fullPlugin.is_preinstalled,
JSON.stringify(fullPlugin.config_schema),
],
'upsertInlinePlugin'
)

return rows[0]
}
16 changes: 11 additions & 5 deletions plugin-server/src/worker/plugins/loadPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ export async function loadPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<
const isLocalPlugin = plugin?.plugin_type === 'local'

if (!plugin) {
pluginConfig.vm?.failInitialization!()
pluginConfig.instance?.failInitialization!()
return false
}

// Inline plugins don't need "loading", and have no source files.
if (plugin.plugin_type === 'inline') {
await pluginConfig.instance?.initialize!('', pluginDigest(plugin))
return true
}

try {
// load config json
const configJson = isLocalPlugin
Expand All @@ -32,7 +38,7 @@ export async function loadPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<
try {
config = JSON.parse(configJson)
} catch (e) {
pluginConfig.vm?.failInitialization!()
pluginConfig.instance?.failInitialization!()
await processError(hub, pluginConfig, `Could not load "plugin.json" for ${pluginDigest(plugin)}`)
return false
}
Expand All @@ -46,11 +52,11 @@ export async function loadPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<
readFileIfExists(hub.BASE_DIR, plugin, 'index.ts')
: plugin.source__index_ts
if (pluginSource) {
void pluginConfig.vm?.initialize!(pluginSource, pluginDigest(plugin))
void pluginConfig.instance?.initialize!(pluginSource, pluginDigest(plugin))
return true
} else {
// always call this if no backend app present, will signal that the VM is done
pluginConfig.vm?.failInitialization!()
pluginConfig.instance?.failInitialization!()

// if there is a frontend or site app, don't save an error if no backend app
const hasFrontend = isLocalPlugin
Expand All @@ -72,7 +78,7 @@ export async function loadPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<
}
}
} catch (error) {
pluginConfig.vm?.failInitialization!()
pluginConfig.instance?.failInitialization!()
await processError(hub, pluginConfig, error)
}
return false
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/src/worker/plugins/loadPluginsFromDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { PluginAttachment } from '@posthog/plugin-scaffold'
import { Summary } from 'prom-client'

import { Hub, Plugin, PluginConfig, PluginConfigId, PluginId, PluginMethod, TeamId } from '../../types'
import { getPluginAttachmentRows, getPluginConfigRows, getPluginRows } from '../../utils/db/sql'
import { getActivePluginRows, getPluginAttachmentRows, getPluginConfigRows } from '../../utils/db/sql'

const loadPluginsMsSummary = new Summary({
name: 'load_plugins_ms',
Expand All @@ -29,7 +29,7 @@ export async function loadPluginsFromDB(
hub: Hub
): Promise<Pick<Hub, 'plugins' | 'pluginConfigs' | 'pluginConfigsPerTeam'>> {
const startTimer = new Date()
const pluginRows = await getPluginRows(hub)
const pluginRows = await getActivePluginRows(hub)
const plugins = new Map<PluginId, Plugin>()

for (const row of pluginRows) {
Expand Down Expand Up @@ -78,7 +78,7 @@ export async function loadPluginsFromDB(
...row,
plugin: plugin,
attachments: attachmentsPerConfig.get(row.id) || {},
vm: null,
instance: null,
method,
}
pluginConfigs.set(row.id, pluginConfig)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/plugins/loadSchedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export async function loadSchedule(server: Hub): Promise<void> {
let count = 0

for (const [id, pluginConfig] of server.pluginConfigs) {
const tasks = (await pluginConfig.vm?.getScheduledTasks()) ?? {}
const tasks = (await pluginConfig.instance?.getScheduledTasks()) ?? {}
for (const [taskName, task] of Object.entries(tasks)) {
if (task && taskName in pluginSchedule) {
pluginSchedule[taskName].push(id)
Expand Down
Loading

0 comments on commit 2f3f4ea

Please sign in to comment.