Skip to content

Commit

Permalink
Merge pull request #63 from DalgoT4D/minor_change
Browse files Browse the repository at this point in the history
dbt throws error if we have duplicate names
  • Loading branch information
fatchat authored Feb 17, 2024
2 parents f1239b1 + dd1e5f9 commit 9b932da
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 36 deletions.
3 changes: 3 additions & 0 deletions dbt_automation/assets/packages.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
packages:
- package: dbt-labs/dbt_utils
version: 1.1.1
2 changes: 1 addition & 1 deletion dbt_automation/utils/sourceschemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def mksourcedefinition(sourcename: str, input_schema: str, tables: list):
source = {"name": sourcename, "schema": input_schema, "tables": []}

for tablename in tables:
cleaned_name = tablename.replace(airbyte_prefix, "")
cleaned_name = tablename
source["tables"].append(
{
"name": cleaned_name,
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@
long_description=long_description,
python_requires=">=3.7",
install_requires=["PyYAML", "requests", "google-cloud-bigquery", "psycopg2-binary"],
package_data={"dbt_automation": ["*.sql"]},
package_data={"dbt_automation": ["assets/*.*"]},
include_package_data=True,
)
4 changes: 2 additions & 2 deletions tests/utils/test_sourceschemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ def test_mk_source_definition():
sourcedefinition = mksourcedefinition(sourcename, input_schema, tables)
assert sourcedefinition["sources"][0]["name"] == sourcename
assert sourcedefinition["sources"][0]["schema"] == input_schema
assert sourcedefinition["sources"][0]["tables"][0]["name"] == "Sheet1"
assert sourcedefinition["sources"][0]["tables"][1]["name"] == "Sheet2"
assert sourcedefinition["sources"][0]["tables"][0]["name"] == "_airbyte_raw_Sheet1"
assert sourcedefinition["sources"][0]["tables"][1]["name"] == "_airbyte_raw_Sheet2"


def test_get_source(sources_yaml, tmpdir):
Expand Down
40 changes: 24 additions & 16 deletions tests/warehouse/test_bigquery_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ def test_flatten(self):
wc_client,
TestBigqueryOperations.test_project_dir,
)
TestBigqueryOperations.execute_dbt("run", "Sheet1")
TestBigqueryOperations.execute_dbt("run", "Sheet2")
TestBigqueryOperations.execute_dbt("run", "_airbyte_raw_Sheet1")
TestBigqueryOperations.execute_dbt("run", "_airbyte_raw_Sheet2")
logger.info("inside test flatten")
logger.info(
f"inside project directory : {TestBigqueryOperations.test_project_dir}"
)
assert "Sheet1" in TestBigqueryOperations.wc_client.get_tables(
assert "_airbyte_raw_Sheet1" in TestBigqueryOperations.wc_client.get_tables(
"pytest_intermediate"
)
assert "Sheet2" in TestBigqueryOperations.wc_client.get_tables(
assert "_airbyte_raw_Sheet1" in TestBigqueryOperations.wc_client.get_tables(
"pytest_intermediate"
)

Expand All @@ -129,7 +129,7 @@ def test_rename_columns(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand Down Expand Up @@ -159,7 +159,7 @@ def test_drop_columns(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand All @@ -186,7 +186,7 @@ def test_coalescecolumns(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand All @@ -213,7 +213,9 @@ def test_coalescecolumns(self):
cols = wc_client.get_table_columns("pytest_intermediate", output_name)
assert "ngo_spoc" in cols
col_data = wc_client.get_table_data("pytest_intermediate", output_name, 5)
col_data_original = wc_client.get_table_data("pytest_intermediate", "Sheet1", 5)
col_data_original = wc_client.get_table_data(
"pytest_intermediate", "_airbyte_raw_Sheet1", 5
)
col_data_original.sort(key=lambda x: x["_airbyte_ab_id"])
col_data.sort(key=lambda x: x["_airbyte_ab_id"])
# TODO: can do a stronger check here; by checking on rows in a loop
Expand All @@ -231,7 +233,7 @@ def test_concatcolumns(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand Down Expand Up @@ -277,7 +279,7 @@ def test_castdatatypes(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand Down Expand Up @@ -459,7 +461,7 @@ def test_regexextract(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand All @@ -477,7 +479,9 @@ def test_regexextract(self):

cols = wc_client.get_table_columns("pytest_intermediate", output_name)
assert "NGO" in cols
table_data_org = wc_client.get_table_data("pytest_intermediate", "Sheet1", 10)
table_data_org = wc_client.get_table_data(
"pytest_intermediate", "_airbyte_raw_Sheet1", 10
)
table_data_org.sort(key=lambda x: x["_airbyte_ab_id"])
table_data_regex = wc_client.get_table_data(
"pytest_intermediate", output_name, 10
Expand All @@ -501,12 +505,12 @@ def test_mergetables(self):
"input_arr": [
{
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
{
"input_type": "model",
"input_name": "Sheet2",
"input_name": "_airbyte_raw_Sheet2",
"source_name": None,
},
],
Expand All @@ -520,8 +524,12 @@ def test_mergetables(self):

TestBigqueryOperations.execute_dbt("run", output_name)

table_data1 = wc_client.get_table_data("pytest_intermediate", "Sheet1", 10)
table_data2 = wc_client.get_table_data("pytest_intermediate", "Sheet2", 10)
table_data1 = wc_client.get_table_data(
"pytest_intermediate", "_airbyte_raw_Sheet1", 10
)
table_data2 = wc_client.get_table_data(
"pytest_intermediate", "_airbyte_raw_Sheet2", 10
)
table_data_union = wc_client.get_table_data(
"pytest_intermediate", output_name, 10
)
Expand Down
40 changes: 24 additions & 16 deletions tests/warehouse/test_postgres_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ def test_flatten(self):
wc_client,
TestPostgresOperations.test_project_dir,
)
TestPostgresOperations.execute_dbt("run", "Sheet1")
TestPostgresOperations.execute_dbt("run", "Sheet2")
TestPostgresOperations.execute_dbt("run", "_airbyte_raw_Sheet1")
TestPostgresOperations.execute_dbt("run", "_airbyte_raw_Sheet2")
logger.info("inside test flatten")
logger.info(
f"inside project directory : {TestPostgresOperations.test_project_dir}"
)
assert "Sheet1" in TestPostgresOperations.wc_client.get_tables(
assert "_airbyte_raw_Sheet1" in TestPostgresOperations.wc_client.get_tables(
"pytest_intermediate"
)
assert "Sheet2" in TestPostgresOperations.wc_client.get_tables(
assert "_airbyte_raw_Sheet2" in TestPostgresOperations.wc_client.get_tables(
"pytest_intermediate"
)

Expand All @@ -133,7 +133,7 @@ def test_rename_columns(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand Down Expand Up @@ -163,7 +163,7 @@ def test_drop_columns(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand All @@ -190,7 +190,7 @@ def test_coalescecolumns(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand Down Expand Up @@ -218,7 +218,9 @@ def test_coalescecolumns(self):
assert "ngo_spoc" in cols
col_data = wc_client.get_table_data("pytest_intermediate", output_name, 1)
col_data_original = wc_client.get_table_data(
"pytest_intermediate", quote_columnname("Sheet1", "postgres"), 1
"pytest_intermediate",
quote_columnname("_airbyte_raw_Sheet1", "postgres"),
1,
)
assert (
col_data[0]["ngo_spoc"] == col_data_original[0]["NGO"]
Expand All @@ -234,7 +236,7 @@ def test_concatcolumns(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand Down Expand Up @@ -280,7 +282,7 @@ def test_castdatatypes(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand Down Expand Up @@ -463,7 +465,7 @@ def test_regexextract(self):
config = {
"input": {
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
"dest_schema": "pytest_intermediate",
Expand All @@ -482,7 +484,9 @@ def test_regexextract(self):
cols = wc_client.get_table_columns("pytest_intermediate", output_name)
assert "NGO" in cols
table_data_org = wc_client.get_table_data(
"pytest_intermediate", quote_columnname("Sheet1", "postgres"), 10
"pytest_intermediate",
quote_columnname("_airbyte_raw_Sheet1", "postgres"),
10,
)
table_data_org.sort(key=lambda x: x["_airbyte_ab_id"])
table_data_regex = wc_client.get_table_data(
Expand All @@ -507,12 +511,12 @@ def test_mergetables(self):
"input_arr": [
{
"input_type": "model",
"input_name": "Sheet1",
"input_name": "_airbyte_raw_Sheet1",
"source_name": None,
},
{
"input_type": "model",
"input_name": "Sheet2",
"input_name": "_airbyte_raw_Sheet2",
"source_name": None,
},
],
Expand All @@ -527,10 +531,14 @@ def test_mergetables(self):
TestPostgresOperations.execute_dbt("run", output_name)

table_data1 = wc_client.get_table_data(
"pytest_intermediate", quote_columnname("Sheet1", "postgres"), 10
"pytest_intermediate",
quote_columnname("_airbyte_raw_Sheet1", "postgres"),
10,
)
table_data2 = wc_client.get_table_data(
"pytest_intermediate", quote_columnname("Sheet2", "postgres"), 10
"pytest_intermediate",
quote_columnname("_airbyte_raw_Sheet2", "postgres"),
10,
)
table_data_union = wc_client.get_table_data(
"pytest_intermediate", output_name, 10
Expand Down

0 comments on commit 9b932da

Please sign in to comment.