Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
frankh committed Aug 27, 2024
1 parent 88003f8 commit 4a19e68
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 137 deletions.
263 changes: 132 additions & 131 deletions .github/workflows/container-images-cd.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ on:
push:
branches:
- master
- frank/test-flush
paths-ignore:
- 'rust/**'
- 'livestream/**'
Expand Down Expand Up @@ -70,134 +71,134 @@ jobs:
platforms: linux/arm64,linux/amd64
build-args: COMMIT_HASH=${{ github.sha }}

- name: get deployer token
id: deployer
uses: getsentry/action-github-app-token@v3
with:
app_id: ${{ secrets.DEPLOYER_APP_ID }}
private_key: ${{ secrets.DEPLOYER_APP_PRIVATE_KEY }}

- name: get PR labels
id: labels
uses: ./.github/actions/get-pr-labels
with:
token: ${{ secrets.GITHUB_TOKEN }}

- name: Trigger PostHog Cloud deployment from Charts
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ steps.deployer.outputs.token }}
repository: PostHog/charts
event-type: commit_state_update
client-payload: |
{
"values": {
"image": {
"sha": "${{ steps.build.outputs.digest }}"
}
},
"release": "posthog",
"commit": ${{ toJson(github.event.head_commit) }},
"repository": ${{ toJson(github.repository) }},
"labels": ${{ steps.labels.outputs.labels }}
}
- name: Check for changes in plugins directory
id: check_changes_plugins
run: |
echo "changed=$((git diff --name-only HEAD^ HEAD | grep -q '^plugin-server/' && echo true) || echo false)" >> $GITHUB_OUTPUT
- name: Trigger Ingestion Cloud deployment
if: steps.check_changes_plugins.outputs.changed == 'true'
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ steps.deployer.outputs.token }}
repository: PostHog/charts
event-type: commit_state_update
client-payload: |
{
"values": {
"image": {
"sha": "${{ steps.build.outputs.digest }}"
}
},
"release": "ingestion",
"commit": ${{ toJson(github.event.head_commit) }},
"repository": ${{ toJson(github.repository) }},
"labels": ${{ toJson(steps.labels.outputs.labels) }}
}
- name: Check for changes that affect batch exports temporal worker
id: check_changes_batch_exports_temporal_worker
run: |
echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/batch_exports|^posthog/batch_exports/|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT
- name: Trigger Batch Exports Temporal Worker Cloud deployment
if: steps.check_changes_batch_exports_temporal_worker.outputs.changed == 'true'
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ steps.deployer.outputs.token }}
repository: PostHog/charts
event-type: commit_state_update
client-payload: |
{
"values": {
"image": {
"sha": "${{ steps.build.outputs.digest }}"
}
},
"release": "temporal-worker",
"commit": ${{ toJson(github.event.head_commit) }},
"repository": ${{ toJson(github.repository) }},
"labels": ${{ steps.labels.outputs.labels }}
}
- name: Check for changes that affect general purpose temporal worker
id: check_changes_general_purpose_temporal_worker
run: |
echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/proxy_service|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT
- name: Trigger General Purpose Temporal Worker Cloud deployment
if: steps.check_changes_general_purpose_temporal_worker.outputs.changed == 'true'
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ steps.deployer.outputs.token }}
repository: PostHog/charts
event-type: commit_state_update
client-payload: |
{
"values": {
"image": {
"sha": "${{ steps.build.outputs.digest }}"
}
},
"release": "temporal-worker-general-purpose",
"commit": ${{ toJson(github.event.head_commit) }},
"repository": ${{ toJson(github.repository) }},
"labels": ${{ steps.labels.outputs.labels }}
}
- name: Check for changes that affect data warehouse temporal worker
id: check_changes_data_warehouse_temporal_worker
run: |
echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/data_imports|^posthog/warehouse/|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT
- name: Trigger Data Warehouse Temporal Worker Cloud deployment
if: steps.check_changes_data_warehouse_temporal_worker.outputs.changed == 'true'
uses: peter-evans/repository-dispatch@v3
with:
token: ${{ steps.deployer.outputs.token }}
repository: PostHog/charts
event-type: commit_state_update
client-payload: |
{
"values": {
"image": {
"sha": "${{ steps.build.outputs.digest }}"
}
},
"release": "temporal-worker-data-warehouse",
"commit": ${{ toJson(github.event.head_commit) }},
"repository": ${{ toJson(github.repository) }},
"labels": ${{ steps.labels.outputs.labels }}
}
# - name: get deployer token
# id: deployer
# uses: getsentry/action-github-app-token@v3
# with:
# app_id: ${{ secrets.DEPLOYER_APP_ID }}
# private_key: ${{ secrets.DEPLOYER_APP_PRIVATE_KEY }}

# - name: get PR labels
# id: labels
# uses: ./.github/actions/get-pr-labels
# with:
# token: ${{ secrets.GITHUB_TOKEN }}

# - name: Trigger PostHog Cloud deployment from Charts
# uses: peter-evans/repository-dispatch@v3
# with:
# token: ${{ steps.deployer.outputs.token }}
# repository: PostHog/charts
# event-type: commit_state_update
# client-payload: |
# {
# "values": {
# "image": {
# "sha": "${{ steps.build.outputs.digest }}"
# }
# },
# "release": "posthog",
# "commit": ${{ toJson(github.event.head_commit) }},
# "repository": ${{ toJson(github.repository) }},
# "labels": ${{ steps.labels.outputs.labels }}
# }

