Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
JamieDeMaria committed Feb 8, 2023
1 parent 073f463 commit 6338da8
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def handle_output(
method=pd_writer,
)
except InterfaceError as e:
if "out of range" in e.__cause__:
if "out of range" in e.orig.msg:
raise DagsterInvalidInvocationError(
f"Could not store output {context.name} of step {context.step_key}. If the"
" DataFrame includes pandas Timestamp values, ensure that they have"
Expand All @@ -86,9 +86,11 @@ def handle_output(
def load_input(self, context: InputContext, table_slice: TableSlice) -> pd.DataFrame:
with _connect_snowflake(context, table_slice) as con:
try:
result = pd.read_sql(sql=SnowflakeDbClient.get_select_statement(table_slice), con=con)
result = pd.read_sql(
sql=SnowflakeDbClient.get_select_statement(table_slice), con=con
)
except InterfaceError as e:
if "out of range" in e.__cause__.msg:
if "out of range" in e.orig.msg:
raise DagsterInvalidInvocationError(
f"Could not load input {context.name} of {context.op_def.name}. If the"
" DataFrame includes pandas Timestamp values, ensure that they have"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@
"password": os.getenv("SNOWFLAKE_BUILDKITE_PASSWORD", ""),
}

# SHARED_BUILDKITE_SNOWFLAKE_CONF = {
# "account": os.getenv("SNOWFLAKE_ACCOUNT", ""),
# "user": "[email protected]",
# "password": os.getenv("SNOWFLAKE_PASSWORD", ""),
# }



@contextmanager
def temporary_snowflake_table(schema_name: str, db_name: str, column_str: str) -> Iterator[str]:
Expand All @@ -65,7 +58,6 @@ def temporary_snowflake_table(schema_name: str, db_name: str, column_str: str) -
try:
yield table_name
finally:
# pass
conn.cursor().execute(f"drop table {schema_name}.{table_name}")


Expand Down Expand Up @@ -177,11 +169,6 @@ def test_io_manager_with_snowflake_pandas_timestamp_data():
db_name="TEST_SNOWFLAKE_IO_MANAGER",
column_str="foo string, date TIMESTAMP_NTZ(9)",
) as table_name:
# with temporary_snowflake_table(
# schema_name="JAMIE",
# db_name="SANDBOX",
# column_str="foo string, date TIMESTAMP_NTZ(9)",
# ) as table_name:
time_df = pandas.DataFrame(
{
"foo": ["bar", "baz"],
Expand Down Expand Up @@ -235,11 +222,6 @@ def test_io_manager_with_snowflake_pandas_timestamp_data_error():
db_name="TEST_SNOWFLAKE_IO_MANAGER",
column_str="foo string, date TIMESTAMP_NTZ(9)",
) as table_name:
# with temporary_snowflake_table(
# schema_name="JAMIE",
# db_name="SANDBOX",
# column_str="foo string, date TIMESTAMP_NTZ(9)",
# ) as table_name:
time_df = pandas.DataFrame(
{
"foo": ["bar", "baz"],
Expand Down Expand Up @@ -293,11 +275,6 @@ def test_time_window_partitioned_asset(tmp_path):
db_name="TEST_SNOWFLAKE_IO_MANAGER",
column_str="TIME TIMESTAMP_NTZ(9), A string, B int",
) as table_name:
# with temporary_snowflake_table(
# schema_name="JAMIE",
# db_name="SANDBOX",
# column_str="TIME TIMESTAMP_NTZ(9), A string, B int",
# ) as table_name:

@asset(
partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"),
Expand All @@ -320,8 +297,6 @@ def daily_partitioned(context):

asset_full_name = f"SNOWFLAKE_IO_MANAGER_SCHEMA__{table_name}"
snowflake_table_path = f"SNOWFLAKE_IO_MANAGER_SCHEMA.{table_name}"
# asset_full_name = f"JAMIE__{table_name}"
# snowflake_table_path = f"JAMIE.{table_name}"

snowflake_config = {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
Expand Down

0 comments on commit 6338da8

Please sign in to comment.