Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Snowflake batch export #18427

Merged
merged 35 commits into from
Nov 20, 2023

Conversation

tomasfarias
Copy link
Contributor

@tomasfarias tomasfarias commented Nov 6, 2023

Problem

Snowflake is the first destination we supported for PostHog batch exports, as such the code has been missing some best practices and features we introduced in destinations implemented later, but never ported back to Snowflake.

Changes

  • Wrap the Snowflake API to make it async.
    • Which allows us to heartbeat...
  • Implement Heartbeat on progress to allow resuming from partial exports.
    • And now that we heartbeat we can...
  • Implement resume based on last heartbeat progress values.

This heartbeat API is going to be rolled out to all destinations, which is why I've dropped it in a utilities module. Long term, I would like to shuffle modules around and end up with a file structure like:

. batch_exports
├── __main__.py (start_worker() with all workflows and activities in this package)
├── activities (common activities shared by all workflows)
│   ├── create_batch_export.py
│   └── ...
├── workflows
│   ├── base.py (PostHogWorkflow base class)
│   ├── bigquery_batch_export_workflow.py
│   ├── snowflake_batch_export_workflow.py
│   └── ...
├── models (or maybe just a models.py)
│   ├── batch_export_model.py
│   ├── backfill_batch_export_model.py
│   └── ...
├── utilities (if any of these gets too big, we can move them to their own separate module)
│   ├── heartbeat.py
│   ├── temporary_file.py
│   ├── clickhouse.py
│   ├── service.py
│   └── ...
├── temporal
│   ├── client.py
│   ├── codec.py
│   ├── worker.py
│   └── ...
└── tests
    ├── workflows
    │   ├── test_bigquery_batch_export_workflow.py
    │   └── ...
    ├── temporal
    │   ├── test_codec.py
    │   └── ...
    └── utilities
        ├── test_clickhouse.py
        ├── test_heartbeat.py
        └── ...

The ideas behind a refactoring like this are:

  1. Separate batch exports from the rest of the PostHog monolith.
    a. If we eventually want batch exports to be separate from PostHog, for starters, it has to be in it in its own directory.
    b. This also makes it easy to eventually refactor the models to avoid any dependencies to PostHog.
  2. Temporal is just the tool we use for the job, batch exports is the actual application. So, batch exports should be the root dir.
    a. Currently, things being separated between batch_exports/ and temporal/ makes no sense.
    b. But it also doesn't make sense to fully integrate into PostHog if long term we want batch exports to be a separate package
    c. The only exception is the batch_exports/http.py module which will remain part of PostHog.

This PR also has some other non-heartbeat related changes:

  • Use the BatchExportTemporaryFile class to manage file writing.
  • Add optional unit tests like those implemented for BigQuery/Redshift.

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

How did you test this code?

New unit tests ran against real Snowflake:

$  SNOWFLAKE_USERNAME="username" SNOWFLAKE_ACCOUNT="account" SNOWFLAKE_PASSWORD="password" SNOWFLAKE_WAREHOUSE="warehouse" DEBUG=1 pytest posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py::test_snowflake_export_workflow -vv
===================================================== test session starts =====================================================
platform linux -- Python 3.10.10, pytest-7.4.0, pluggy-0.13.1 -- src/github.com/PostHog/posthog/.direnv/python-3.10.10/bin/python
cachedir: .pytest_cache
django: settings: posthog.settings (from ini)
rootdir: src/github.com/PostHog/posthog
configfile: pytest.ini
plugins: asyncio-0.21.1, icdiff-0.6, flaky-3.7.0, env-0.8.2, Faker-17.5.0, syrupy-1.7.4, mock-3.11.1, split-0.8.1, django-4.5.2, cov-4.1.0
asyncio: mode=strict
collected 4 items                                                                                                             

posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py::test_snowflake_export_workflow[None-hour] PASSED [ 25%]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py::test_snowflake_export_workflow[None-day] PASSED [ 50%]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py::test_snowflake_export_workflow[exclude_events1-hour] PASSED [ 75%]
posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py::test_snowflake_export_workflow[exclude_events1-day] PASSED [100%]

--------------------------------------------------- snapshot report summary ---------------------------------------------------

===================================================== 4 passed in 35.77s ======================================================

