Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Inline plugins (plugins without running VM2) #23443

Merged
merged 14 commits into from
Jul 24, 2024
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0448_add_mysql_externaldatasource_source_type
posthog: 0449_alter_plugin_organization_alter_plugin_plugin_type_and_more
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
2 changes: 1 addition & 1 deletion plugin-server/functional_tests/plugins.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ test.concurrent('plugins can use attachements', async () => {
key: 'testAttachment',
contents: 'test',
})
await enablePluginConfig(teamId, plugin.id)
await enablePluginConfig(teamId, pluginConfig.id)

await reloadPlugins()

Expand Down
3 changes: 3 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpProcessedEvents: true,
cdpFunctionCallbacks: true,
cdpFunctionOverflow: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
case PluginServerMode.ingestion:
Expand Down Expand Up @@ -89,6 +90,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
return {
pluginScheduledTasks: true,
appManagementSingleton: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
case PluginServerMode.cdp_processed_events:
Expand Down Expand Up @@ -121,6 +123,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
sessionRecordingBlobIngestion: true,
appManagementSingleton: true,
preflightSchedules: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
}
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import { OrganizationManager } from '../worker/ingestion/organization-manager'
import { TeamManager } from '../worker/ingestion/team-manager'
import Piscina, { makePiscina as defaultMakePiscina } from '../worker/piscina'
import { RustyHook } from '../worker/rusty-hook'
import { syncInlinePlugins } from '../worker/vm/inline/inline'
import { GraphileWorker } from './graphile-worker/graphile-worker'
import { loadPluginSchedule } from './graphile-worker/schedule'
import { startGraphileWorker } from './graphile-worker/worker-setup'
Expand Down Expand Up @@ -439,6 +440,13 @@ export async function startPluginsServer(
healthChecks['webhooks-ingestion'] = isWebhooksIngestionHealthy
}

if (capabilities.syncInlinePlugins) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
serverInstance = serverInstance ? serverInstance : { hub }

await syncInlinePlugins(hub)
}

