Skip to content

Commit

Permalink
feat: populate plugin capabilities on install and edit
Browse files Browse the repository at this point in the history
  • Loading branch information
tiina303 authored and bretthoerner committed Dec 11, 2023
1 parent 01cc9d5 commit 4726f60
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 20 deletions.
6 changes: 6 additions & 0 deletions plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,12 @@ export async function startPluginsServer(
'reset-available-features-cache': async (message) => {
await piscina?.broadcastTask({ task: 'resetAvailableFeaturesCache', args: JSON.parse(message) })
},
'populate-plugin-capabilities': async (message) => {
// We need this to be done in only once
if (hub?.capabilities.pluginScheduledTasks && piscina) {
await piscina?.broadcastTask({ task: 'populatePluginCapabilities', args: JSON.parse(message) })
}
},
})

await pubSub.start()
Expand Down
36 changes: 24 additions & 12 deletions plugin-server/src/utils/db/sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,7 @@ function pluginConfigsInForceQuery(specificField?: keyof PluginConfig): string {
)`
}

export async function getPluginRows(hub: Hub): Promise<Plugin[]> {
const { rows }: { rows: Plugin[] } = await hub.db.postgres.query(
PostgresUse.COMMON_READ,
// `posthog_plugin` columns have to be listed individually, as we want to exclude a few columns
// and Postgres syntax unfortunately doesn't have a column exclusion feature. The excluded columns are:
// - archive - this is a potentially large blob, only extracted in Django as a plugin server optimization
// - latest_tag - not used in this service
// - latest_tag_checked_at - not used in this service
`SELECT
const PLUGIN_SELECT = `SELECT
posthog_plugin.id,
posthog_plugin.name,
posthog_plugin.url,
Expand Down Expand Up @@ -61,7 +53,27 @@ export async function getPluginRows(hub: Hub): Promise<Plugin[]> {
LEFT JOIN posthog_pluginsourcefile psf__frontend_tsx
ON (psf__frontend_tsx.plugin_id = posthog_plugin.id AND psf__frontend_tsx.filename = 'frontend.tsx')
LEFT JOIN posthog_pluginsourcefile psf__site_ts
ON (psf__site_ts.plugin_id = posthog_plugin.id AND psf__site_ts.filename = 'site.ts')
ON (psf__site_ts.plugin_id = posthog_plugin.id AND psf__site_ts.filename = 'site.ts')`

export async function getPlugin(hub: Hub, pluginId: number): Promise<Plugin | undefined> {
const result = await hub.db.postgres.query(
PostgresUse.COMMON_READ,
`${PLUGIN_SELECT} WHERE posthog_plugin.id = $1`,
[pluginId],
'getPlugin'
)
return result.rows[0]
}

