Skip to content

Commit

Permalink
feat(messaging): test sending e-mails (#25825)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusandra authored Oct 26, 2024
1 parent e4d3696 commit 4a35174
Show file tree
Hide file tree
Showing 21 changed files with 529 additions and 69 deletions.
2 changes: 1 addition & 1 deletion hogvm/typescript/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@posthog/hogvm",
"version": "1.0.55",
"version": "1.0.56",
"description": "PostHog Hog Virtual Machine",
"types": "dist/index.d.ts",
"source": "src/index.ts",
Expand Down
5 changes: 4 additions & 1 deletion hogvm/typescript/src/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ export function exec(input: any[] | VMState | Bytecodes, options?: ExecOptions):
let ops = vmState ? vmState.ops : 0
const timeout = options?.timeout ?? DEFAULT_TIMEOUT_MS
const maxAsyncSteps = options?.maxAsyncSteps ?? DEFAULT_MAX_ASYNC_STEPS
const rootGlobals: Record<string, any> = options?.globals ?? {}
const rootGlobals: Record<string, any> =
bytecodes.root?.globals && options?.globals
? { ...bytecodes.root.globals, ...options.globals }
: bytecodes.root?.globals ?? options?.globals ?? {}

if (callStack.length === 0) {
callStack.push({
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"@medv/finder": "^3.1.0",
"@microlink/react-json-view": "^1.21.3",
"@monaco-editor/react": "4.6.0",
"@posthog/hogvm": "^1.0.55",
"@posthog/hogvm": "^1.0.56",
"@posthog/icons": "0.8.5",
"@posthog/plugin-scaffold": "^1.4.4",
"@react-hook/size": "^2.1.2",
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"@maxmind/geoip2-node": "^3.4.0",
"@posthog/clickhouse": "^1.7.0",
"@posthog/cyclotron": "file:../rust/cyclotron-node",
"@posthog/hogvm": "^1.0.55",
"@posthog/hogvm": "^1.0.56",
"@posthog/plugin-scaffold": "1.4.4",
"@sentry/node": "^7.49.0",
"@sentry/profiling-node": "^0.3.0",
Expand Down
8 changes: 4 additions & 4 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ export class CdpApi {
url: `${this.hub.SITE_URL ?? 'http://localhost:8000'}/project/${team.id}`,
},
},
compoundConfiguration
compoundConfiguration,
// The "email" hog functions export a "sendEmail" function that we must explicitly call
hogFunction.type === 'email' ? ['sendEmail', [globals.email]] : undefined
)

if (invocation.queue === 'fetch') {
Expand Down
51 changes: 47 additions & 4 deletions plugin-server/src/cdp/hog-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
HogFunctionQueueParametersFetchResponse,
HogFunctionType,
} from './types'
import { convertToHogFunctionFilterGlobal } from './utils'
import { buildExportedFunctionInvoker, convertToHogFunctionFilterGlobal } from './utils'

export const MAX_ASYNC_STEPS = 5
export const MAX_HOG_LOGS = 25
Expand Down Expand Up @@ -278,16 +278,59 @@ export class HogExecutor {
}

const sensitiveValues = this.getSensitiveValues(invocation.hogFunction, globals.inputs)
const invocationInput =
invocation.vmState ??
(invocation.functionToExecute
? buildExportedFunctionInvoker(
invocation.hogFunction.bytecode,
globals,
invocation.functionToExecute[0], // name
invocation.functionToExecute[1] // args
)
: invocation.hogFunction.bytecode)

try {
let hogLogs = 0
execRes = execHog(invocation.vmState ?? invocation.hogFunction.bytecode, {
globals,
execRes = execHog(invocationInput, {
globals: invocation.functionToExecute ? undefined : globals,
maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future
asyncFunctions: {
// We need to pass these in but they don't actually do anything as it is a sync exec
fetch: async () => Promise.resolve(),
},
importBytecode: (module) => {
// TODO: more than one hardcoded module
if (module === 'provider/email') {
const provider = this.hogFunctionManager.getTeamHogEmailProvider(invocation.teamId)
if (!provider) {
throw new Error('No email provider configured')
}
try {
const providerGlobals = this.buildHogFunctionGlobals({
id: '',
teamId: invocation.teamId,
hogFunction: provider,
globals: {} as any,
queue: 'hog',
timings: [],
priority: 0,
} satisfies HogFunctionInvocation)

return {
bytecode: provider.bytecode,
globals: providerGlobals,
}
} catch (e) {
result.logs.push({
level: 'error',
timestamp: DateTime.now(),
message: `Error building inputs: ${e}`,
})
throw e
}
}
throw new Error(`Can't import unknown module: ${module}`)
},
functions: {
print: (...args) => {
hogLogs++
Expand Down Expand Up @@ -453,7 +496,7 @@ export class HogExecutor {
result.finished = true // Explicitly set to true to prevent infinite loops
status.error(
'🦔',
`[HogExecutor] Error executing function ${invocation.hogFunction.id} - ${invocation.hogFunction.name}. Event: '${invocation.globals.event.url}'`,
`[HogExecutor] Error executing function ${invocation.hogFunction.id} - ${invocation.hogFunction.name}. Event: '${invocation.globals.event?.url}'`,
err
)
}
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/cdp/hog-function-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ export class HogFunctionManager {
return this.getTeamHogFunctions(teamId).filter((x) => x.type === 'destination' || !x.type)
}

public getTeamHogEmailProvider(teamId: Team['id']): HogFunctionType | undefined {
return this.getTeamHogFunctions(teamId).find((x) => x.type === 'email')
}

public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/cdp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ export type HogFunctionInvocation = {
// The current vmstate (set if the invocation is paused)
vmState?: VMState
timings: HogFunctionTiming[]
functionToExecute?: [string, any[]]
}

export type HogFunctionAsyncFunctionRequest = {
Expand Down
48 changes: 47 additions & 1 deletion plugin-server/src/cdp/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// NOTE: PostIngestionEvent is our context event - it should never be sent directly to an output, but rather transformed into a lightweight schema

import { CyclotronJob, CyclotronJobUpdate } from '@posthog/cyclotron'
import { Bytecodes } from '@posthog/hogvm'
import { captureException } from '@sentry/node'
import { DateTime } from 'luxon'
import RE2 from 're2'
Expand Down Expand Up @@ -273,7 +274,8 @@ export const prepareLogEntriesForClickhouse = (

export function createInvocation(
globals: HogFunctionInvocationGlobals,
hogFunction: HogFunctionType
hogFunction: HogFunctionType,
functionToExecute?: [string, any[]]
): HogFunctionInvocation {
// Add the source of the trigger to the globals
const modifiedGlobals: HogFunctionInvocationGlobals = {
Expand All @@ -292,6 +294,7 @@ export function createInvocation(
queue: 'hog',
priority: 1,
timings: [],
functionToExecute,
}
}

Expand Down Expand Up @@ -378,3 +381,46 @@ export function cyclotronJobToInvocation(job: CyclotronJob, hogFunction: HogFunc
timings: parsedState.timings,
}
}

/** Build bytecode that calls a function in another imported bytecode */
export function buildExportedFunctionInvoker(
exportBytecode: any[],
exportGlobals: any,
functionName: string,
args: any[]
): Bytecodes {
let argBytecodes: any[] = []
for (let i = 0; i < args.length; i++) {
argBytecodes = [
...argBytecodes,
33, // integer
i + 1, // (index in args array)
32, // string
'__args',
1, // get global
2, // (chain length)
]
}
const bytecode = [
'_H',
1,
...argBytecodes,
32, // string
'x',
2, // call global
'import',
1, // (arg count)
32, // string
functionName,
45, // get property
54, // call local
args.length,
35, // pop
]
return {
bytecodes: {
x: { bytecode: exportBytecode, globals: exportGlobals },
root: { bytecode, globals: { __args: args } },
},
}
}
38 changes: 38 additions & 0 deletions plugin-server/tests/cdp/cdp-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,5 +239,43 @@ describe('CDP API', () => {
],
})
})

it('call exported sendEmail for email provider functions', async () => {
hogFunction = await insertHogFunction({
...HOG_EXAMPLES.export_send_email,
...HOG_INPUTS_EXAMPLES.simple_fetch,
...HOG_FILTERS_EXAMPLES.no_filters,
})

mockFetch.mockImplementationOnce(() =>
Promise.resolve({
status: 201,
text: () => Promise.resolve(JSON.stringify({ real: true })),
})
)
const res = await supertest(app)
.post(`/api/projects/${hogFunction.team_id}/hog_functions/${hogFunction.id}/invocations`)
.send({ globals: { ...globals, email: { from: '[email protected]' } }, mock_async_functions: false })

expect(res.status).toEqual(200)
expect(res.body).toMatchObject({
status: 'success',
error: 'undefined',
logs: [
{
level: 'debug',
message: 'Executing function',
},
{
level: 'info',
message: '{"from":"[email protected]"}',
},
{
level: 'debug',
message: expect.stringContaining('Function completed in'),
},
],
})
})
})
})
2 changes: 1 addition & 1 deletion plugin-server/tests/cdp/cdp-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ describe('CDP E2E', () => {
Array [
"https://example.com/posthog-webhook",
Object {
"body": "{\\"event\\":{\\"uuid\\":\\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\",\\"event\\":\\"$pageview\\",\\"elements_chain\\":\\"\\",\\"distinct_id\\":\\"distinct_id\\",\\"url\\":\\"http://localhost:8000/events/1\\",\\"properties\\":{\\"$current_url\\":\\"https://posthog.com\\",\\"$lib_version\\":\\"1.0.0\\"},\\"timestamp\\":\\"2024-09-03T09:00:00Z\\"},\\"groups\\":{},\\"nested\\":{\\"foo\\":\\"http://localhost:8000/events/1\\"},\\"person\\":{\\"id\\":\\"uuid\\",\\"name\\":\\"test\\",\\"url\\":\\"http://localhost:8000/persons/1\\",\\"properties\\":{\\"email\\":\\"[email protected]\\"}},\\"event_url\\":\\"http://localhost:8000/events/1-test\\"}",
"body": "{\\"event\\":{\\"uuid\\":\\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\",\\"event\\":\\"$pageview\\",\\"elements_chain\\":\\"\\",\\"distinct_id\\":\\"distinct_id\\",\\"url\\":\\"http://localhost:8000/events/1\\",\\"properties\\":{\\"$current_url\\":\\"https://posthog.com\\",\\"$lib_version\\":\\"1.0.0\\"},\\"timestamp\\":\\"2024-09-03T09:00:00Z\\"},\\"groups\\":{},\\"nested\\":{\\"foo\\":\\"http://localhost:8000/events/1\\"},\\"person\\":{\\"id\\":\\"uuid\\",\\"name\\":\\"test\\",\\"url\\":\\"http://localhost:8000/persons/1\\",\\"properties\\":{\\"email\\":\\"[email protected]\\",\\"first_name\\":\\"Pumpkin\\"}},\\"event_url\\":\\"http://localhost:8000/events/1-test\\"}",
"headers": Object {
"version": "v=1.0.0",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ describe('CDP Processed Events Consumer', () => {
{
topic: 'log_entries_test',
value: {
message: "Suspending function due to async function call 'fetch'. Payload: 2002 bytes",
message: "Suspending function due to async function call 'fetch'. Payload: 2035 bytes",
log_source_id: fnFetchNoFilters.id,
},
},
Expand Down
Loading

0 comments on commit 4a35174

Please sign in to comment.