diff --git a/posthog/tasks/test/test_warehouse.py b/posthog/tasks/test/test_warehouse.py index 20b669b754995..9e5151e4e4fc3 100644 --- a/posthog/tasks/test/test_warehouse.py +++ b/posthog/tasks/test/test_warehouse.py @@ -6,9 +6,14 @@ capture_workspace_rows_synced_by_team, check_external_data_source_billing_limit_by_team, ) -from posthog.warehouse.models import ExternalDataSource +from posthog.warehouse.models import ExternalDataSource, DataWarehouseTable from freezegun import freeze_time +AIRBYTE_MOCK_SETTINGS = { + "AIRBYTE_BUCKET_KEY": "fake", + "AIRBYTE_BUCKET_SECRET": "fake", +} + class TestWarehouse(APIBaseTest): @patch("posthog.tasks.warehouse.send_request") @@ -165,3 +170,83 @@ def test_external_data_source_billing_limit_activate( external_source.refresh_from_db() self.assertEqual(external_source.status, "running") + + @patch("posthog.warehouse.models.table.DataWarehouseTable.get_columns") + @patch("posthog.tasks.warehouse.retrieve_sync") + @patch("posthog.tasks.warehouse.get_active_connection_streams_by_id") + def test_sync_resource(self, get_connection_streams_mock, send_request_mock, get_columns_mock): + send_request_mock.return_value = { + "jobId": 5827835, + "status": "succeeded", + "jobType": "sync", + "startTime": "2023-11-07T16:50:49Z", + "connectionId": "fake", + "lastUpdatedAt": "2023-11-07T16:52:54Z", + "duration": "PT2M5S", + "rowsSynced": 93353, + } + get_connection_streams_mock.return_value = [{"name": "customers"}] + get_columns_mock.return_value = {"test_field": "String"} + + external_source = ExternalDataSource.objects.create( + source_id="test_id", + connection_id="fake connectino_id", + destination_id="fake destination_id", + team=self.team, + status="inactive", + source_type="Stripe", + ) + from posthog.tasks.warehouse import sync_resource + + with self.settings(**AIRBYTE_MOCK_SETTINGS): + sync_resource(external_source.pk) + + external_source.refresh_from_db() + new_table = DataWarehouseTable.objects.get(name="customers", team_id=self.team.pk) + + self.assertIsNotNone(new_table) + self.assertEqual(new_table.columns, {"test_field": "String"}) + self.assertEqual(external_source.status, "succeeded") + self.assertEqual(external_source.are_tables_created, True) + + @patch("posthog.warehouse.models.table.DataWarehouseTable.get_columns") + @patch("posthog.tasks.warehouse.retrieve_sync") + @patch("posthog.tasks.warehouse.get_active_connection_streams_by_id") + def test_sync_resource_table_changed(self, get_connection_streams_mock, send_request_mock, get_columns_mock): + send_request_mock.return_value = { + "jobId": 5827835, + "status": "succeeded", + "jobType": "sync", + "startTime": "2023-11-07T16:50:49Z", + "connectionId": "fake", + "lastUpdatedAt": "2023-11-07T16:52:54Z", + "duration": "PT2M5S", + "rowsSynced": 93353, + } + get_connection_streams_mock.return_value = [{"name": "customers"}] + get_columns_mock.return_value = {"another_field": "String"} + + external_source = ExternalDataSource.objects.create( + source_id="test_id", + connection_id="fake connectino_id", + destination_id="fake destination_id", + team=self.team, + status="succeeded", + source_type="Stripe", + ) + existing_table = DataWarehouseTable.objects.create( + name="customers", team_id=self.team.pk, columns={"test_field": "String"} + ) + + from posthog.tasks.warehouse import sync_resource + + with self.settings(**AIRBYTE_MOCK_SETTINGS): + sync_resource(external_source.pk) + + external_source.refresh_from_db() + existing_table.refresh_from_db() + + self.assertIsNotNone(existing_table) + self.assertEqual(existing_table.columns, {"another_field": "String"}) + self.assertEqual(external_source.status, "succeeded") + self.assertEqual(external_source.are_tables_created, True) diff --git a/posthog/tasks/warehouse.py b/posthog/tasks/warehouse.py index 434eb758f232b..d21de379ba0d2 100644 --- a/posthog/tasks/warehouse.py +++ b/posthog/tasks/warehouse.py @@ -22,7 +22,7 @@ def sync_resources() -> None: - resources = ExternalDataSource.objects.filter(status__in=["running", "error"]) + resources = ExternalDataSource.objects.all() for resource in resources: sync_resource.delay(resource.pk)