diff --git a/.github/workflows/container-images-cd.yml b/.github/workflows/container-images-cd.yml index 18888e9e3b301..c620c46ac274d 100644 --- a/.github/workflows/container-images-cd.yml +++ b/.github/workflows/container-images-cd.yml @@ -13,6 +13,7 @@ on: push: branches: - master + - frank/test-flush paths-ignore: - 'rust/**' - 'livestream/**' @@ -70,134 +71,134 @@ jobs: platforms: linux/arm64,linux/amd64 build-args: COMMIT_HASH=${{ github.sha }} - - name: get deployer token - id: deployer - uses: getsentry/action-github-app-token@v3 - with: - app_id: ${{ secrets.DEPLOYER_APP_ID }} - private_key: ${{ secrets.DEPLOYER_APP_PRIVATE_KEY }} - - - name: get PR labels - id: labels - uses: ./.github/actions/get-pr-labels - with: - token: ${{ secrets.GITHUB_TOKEN }} - - - name: Trigger PostHog Cloud deployment from Charts - uses: peter-evans/repository-dispatch@v3 - with: - token: ${{ steps.deployer.outputs.token }} - repository: PostHog/charts - event-type: commit_state_update - client-payload: | - { - "values": { - "image": { - "sha": "${{ steps.build.outputs.digest }}" - } - }, - "release": "posthog", - "commit": ${{ toJson(github.event.head_commit) }}, - "repository": ${{ toJson(github.repository) }}, - "labels": ${{ steps.labels.outputs.labels }} - } - - - name: Check for changes in plugins directory - id: check_changes_plugins - run: | - echo "changed=$((git diff --name-only HEAD^ HEAD | grep -q '^plugin-server/' && echo true) || echo false)" >> $GITHUB_OUTPUT - - - name: Trigger Ingestion Cloud deployment - if: steps.check_changes_plugins.outputs.changed == 'true' - uses: peter-evans/repository-dispatch@v3 - with: - token: ${{ steps.deployer.outputs.token }} - repository: PostHog/charts - event-type: commit_state_update - client-payload: | - { - "values": { - "image": { - "sha": "${{ steps.build.outputs.digest }}" - } - }, - "release": "ingestion", - "commit": ${{ toJson(github.event.head_commit) }}, - "repository": ${{ toJson(github.repository) }}, - "labels": ${{ toJson(steps.labels.outputs.labels) }} - } - - - name: Check for changes that affect batch exports temporal worker - id: check_changes_batch_exports_temporal_worker - run: | - echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/batch_exports|^posthog/batch_exports/|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT - - - name: Trigger Batch Exports Temporal Worker Cloud deployment - if: steps.check_changes_batch_exports_temporal_worker.outputs.changed == 'true' - uses: peter-evans/repository-dispatch@v3 - with: - token: ${{ steps.deployer.outputs.token }} - repository: PostHog/charts - event-type: commit_state_update - client-payload: | - { - "values": { - "image": { - "sha": "${{ steps.build.outputs.digest }}" - } - }, - "release": "temporal-worker", - "commit": ${{ toJson(github.event.head_commit) }}, - "repository": ${{ toJson(github.repository) }}, - "labels": ${{ steps.labels.outputs.labels }} - } - - - name: Check for changes that affect general purpose temporal worker - id: check_changes_general_purpose_temporal_worker - run: | - echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/proxy_service|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT - - - name: Trigger General Purpose Temporal Worker Cloud deployment - if: steps.check_changes_general_purpose_temporal_worker.outputs.changed == 'true' - uses: peter-evans/repository-dispatch@v3 - with: - token: ${{ steps.deployer.outputs.token }} - repository: PostHog/charts - event-type: commit_state_update - client-payload: | - { - "values": { - "image": { - "sha": "${{ steps.build.outputs.digest }}" - } - }, - "release": "temporal-worker-general-purpose", - "commit": ${{ toJson(github.event.head_commit) }}, - "repository": ${{ toJson(github.repository) }}, - "labels": ${{ steps.labels.outputs.labels }} - } - - - name: Check for changes that affect data warehouse temporal worker - id: check_changes_data_warehouse_temporal_worker - run: | - echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/data_imports|^posthog/warehouse/|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT - - - name: Trigger Data Warehouse Temporal Worker Cloud deployment - if: steps.check_changes_data_warehouse_temporal_worker.outputs.changed == 'true' - uses: peter-evans/repository-dispatch@v3 - with: - token: ${{ steps.deployer.outputs.token }} - repository: PostHog/charts - event-type: commit_state_update - client-payload: | - { - "values": { - "image": { - "sha": "${{ steps.build.outputs.digest }}" - } - }, - "release": "temporal-worker-data-warehouse", - "commit": ${{ toJson(github.event.head_commit) }}, - "repository": ${{ toJson(github.repository) }}, - "labels": ${{ steps.labels.outputs.labels }} - } + # - name: get deployer token + # id: deployer + # uses: getsentry/action-github-app-token@v3 + # with: + # app_id: ${{ secrets.DEPLOYER_APP_ID }} + # private_key: ${{ secrets.DEPLOYER_APP_PRIVATE_KEY }} + + # - name: get PR labels + # id: labels + # uses: ./.github/actions/get-pr-labels + # with: + # token: ${{ secrets.GITHUB_TOKEN }} + + # - name: Trigger PostHog Cloud deployment from Charts + # uses: peter-evans/repository-dispatch@v3 + # with: + # token: ${{ steps.deployer.outputs.token }} + # repository: PostHog/charts + # event-type: commit_state_update + # client-payload: | + # { + # "values": { + # "image": { + # "sha": "${{ steps.build.outputs.digest }}" + # } + # }, + # "release": "posthog", + # "commit": ${{ toJson(github.event.head_commit) }}, + # "repository": ${{ toJson(github.repository) }}, + # "labels": ${{ steps.labels.outputs.labels }} + # } + + # - name: Check for changes in plugins directory + # id: check_changes_plugins + # run: | + # echo "changed=$((git diff --name-only HEAD^ HEAD | grep -q '^plugin-server/' && echo true) || echo false)" >> $GITHUB_OUTPUT + + # - name: Trigger Ingestion Cloud deployment + # if: steps.check_changes_plugins.outputs.changed == 'true' + # uses: peter-evans/repository-dispatch@v3 + # with: + # token: ${{ steps.deployer.outputs.token }} + # repository: PostHog/charts + # event-type: commit_state_update + # client-payload: | + # { + # "values": { + # "image": { + # "sha": "${{ steps.build.outputs.digest }}" + # } + # }, + # "release": "ingestion", + # "commit": ${{ toJson(github.event.head_commit) }}, + # "repository": ${{ toJson(github.repository) }}, + # "labels": ${{ toJson(steps.labels.outputs.labels) }} + # } + + # - name: Check for changes that affect batch exports temporal worker + # id: check_changes_batch_exports_temporal_worker + # run: | + # echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/batch_exports|^posthog/batch_exports/|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT + + # - name: Trigger Batch Exports Temporal Worker Cloud deployment + # if: steps.check_changes_batch_exports_temporal_worker.outputs.changed == 'true' + # uses: peter-evans/repository-dispatch@v3 + # with: + # token: ${{ steps.deployer.outputs.token }} + # repository: PostHog/charts + # event-type: commit_state_update + # client-payload: | + # { + # "values": { + # "image": { + # "sha": "${{ steps.build.outputs.digest }}" + # } + # }, + # "release": "temporal-worker", + # "commit": ${{ toJson(github.event.head_commit) }}, + # "repository": ${{ toJson(github.repository) }}, + # "labels": ${{ steps.labels.outputs.labels }} + # } + + # - name: Check for changes that affect general purpose temporal worker + # id: check_changes_general_purpose_temporal_worker + # run: | + # echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/proxy_service|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT + + # - name: Trigger General Purpose Temporal Worker Cloud deployment + # if: steps.check_changes_general_purpose_temporal_worker.outputs.changed == 'true' + # uses: peter-evans/repository-dispatch@v3 + # with: + # token: ${{ steps.deployer.outputs.token }} + # repository: PostHog/charts + # event-type: commit_state_update + # client-payload: | + # { + # "values": { + # "image": { + # "sha": "${{ steps.build.outputs.digest }}" + # } + # }, + # "release": "temporal-worker-general-purpose", + # "commit": ${{ toJson(github.event.head_commit) }}, + # "repository": ${{ toJson(github.repository) }}, + # "labels": ${{ steps.labels.outputs.labels }} + # } + + # - name: Check for changes that affect data warehouse temporal worker + # id: check_changes_data_warehouse_temporal_worker + # run: | + # echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/data_imports|^posthog/warehouse/|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT + + # - name: Trigger Data Warehouse Temporal Worker Cloud deployment + # if: steps.check_changes_data_warehouse_temporal_worker.outputs.changed == 'true' + # uses: peter-evans/repository-dispatch@v3 + # with: + # token: ${{ steps.deployer.outputs.token }} + # repository: PostHog/charts + # event-type: commit_state_update + # client-payload: | + # { + # "values": { + # "image": { + # "sha": "${{ steps.build.outputs.digest }}" + # } + # }, + # "release": "temporal-worker-data-warehouse", + # "commit": ${{ toJson(github.event.head_commit) }}, + # "repository": ${{ toJson(github.repository) }}, + # "labels": ${{ steps.labels.outputs.labels }} + # } diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index feed88b570d5f..cd4c2d5ff23d7 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -566,6 +566,10 @@ export class SessionRecordingIngester { // NOTE: We have to get the partitions before we stop the consumer as it throws if disconnected const assignedPartitions = this.assignedTopicPartitions + + // flush everything we can + await this.flushAllReadySessions(() => {}) + // Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive await this.batchConsumer?.stop() @@ -713,17 +717,16 @@ export class SessionRecordingIngester { sessions, async ([key, sessionManager], ctx) => { heartbeat() - - if (this.isStopping) { - // We can end up with a large number of flushes. We want to stop early if we hit shutdown - return ctx.break() - } - if (!this.assignedPartitions.includes(sessionManager.partition)) { // We are no longer in charge of this partition, so we should not flush it return } + if (this.isStopping) { + await sessionManager.flush('partition_shutdown') + return + } + // in practice, we will always have a values for latestKafkaMessageTimestamp, const { lastMessageTimestamp, offsetLag } = this.partitionMetrics[sessionManager.partition] || {} if (!lastMessageTimestamp) {