@tomasfarias tomasfarias marked this pull request as draft November 6, 2023 17:39
@tomasfarias tomasfarias changed the title refactor: Snowflake batch export is now async refactor: Snowflake batch export Nov 6, 2023
Base automatically changed from refactor/batch-exports-tests-simplification to master November 7, 2023 10:07
@tomasfarias tomasfarias force-pushed the refactor/snowflake-batch-exports-and-real-tests branch from 1e80500 to 8d9f615 Compare November 7, 2023 10:09
@tomasfarias tomasfarias marked this pull request as ready for review November 9, 2023 14:26
@tomasfarias tomasfarias requested a review from a team November 9, 2023 14:26
@tomasfarias
Copy link
Contributor Author

We should hold off on merging this one until #18467 is merged due to potential conflicts.

Copy link
Contributor

@bretthoerner bretthoerner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a conflict from my merge, but LGTM.

I like the plans to extract batch exports to their own directory/app/service/image, no real comments there at this point.

posthog/temporal/utils.py Show resolved Hide resolved
# Ideally, any new exceptions should be added to the previous blocks after the first time and we will never land here.
heartbeat_details = None
received = False
logger.warning(f"Did not receive details from previous activity Excecution due to an unexpected error")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this include the exception itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, this log should be promoted to ERROR level and use logger.exception.


except snowflake.connector.ProgrammingError:
# TODO: logging? Other handling?
raise
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this exception end up in some existing log at least?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will bubble up and the activity will fail, so we will see it in the temporal error logs.

Copy link
Contributor Author

@tomasfarias tomasfarias Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO was just thinking if we should do anything more, maybe log ourselves to add some context. I'll do that at least once we merge the structlog PR and I can rebase this one.

"""Executes a PUT query using the provided cursor to the provided table_name.

Sadly, Snowflake's execute_async does not work with PUT statements. So, we pass the execute
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally aside, it's wild to me that their execute fn does or doesn't work only with certain statements. Weird!

Copy link
Contributor Author

@tomasfarias tomasfarias Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is some (very little) explanation of why here: snowflakedb/snowflake-connector-python#1227 (comment).


We add a file_no to the file_name when executing PUT as Snowflake will reject any files with the same
name. Since batch exports re-use the same file, our name does not change, but we don't want Snowflake
to reject or overwrite our new data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the unique filename thing true for the lifetime of the table? Do we need to worry at all about a temporary file name (which is I think what we're using here) being reused?

Copy link
Contributor Author

@tomasfarias tomasfarias Nov 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If COPY is successful after we are done uploading everything, the files will be purged, so their names become available again.

That being said, the purge can fail (even without COPY failing), or we could fail somewhere before COPY. In that case, we are hoping that Python will not generate the same name for our temporary file again. First, Python generates an infinite sequence of random names (with this: https://github.com/python/cpython/blob/3.10/Lib/tempfile.py#L132), and every time it needs a name for a temp file, it calls next to get the next randomly generated name (here: https://github.com/python/cpython/blob/3.10/Lib/tempfile.py#L252), and uses that as the file name (here: https://github.com/python/cpython/blob/3.10/Lib/tempfile.py#L556).

So, worst possible scenario, our protection from collisions boils down to how likely or under what circumstances will the Python RNG choose the same sequence of 8 characters. We are not manually seeding it for any reason, so I think we should be safe and that if it ever happens it will be a good anecdote.

EDIT: I ran through the name generation process in this comment as I was keeping notes as I looked up how it worked just now. Not my intention to imply I know everything or sound pedantic!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the detail!

@tomasfarias tomasfarias force-pushed the refactor/snowflake-batch-exports-and-real-tests branch from 7f46d0e to 4dcf261 Compare November 17, 2023 14:46
Copy link
Contributor

github-actions bot commented Nov 17, 2023

Size Change: -2.64 kB (0%)

Total Size: 2.01 MB

Filename Size Change
frontend/dist/toolbar.js 2.01 MB -2.64 kB (0%)

compressed-size-action

@tomasfarias tomasfarias merged commit 1c6ec08 into master Nov 20, 2023
67 checks passed
@tomasfarias tomasfarias deleted the refactor/snowflake-batch-exports-and-real-tests branch November 20, 2023 10:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants