Skip to content

Commit

Permalink
added fix for snowflake operator to accept authenticator and make par…
Browse files Browse the repository at this point in the history
…ameters optional based on load_type (#92)

* Making parameters like sf_cluster_keys, sf_grantee_roles optional, improving OO code , fixing authenticator issue

* reverting parameter name change to avoid compatibility issues
  • Loading branch information
ckonuganti authored Feb 15, 2024
1 parent 000dc06 commit 5a4fdfc
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 81 deletions.
1 change: 1 addition & 0 deletions brickflow/engine/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -821,6 +821,7 @@ def get_brickflow_libraries(enable_plugins: bool = False) -> List[TaskLibrary]:
return [
bf_lib,
PypiTaskLibrary("apache-airflow==2.6.3"),
PypiTaskLibrary("snowflake==0.5.1"),
MavenTaskLibrary("com.cronutils:cron-utils:9.2.0"),
]
else:
Expand Down
176 changes: 107 additions & 69 deletions brickflow_plugins/databricks/uc_to_snowflake_operator.py

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ wf = Workflow(...)
@wf.task
def run_snowflake_queries(*args):
sf_query_run = SnowflakeOperator(
secret_cope = "your_databricks secrets scope name",
input_params = {'query':"comma_seprated_list_of_queries"}
secret_scope = "your_databricks secrets scope name",
query_string = "string of queries separated by semicolon(;)",
parameters={"key1":"value1", "key2":"value2"}
)
sf_query_run.execute()
```
Expand All @@ -85,7 +86,7 @@ wf = Workflow(...)
def copy_from_uc_sf(*args):
uc_to_sf_copy = UcToSnowflakeOperator(
secret_scope = "your_databricks secrets scope name",
uc_parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema',
parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema',
'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1',
'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'",
'sf_cluster_keys':''}
Expand Down
26 changes: 22 additions & 4 deletions docs/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,11 @@ As databricks secrets is a key value store, code expects the secret scope to con
    database : default database that we want to connect for ex: sample_database
    role : role to which the user has write access for ex: sample_write_role

SnowflakeOperator can accept the following as inputs
    secret_scope (required): databricks secret scope identifier
    query_string (required): queries separated by semicolon
    parameters (optional) : dictionary with variables that can be used to substitute in queries

```python title="snowflake_operator"
from brickflow_plugins import SnowflakeOperator

Expand All @@ -417,8 +422,9 @@ wf = Workflow(...)
@wf.task
def run_snowflake_queries(*args):
sf_query_run = SnowflakeOperator(
secret_cope = "your_databricks secrets scope name",
input_params = {'query':"comma_seprated_list_of_queries"}
secret_scope = "your_databricks secrets scope name",
query_string ="select * from database.$schema.$table where $filter_condition1; select * from sample_schema.test_table",
parameters = {"schema":"test_schema","table":"sample_table","filter_condition":"col='something'"}
)
sf_query_run.execute()
```
Expand All @@ -436,6 +442,18 @@ As databricks secrets is a key value store, code expects the secret scope to con
    database : default database that we want to connect for ex: sample_database
    role : role to which the user has write access for ex: sample_write_role

UcToSnowflakeOperator can expects the following as inputs to copy data in parameters
    load_type (required): type of data load , acceptable values full or incremental
    dbx_catalog (required) : name of the databricks catalog in which object resides
    dbx_database (required): name of the databricks schema in which object is available
    dbx_table (required) : name of the databricks object we want to copy to snowflake
    sf_database (optional) : name of the snowflake database if different from the one in secret_scope
    sf_schema (required): name of the snowflake schema in which we want to copy the data
    sf_table (required) : name of the snowflake object to which we want to copy from databricks
    incremental_filter (required for incrmental mode) : condition to manage data before writing to snowflake
    dbx_data_filter (optional): filter condition on databricks source for full or incremental (if different from inremental_filter)
    sf_grantee_roles (optional) : snowflake roles to which we want to grant select/read access
    sf_cluster_keys (optional) : list of keys we want to cluster our snowflake table.

```python title="uc_to_snowflake_operator"
from brickflow_plugins import UcToSnowflakeOperator
Expand All @@ -445,8 +463,8 @@ wf = Workflow(...)
@wf.task
def run_snowflake_queries(*args):
uc_to_sf_copy = UcToSnowflakeOperator(
secret_cope = "your_databricks secrets scope name",
uc_parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema',
secret_scope = "your_databricks secrets scope name",
parameters = {'load_type':'incremental','dbx_catalog':'sample_catalog','dbx_database':'sample_schema',
'dbx_table':'sf_operator_1', 'sf_schema':'stage','sf_table':'SF_OPERATOR_1',
'sf_grantee_roles':'downstream_read_role', 'incremental_filter':"dt='2023-10-22'",
'sf_cluster_keys':''}
Expand Down
7 changes: 5 additions & 2 deletions examples/brickflow_examples/workflows/demo_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ def airflow_autosys_sensor():
def run_snowflake_queries(*args):
uc_to_sf_copy = UcToSnowflakeOperator(
secret_cope="sample_scope",
uc_parameters={
parameters={
"load_type": "incremental",
"dbx_catalog": "sample_catalog",
"dbx_database": "sample_schema",
Expand All @@ -282,6 +282,7 @@ def run_snowflake_queries(*args):
"sf_table": "SF_OPERATOR_1",
"sf_grantee_roles": "downstream_read_role",
"incremental_filter": "dt='2023-10-22'",
"dbx_data_filter": "run_dt='2023-10-21'",
"sf_cluster_keys": "",
},
)
Expand All @@ -291,7 +292,9 @@ def run_snowflake_queries(*args):
@wf.task
def run_snowflake_queries(*args):
sf_query_run = SnowflakeOperator(
secret_cope="sample_scope", input_params={"query": "select * from table"}
secret_cope="sample_scope",
query_string="select * from table; insert into table1 select * from $database.table2",
parameters={"database": "sample_db"},
)
sf_query_run.execute()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ targets:
- pypi:
package: apache-airflow==2.6.3
repo: null
- pypi:
package: snowflake==0.5.1
repo: null
- maven:
coordinates: com.cronutils:cron-utils:9.2.0
exclusions: null
Expand Down
6 changes: 3 additions & 3 deletions tests/engine/test_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ def test_get_brickflow_lib_version(self):
def test_get_brickflow_libraries(self):
settings = BrickflowProjectDeploymentSettings()
settings.brickflow_project_runtime_version = "1.0.0"
assert len(get_brickflow_libraries(enable_plugins=True)) == 3
assert len(get_brickflow_libraries(enable_plugins=True)) == 4
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand All @@ -439,7 +439,7 @@ def test_get_brickflow_libraries_semver_non_numeric(self):
settings = BrickflowProjectDeploymentSettings()
tag = "1.0.1rc1234"
settings.brickflow_project_runtime_version = tag
assert len(get_brickflow_libraries(enable_plugins=True)) == 3
assert len(get_brickflow_libraries(enable_plugins=True)) == 4
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand All @@ -455,7 +455,7 @@ def test_get_brickflow_libraries_non_semver(self):
settings = BrickflowProjectDeploymentSettings()
tag = "somebranch"
settings.brickflow_project_runtime_version = tag
assert len(get_brickflow_libraries(enable_plugins=True)) == 3
assert len(get_brickflow_libraries(enable_plugins=True)) == 4
assert len(get_brickflow_libraries(enable_plugins=False)) == 1
lib = get_brickflow_libraries(enable_plugins=False)[0].dict
expected = {
Expand Down

0 comments on commit 5a4fdfc

Please sign in to comment.