Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(plugin-server): Validate fetch hostnames #17183

Merged
merged 12 commits into from
Sep 18, 2023
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 6 additions & 2 deletions plugin-server/jest.setup.fetch-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import fetch from 'node-fetch'

import { status } from './src/utils/status'

jest.mock('node-fetch')
jest.mock('node-fetch', () => ({
__esModule: true,
...jest.requireActual('node-fetch'), // Only mock fetch(), leave Request, Response, FetchError, etc. alone
default: jest.fn(),
}))

beforeEach(() => {
const responsesToUrls = {
Expand All @@ -21,7 +25,7 @@ beforeEach(() => {
]),
}

fetch.mockImplementation(
jest.mocked(fetch).mockImplementation(
(url, options) =>
new Promise((resolve) =>
resolve({
Expand Down
1 change: 1 addition & 0 deletions plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
"eslint-plugin-prettier": "^4.2.1",
"eslint-plugin-promise": "^6.0.0",
"eslint-plugin-simple-import-sort": "^7.0.0",
"ipaddr.js": "^2.1.0",
"jest": "^28.1.1",
"nodemon": "^2.0.22",
"parse-prometheus-text-format": "^1.1.1",
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CONVERSION_BUFFER_ENABLED_TEAMS: '',
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '',
BUFFER_CONVERSION_SECONDS: isDevEnv() ? 2 : 60, // KEEP IN SYNC WITH posthog/settings/ingestion.py
FETCH_HOSTNAME_GUARD_TEAMS: '',
PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min
KAFKA_HEALTHCHECK_SECONDS: 20,
OBJECT_STORAGE_ENABLED: true,
Expand All @@ -129,7 +130,7 @@ export function getDefaultConfig(): PluginsServerConfig {
APP_METRICS_GATHERED_FOR_ALL: isDevEnv() ? true : false,
MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0,
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag
CLOUD_DEPLOYMENT: null,

STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
STARTUP_PROFILE_CPU: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ export const startAsyncWebhooksHandlerConsumer = async ({
const actionManager = new ActionManager(postgres)
await actionManager.prepare()
const actionMatcher = new ActionMatcher(postgres, actionManager, statsd)
const hookCannon = new HookCommander(postgres, teamManager, organizationManager, statsd)
const hookCannon = new HookCommander(
postgres,
teamManager,
organizationManager,
new Set(serverConfig.FETCH_HOSTNAME_GUARD_TEAMS.split(',').filter(String).map(Number)),
statsd
)
const concurrency = serverConfig.TASKS_PER_WORKER || 20

const pubSub = new PubSub(serverConfig, {
Expand Down
5 changes: 4 additions & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export interface PluginsServerConfig {
CONVERSION_BUFFER_ENABLED_TEAMS: string
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: string
BUFFER_CONVERSION_SECONDS: number
FETCH_HOSTNAME_GUARD_TEAMS: string
PERSON_INFO_CACHE_TTL: number
KAFKA_HEALTHCHECK_SECONDS: number
OBJECT_STORAGE_ENABLED: boolean // Disables or enables the use of object storage. It will become mandatory to use object storage
Expand All @@ -201,7 +202,8 @@ export interface PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: boolean // distribute scheduled tasks across the scheduler workers
EVENT_OVERFLOW_BUCKET_CAPACITY: number
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number
CLOUD_DEPLOYMENT: string
/** Label of the PostHog Cloud environment. Null if not running PostHog Cloud. @example 'US' */
CLOUD_DEPLOYMENT: string | null

// dump profiles to disk, covering the first N seconds of runtime
STARTUP_PROFILE_DURATION_SECONDS: number
Expand Down Expand Up @@ -266,6 +268,7 @@ export interface Hub extends PluginsServerConfig {
lastActivityType: string
statelessVms: StatelessVmMap
conversionBufferEnabledTeams: Set<number>
fetchHostnameGuardTeams: Set<number>
// functions
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
// ValueMatchers used for various opt-in/out features
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export async function createHub(
const conversionBufferEnabledTeams = new Set(
serverConfig.CONVERSION_BUFFER_ENABLED_TEAMS.split(',').filter(String).map(Number)
)
const fetchHostnameGuardTeams = new Set(
serverConfig.FETCH_HOSTNAME_GUARD_TEAMS.split(',').filter(String).map(Number)
)

const statsd: StatsD | undefined = createStatsdClient(serverConfig, threadId)

Expand Down Expand Up @@ -181,6 +184,7 @@ export async function createHub(
rootAccessManager,
promiseManager,
conversionBufferEnabledTeams,
fetchHostnameGuardTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
}

Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/utils/env-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export const isTestEnv = (): boolean => determineNodeEnv() === NodeEnv.Test
export const isDevEnv = (): boolean => determineNodeEnv() === NodeEnv.Development
export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Production

export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT

export function isIngestionOverflowEnabled(): boolean {
const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED
return stringToBoolean(ingestionOverflowEnabled)
Expand Down
68 changes: 60 additions & 8 deletions plugin-server/src/utils/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,73 @@
// This module wraps node-fetch with a sentry tracing-aware extension

import fetch, { FetchError, Request, Response } from 'node-fetch'
import { LookupAddress } from 'dns'
import dns from 'dns/promises'
import * as ipaddr from 'ipaddr.js'
import fetch, { type RequestInfo, type RequestInit, type Response, FetchError, Request } from 'node-fetch'
import { URL } from 'url'

import { runInSpan } from '../sentry'

function fetchWrapper(...args: Parameters<typeof fetch>): Promise<Response> {
const request = new Request(...args)
return runInSpan(
export async function trackedFetch(url: RequestInfo, init?: RequestInit): Promise<Response> {
const request = new Request(url, init)
return await runInSpan(
{
op: 'fetch',
description: `${request.method} ${request.url}`,
},
() => fetch(...args)
async () => await fetch(url, init)
)
}

fetchWrapper.isRedirect = fetch.isRedirect
fetchWrapper.FetchError = FetchError
trackedFetch.isRedirect = fetch.isRedirect
trackedFetch.FetchError = FetchError

export default fetchWrapper
export async function safeTrackedFetch(url: RequestInfo, init?: RequestInit): Promise<Response> {
const request = new Request(url, init)
return await runInSpan(
{
op: 'fetch',
description: `${request.method} ${request.url}`,
},
async () => {
await raiseIfUserProvidedUrlUnsafe(request.url)
return await fetch(url, init)
}
)
}

safeTrackedFetch.isRedirect = fetch.isRedirect
safeTrackedFetch.FetchError = FetchError

/**
* Raise if the provided URL seems unsafe, otherwise do nothing.
*
* Equivalent of Django raise_if_user_provided_url_unsafe.
*/
export async function raiseIfUserProvidedUrlUnsafe(url: string): Promise<void> {
// Raise if the provided URL seems unsafe, otherwise do nothing.
let parsedUrl: URL
try {
parsedUrl = new URL(url)
} catch (err) {
throw new FetchError('Invalid URL', 'posthog-host-guard')
}
if (!parsedUrl.hostname) {
throw new FetchError('No hostname', 'posthog-host-guard')
}
if (parsedUrl.protocol !== 'http:' && parsedUrl.protocol !== 'https:') {
throw new FetchError('Scheme must be either HTTP or HTTPS', 'posthog-host-guard')
}
let addrinfo: LookupAddress[]
try {
addrinfo = await dns.lookup(parsedUrl.hostname, { all: true })
} catch (err) {
throw new FetchError('Invalid hostname', 'posthog-host-guard')
}
for (const { address } of addrinfo) {
// Prevent addressing internal services
if (ipaddr.parse(address).range() !== 'unicast') {
throw new FetchError('Internal hostname', 'posthog-host-guard')
}
}
}
13 changes: 10 additions & 3 deletions plugin-server/src/worker/ingestion/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { format } from 'util'

import { Action, Hook, PostIngestionEvent, Team } from '../../types'
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
import fetch from '../../utils/fetch'
import { isCloud } from '../../utils/env-utils'
import { safeTrackedFetch, trackedFetch } from '../../utils/fetch'
import { status } from '../../utils/status'
import { getPropertyValueByPath, stringify } from '../../utils/utils'
import { OrganizationManager } from './organization-manager'
Expand Down Expand Up @@ -256,6 +257,7 @@ export class HookCommander {
organizationManager: OrganizationManager
statsd: StatsD | undefined
siteUrl: string
fetchHostnameGuardTeams: Set<number>

/** Hook request timeout in ms. */
EXTERNAL_REQUEST_TIMEOUT = 10 * 1000
Expand All @@ -264,11 +266,13 @@ export class HookCommander {
postgres: PostgresRouter,
teamManager: TeamManager,
organizationManager: OrganizationManager,
fetchHostnameGuardTeams?: Set<number>,
statsd?: StatsD
) {
this.postgres = postgres
this.teamManager = teamManager
this.organizationManager = organizationManager
this.fetchHostnameGuardTeams = fetchHostnameGuardTeams || new Set()
if (process.env.SITE_URL) {
this.siteUrl = process.env.SITE_URL
} else {
Expand Down Expand Up @@ -358,9 +362,10 @@ export class HookCommander {
`⌛⌛⌛ Posting Webhook slow. Timeout warning after 5 sec! url=${webhookUrl} team_id=${team.id} event_id=${event.eventUuid}`
)
}, 5000)
const relevantFetch = isCloud() && this.fetchHostnameGuardTeams.has(team.id) ? safeTrackedFetch : trackedFetch
try {
await instrumentWebhookStep('fetch', async () => {
const request = await fetch(webhookUrl, {
const request = await relevantFetch(webhookUrl, {
method: 'POST',
body: JSON.stringify(message, undefined, 4),
headers: { 'Content-Type': 'application/json' },
Expand Down Expand Up @@ -399,8 +404,10 @@ export class HookCommander {
`⌛⌛⌛ Posting RestHook slow. Timeout warning after 5 sec! url=${hook.target} team_id=${event.teamId} event_id=${event.eventUuid}`
)
}, 5000)
const relevantFetch =
isCloud() && this.fetchHostnameGuardTeams.has(hook.team_id) ? safeTrackedFetch : trackedFetch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just calling out this line to any other reviewers as being the "opt in"

try {
const request = await fetch(hook.target, {
const request = await relevantFetch(hook.target, {
method: 'POST',
body: JSON.stringify(payload, undefined, 4),
headers: { 'Content-Type': 'application/json' },
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/plugins/mmdb.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Reader, ReaderModel } from '@maxmind/geoip2-node'
import { DateTime } from 'luxon'
import fetch from 'node-fetch'
import * as schedule from 'node-schedule'
import prettyBytes from 'pretty-bytes'
import { brotliDecompress } from 'zlib'
Expand All @@ -12,7 +13,6 @@ import {
} from '../../config/mmdb-constants'
import { Hub, PluginAttachmentDB } from '../../types'
import { PostgresUse } from '../../utils/db/postgres'
import fetch from '../../utils/fetch'
import { status } from '../../utils/status'
import { delay } from '../../utils/utils'

Expand Down
52 changes: 28 additions & 24 deletions plugin-server/src/worker/vm/imports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,37 @@ import * as jsonwebtoken from 'jsonwebtoken'
import * as pg from 'pg'
import snowflake from 'snowflake-sdk'
import { PassThrough } from 'stream'
import { Hub } from 'types'
import * as url from 'url'
import * as zlib from 'zlib'

import fetch from '../../utils/fetch'
import { isCloud, isTestEnv } from '../../utils/env-utils'
import { safeTrackedFetch, trackedFetch } from '../../utils/fetch'
import { writeToFile } from './extensions/test-utils'

export const imports = {
...(process.env.NODE_ENV === 'test'
? {
'test-utils/write-to-file': writeToFile,
}
: {}),
'@google-cloud/bigquery': bigquery,
'@google-cloud/pubsub': pubsub,
'@google-cloud/storage': gcs,
'@posthog/plugin-contrib': contrib,
'@posthog/plugin-scaffold': scaffold,
'aws-sdk': AWS,
ethers: ethers,
'generic-pool': genericPool,
'node-fetch': fetch,
'snowflake-sdk': snowflake,
crypto: crypto,
jsonwebtoken: jsonwebtoken,
faker: faker,
pg: pg,
stream: { PassThrough },
url: url,
zlib: zlib,
export function determineImports(hub: Hub, teamId: number) {
return {
...(isTestEnv()
? {
'test-utils/write-to-file': writeToFile,
}
: {}),
'@google-cloud/bigquery': bigquery,
'@google-cloud/pubsub': pubsub,
'@google-cloud/storage': gcs,
'@posthog/plugin-contrib': contrib,
'@posthog/plugin-scaffold': scaffold,
'aws-sdk': AWS,
ethers: ethers,
'generic-pool': genericPool,
'node-fetch': isCloud() && hub.fetchHostnameGuardTeams.has(teamId) ? safeTrackedFetch : trackedFetch,
'snowflake-sdk': snowflake,
crypto: crypto,
jsonwebtoken: jsonwebtoken,
faker: faker,
pg: pg,
stream: { PassThrough },
url: url,
zlib: zlib,
}
}
4 changes: 3 additions & 1 deletion plugin-server/src/worker/vm/vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { createJobs } from './extensions/jobs'
import { createPosthog } from './extensions/posthog'
import { createStorage } from './extensions/storage'
import { createUtils } from './extensions/utilities'
import { imports } from './imports'
import { determineImports } from './imports'
import { transformCode } from './transforms'
import { upgradeExportEvents } from './upgrades/export-events'
import { addHistoricalEventsExportCapability } from './upgrades/historical-export/export-historical-events'
Expand All @@ -34,6 +34,8 @@ export function createPluginConfigVM(
pluginConfig: PluginConfig, // NB! might have team_id = 0
indexJs: string
): PluginConfigVMResponse {
const imports = determineImports(hub, pluginConfig.team_id)

const timer = new Date()

const statsdTiming = (metric: string) => {
Expand Down
Loading