diff --git a/dbt_automation/assets/operations.yaml.template b/dbt_automation/assets/operations.template.yml similarity index 100% rename from dbt_automation/assets/operations.yaml.template rename to dbt_automation/assets/operations.template.yml diff --git a/dbt_automation/assets/packages.yml b/dbt_automation/assets/packages.yml new file mode 100644 index 0000000..6152b33 --- /dev/null +++ b/dbt_automation/assets/packages.yml @@ -0,0 +1,3 @@ +packages: + - package: dbt-labs/dbt_utils + version: 1.1.1 \ No newline at end of file diff --git a/dbt_automation/utils/sourceschemas.py b/dbt_automation/utils/sourceschemas.py index f8a4c0d..402f188 100644 --- a/dbt_automation/utils/sourceschemas.py +++ b/dbt_automation/utils/sourceschemas.py @@ -11,7 +11,7 @@ def mksourcedefinition(sourcename: str, input_schema: str, tables: list): source = {"name": sourcename, "schema": input_schema, "tables": []} for tablename in tables: - cleaned_name = tablename.replace(airbyte_prefix, "") + cleaned_name = tablename source["tables"].append( { "name": cleaned_name, diff --git a/setup.py b/setup.py index ecebf41..ac62555 100644 --- a/setup.py +++ b/setup.py @@ -21,6 +21,6 @@ long_description=long_description, python_requires=">=3.7", install_requires=["PyYAML", "requests", "google-cloud-bigquery", "psycopg2-binary"], - package_data={"dbt_automation": ["*.sql"]}, + package_data={"dbt_automation": ["assets/*.*"]}, include_package_data=True, ) diff --git a/tests/utils/test_sourceschemas.py b/tests/utils/test_sourceschemas.py index 70c865d..2f7d23c 100644 --- a/tests/utils/test_sourceschemas.py +++ b/tests/utils/test_sourceschemas.py @@ -39,8 +39,8 @@ def test_mk_source_definition(): sourcedefinition = mksourcedefinition(sourcename, input_schema, tables) assert sourcedefinition["sources"][0]["name"] == sourcename assert sourcedefinition["sources"][0]["schema"] == input_schema - assert sourcedefinition["sources"][0]["tables"][0]["name"] == "Sheet1" - assert sourcedefinition["sources"][0]["tables"][1]["name"] == "Sheet2" + assert sourcedefinition["sources"][0]["tables"][0]["name"] == "_airbyte_raw_Sheet1" + assert sourcedefinition["sources"][0]["tables"][1]["name"] == "_airbyte_raw_Sheet2" def test_get_source(sources_yaml, tmpdir): diff --git a/tests/warehouse/test_bigquery_ops.py b/tests/warehouse/test_bigquery_ops.py index 3302039..513d44b 100644 --- a/tests/warehouse/test_bigquery_ops.py +++ b/tests/warehouse/test_bigquery_ops.py @@ -109,16 +109,16 @@ def test_flatten(self): wc_client, TestBigqueryOperations.test_project_dir, ) - TestBigqueryOperations.execute_dbt("run", "Sheet1") - TestBigqueryOperations.execute_dbt("run", "Sheet2") + TestBigqueryOperations.execute_dbt("run", "_airbyte_raw_Sheet1") + TestBigqueryOperations.execute_dbt("run", "_airbyte_raw_Sheet2") logger.info("inside test flatten") logger.info( f"inside project directory : {TestBigqueryOperations.test_project_dir}" ) - assert "Sheet1" in TestBigqueryOperations.wc_client.get_tables( + assert "_airbyte_raw_Sheet1" in TestBigqueryOperations.wc_client.get_tables( "pytest_intermediate" ) - assert "Sheet2" in TestBigqueryOperations.wc_client.get_tables( + assert "_airbyte_raw_Sheet1" in TestBigqueryOperations.wc_client.get_tables( "pytest_intermediate" ) @@ -129,7 +129,7 @@ def test_rename_columns(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -159,7 +159,7 @@ def test_drop_columns(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -186,7 +186,7 @@ def test_coalescecolumns(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -213,7 +213,9 @@ def test_coalescecolumns(self): cols = wc_client.get_table_columns("pytest_intermediate", output_name) assert "ngo_spoc" in cols col_data = wc_client.get_table_data("pytest_intermediate", output_name, 5) - col_data_original = wc_client.get_table_data("pytest_intermediate", "Sheet1", 5) + col_data_original = wc_client.get_table_data( + "pytest_intermediate", "_airbyte_raw_Sheet1", 5 + ) col_data_original.sort(key=lambda x: x["_airbyte_ab_id"]) col_data.sort(key=lambda x: x["_airbyte_ab_id"]) # TODO: can do a stronger check here; by checking on rows in a loop @@ -231,7 +233,7 @@ def test_concatcolumns(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -277,7 +279,7 @@ def test_castdatatypes(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -459,7 +461,7 @@ def test_regexextract(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -477,7 +479,9 @@ def test_regexextract(self): cols = wc_client.get_table_columns("pytest_intermediate", output_name) assert "NGO" in cols - table_data_org = wc_client.get_table_data("pytest_intermediate", "Sheet1", 10) + table_data_org = wc_client.get_table_data( + "pytest_intermediate", "_airbyte_raw_Sheet1", 10 + ) table_data_org.sort(key=lambda x: x["_airbyte_ab_id"]) table_data_regex = wc_client.get_table_data( "pytest_intermediate", output_name, 10 @@ -501,12 +505,12 @@ def test_mergetables(self): "input_arr": [ { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, { "input_type": "model", - "input_name": "Sheet2", + "input_name": "_airbyte_raw_Sheet2", "source_name": None, }, ], @@ -520,8 +524,12 @@ def test_mergetables(self): TestBigqueryOperations.execute_dbt("run", output_name) - table_data1 = wc_client.get_table_data("pytest_intermediate", "Sheet1", 10) - table_data2 = wc_client.get_table_data("pytest_intermediate", "Sheet2", 10) + table_data1 = wc_client.get_table_data( + "pytest_intermediate", "_airbyte_raw_Sheet1", 10 + ) + table_data2 = wc_client.get_table_data( + "pytest_intermediate", "_airbyte_raw_Sheet2", 10 + ) table_data_union = wc_client.get_table_data( "pytest_intermediate", output_name, 10 ) diff --git a/tests/warehouse/test_postgres_ops.py b/tests/warehouse/test_postgres_ops.py index c2b0c22..254d78c 100644 --- a/tests/warehouse/test_postgres_ops.py +++ b/tests/warehouse/test_postgres_ops.py @@ -113,16 +113,16 @@ def test_flatten(self): wc_client, TestPostgresOperations.test_project_dir, ) - TestPostgresOperations.execute_dbt("run", "Sheet1") - TestPostgresOperations.execute_dbt("run", "Sheet2") + TestPostgresOperations.execute_dbt("run", "_airbyte_raw_Sheet1") + TestPostgresOperations.execute_dbt("run", "_airbyte_raw_Sheet2") logger.info("inside test flatten") logger.info( f"inside project directory : {TestPostgresOperations.test_project_dir}" ) - assert "Sheet1" in TestPostgresOperations.wc_client.get_tables( + assert "_airbyte_raw_Sheet1" in TestPostgresOperations.wc_client.get_tables( "pytest_intermediate" ) - assert "Sheet2" in TestPostgresOperations.wc_client.get_tables( + assert "_airbyte_raw_Sheet2" in TestPostgresOperations.wc_client.get_tables( "pytest_intermediate" ) @@ -133,7 +133,7 @@ def test_rename_columns(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -163,7 +163,7 @@ def test_drop_columns(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -190,7 +190,7 @@ def test_coalescecolumns(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -218,7 +218,9 @@ def test_coalescecolumns(self): assert "ngo_spoc" in cols col_data = wc_client.get_table_data("pytest_intermediate", output_name, 1) col_data_original = wc_client.get_table_data( - "pytest_intermediate", quote_columnname("Sheet1", "postgres"), 1 + "pytest_intermediate", + quote_columnname("_airbyte_raw_Sheet1", "postgres"), + 1, ) assert ( col_data[0]["ngo_spoc"] == col_data_original[0]["NGO"] @@ -234,7 +236,7 @@ def test_concatcolumns(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -280,7 +282,7 @@ def test_castdatatypes(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -463,7 +465,7 @@ def test_regexextract(self): config = { "input": { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, "dest_schema": "pytest_intermediate", @@ -482,7 +484,9 @@ def test_regexextract(self): cols = wc_client.get_table_columns("pytest_intermediate", output_name) assert "NGO" in cols table_data_org = wc_client.get_table_data( - "pytest_intermediate", quote_columnname("Sheet1", "postgres"), 10 + "pytest_intermediate", + quote_columnname("_airbyte_raw_Sheet1", "postgres"), + 10, ) table_data_org.sort(key=lambda x: x["_airbyte_ab_id"]) table_data_regex = wc_client.get_table_data( @@ -507,12 +511,12 @@ def test_mergetables(self): "input_arr": [ { "input_type": "model", - "input_name": "Sheet1", + "input_name": "_airbyte_raw_Sheet1", "source_name": None, }, { "input_type": "model", - "input_name": "Sheet2", + "input_name": "_airbyte_raw_Sheet2", "source_name": None, }, ], @@ -527,10 +531,14 @@ def test_mergetables(self): TestPostgresOperations.execute_dbt("run", output_name) table_data1 = wc_client.get_table_data( - "pytest_intermediate", quote_columnname("Sheet1", "postgres"), 10 + "pytest_intermediate", + quote_columnname("_airbyte_raw_Sheet1", "postgres"), + 10, ) table_data2 = wc_client.get_table_data( - "pytest_intermediate", quote_columnname("Sheet2", "postgres"), 10 + "pytest_intermediate", + quote_columnname("_airbyte_raw_Sheet2", "postgres"), + 10, ) table_data_union = wc_client.get_table_data( "pytest_intermediate", output_name, 10