Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(messaging): test sending e-mails #25825

Merged
merged 36 commits into from
Oct 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
bc5e8a0
feat(messaging): actually send the message
mariusandra Oct 25, 2024
c5aa62b
Update query snapshots
github-actions[bot] Oct 25, 2024
c360b17
fix typo
mariusandra Oct 25, 2024
a742658
fix another flaker
mariusandra Oct 25, 2024
c60c167
Merge branch 'messaging-part-4' of github.com:PostHog/posthog into me…
mariusandra Oct 25, 2024
f012afe
remove broadcast code
mariusandra Oct 25, 2024
a59821a
Update query snapshots
github-actions[bot] Oct 25, 2024
0c573dd
Update UI snapshots for `chromium` (2)
github-actions[bot] Oct 25, 2024
dfd226f
Update UI snapshots for `chromium` (2)
github-actions[bot] Oct 25, 2024
7b3cc3b
Merge branch 'master' into messaging-part-4
mariusandra Oct 25, 2024
6c0903c
no change
mariusandra Oct 25, 2024
77dfb8b
fixes
mariusandra Oct 25, 2024
7b8a4c8
maybe not
mariusandra Oct 25, 2024
1e5341e
Update query snapshots
github-actions[bot] Oct 25, 2024
739d8bb
Update query snapshots
github-actions[bot] Oct 25, 2024
26292c5
test calling sendEmail for providers on invocation test
mariusandra Oct 25, 2024
57345ed
test email provider import
mariusandra Oct 25, 2024
800739a
test calling imported code
mariusandra Oct 25, 2024
4c43ca0
fistbump
mariusandra Oct 25, 2024
7d65366
Merge branch 'messaging-part-4' of github.com:PostHog/posthog into me…
mariusandra Oct 25, 2024
63964f7
Merge branch 'master' into messaging-part-4
mariusandra Oct 25, 2024
13693da
Revert "Update query snapshots"
mariusandra Oct 25, 2024
384beee
Revert "Update query snapshots"
mariusandra Oct 25, 2024
230df08
fix crash
mariusandra Oct 25, 2024
90fd457
fix template
mariusandra Oct 25, 2024
0178100
Update query snapshots
github-actions[bot] Oct 25, 2024
cf2e114
cleanup
mariusandra Oct 25, 2024
7fcbb34
Merge branch 'messaging-part-4' of github.com:PostHog/posthog into me…
mariusandra Oct 25, 2024
9baeaa4
Update query snapshots
github-actions[bot] Oct 25, 2024
48bd149
Update query snapshots
github-actions[bot] Oct 25, 2024
efa2252
Update UI snapshots for `chromium` (1)
github-actions[bot] Oct 25, 2024
709278c
Update query snapshots
github-actions[bot] Oct 25, 2024
636f554
Update UI snapshots for `chromium` (1)
github-actions[bot] Oct 25, 2024
8e6396a
fixes
mariusandra Oct 26, 2024
d923a8a
Update UI snapshots for `chromium` (1)
github-actions[bot] Oct 26, 2024
05ae44a
Update UI snapshots for `chromium` (1)
github-actions[bot] Oct 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hogvm/typescript/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@posthog/hogvm",
"version": "1.0.55",
"version": "1.0.56",
"description": "PostHog Hog Virtual Machine",
"types": "dist/index.d.ts",
"source": "src/index.ts",
Expand Down
5 changes: 4 additions & 1 deletion hogvm/typescript/src/execute.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ export function exec(input: any[] | VMState | Bytecodes, options?: ExecOptions):
let ops = vmState ? vmState.ops : 0
const timeout = options?.timeout ?? DEFAULT_TIMEOUT_MS
const maxAsyncSteps = options?.maxAsyncSteps ?? DEFAULT_MAX_ASYNC_STEPS
const rootGlobals: Record<string, any> = options?.globals ?? {}
const rootGlobals: Record<string, any> =
bytecodes.root?.globals && options?.globals
? { ...bytecodes.root.globals, ...options.globals }
: bytecodes.root?.globals ?? options?.globals ?? {}