# - name: Check for changes in plugins directory
# id: check_changes_plugins
# run: |
# echo "changed=$((git diff --name-only HEAD^ HEAD | grep -q '^plugin-server/' && echo true) || echo false)" >> $GITHUB_OUTPUT

# - name: Trigger Ingestion Cloud deployment
# if: steps.check_changes_plugins.outputs.changed == 'true'
# uses: peter-evans/repository-dispatch@v3
# with:
# token: ${{ steps.deployer.outputs.token }}
# repository: PostHog/charts
# event-type: commit_state_update
# client-payload: |
# {
# "values": {
# "image": {
# "sha": "${{ steps.build.outputs.digest }}"
# }
# },
# "release": "ingestion",
# "commit": ${{ toJson(github.event.head_commit) }},
# "repository": ${{ toJson(github.repository) }},
# "labels": ${{ toJson(steps.labels.outputs.labels) }}
# }

# - name: Check for changes that affect batch exports temporal worker
# id: check_changes_batch_exports_temporal_worker
# run: |
# echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/batch_exports|^posthog/batch_exports/|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT

# - name: Trigger Batch Exports Temporal Worker Cloud deployment
# if: steps.check_changes_batch_exports_temporal_worker.outputs.changed == 'true'
# uses: peter-evans/repository-dispatch@v3
# with:
# token: ${{ steps.deployer.outputs.token }}
# repository: PostHog/charts
# event-type: commit_state_update
# client-payload: |
# {
# "values": {
# "image": {
# "sha": "${{ steps.build.outputs.digest }}"
# }
# },
# "release": "temporal-worker",
# "commit": ${{ toJson(github.event.head_commit) }},
# "repository": ${{ toJson(github.repository) }},
# "labels": ${{ steps.labels.outputs.labels }}
# }

# - name: Check for changes that affect general purpose temporal worker
# id: check_changes_general_purpose_temporal_worker
# run: |
# echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/proxy_service|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT

# - name: Trigger General Purpose Temporal Worker Cloud deployment
# if: steps.check_changes_general_purpose_temporal_worker.outputs.changed == 'true'
# uses: peter-evans/repository-dispatch@v3
# with:
# token: ${{ steps.deployer.outputs.token }}
# repository: PostHog/charts
# event-type: commit_state_update
# client-payload: |
# {
# "values": {
# "image": {
# "sha": "${{ steps.build.outputs.digest }}"
# }
# },
# "release": "temporal-worker-general-purpose",
# "commit": ${{ toJson(github.event.head_commit) }},
# "repository": ${{ toJson(github.repository) }},
# "labels": ${{ steps.labels.outputs.labels }}
# }

# - name: Check for changes that affect data warehouse temporal worker
# id: check_changes_data_warehouse_temporal_worker
# run: |
# echo "changed=$((git diff --name-only HEAD^ HEAD | grep -qE '^posthog/temporal/common|^posthog/temporal/data_imports|^posthog/warehouse/|^posthog/management/commands/start_temporal_worker.py$' && echo true) || echo false)" >> $GITHUB_OUTPUT

# - name: Trigger Data Warehouse Temporal Worker Cloud deployment
# if: steps.check_changes_data_warehouse_temporal_worker.outputs.changed == 'true'
# uses: peter-evans/repository-dispatch@v3
# with:
# token: ${{ steps.deployer.outputs.token }}
# repository: PostHog/charts
# event-type: commit_state_update
# client-payload: |
# {
# "values": {
# "image": {
# "sha": "${{ steps.build.outputs.digest }}"
# }
# },
# "release": "temporal-worker-data-warehouse",
# "commit": ${{ toJson(github.event.head_commit) }},
# "repository": ${{ toJson(github.repository) }},
# "labels": ${{ steps.labels.outputs.labels }}
# }
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,10 @@ export class SessionRecordingIngester {

// NOTE: We have to get the partitions before we stop the consumer as it throws if disconnected
const assignedPartitions = this.assignedTopicPartitions

// flush everything we can
await this.flushAllReadySessions(() => {})

// Mark as stopping so that we don't actually process any more incoming messages, but still keep the process alive
await this.batchConsumer?.stop()

Expand Down Expand Up @@ -713,17 +717,16 @@ export class SessionRecordingIngester {
sessions,
async ([key, sessionManager], ctx) => {

Check failure on line 718 in plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts

View workflow job for this annotation

GitHub Actions / Code quality

'ctx' is defined but never used. Allowed unused args must match /^_/u
heartbeat()

if (this.isStopping) {
// We can end up with a large number of flushes. We want to stop early if we hit shutdown
return ctx.break()
}

if (!this.assignedPartitions.includes(sessionManager.partition)) {
// We are no longer in charge of this partition, so we should not flush it
return
}

if (this.isStopping) {
await sessionManager.flush('partition_shutdown')
return
}

// in practice, we will always have a values for latestKafkaMessageTimestamp,
const { lastMessageTimestamp, offsetLag } = this.partitionMetrics[sessionManager.partition] || {}
if (!lastMessageTimestamp) {
Expand Down

0 comments on commit 4a19e68

Please sign in to comment.