export async function getPluginRows(hub: Hub): Promise<Plugin[]> {
const { rows }: { rows: Plugin[] } = await hub.db.postgres.query(
PostgresUse.COMMON_READ,
// `posthog_plugin` columns have to be listed individually, as we want to exclude a few columns
// and Postgres syntax unfortunately doesn't have a column exclusion feature. The excluded columns are:
// - archive - this is a potentially large blob, only extracted in Django as a plugin server optimization
// - latest_tag - not used in this service
// - latest_tag_checked_at - not used in this service
`${PLUGIN_SELECT}
WHERE posthog_plugin.id IN (${pluginConfigsInForceQuery('plugin_id')}
GROUP BY posthog_pluginconfig.plugin_id)`,
undefined,
Expand Down Expand Up @@ -94,13 +106,13 @@ export async function getPluginConfigRows(hub: Hub): Promise<PluginConfig[]> {

export async function setPluginCapabilities(
hub: Hub,
pluginConfig: PluginConfig,
pluginId: number,
capabilities: PluginCapabilities
): Promise<void> {
await hub.db.postgres.query(
PostgresUse.COMMON_WRITE,
'UPDATE posthog_plugin SET capabilities = ($1) WHERE id = $2',
[capabilities, pluginConfig.plugin_id],
[capabilities, pluginId],
'setPluginCapabilities'
)
}
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/worker/tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { loadSchedule } from './plugins/loadSchedule'
import { runPluginTask, runProcessEvent } from './plugins/run'
import { setupPlugins } from './plugins/setup'
import { teardownPlugins } from './plugins/teardown'
import { populatePluginCapabilities } from './vm/lazy'

type TaskRunner = (hub: Hub, args: any) => Promise<any> | any

Expand Down Expand Up @@ -88,6 +89,9 @@ export const workerTasks: Record<string, TaskRunner> = {
resetAvailableFeaturesCache: (hub, args: { organization_id: string }) => {
hub.organizationManager.resetAvailableFeatureCache(args.organization_id)
},
populatePluginCapabilities: async (hub, args: { plugin_id: string }) => {
await populatePluginCapabilities(hub, Number(args.plugin_id))
},
// Exported only for tests
_testsRunProcessEvent: async (hub, args: { event: PluginEvent }) => {
return runProcessEvent(hub, args.event)
Expand Down
10 changes: 5 additions & 5 deletions plugin-server/src/worker/vm/capabilities.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import { PluginCapabilities, PluginConfigVMResponse, VMMethods } from '../../types'
import { PluginCapabilities, PluginTask, PluginTaskType, VMMethods } from '../../types'
import { PluginServerCapabilities } from './../../types'

const PROCESS_EVENT_CAPABILITIES = new Set(['ingestion', 'ingestionOverflow', 'ingestionHistorical'])

export function getVMPluginCapabilities(vm: PluginConfigVMResponse): PluginCapabilities {
export function getVMPluginCapabilities(
methods: VMMethods,
tasks: Record<PluginTaskType, Record<string, PluginTask>>
): PluginCapabilities {
const capabilities: Required<PluginCapabilities> = { scheduled_tasks: [], jobs: [], methods: [] }

const tasks = vm?.tasks
const methods = vm?.methods

if (methods) {
for (const [key, value] of Object.entries(methods)) {
if (value as VMMethods[keyof VMMethods] | undefined) {
Expand Down
40 changes: 37 additions & 3 deletions plugin-server/src/worker/vm/lazy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
VMMethods,
} from '../../types'
import { processError } from '../../utils/db/error'
import { disablePlugin, setPluginCapabilities } from '../../utils/db/sql'
import { disablePlugin, getPlugin, setPluginCapabilities } from '../../utils/db/sql'
import { instrument } from '../../utils/metrics'
import { getNextRetryMs } from '../../utils/retries'
import { status } from '../../utils/status'
Expand Down Expand Up @@ -307,12 +307,46 @@ export class LazyPluginVM {
}

private async updatePluginCapabilitiesIfNeeded(vm: PluginConfigVMResponse): Promise<void> {
const capabilities = getVMPluginCapabilities(vm)
const capabilities = getVMPluginCapabilities(vm.methods, vm.tasks)

const prevCapabilities = this.pluginConfig.plugin!.capabilities
if (!equal(prevCapabilities, capabilities)) {
await setPluginCapabilities(this.hub, this.pluginConfig, capabilities)
await setPluginCapabilities(this.hub, this.pluginConfig.plugin_id, capabilities)
this.pluginConfig.plugin!.capabilities = capabilities
}
}
}

export async function populatePluginCapabilities(hub: Hub, pluginId: number): Promise<void> {
status.info('🔌', `Populating plugin capabilities for plugin ID ${pluginId}...`)
const plugin = await getPlugin(hub, pluginId)
if (!plugin) {
status.error('🔌', `Plugin with ID ${pluginId} not found for populating capabilities.`)
return
}
if (!plugin.source__index_ts) {
status.error('🔌', `Plugin with ID ${pluginId} has no index.ts file for populating capabilities.`)
return
}

const { methods, tasks } = createPluginConfigVM(
hub,
{
id: 0,
plugin: plugin,
plugin_id: plugin.id,
team_id: 0,
enabled: false,
order: 0,
created_at: '0',
config: {},
},
plugin.source__index_ts || ''
)
const capabilities = getVMPluginCapabilities(methods, tasks)

const prevCapabilities = plugin.capabilities
if (!equal(prevCapabilities, capabilities)) {
await setPluginCapabilities(hub, pluginId, capabilities)
}
}
6 changes: 6 additions & 0 deletions posthog/api/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from posthog.plugins import can_configure_plugins, can_install_plugins, parse_url
from posthog.plugins.access import can_globally_manage_plugins
from posthog.queries.app_metrics.app_metrics import TeamPluginsDeliveryRateQuery
from posthog.redis import get_client
from posthog.utils import format_query_params_absolute_url

# Keep this in sync with: frontend/scenes/plugins/utils.ts
Expand Down Expand Up @@ -439,6 +440,11 @@ def update_source(self, request: request.Request, **kwargs):
if performed_changes:
plugin.updated_at = now()
plugin.save()
# Trigger capabilities update in plugin server, in case the app source changed the methods etc
get_client().publish(
"populate-plugin-capabilities",
json.dumps({"plugin_id": str(plugin.id)}),
)
return Response(response)

@action(methods=["POST"], detail=True)
Expand Down
6 changes: 6 additions & 0 deletions posthog/models/plugin.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import json
import os
from dataclasses import dataclass
from enum import Enum
Expand Down Expand Up @@ -29,6 +30,7 @@
load_json_file,
parse_url,
)
from posthog.redis import get_client

from .utils import UUIDModel, sane_repr

Expand Down Expand Up @@ -129,6 +131,10 @@ def install(self, **kwargs) -> "Plugin":
plugin = Plugin.objects.create(**kwargs)
if plugin_json:
PluginSourceFile.objects.sync_from_plugin_archive(plugin, plugin_json)
get_client().publish(
"populate-plugin-capabilities",
json.dumps({"plugin_id": str(plugin.id)}),
)
return plugin


Expand Down

0 comments on commit 4726f60

Please sign in to comment.