if (callStack.length === 0) {
callStack.push({
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
"@medv/finder": "^3.1.0",
"@microlink/react-json-view": "^1.21.3",
"@monaco-editor/react": "4.6.0",
"@posthog/hogvm": "^1.0.55",
"@posthog/hogvm": "^1.0.56",
"@posthog/icons": "0.8.5",
"@posthog/plugin-scaffold": "^1.4.4",
"@react-hook/size": "^2.1.2",
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
"@maxmind/geoip2-node": "^3.4.0",
"@posthog/clickhouse": "^1.7.0",
"@posthog/cyclotron": "file:../rust/cyclotron-node",
"@posthog/hogvm": "^1.0.55",
"@posthog/hogvm": "^1.0.56",
"@posthog/plugin-scaffold": "1.4.4",
"@sentry/node": "^7.49.0",
"@sentry/profiling-node": "^0.3.0",
Expand Down
8 changes: 4 additions & 4 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion plugin-server/src/cdp/cdp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ export class CdpApi {
url: `${this.hub.SITE_URL ?? 'http://localhost:8000'}/project/${team.id}`,
},
},
compoundConfiguration
compoundConfiguration,
// The "email" hog functions export a "sendEmail" function that we must explicitly call
hogFunction.type === 'email' ? ['sendEmail', [globals.email]] : undefined
)

if (invocation.queue === 'fetch') {
Expand Down
51 changes: 47 additions & 4 deletions plugin-server/src/cdp/hog-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import {
HogFunctionQueueParametersFetchResponse,
HogFunctionType,
} from './types'
import { convertToHogFunctionFilterGlobal } from './utils'
import { buildExportedFunctionInvoker, convertToHogFunctionFilterGlobal } from './utils'

export const MAX_ASYNC_STEPS = 5
export const MAX_HOG_LOGS = 25
Expand Down Expand Up @@ -278,16 +278,59 @@ export class HogExecutor {
}

const sensitiveValues = this.getSensitiveValues(invocation.hogFunction, globals.inputs)
const invocationInput =
invocation.vmState ??
(invocation.functionToExecute
? buildExportedFunctionInvoker(
invocation.hogFunction.bytecode,
globals,
invocation.functionToExecute[0], // name
invocation.functionToExecute[1] // args
)
: invocation.hogFunction.bytecode)

try {
let hogLogs = 0
execRes = execHog(invocation.vmState ?? invocation.hogFunction.bytecode, {
globals,
execRes = execHog(invocationInput, {
globals: invocation.functionToExecute ? undefined : globals,
maxAsyncSteps: MAX_ASYNC_STEPS, // NOTE: This will likely be configurable in the future
asyncFunctions: {
// We need to pass these in but they don't actually do anything as it is a sync exec
fetch: async () => Promise.resolve(),
},
importBytecode: (module) => {
// TODO: more than one hardcoded module
if (module === 'provider/email') {
const provider = this.hogFunctionManager.getTeamHogEmailProvider(invocation.teamId)
if (!provider) {
throw new Error('No email provider configured')
}
try {
const providerGlobals = this.buildHogFunctionGlobals({
id: '',
teamId: invocation.teamId,
hogFunction: provider,
globals: {} as any,
queue: 'hog',
timings: [],
priority: 0,
} satisfies HogFunctionInvocation)

return {
bytecode: provider.bytecode,
globals: providerGlobals,
}
} catch (e) {
result.logs.push({
level: 'error',
timestamp: DateTime.now(),
message: `Error building inputs: ${e}`,
})
throw e
}
}
throw new Error(`Can't import unknown module: ${module}`)
},
functions: {
print: (...args) => {
hogLogs++
Expand Down Expand Up @@ -453,7 +496,7 @@ export class HogExecutor {
result.finished = true // Explicitly set to true to prevent infinite loops
status.error(
'🦔',
`[HogExecutor] Error executing function ${invocation.hogFunction.id} - ${invocation.hogFunction.name}. Event: '${invocation.globals.event.url}'`,
`[HogExecutor] Error executing function ${invocation.hogFunction.id} - ${invocation.hogFunction.name}. Event: '${invocation.globals.event?.url}'`,
err
)
}
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/cdp/hog-function-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ export class HogFunctionManager {
return this.getTeamHogFunctions(teamId).filter((x) => x.type === 'destination' || !x.type)
}

public getTeamHogEmailProvider(teamId: Team['id']): HogFunctionType | undefined {
return this.getTeamHogFunctions(teamId).find((x) => x.type === 'email')
}

public getHogFunction(id: HogFunctionType['id']): HogFunctionType | undefined {
if (!this.ready) {
throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this')
Expand Down
1 change: 1 addition & 0 deletions plugin-server/src/cdp/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ export type HogFunctionInvocation = {
// The current vmstate (set if the invocation is paused)
vmState?: VMState
timings: HogFunctionTiming[]
functionToExecute?: [string, any[]]
}

export type HogFunctionAsyncFunctionRequest = {
Expand Down
48 changes: 47 additions & 1 deletion plugin-server/src/cdp/utils.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// NOTE: PostIngestionEvent is our context event - it should never be sent directly to an output, but rather transformed into a lightweight schema

import { CyclotronJob, CyclotronJobUpdate } from '@posthog/cyclotron'
import { Bytecodes } from '@posthog/hogvm'
import { captureException } from '@sentry/node'
import { DateTime } from 'luxon'
import RE2 from 're2'
Expand Down Expand Up @@ -273,7 +274,8 @@ export const prepareLogEntriesForClickhouse = (

export function createInvocation(
globals: HogFunctionInvocationGlobals,
hogFunction: HogFunctionType
hogFunction: HogFunctionType,
functionToExecute?: [string, any[]]
): HogFunctionInvocation {
// Add the source of the trigger to the globals
const modifiedGlobals: HogFunctionInvocationGlobals = {
Expand All @@ -292,6 +294,7 @@ export function createInvocation(
queue: 'hog',
priority: 1,
timings: [],
functionToExecute,
}
}

Expand Down Expand Up @@ -378,3 +381,46 @@ export function cyclotronJobToInvocation(job: CyclotronJob, hogFunction: HogFunc
timings: parsedState.timings,
}
}

/** Build bytecode that calls a function in another imported bytecode */
export function buildExportedFunctionInvoker(
exportBytecode: any[],
exportGlobals: any,
functionName: string,
args: any[]
): Bytecodes {
let argBytecodes: any[] = []
for (let i = 0; i < args.length; i++) {
argBytecodes = [
...argBytecodes,
33, // integer
i + 1, // (index in args array)
32, // string
'__args',
1, // get global
2, // (chain length)
]
}
const bytecode = [
'_H',
1,
...argBytecodes,
32, // string
'x',
2, // call global
'import',
1, // (arg count)
32, // string
functionName,
45, // get property
54, // call local
args.length,
35, // pop
]
return {
bytecodes: {
x: { bytecode: exportBytecode, globals: exportGlobals },
root: { bytecode, globals: { __args: args } },
},
}
}
38 changes: 38 additions & 0 deletions plugin-server/tests/cdp/cdp-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,5 +239,43 @@ describe('CDP API', () => {
],
})
})

it('call exported sendEmail for email provider functions', async () => {
hogFunction = await insertHogFunction({
...HOG_EXAMPLES.export_send_email,
...HOG_INPUTS_EXAMPLES.simple_fetch,
...HOG_FILTERS_EXAMPLES.no_filters,
})

mockFetch.mockImplementationOnce(() =>
Promise.resolve({
status: 201,
text: () => Promise.resolve(JSON.stringify({ real: true })),
})
)
const res = await supertest(app)
.post(`/api/projects/${hogFunction.team_id}/hog_functions/${hogFunction.id}/invocations`)
.send({ globals: { ...globals, email: { from: '[email protected]' } }, mock_async_functions: false })

expect(res.status).toEqual(200)
expect(res.body).toMatchObject({
status: 'success',
error: 'undefined',
logs: [
{
level: 'debug',
message: 'Executing function',
},
{
level: 'info',
message: '{"from":"[email protected]"}',
},
{
level: 'debug',
message: expect.stringContaining('Function completed in'),
},
],
})
})
})
})
2 changes: 1 addition & 1 deletion plugin-server/tests/cdp/cdp-e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ describe('CDP E2E', () => {
Array [
"https://example.com/posthog-webhook",
Object {
"body": "{\\"event\\":{\\"uuid\\":\\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\",\\"event\\":\\"$pageview\\",\\"elements_chain\\":\\"\\",\\"distinct_id\\":\\"distinct_id\\",\\"url\\":\\"http://localhost:8000/events/1\\",\\"properties\\":{\\"$current_url\\":\\"https://posthog.com\\",\\"$lib_version\\":\\"1.0.0\\"},\\"timestamp\\":\\"2024-09-03T09:00:00Z\\"},\\"groups\\":{},\\"nested\\":{\\"foo\\":\\"http://localhost:8000/events/1\\"},\\"person\\":{\\"id\\":\\"uuid\\",\\"name\\":\\"test\\",\\"url\\":\\"http://localhost:8000/persons/1\\",\\"properties\\":{\\"email\\":\\"[email protected]\\"}},\\"event_url\\":\\"http://localhost:8000/events/1-test\\"}",
"body": "{\\"event\\":{\\"uuid\\":\\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\",\\"event\\":\\"$pageview\\",\\"elements_chain\\":\\"\\",\\"distinct_id\\":\\"distinct_id\\",\\"url\\":\\"http://localhost:8000/events/1\\",\\"properties\\":{\\"$current_url\\":\\"https://posthog.com\\",\\"$lib_version\\":\\"1.0.0\\"},\\"timestamp\\":\\"2024-09-03T09:00:00Z\\"},\\"groups\\":{},\\"nested\\":{\\"foo\\":\\"http://localhost:8000/events/1\\"},\\"person\\":{\\"id\\":\\"uuid\\",\\"name\\":\\"test\\",\\"url\\":\\"http://localhost:8000/persons/1\\",\\"properties\\":{\\"email\\":\\"[email protected]\\",\\"first_name\\":\\"Pumpkin\\"}},\\"event_url\\":\\"http://localhost:8000/events/1-test\\"}",
"headers": Object {
"version": "v=1.0.0",
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ describe('CDP Processed Events Consumer', () => {
{
topic: 'log_entries_test',
value: {
message: "Suspending function due to async function call 'fetch'. Payload: 2002 bytes",
message: "Suspending function due to async function call 'fetch'. Payload: 2035 bytes",
log_source_id: fnFetchNoFilters.id,
},
},
Expand Down
Loading
Loading