diff --git a/.github/workflows/replay-capture.yml b/.github/workflows/replay-capture.yml new file mode 100644 index 0000000000000..20f5df785763e --- /dev/null +++ b/.github/workflows/replay-capture.yml @@ -0,0 +1,35 @@ +name: Vector Replay Capture Tests + +on: + workflow_dispatch: + pull_request: + paths: + - vector/** + - .github/workflows/replay-capture.yml + + workflow_call: + +jobs: + vector-test: + name: Vector test + runs-on: ubuntu-20.04 + env: + QUOTA_LIMITED_TEAMS_PATH: vector/replay-capture/tests/quota_limited_teams.csv + OVERFLOW_SESSIONS_PATH: vector/replay-capture/tests/overflow_sessions.csv + KAFKA_BOOSTRAP_SERVERS: dummy:9092 + KAFKA_EVENTS_TOPIC: session_recording_snapshot_item_events + KAFKA_OVERFLOW_TOPIC: session_recording_snapshot_item_overflow + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Vector + run: | + wget https://github.com/vectordotdev/vector/releases/download/v0.40.0/vector-0.40.0-x86_64-unknown-linux-gnu.tar.gz + tar xzvf vector-0.40.0-x86_64-unknown-linux-gnu.tar.gz ./vector-x86_64-unknown-linux-gnu/bin/vector + sudo mv ./vector-x86_64-unknown-linux-gnu/bin/vector /usr/bin/vector + + - name: Run vector tests + run: | + yq -i e 'explode(.)' vector/replay-capture/vector.yaml + vector test vector/replay-capture/*.yaml diff --git a/.github/workflows/vector-docker-build-deploy.yml b/.github/workflows/vector-docker-build-deploy.yml new file mode 100644 index 0000000000000..bcd7b6c2f00aa --- /dev/null +++ b/.github/workflows/vector-docker-build-deploy.yml @@ -0,0 +1,105 @@ +name: Build and deploy replay capture container images + +on: + workflow_dispatch: + push: + paths: + - 'vector/**' + - '.github/workflows/vector-docker-build-deploy.yml' + branches: + - 'master' + +jobs: + build: + name: Build and publish container image + runs-on: depot-ubuntu-22.04-4 + permissions: + id-token: write # allow issuing OIDC tokens for this workflow run + contents: read # allow reading the repo contents + packages: write # allow push to ghcr.io + + defaults: + run: + working-directory: vector/ + + steps: + - name: Check Out Repo + # Checkout project code + # Use sparse checkout to only select files in vector directory + # Turning off cone mode ensures that files in the project root are not included during checkout + uses: actions/checkout@v4 + with: + sparse-checkout: 'vector/' + sparse-checkout-cone-mode: false + + - name: Set up Depot CLI + uses: depot/setup-action@v1 + + - name: Login to DockerHub + uses: docker/login-action@v2 + with: + username: posthog + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Login to ghcr.io + uses: docker/login-action@v2 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + logout: false + + - name: Set up QEMU + uses: docker/setup-qemu-action@v3 + + - name: Docker meta + id: meta + uses: docker/metadata-action@v5 + with: + images: ghcr.io/posthog/posthog/replay-capture + tags: | + type=ref,event=pr + type=ref,event=branch + type=semver,pattern={{version}} + type=semver,pattern={{major}}.{{minor}} + type=sha + + - name: Set up Docker Buildx + id: buildx + uses: docker/setup-buildx-action@v2 + + - name: Build and push image + id: docker_build + uses: depot/build-push-action@v1 + with: + context: ./vector/replay-capture/ + file: ./vector/replay-capture/Dockerfile + push: true + tags: ${{ steps.meta.outputs.tags }} + labels: ${{ steps.meta.outputs.labels }} + platforms: linux/arm64 + cache-from: type=gha + cache-to: type=gha,mode=max + build-args: BIN=${{ matrix.image }} + + - name: Container image digest + run: echo ${{ steps.docker_build.outputs.digest }} + + - name: Trigger replay capture deployment + uses: peter-evans/repository-dispatch@v3 + with: + token: ${{ steps.deployer.outputs.token }} + repository: PostHog/charts + event-type: commit_state_update + client-payload: | + { + "values": { + "image": { + "sha": "${{ steps.docker_build.outputs.digest }}" + } + }, + "release": "replay-capture-vector", + "commit": ${{ toJson(github.event.head_commit) }}, + "repository": ${{ toJson(github.repository) }}, + "labels": [] + } diff --git a/docker-compose.base.yml b/docker-compose.base.yml index 4303d86a91350..0f75bc58ec078 100644 --- a/docker-compose.base.yml +++ b/docker-compose.base.yml @@ -105,7 +105,7 @@ services: restart: on-failure capture: - image: ghcr.io/posthog/capture:main + image: ghcr.io/posthog/posthog/capture:master restart: on-failure environment: ADDRESS: '0.0.0.0:3000' @@ -113,6 +113,26 @@ services: KAFKA_HOSTS: 'kafka:9092' REDIS_URL: 'redis://redis:6379/' + replay-capture: + image: ghcr.io/posthog/posthog/replay-capture:master + build: + context: vector/replay-capture + restart: on-failure + entrypoint: ['sh', '-c'] + command: + - | + set -x + # seed empty required data files + mkdir -p /etc/vector/data + echo "token" > /etc/vector/data/quota_limited_teams.csv + echo "session_id" > /etc/vector/data/overflow_sessions.csv + exec vector -v --watch-config + environment: + KAFKA_EVENTS_TOPIC: session_recording_snapshot_item_events + KAFKA_OVERFLOW_TOPIC: session_recording_snapshot_item_overflow + KAFKA_BOOSTRAP_SERVERS: 'kafka:9092' + REDIS_URL: 'redis://redis:6379/' + plugins: command: ./bin/plugin-server --no-restart-loop restart: on-failure diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index f79697c73fbfd..510570ce2ee90 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -117,6 +117,17 @@ services: - redis - kafka + # Optional capture + replay-capture: + extends: + file: docker-compose.base.yml + service: replay-capture + ports: + - 3001:8000 + depends_on: + - redis + - kafka + livestream: extends: file: docker-compose.base.yml diff --git a/vector/replay-capture/Dockerfile b/vector/replay-capture/Dockerfile new file mode 100644 index 0000000000000..e32ab326aadd7 --- /dev/null +++ b/vector/replay-capture/Dockerfile @@ -0,0 +1,12 @@ +FROM alpine as config-builder + +RUN apk add -U yq + +WORKDIR /config +COPY vector.yaml . +# evaluate with yq, basically to expand anchors (which vector doesn't support) +RUN yq -i e 'explode(.)' vector.yaml + +FROM timberio/vector:0.40.X-alpine + +COPY --from=config-builder /config/vector.yaml /etc/vector/vector.yaml diff --git a/vector/replay-capture/tests.yaml b/vector/replay-capture/tests.yaml new file mode 100644 index 0000000000000..1c30b243c480c --- /dev/null +++ b/vector/replay-capture/tests.yaml @@ -0,0 +1,76 @@ +tests: + - name: Basic Test + inputs: + - insert_at: quota_check + type: vrl + source: | + .message = [{}] + .message[0].properties = {} + .message[0].properties."$$session_id" = "123" + .message[0].properties."$$window_id" = "123" + .message[0].properties."token" = "123" + .message[0].properties."distinct_id" = "123" + .message[0].properties."$$snapshot_data" = [{"offset": 123}] + + .ip = "0.0.0.0" + .timestamp = now() + ."_" = "123456789" + %token = "123" + + outputs: + - conditions: + - source: | + assert!(is_string(.uuid)) + assert!(is_string(%headers.token)) + assert!(is_string(parse_json!(.data).uuid)) + assert!(parse_json!(.data).properties."$$snapshot_items"[0].offset == 123) + type: vrl + extract_from: overflow_check._unmatched + - name: Quota limited + inputs: + - insert_at: quota_check + type: vrl + source: | + .message = [{}] + .message[0].properties = {} + .message[0].properties."$$session_id" = "123" + .message[0].properties."$$window_id" = "123" + .message[0].properties."token" = "limited_token" + .message[0].properties."distinct_id" = "123" + .message[0].properties."$$snapshot_data" = [{"offset": 123}] + + .ip = "0.0.0.0" + .timestamp = now() + ."_" = "123456789" + %token = "limited_token" + + outputs: + - conditions: + - source: | + true + type: vrl + extract_from: metric_quota_dropped + - name: Overflow + inputs: + - insert_at: quota_check + type: vrl + source: | + .message = [{}] + .message[0].properties = {} + .message[0].properties."$$session_id" = "overflow_session" + .message[0].properties."$$window_id" = "123" + .message[0].properties."token" = "123" + .message[0].properties."distinct_id" = "123" + .message[0].properties."$$snapshot_data" = [{"offset": 123}] + + .ip = "0.0.0.0" + .timestamp = now() + ."_" = "123456789" + %token = "123" + + outputs: + - conditions: + - source: | + true + type: vrl + extract_from: overflow_check.overflow diff --git a/vector/replay-capture/tests/overflow_sessions.csv b/vector/replay-capture/tests/overflow_sessions.csv new file mode 100644 index 0000000000000..0d825115e2944 --- /dev/null +++ b/vector/replay-capture/tests/overflow_sessions.csv @@ -0,0 +1,2 @@ +session_id +overflow_session \ No newline at end of file diff --git a/vector/replay-capture/tests/quota_limited_teams.csv b/vector/replay-capture/tests/quota_limited_teams.csv new file mode 100644 index 0000000000000..cc5b54d345ddd --- /dev/null +++ b/vector/replay-capture/tests/quota_limited_teams.csv @@ -0,0 +1,2 @@ +token +limited_token \ No newline at end of file diff --git a/vector/replay-capture/vector.yaml b/vector/replay-capture/vector.yaml new file mode 100644 index 0000000000000..6428d2c2b9890 --- /dev/null +++ b/vector/replay-capture/vector.yaml @@ -0,0 +1,236 @@ +acknowledgements: + enabled: true +api: + enabled: true + address: 0.0.0.0:8686 + playground: true +enrichment_tables: + quota_limited_teams: + type: file + file: + path: '${QUOTA_LIMITED_TEAMS_PATH:-/etc/vector/data/quota_limited_teams.csv}' + encoding: + type: csv + schema: + token: string + overflow_sessions: + type: file + file: + path: '${OVERFLOW_SESSIONS_PATH:-/etc/vector/data/overflow_sessions.csv}' + encoding: + type: csv + schema: + token: string +sources: + capture_server: + type: http_server + address: 0.0.0.0:8000 + strict_path: false + query_parameters: + - _ + host_key: ip + decoding: + codec: vrl + vrl: + source: | + ._message, err = decode_gzip(.message) + if err == null { + .message = parse_json!(del(._message)) + } else { + # if we failed to decode gzip then ._message is empty + .message = parse_json!(.message) + del(._message) + } + .message[0].distinct_id = .message[0]."$$distinct_id" || .message[0].properties.distinct_id || .message[0].distinct_id + if is_integer(.message[0].distinct_id) { + .message[0].distinct_id, _ = to_string(.message[0].distinct_id) + } + + %token = .message[0].properties.token || .message[0].api_key + + if !is_string(%token) || !is_string(.message[0].distinct_id) { + for_each(array(.message)) -> |_index, value| { + del(value.properties."$$snapshot_data") + } + log(truncate(encode_json(.), 1000), rate_limit_secs: 0) + } + + assert!(is_string(.message[0].distinct_id), "distinct_id is required") + assert!(is_string(.message[0].properties."$$session_id"), "$$session_id is required") + assert!(is_string(%token), "token is required") +transforms: + quota_check: + type: route + inputs: + - capture_server + route: + quota_limited: + type: vrl + source: | + _, err = get_enrichment_table_record("quota_limited_teams", { "token": %token }) + err == null # err is not null if row not found, we want to drop where the row _is_ found + events_parsed: + type: remap + inputs: + - quota_check._unmatched + drop_on_abort: true + drop_on_error: true + reroute_dropped: true + source: | + event = { + "ip": .ip, + "uuid": uuid_v7(), + "distinct_id": .message[0].distinct_id, + "session_id": .message[0].properties."$$session_id", + "now": format_timestamp!(.timestamp, "%+", "UTC"), + "token": %token, + } + + event.sent_at, err = from_unix_timestamp(to_int!(."_"), "milliseconds") + if err != null { + event.sent_at = event.now + } else { + event.sent_at = format_timestamp!(event.sent_at, "%+", "UTC") + } + + snapshot_items = flatten(map_values(array!(.message)) -> |value| { + if is_array(value.properties."$$snapshot_data") { + array!(value.properties."$$snapshot_data") + } else { + [value.properties."$$snapshot_data"] + } + }) + + data = { + "uuid": event.uuid, + "event": "$$snapshot_items", + "properties": { + "distinct_id": event.distinct_id, + "$$session_id": .message[0].properties."$$session_id", + "$$window_id": .message[0].properties."$$window_id", + "$$snapshot_source": .message[0].properties."$$snapshot_source" || "web", + "$$snapshot_items": snapshot_items + } + } + + event.data = encode_json(data) + . = event + + %headers = { + "token": .token + } + overflow_check: + type: route + inputs: + - events_parsed + route: + overflow: + type: vrl + source: | + _, err = get_enrichment_table_record("overflow_sessions", { "session_id": .session_id }) + err == null # err is not null if row not found, we want to drop where the row _is_ found + log_errors: + type: remap + inputs: + - events_parsed.dropped + source: | + log({ + "event": "error", + "reason": "events_parsed.dropped", + "token": %token, + "session_id": .message[0].properties."$$session_id", + "distinct_id": .message[0].distinct_id + }, rate_limit_secs: 0) + metric_quota_dropped: + type: log_to_metric + inputs: + - quota_check.quota_limited + metrics: + - type: counter + field: message + kind: incremental + name: vector_capture_quota_dropped_count + tags: + token: '{{%token}}' +sinks: + # invalid sink to catch and raise errors + # without this vector drops them silently + error: + type: file + path: '' + encoding: + codec: json + acknowledgements: + enabled: true + inputs: + - log_errors + dropped: + type: blackhole + acknowledgements: + enabled: true + inputs: + - metric_quota_dropped + kafka: + type: kafka + inputs: + - overflow_check._unmatched + buffer: + - type: memory + max_events: 10000 + when_full: block + bootstrap_servers: $KAFKA_BOOSTRAP_SERVERS + topic: $KAFKA_EVENTS_TOPIC + compression: gzip + key_field: .session_id + headers_key: '%headers' + tls: + enabled: false + encoding: + codec: json + librdkafka_options: + client.id: ${CLIENT_ID:-$HOSTNAME} + linger.ms: '0' + topic.metadata.refresh.interval.ms: '20000' + queue.buffering.max.kbytes: '1048576' + queue.buffering.max.messages: '100' + message.max.bytes: '64000000' + batch.size: '1600000' + batch.num.messages: '100' + sticky.partitioning.linger.ms: '25' + enable.idempotence: 'false' + max.in.flight.requests.per.connection: '1000000' + partitioner: 'consistent_random' + message_timeout_ms: 10000 + socket_timeout_ms: 5000 + kafka_overflow: + type: kafka + buffer: + - type: memory + max_events: 10000 + when_full: block + bootstrap_servers: $KAFKA_BOOSTRAP_SERVERS + compression: gzip + key_field: .session_id + headers_key: '%headers' + tls: + enabled: false + encoding: + codec: json + librdkafka_options: + client.id: ${CLIENT_ID:-$HOSTNAME} + linger.ms: '0' + topic.metadata.refresh.interval.ms: '20000' + queue.buffering.max.kbytes: '1048576' + queue.buffering.max.messages: '100' + message.max.bytes: '64000000' + batch.size: '1600000' + batch.num.messages: '100' + sticky.partitioning.linger.ms: '25' + enable.idempotence: 'false' + max.in.flight.requests.per.connection: '1000000' + partitioner: 'consistent_random' + message_timeout_ms: 10000 + socket_timeout_ms: 5000 + inputs: + - overflow_check.overflow + topic: $KAFKA_OVERFLOW_TOPIC