if (hub && serverInstance) {
pubSub = new PubSub(hub, {
[hub.PLUGINS_RELOAD_PUBSUB_CHANNEL]: async () => {
Expand Down
19 changes: 10 additions & 9 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import { TeamManager } from './worker/ingestion/team-manager'
import { RustyHook } from './worker/rusty-hook'
import { PluginsApiKeyManager } from './worker/vm/extensions/helpers/api-key-manager'
import { RootAccessManager } from './worker/vm/extensions/helpers/root-acess-manager'
import { LazyPluginVM } from './worker/vm/lazy'
import { PluginInstance } from './worker/vm/lazy'

export { Element } from '@posthog/plugin-scaffold' // Re-export Element from scaffolding, for backwards compat.

Expand Down Expand Up @@ -314,7 +314,7 @@ export interface Hub extends PluginsServerConfig {
// diagnostics
lastActivity: number
lastActivityType: string
statelessVms: StatelessVmMap
statelessVms: StatelessInstanceMap
conversionBufferEnabledTeams: Set<number>
// functions
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
Expand Down Expand Up @@ -344,6 +344,7 @@ export interface PluginServerCapabilities {
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
http?: boolean
mmdb?: boolean
syncInlinePlugins?: boolean
}

export type EnqueuedJob = EnqueuedPluginJob | GraphileWorkerCronScheduleJob
Expand Down Expand Up @@ -394,9 +395,9 @@ export interface JobSpec {

export interface Plugin {
id: number
organization_id: string
organization_id?: string
name: string
plugin_type: 'local' | 'respository' | 'custom' | 'source'
plugin_type: 'local' | 'respository' | 'custom' | 'source' | 'inline'
description?: string
is_global: boolean
is_preinstalled?: boolean
Expand Down Expand Up @@ -443,7 +444,7 @@ export interface PluginConfig {
order: number
config: Record<string, unknown>
attachments?: Record<string, PluginAttachment>
vm?: LazyPluginVM | null
instance?: PluginInstance | null
created_at: string
updated_at?: string
// We're migrating to a new functions that take PostHogEvent instead of PluginEvent
Expand Down Expand Up @@ -528,7 +529,7 @@ export interface PluginTask {
__ignoreForAppMetrics?: boolean
}

export type VMMethods = {
export type PluginMethods = {
setupPlugin?: () => Promise<void>
teardownPlugin?: () => Promise<void>
getSettings?: () => PluginSettings
Expand All @@ -538,7 +539,7 @@ export type VMMethods = {
}

// Helper when ensuring that a required method is implemented
export type VMMethodsConcrete = Required<VMMethods>
export type PluginMethodsConcrete = Required<PluginMethods>

export enum AlertLevel {
P0 = 0,
Expand All @@ -565,7 +566,7 @@ export interface Alert {
}
export interface PluginConfigVMResponse {
vm: VM
methods: VMMethods
methods: PluginMethods
tasks: Record<PluginTaskType, Record<string, PluginTask>>
vmResponseVariable: string
usedImports: Set<string>
Expand Down Expand Up @@ -1150,7 +1151,7 @@ export enum PropertyUpdateOperation {
SetOnce = 'set_once',
}

export type StatelessVmMap = Record<PluginId, LazyPluginVM>
export type StatelessInstanceMap = Record<PluginId, PluginInstance>

export enum OrganizationPluginsAccessLevel {
NONE = 0,
Expand Down
98 changes: 96 additions & 2 deletions plugin-server/src/utils/db/sql.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Hub, Plugin, PluginAttachmentDB, PluginCapabilities, PluginConfig, PluginConfigId } from '../../types'
import { InlinePluginDescription } from '../../worker/vm/inline/inline'
import { PostgresUse } from './postgres'

function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
Expand Down Expand Up @@ -58,6 +59,49 @@ const PLUGIN_SELECT = `SELECT
LEFT JOIN posthog_pluginsourcefile psf__site_ts
ON (psf__site_ts.plugin_id = posthog_plugin.id AND psf__site_ts.filename = 'site.ts')`

const PLUGIN_UPSERT_RETURNING = `INSERT INTO posthog_plugin
(
name,
url,
tag,
from_json,
from_web,
error,
plugin_type,
organization_id,
is_global,
capabilities,
public_jobs,
is_stateless,
log_level,
description,
is_preinstalled,
config_schema,
updated_at,
created_at
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, NOW(), NOW())
ON CONFLICT (url)
DO UPDATE SET
name = $1,
tag = $3,
from_json = $4,
from_web = $5,
error = $6,
plugin_type = $7,
organization_id = $8,
is_global = $9,
capabilities = $10,
public_jobs = $11,
is_stateless = $12,
log_level = $13,
description = $14,
is_preinstalled = $15,
config_schema = $16,
updated_at = NOW()
RETURNING *
`

export async function getPlugin(hub: Hub, pluginId: number): Promise<Plugin | undefined> {
const result = await hub.db.postgres.query(
PostgresUse.COMMON_READ,
Expand All @@ -68,14 +112,14 @@ export async function getPlugin(hub: Hub, pluginId: number): Promise<Plugin | un
return result.rows[0]
}

export async function getPluginRows(hub: Hub): Promise<Plugin[]> {
export async function getActivePluginRows(hub: Hub): Promise<Plugin[]> {
const { rows }: { rows: Plugin[] } = await hub.db.postgres.query(
PostgresUse.COMMON_READ,
`${PLUGIN_SELECT}
WHERE posthog_plugin.id IN (${pluginConfigsInForceQuery('plugin_id')}
GROUP BY posthog_pluginconfig.plugin_id)`,
undefined,
'getPluginRows'
'getActivePluginRows'
)

return rows
Expand Down Expand Up @@ -124,3 +168,53 @@ export async function disablePlugin(hub: Hub, pluginConfigId: PluginConfigId): P
)
await hub.db.redisPublish(hub.PLUGINS_RELOAD_PUBSUB_CHANNEL, 'reload!')
}

// Given an inline plugin description, upsert it into the known plugins table, returning the full
// Plugin object. Matching is done based on plugin url, not id, since that varies by region.
export async function upsertInlinePlugin(hub: Hub, inline: InlinePluginDescription): Promise<Plugin> {
const fullPlugin: Plugin = {
id: 0,
name: inline.name,
url: inline.url,
tag: inline.tag,
from_json: false,
from_web: false,
error: undefined,
plugin_type: 'inline',
organization_id: undefined,
is_global: inline.is_global,
capabilities: inline.capabilities,
public_jobs: undefined,
is_stateless: inline.is_stateless,
log_level: inline.log_level,
description: inline.description,
is_preinstalled: inline.is_preinstalled,
config_schema: inline.config_schema,
}

const { rows }: { rows: Plugin[] } = await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
`${PLUGIN_UPSERT_RETURNING}`,
[
fullPlugin.name,
fullPlugin.url,
fullPlugin.tag,
fullPlugin.from_json,
fullPlugin.from_web,
fullPlugin.error,
fullPlugin.plugin_type,
fullPlugin.organization_id,
fullPlugin.is_global,
fullPlugin.capabilities,
fullPlugin.public_jobs,
fullPlugin.is_stateless,
fullPlugin.log_level,
fullPlugin.description,
fullPlugin.is_preinstalled,
JSON.stringify(fullPlugin.config_schema),
],
'upsertInlinePlugin'
)

return rows[0]
}
16 changes: 11 additions & 5 deletions plugin-server/src/worker/plugins/loadPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,16 @@ export async function loadPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<
const isLocalPlugin = plugin?.plugin_type === 'local'

if (!plugin) {
pluginConfig.vm?.failInitialization!()
pluginConfig.instance?.failInitialization!()
return false
}

// Inline plugins don't need "loading", and have no source files.
if (plugin.plugin_type === 'inline') {
await pluginConfig.instance?.initialize!('', pluginDigest(plugin))
return true
}

try {
// load config json
const configJson = isLocalPlugin
Expand All @@ -32,7 +38,7 @@ export async function loadPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<
try {
config = JSON.parse(configJson)
} catch (e) {
pluginConfig.vm?.failInitialization!()
pluginConfig.instance?.failInitialization!()
await processError(hub, pluginConfig, `Could not load "plugin.json" for ${pluginDigest(plugin)}`)
return false
}
Expand All @@ -46,11 +52,11 @@ export async function loadPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<
readFileIfExists(hub.BASE_DIR, plugin, 'index.ts')
: plugin.source__index_ts
if (pluginSource) {
void pluginConfig.vm?.initialize!(pluginSource, pluginDigest(plugin))
void pluginConfig.instance?.initialize!(pluginSource, pluginDigest(plugin))
return true
} else {
// always call this if no backend app present, will signal that the VM is done
pluginConfig.vm?.failInitialization!()
pluginConfig.instance?.failInitialization!()

// if there is a frontend or site app, don't save an error if no backend app
const hasFrontend = isLocalPlugin
Expand All @@ -72,7 +78,7 @@ export async function loadPlugin(hub: Hub, pluginConfig: PluginConfig): Promise<
}
}
} catch (error) {
pluginConfig.vm?.failInitialization!()
pluginConfig.instance?.failInitialization!()
await processError(hub, pluginConfig, error)
}
return false
Expand Down
6 changes: 3 additions & 3 deletions plugin-server/src/worker/plugins/loadPluginsFromDB.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { PluginAttachment } from '@posthog/plugin-scaffold'
import { Summary } from 'prom-client'

import { Hub, Plugin, PluginConfig, PluginConfigId, PluginId, PluginMethod, TeamId } from '../../types'
import { getPluginAttachmentRows, getPluginConfigRows, getPluginRows } from '../../utils/db/sql'
import { getActivePluginRows, getPluginAttachmentRows, getPluginConfigRows } from '../../utils/db/sql'

const loadPluginsMsSummary = new Summary({
name: 'load_plugins_ms',
Expand All @@ -29,7 +29,7 @@ export async function loadPluginsFromDB(
hub: Hub
): Promise<Pick<Hub, 'plugins' | 'pluginConfigs' | 'pluginConfigsPerTeam'>> {
const startTimer = new Date()
const pluginRows = await getPluginRows(hub)
const pluginRows = await getActivePluginRows(hub)
const plugins = new Map<PluginId, Plugin>()

for (const row of pluginRows) {
Expand Down Expand Up @@ -78,7 +78,7 @@ export async function loadPluginsFromDB(
...row,
plugin: plugin,
attachments: attachmentsPerConfig.get(row.id) || {},
vm: null,
instance: null,
method,
}
pluginConfigs.set(row.id, pluginConfig)
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/plugins/loadSchedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ export async function loadSchedule(server: Hub): Promise<void> {
let count = 0

for (const [id, pluginConfig] of server.pluginConfigs) {
const tasks = (await pluginConfig.vm?.getScheduledTasks()) ?? {}
const tasks = (await pluginConfig.instance?.getScheduledTasks()) ?? {}
for (const [taskName, task] of Object.entries(tasks)) {
if (task && taskName in pluginSchedule) {
pluginSchedule[taskName].push(id)
Expand Down
Loading
Loading