Skip to content

Commit

Permalink
add config and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
JamieDeMaria committed Mar 22, 2023
1 parent 95b9646 commit e438836
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 33 deletions.
34 changes: 34 additions & 0 deletions docs/content/integrations/snowflake/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ This reference page provides information for working with [`dagster-snowflake`](
- [Selecting specific columns in a downstream asset](#selecting-specific-columns-in-a-downstream-asset)
- [Storing partitioned assets](#storing-partitioned-assets)
- [Storing tables in multiple schemas](#storing-tables-in-multiple-schemas)
- [Storing timestamp data in Pandas DataFrames](#storing-timestamp-data-in-pandas-dataframes)
- [Using the Snowflake I/O manager with other I/O managers](#using-the-snowflake-io-manager-with-other-io-managers)
- [Storing and loading PySpark DataFrames in Snowflake](#storing-and-loading-pyspark-dataframes-in-snowflake)
- [Using Pandas and PySpark DataFrames with Snowflake](#using-pandas-and-pyspark-dataframes-with-snowflake)
Expand Down Expand Up @@ -330,6 +331,39 @@ In this example, the `iris_dataset` asset will be stored in the `IRIS` schema, a

---

## Storing timestamp data in Pandas DataFrames
Due to a longstanding bug in the Snowflake Pandas connector, loading timestamp data from a Pandas DataFrame to Snowflake sometimes causes the data to be corrupted. Prior to `dagster-snowflake` version `0.19.0` we solved this issue by converting all timestamp data to strings before loading the data in Snowflake, and doing the opposite conversion when fetching a DataFrame from Snowflake. However, we can also avoid this issue by ensuring that all timestamp data has a timezone. This allows us to store the data as TIMESTAMP_NTZ(9) type in Snowflake.

To specify how you would like timestamp data to be handled, use the `time_data_to_string` configuration value for the Snowflake I/O manager. If `True`, the I/O manager will convert timestamp data to a string before loading it into Snowflake. If `False` the I/O manager will ensure the data has a timezone (attaching the UTC timezone if necessary) before loading it into Snowflake.

If you would like to migrate a table created prior to `0.19.0` to one with a TIMESTAMP_NTZ(9) type, you can run the follow SQL queries in Snowflake. In the example, our table is located at `database.schema.table` and the column we want to migrate is called `time`:

```sql

// Add a column of type TIMESTAMP_NTZ(9)
ALTER TABLE database.schema.table
ADD COLUMN time_copy TIMESTAMP_NTZ(9)

// copy the data from time and convert to timestamp data
UPDATE database.schema.table
SET time_copy = to_timestamp_ntz(time)

// drop the time column
ALTER TABLE database.schema.table
DROP COLUMN time

// rename the time_copy column to time
ALTER TABLER database.schema.table
RENAME COLUMN time_copy TO time

```

<Note>
The <code>time_data_to_string</code> configuration value will be deprecated in version X.Y.Z of the <code>dagster-snowflake</code> library.
</Note>

---

## Using the Snowflake I/O manager with other I/O managers

You may have assets that you don't want to store in Snowflake. You can provide an I/O manager to each asset using the `io_manager_key` parameter in the `asset` decorator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,48 @@

import pandas as pd
import pandas.core.dtypes.common as pd_core_dtypes_common
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient
from snowflake.connector.pandas_tools import pd_writer

from dagster import InputContext, MetadataValue, OutputContext, TableColumn, TableSchema
from dagster._core.definitions.metadata import RawMetadataValue
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.storage.db_io_manager import DbTypeHandler, TableSlice
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake.snowflake_io_manager import SnowflakeDbClient
from snowflake.connector.pandas_tools import pd_writer


def _table_exists(table_slice: TableSlice, connection):
tables = connection.execute(
f"SHOW TABLES LIKE '{table_slice.table}' IN DATABASE '{table_slice.database}' SCHEMA"
f" '{table_slice.schema}'"
f"SHOW TABLES LIKE '{table_slice.table}' IN SCHEMA"
f" {table_slice.database}.{table_slice.schema}"
).fetchall()
return len(tables) > 0

def _get_table_schema(table_slice: TableSlice, connection):

def _get_table_column_types(table_slice: TableSlice, connection):
if _table_exists(table_slice, connection):
return connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall()
schema_list = connection.execute(f"DESCRIBE TABLE {table_slice.table}").fetchall()
return {item[0]: item[1] for item in schema_list}


def _convert_timestamp_to_string(s: pd.Series, table_schema) -> pd.Series:
def _convert_timestamp_to_string(s: pd.Series, column_types) -> pd.Series:
"""Converts columns of data of type pd.Timestamp to string so that it can be stored in
snowflake.
"""
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s): # type: ignore # (bad stubs)
if table_schema:
pass
if column_types:
if "VARCHAR" not in column_types[s.name]:
raise DagsterInvariantViolationError(
"Snowflake I/O manager configured to convert time data to strings, but the"
" corresponding column is not of type VARCHAR, it is of type"
f" {column_types[s.name]}. Please set time_data_to_string=False in the"
" Snowflake I/O manager configuration to store time data as TIMESTAMP types."
)
return s.dt.strftime("%Y-%m-%d %H:%M:%S.%f %z")
else:
return s


def _convert_string_to_timestamp(s: pd.Series, table_schema) -> pd.Series:
def _convert_string_to_timestamp(s: pd.Series) -> pd.Series:
"""Converts columns of strings in Timestamp format to pd.Timestamp to undo the conversion in
_convert_timestamp_to_string.
Expand All @@ -50,8 +59,16 @@ def _convert_string_to_timestamp(s: pd.Series, table_schema) -> pd.Series:
return s


def _add_missing_timezone(s: pd.Series) -> pd.Series:
def _add_missing_timezone(s: pd.Series, column_types) -> pd.Series:
if pd_core_dtypes_common.is_datetime_or_timedelta_dtype(s):
if column_types:
if "VARCHAR" in column_types[s.name]:
raise DagsterInvariantViolationError(
f"Snowflake I/O manager: The Snowflake column for {s.name} is of type"
f" {column_types[s.name]} and should be of type TIMESTAMP to store time data."
" Please migrate this column to be of time TIMESTAMP_NTZ(9) to store time"
" data."
)
return s.dt.tz_localize("UTC")
return s

Expand Down Expand Up @@ -79,14 +96,15 @@ def handle_output(

connector.paramstyle = "pyformat"
with_uppercase_cols = obj.rename(str.upper, copy=False, axis="columns")
table_schema = _get_table_schema(table_slice, connection)
if context.config["time_data_to_string"]:
column_types = _get_table_column_types(table_slice, connection)
if context.resource_config["time_data_to_string"]:
with_uppercase_cols = with_uppercase_cols.apply(
_convert_timestamp_to_string, axis="index", args=(table_schema)
lambda x: _convert_timestamp_to_string(x, column_types),
axis="index",
)
else:
with_uppercase_cols = with_uppercase_cols.apply(
_add_missing_timezone, axis="index", args=(table_schema)
lambda x: _add_missing_timezone(x, column_types), axis="index"
)
with_uppercase_cols.to_sql(
table_slice.table,
Expand Down Expand Up @@ -116,7 +134,7 @@ def load_input(
result = pd.read_sql(
sql=SnowflakeDbClient.get_select_statement(table_slice), con=connection
)
if context.config["time_data_to_string"]:
if context.resource_config["time_data_to_string"]:
result = result.apply(_convert_string_to_timestamp, axis="index")
result.columns = map(str.lower, result.columns) # type: ignore # (bad stubs)
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
materialize,
op,
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.storage.db_io_manager import TableSlice
from dagster_snowflake import build_snowflake_io_manager
from dagster_snowflake.resources import SnowflakeConnection
Expand Down Expand Up @@ -201,9 +202,11 @@ def io_manager_test_pipeline():

@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
def test_io_manager_with_snowflake_pandas_timestamp_data():
schema_name = "SNOWFLAKE_IO_MANAGER_SCHEMA"
db_name = "TEST_SNOWFLAKE_IO_MANAGER"
with temporary_snowflake_table(
schema_name="SNOWFLAKE_IO_MANAGER_SCHEMA",
db_name="TEST_SNOWFLAKE_IO_MANAGER",
schema_name=schema_name,
db_name=db_name,
) as table_name:
time_df = pandas.DataFrame(
{
Expand All @@ -215,13 +218,7 @@ def test_io_manager_with_snowflake_pandas_timestamp_data():
}
)

@op(
out={
table_name: Out(
io_manager_key="snowflake", metadata={"schema": "SNOWFLAKE_IO_MANAGER_SCHEMA"}
)
}
)
@op(out={table_name: Out(io_manager_key="snowflake", metadata={"schema": schema_name})})
def emit_time_df(_):
return time_df

Expand All @@ -237,7 +234,7 @@ def read_time_df(df: pandas.DataFrame):
"snowflake": {
"config": {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
"database": "TEST_SNOWFLAKE_IO_MANAGER",
"database": db_name,
}
}
}
Expand All @@ -249,6 +246,26 @@ def io_manager_timestamp_test_job():
res = io_manager_timestamp_test_job.execute_in_process()
assert res.success

@job(
resource_defs={"snowflake": snowflake_pandas_io_manager},
config={
"resources": {
"snowflake": {
"config": {
**SHARED_BUILDKITE_SNOWFLAKE_CONF,
"database": db_name,
"time_data_as_string": True,
}
}
}
},
)
def io_manager_timestamp_as_string_test_job():
read_time_df(emit_time_df())

with pytest.raises(DagsterInvariantViolationError):
io_manager_timestamp_as_string_test_job.execute_in_process()


@pytest.mark.skipif(not IS_BUILDKITE, reason="Requires access to the BUILDKITE snowflake DB")
def test_time_window_partitioned_asset():
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
from contextlib import contextmanager
from typing import Mapping, Optional, Sequence, Type, cast

from sqlalchemy.exc import ProgrammingError

from dagster import Field, IOManagerDefinition, OutputContext, StringSource, io_manager
from dagster._core.definitions.time_window_partitions import TimeWindow
from dagster._core.storage.db_io_manager import (
Expand All @@ -13,6 +11,7 @@
TableSlice,
)
from dagster._utils.backcompat import deprecation_warning
from sqlalchemy.exc import ProgrammingError

from .resources import SnowflakeConnection

Expand Down Expand Up @@ -137,15 +136,21 @@ def my_table_a(my_table: pd.DataFrame) -> pd.DataFrame:
"If using Pandas DataFrames, whether to convert time data to strings. If True,"
" time data will be converted to strings when storing the dataframe and"
" converted back to time data when loading the dataframe. If False, time data"
" without a timezone will be set to UTC timezone to avoid a Snowflake bug. Defaults to False."
" without a timezone will be set to UTC timezone to avoid a Snowflake bug."
" Defaults to False."
),
),
}
)
def snowflake_io_manager(init_context):
if init_context.config["time_data_to_string"]:
if init_context.resource_config["time_data_to_string"]:
deprecation_warning(
"Snowflake I/O manager config time_data_to_string", "2.0.0", "Convert existing tables to use timestamps and remove time_data_to_string configuration instead."
"Snowflake I/O manager config time_data_to_string",
"2.0.0",
(
"Convert existing tables to use timestamps and remove time_data_to_string"
" configuration instead."
),
)
return DbIOManager(
type_handlers=type_handlers,
Expand Down

0 comments on commit e438836

Please sign in to comment.