Skip to content

Commit

Permalink
chore(plugin-server): Validate fetch hostnames (#17183)
Browse files Browse the repository at this point in the history
* chore(plugin-server): Validate fetch hostnames

* Only apply Python host check on Cloud

* Update tests to use valid hook URLs

* Only apply plugin server host check in prod

* Update URLs in a couple more tests

* Only check hostnames on Cloud and remove port check

* Fix fetch mocking

* Roll out hostname guard per project

* Fix fetch call assertions

* Make `fetchHostnameGuardTeams` optional
  • Loading branch information
Twixes authored Sep 18, 2023
1 parent a87b247 commit b7fe004
Show file tree
Hide file tree
Showing 24 changed files with 279 additions and 100 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
8 changes: 6 additions & 2 deletions plugin-server/jest.setup.fetch-mock.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import fetch from 'node-fetch'

import { status } from './src/utils/status'

jest.mock('node-fetch')
jest.mock('node-fetch', () => ({
__esModule: true,
...jest.requireActual('node-fetch'), // Only mock fetch(), leave Request, Response, FetchError, etc. alone
default: jest.fn(),
}))

beforeEach(() => {
const responsesToUrls = {
Expand All @@ -21,7 +25,7 @@ beforeEach(() => {
]),
}

fetch.mockImplementation(
jest.mocked(fetch).mockImplementation(
(url, options) =>
new Promise((resolve) =>
resolve({
Expand Down
1 change: 1 addition & 0 deletions plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
"eslint-plugin-prettier": "^4.2.1",
"eslint-plugin-promise": "^6.0.0",
"eslint-plugin-simple-import-sort": "^7.0.0",
"ipaddr.js": "^2.1.0",
"jest": "^28.1.1",
"nodemon": "^2.0.22",
"parse-prometheus-text-format": "^1.1.1",
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ export function getDefaultConfig(): PluginsServerConfig {
CONVERSION_BUFFER_ENABLED_TEAMS: '',
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: '',
BUFFER_CONVERSION_SECONDS: isDevEnv() ? 2 : 60, // KEEP IN SYNC WITH posthog/settings/ingestion.py
FETCH_HOSTNAME_GUARD_TEAMS: '',
PERSON_INFO_CACHE_TTL: 5 * 60, // 5 min
KAFKA_HEALTHCHECK_SECONDS: 20,
OBJECT_STORAGE_ENABLED: true,
Expand All @@ -129,7 +130,7 @@ export function getDefaultConfig(): PluginsServerConfig {
APP_METRICS_GATHERED_FOR_ALL: isDevEnv() ? true : false,
MAX_TEAM_ID_TO_BUFFER_ANONYMOUS_EVENTS_FOR: 0,
USE_KAFKA_FOR_SCHEDULED_TASKS: true,
CLOUD_DEPLOYMENT: 'default', // Used as a Sentry tag
CLOUD_DEPLOYMENT: null,

STARTUP_PROFILE_DURATION_SECONDS: 300, // 5 minutes
STARTUP_PROFILE_CPU: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,13 @@ export const startAsyncWebhooksHandlerConsumer = async ({
const actionManager = new ActionManager(postgres)
await actionManager.prepare()
const actionMatcher = new ActionMatcher(postgres, actionManager, statsd)
const hookCannon = new HookCommander(postgres, teamManager, organizationManager, statsd)
const hookCannon = new HookCommander(
postgres,
teamManager,
organizationManager,
new Set(serverConfig.FETCH_HOSTNAME_GUARD_TEAMS.split(',').filter(String).map(Number)),
statsd
)
const concurrency = serverConfig.TASKS_PER_WORKER || 20

const pubSub = new PubSub(serverConfig, {
Expand Down
5 changes: 4 additions & 1 deletion plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ export interface PluginsServerConfig {
CONVERSION_BUFFER_ENABLED_TEAMS: string
CONVERSION_BUFFER_TOPIC_ENABLED_TEAMS: string
BUFFER_CONVERSION_SECONDS: number
FETCH_HOSTNAME_GUARD_TEAMS: string
PERSON_INFO_CACHE_TTL: number
KAFKA_HEALTHCHECK_SECONDS: number
OBJECT_STORAGE_ENABLED: boolean // Disables or enables the use of object storage. It will become mandatory to use object storage
Expand All @@ -201,7 +202,8 @@ export interface PluginsServerConfig {
USE_KAFKA_FOR_SCHEDULED_TASKS: boolean // distribute scheduled tasks across the scheduler workers
EVENT_OVERFLOW_BUCKET_CAPACITY: number
EVENT_OVERFLOW_BUCKET_REPLENISH_RATE: number
CLOUD_DEPLOYMENT: string
/** Label of the PostHog Cloud environment. Null if not running PostHog Cloud. @example 'US' */
CLOUD_DEPLOYMENT: string | null

// dump profiles to disk, covering the first N seconds of runtime
STARTUP_PROFILE_DURATION_SECONDS: number
Expand Down Expand Up @@ -266,6 +268,7 @@ export interface Hub extends PluginsServerConfig {
lastActivityType: string
statelessVms: StatelessVmMap
conversionBufferEnabledTeams: Set<number>
fetchHostnameGuardTeams: Set<number>
// functions
enqueuePluginJob: (job: EnqueuedPluginJob) => Promise<void>
// ValueMatchers used for various opt-in/out features
Expand Down
4 changes: 4 additions & 0 deletions plugin-server/src/utils/db/hub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ export async function createHub(
const conversionBufferEnabledTeams = new Set(
serverConfig.CONVERSION_BUFFER_ENABLED_TEAMS.split(',').filter(String).map(Number)
)
const fetchHostnameGuardTeams = new Set(
serverConfig.FETCH_HOSTNAME_GUARD_TEAMS.split(',').filter(String).map(Number)
)

const statsd: StatsD | undefined = createStatsdClient(serverConfig, threadId)

Expand Down Expand Up @@ -181,6 +184,7 @@ export async function createHub(
rootAccessManager,
promiseManager,
conversionBufferEnabledTeams,
fetchHostnameGuardTeams,
pluginConfigsToSkipElementsParsing: buildIntegerMatcher(process.env.SKIP_ELEMENTS_PARSING_PLUGINS, true),
}

Expand Down
2 changes: 2 additions & 0 deletions plugin-server/src/utils/env-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ export const isTestEnv = (): boolean => determineNodeEnv() === NodeEnv.Test
export const isDevEnv = (): boolean => determineNodeEnv() === NodeEnv.Development
export const isProdEnv = (): boolean => determineNodeEnv() === NodeEnv.Production

export const isCloud = (): boolean => !!process.env.CLOUD_DEPLOYMENT

export function isIngestionOverflowEnabled(): boolean {
const ingestionOverflowEnabled = process.env.INGESTION_OVERFLOW_ENABLED
return stringToBoolean(ingestionOverflowEnabled)
Expand Down
68 changes: 60 additions & 8 deletions plugin-server/src/utils/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,73 @@
// This module wraps node-fetch with a sentry tracing-aware extension

import fetch, { FetchError, Request, Response } from 'node-fetch'
import { LookupAddress } from 'dns'
import dns from 'dns/promises'
import * as ipaddr from 'ipaddr.js'
import fetch, { type RequestInfo, type RequestInit, type Response, FetchError, Request } from 'node-fetch'
import { URL } from 'url'

import { runInSpan } from '../sentry'

function fetchWrapper(...args: Parameters<typeof fetch>): Promise<Response> {
const request = new Request(...args)
return runInSpan(
export async function trackedFetch(url: RequestInfo, init?: RequestInit): Promise<Response> {
const request = new Request(url, init)
return await runInSpan(
{
op: 'fetch',
description: `${request.method} ${request.url}`,
},
() => fetch(...args)
async () => await fetch(url, init)
)
}

fetchWrapper.isRedirect = fetch.isRedirect
fetchWrapper.FetchError = FetchError
trackedFetch.isRedirect = fetch.isRedirect
trackedFetch.FetchError = FetchError

export default fetchWrapper
export async function safeTrackedFetch(url: RequestInfo, init?: RequestInit): Promise<Response> {
const request = new Request(url, init)
return await runInSpan(
{
op: 'fetch',
description: `${request.method} ${request.url}`,
},
async () => {
await raiseIfUserProvidedUrlUnsafe(request.url)
return await fetch(url, init)
}
)
}

safeTrackedFetch.isRedirect = fetch.isRedirect
safeTrackedFetch.FetchError = FetchError

/**
* Raise if the provided URL seems unsafe, otherwise do nothing.
*
* Equivalent of Django raise_if_user_provided_url_unsafe.
*/
export async function raiseIfUserProvidedUrlUnsafe(url: string): Promise<void> {
// Raise if the provided URL seems unsafe, otherwise do nothing.
let parsedUrl: URL
try {
parsedUrl = new URL(url)
} catch (err) {
throw new FetchError('Invalid URL', 'posthog-host-guard')
}
if (!parsedUrl.hostname) {
throw new FetchError('No hostname', 'posthog-host-guard')
}
if (parsedUrl.protocol !== 'http:' && parsedUrl.protocol !== 'https:') {
throw new FetchError('Scheme must be either HTTP or HTTPS', 'posthog-host-guard')
}
let addrinfo: LookupAddress[]
try {
addrinfo = await dns.lookup(parsedUrl.hostname, { all: true })
} catch (err) {
throw new FetchError('Invalid hostname', 'posthog-host-guard')
}
for (const { address } of addrinfo) {
// Prevent addressing internal services
if (ipaddr.parse(address).range() !== 'unicast') {
throw new FetchError('Internal hostname', 'posthog-host-guard')
}
}
}
13 changes: 10 additions & 3 deletions plugin-server/src/worker/ingestion/hooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { format } from 'util'

import { Action, Hook, PostIngestionEvent, Team } from '../../types'
import { PostgresRouter, PostgresUse } from '../../utils/db/postgres'
import fetch from '../../utils/fetch'
import { isCloud } from '../../utils/env-utils'
import { safeTrackedFetch, trackedFetch } from '../../utils/fetch'
import { status } from '../../utils/status'
import { getPropertyValueByPath, stringify } from '../../utils/utils'
import { OrganizationManager } from './organization-manager'
Expand Down Expand Up @@ -256,6 +257,7 @@ export class HookCommander {
organizationManager: OrganizationManager
statsd: StatsD | undefined
siteUrl: string
fetchHostnameGuardTeams: Set<number>

/** Hook request timeout in ms. */
EXTERNAL_REQUEST_TIMEOUT = 10 * 1000
Expand All @@ -264,11 +266,13 @@ export class HookCommander {
postgres: PostgresRouter,
teamManager: TeamManager,
organizationManager: OrganizationManager,
fetchHostnameGuardTeams?: Set<number>,
statsd?: StatsD
) {
this.postgres = postgres
this.teamManager = teamManager
this.organizationManager = organizationManager
this.fetchHostnameGuardTeams = fetchHostnameGuardTeams || new Set()
if (process.env.SITE_URL) {
this.siteUrl = process.env.SITE_URL
} else {
Expand Down Expand Up @@ -358,9 +362,10 @@ export class HookCommander {
`⌛⌛⌛ Posting Webhook slow. Timeout warning after 5 sec! url=${webhookUrl} team_id=${team.id} event_id=${event.eventUuid}`
)
}, 5000)
const relevantFetch = isCloud() && this.fetchHostnameGuardTeams.has(team.id) ? safeTrackedFetch : trackedFetch
try {
await instrumentWebhookStep('fetch', async () => {
const request = await fetch(webhookUrl, {
const request = await relevantFetch(webhookUrl, {
method: 'POST',
body: JSON.stringify(message, undefined, 4),
headers: { 'Content-Type': 'application/json' },
Expand Down Expand Up @@ -399,8 +404,10 @@ export class HookCommander {
`⌛⌛⌛ Posting RestHook slow. Timeout warning after 5 sec! url=${hook.target} team_id=${event.teamId} event_id=${event.eventUuid}`
)
}, 5000)
const relevantFetch =
isCloud() && this.fetchHostnameGuardTeams.has(hook.team_id) ? safeTrackedFetch : trackedFetch
try {
const request = await fetch(hook.target, {
const request = await relevantFetch(hook.target, {
method: 'POST',
body: JSON.stringify(payload, undefined, 4),
headers: { 'Content-Type': 'application/json' },
Expand Down
2 changes: 1 addition & 1 deletion plugin-server/src/worker/plugins/mmdb.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Reader, ReaderModel } from '@maxmind/geoip2-node'
import { DateTime } from 'luxon'
import fetch from 'node-fetch'
import * as schedule from 'node-schedule'
import prettyBytes from 'pretty-bytes'
import { brotliDecompress } from 'zlib'
Expand All @@ -12,7 +13,6 @@ import {
} from '../../config/mmdb-constants'
import { Hub, PluginAttachmentDB } from '../../types'
import { PostgresUse } from '../../utils/db/postgres'
import fetch from '../../utils/fetch'
import { status } from '../../utils/status'
import { delay } from '../../utils/utils'

Expand Down
52 changes: 28 additions & 24 deletions plugin-server/src/worker/vm/imports.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,33 +12,37 @@ import * as jsonwebtoken from 'jsonwebtoken'
import * as pg from 'pg'
import snowflake from 'snowflake-sdk'
import { PassThrough } from 'stream'
import { Hub } from 'types'
import * as url from 'url'
import * as zlib from 'zlib'

import fetch from '../../utils/fetch'
import { isCloud, isTestEnv } from '../../utils/env-utils'
import { safeTrackedFetch, trackedFetch } from '../../utils/fetch'
import { writeToFile } from './extensions/test-utils'

export const imports = {
...(process.env.NODE_ENV === 'test'
? {
'test-utils/write-to-file': writeToFile,
}
: {}),
'@google-cloud/bigquery': bigquery,
'@google-cloud/pubsub': pubsub,
'@google-cloud/storage': gcs,
'@posthog/plugin-contrib': contrib,
'@posthog/plugin-scaffold': scaffold,
'aws-sdk': AWS,
ethers: ethers,
'generic-pool': genericPool,
'node-fetch': fetch,
'snowflake-sdk': snowflake,
crypto: crypto,
jsonwebtoken: jsonwebtoken,
faker: faker,
pg: pg,
stream: { PassThrough },
url: url,
zlib: zlib,
export function determineImports(hub: Hub, teamId: number) {
return {
...(isTestEnv()
? {
'test-utils/write-to-file': writeToFile,
}
: {}),
'@google-cloud/bigquery': bigquery,
'@google-cloud/pubsub': pubsub,
'@google-cloud/storage': gcs,
'@posthog/plugin-contrib': contrib,
'@posthog/plugin-scaffold': scaffold,
'aws-sdk': AWS,
ethers: ethers,
'generic-pool': genericPool,
'node-fetch': isCloud() && hub.fetchHostnameGuardTeams.has(teamId) ? safeTrackedFetch : trackedFetch,
'snowflake-sdk': snowflake,
crypto: crypto,
jsonwebtoken: jsonwebtoken,
faker: faker,
pg: pg,
stream: { PassThrough },
url: url,
zlib: zlib,
}
}
4 changes: 3 additions & 1 deletion plugin-server/src/worker/vm/vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { createJobs } from './extensions/jobs'
import { createPosthog } from './extensions/posthog'
import { createStorage } from './extensions/storage'
import { createUtils } from './extensions/utilities'
import { imports } from './imports'
import { determineImports } from './imports'
import { transformCode } from './transforms'
import { upgradeExportEvents } from './upgrades/export-events'
import { addHistoricalEventsExportCapability } from './upgrades/historical-export/export-historical-events'
Expand All @@ -34,6 +34,8 @@ export function createPluginConfigVM(
pluginConfig: PluginConfig, // NB! might have team_id = 0
indexJs: string
): PluginConfigVMResponse {
const imports = determineImports(hub, pluginConfig.team_id)

const timer = new Date()

const statsdTiming = (metric: string) => {
Expand Down
Loading

0 comments on commit b7fe004

Please sign in to comment.