Skip to content

Commit

Permalink
Add Azure SQL support to clouddb_extractor (#69)
Browse files Browse the repository at this point in the history
  • Loading branch information
FrancoisZim authored Jul 25, 2022
1 parent a0935cb commit 0f9062b
Show file tree
Hide file tree
Showing 5 changed files with 259 additions and 7 deletions.
17 changes: 17 additions & 0 deletions Community-Supported/clouddb-extractor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ following methods:
For a full list of methods and args see the docstrings in the BaseExtractor class.

## Contents
* __azuresql_extractor.py__ - Azure SQL Database implementation of Base Hyper Extractor ABC
* __base_extractor.py__ - provides an Abstract Base Class with some utility methods to extract from cloud databases to "live to hyper" Tableau Datasources. Database specific Extractor classes extend this to manage connections and schema discovery
and may override the generic query processing methods based on DBAPIv2 standards with database specific optimizations.
* __bigquery_extractor.py__ - Google BigQuery implementation of Base Hyper Extractor ABC
Expand Down Expand Up @@ -61,6 +62,18 @@ $ python3 extractor_cli.py --help
- delete: Delete rows from a Tableau datasource that match key columns in a changeset from a query
```

### Sample Usage

```console
# Load a sample (default=1000 lines) from test_table to sample_extract in test_project
python3 extractor_cli.py load_sample --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --source_table_id test_table --tableau_project test_project --tableau_datasource sample_extract

# Load a full extract from test_table to full_extract in test_project
python3 extractor_cli.py export_load --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --source_table_id test_table --tableau_project test_project --tableau_datasource full_extract

# Execute updated_rows.sql to retrieve a changeset and update full_extract where ROW_ID in changeset matches
python3 extractor_cli.py update --tableau_token_name hyperapitest --tableau_token_secretfile hyperapitest.token --sqlfile updated_rows.sql --tableau_project test_project --tableau_datasource full_extract --match_columns ROW_ID ROW_ID
```

# Installation

Expand Down Expand Up @@ -103,6 +116,10 @@ cd hyper-api-samples/Community-Supported/clouddb-extractor
pip install -r requirements.txt
```

## Azure SQL Database Configuration
The following steps are required if using azuresql_extractor.
- Install ODBC Drivers and Azure utilities for your platform using the following instructions: https://github.com/Azure-Samples/AzureSqlGettingStartedSamples/tree/master/python/Unix-based

## Google BigQuery Configuration
The following steps are required if using bigquery_extractor.

Expand Down
209 changes: 209 additions & 0 deletions Community-Supported/clouddb-extractor/azuresql_extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
"""Azure SQL Database implementation of Base Hyper Extractor ABC
Tableau Community supported Hyper API sample
-----------------------------------------------------------------------------
This file is the copyrighted property of Tableau Software and is protected
by registered patents and other applicable U.S. and international laws and
regulations.
You may adapt this file and modify it to fit into your context and use it
as a template to start your own projects.
-----------------------------------------------------------------------------
"""
import logging
from typing import Any, Optional, Dict

import pyodbc
from tableauhyperapi import Nullability, SqlType, TableDefinition, TableName

from base_extractor import DEFAULT_SITE_ID, BaseExtractor, HyperSQLTypeMappingError

logger = logging.getLogger("hyper_samples.extractor.mySQL")

class QuerySizeLimitError(Exception):
pass

class AzureSQLExtractor(BaseExtractor):
"""Azure SQL Database Implementation of Extractor Interface
Authentication to Tableau Server can be either by Personal Access Token or
Username and Password.
Constructor Args:
- source_database_config (dict): Source database parameters
- tableau_hostname (string): URL for Tableau Server, e.g. "http://localhost"
- tableau_site_id (string): Tableau site identifier - if default use ""
- tableau_project (string): Tableau project identifier
- tableau_token_name (string): PAT name
- tableau_token_secret (string): PAT secret
- tableau_username (string): Tableau username
- tableau_password (string): Tableau password
NOTE: Authentication to Tableau Server can be either by Personal Access Token or
Username and Password. If both are specified then token takes precedence.
"""

def __init__(
self,
source_database_config: dict,
tableau_hostname: str,
tableau_project: str,
tableau_site_id: str = DEFAULT_SITE_ID,
tableau_token_name: Optional[str] = None,
tableau_token_secret: Optional[str] = None,
tableau_username: Optional[str] = None,
tableau_password: Optional[str] = None,
) -> None:
super().__init__(
source_database_config=source_database_config,
tableau_hostname=tableau_hostname,
tableau_project=tableau_project,
tableau_site_id=tableau_site_id,
tableau_token_name=tableau_token_name,
tableau_token_secret=tableau_token_secret,
tableau_username=tableau_username,
tableau_password=tableau_password,
)
self._source_database_connection = None
self.sql_identifier_quote = ""

def source_database_cursor(self) -> Any:
"""
Returns a DBAPI Cursor to the source database
"""
assert self.source_database_config is not None
if self._source_database_connection is None:
logger.info("Connecting to source Azure SQL Database Instance...")

db_connection_args = self.source_database_config.get("connection")
assert type(db_connection_args) is dict

key_vault_url = db_connection_args.get("key_vault_url")
secret_name = db_connection_args.get("secret_name")
if key_vault_url is not None:
#Recommended: Read password from keyvault
from azure.identity import DefaultAzureCredential
from azure.keyvault.secrets import SecretClient
credential = DefaultAzureCredential()
secret_client = SecretClient(vault_url=key_vault_url, credential=credential)
secret = secret_client.get_secret(secret_name)
this_password = secret.value
else:
#Password is stored as plain text
this_password = db_connection_args["password"]

connection_str = "Driver={{ODBC Driver 17 for SQL Server}};Server={host},{port};Database={database};Uid={username};Pwd={password};{connect_str_suffix}".format(
host=db_connection_args["host"],
port=db_connection_args["port"],
database=db_connection_args["database"],
username=db_connection_args["username"],
password=this_password,
connect_str_suffix=db_connection_args["connect_str_suffix"]
)
self._source_database_connection = pyodbc.connect(connection_str)

return self._source_database_connection.cursor()

def hyper_sql_type(self, source_column: Any) -> SqlType:
"""
Finds the corresponding Hyper column type for source_column
source_column (obj): Instance of DBAPI Column description tuple
Returns a tableauhyperapi.SqlType Object
"""

"""
Note: pyodbc returns a description which contains a tuple per column with the following fields
0 column name (or alias, if specified in the SQL)
1 type object
2 display size (pyodbc does not set this value)
3 internal size (in bytes)
4 precision
5 scale
6 nullable (True/False)
e.g. ('schema_id', <class 'int'>, None, 10, 10, 0, False)
The mapping from SQL types to python types is defined in pyodbx.SQL_data_type_dict
"""
source_column_type = source_column[1].__name__
source_column_precision = source_column[4]
source_column_scale = source_column[5]

type_lookup = {
"str": SqlType.text,
"unicode": SqlType.text,
"bytearray": SqlType.text,
"bool": SqlType.bool,

"int": SqlType.int,
"float": SqlType.double,
"long": SqlType.big_int,
#"Decimal": SqlType.numeric,

"date": SqlType.date,
"time": SqlType.time,
"datetime": SqlType.timestamp_tz,
}

if source_column_type == 'Decimal':
return_sql_type = SqlType.numeric(source_column_precision, source_column_scale)
else:
return_sql_type = type_lookup.get(source_column_type)

if return_sql_type is None:
error_message = "No Hyper SqlType defined for MySQL source type: {}".format(source_column_type)
logger.error(error_message)
raise HyperSQLTypeMappingError(error_message)

return_sql_type = return_sql_type()

logger.debug("Translated source column type {} to Hyper SqlType {}".format(source_column_type, return_sql_type))
return return_sql_type

def hyper_table_definition(self, source_table: Any, hyper_table_name: str = "Extract") -> TableDefinition:
"""
Build a hyper table definition from source_schema
source_table (obj): Source table (Instance of DBAPI Cursor Description)
hyper_table_name (string): Name of the target Hyper table, default="Extract"
Returns a tableauhyperapi.TableDefinition Object
"""
logger.debug(
"Building Hyper TableDefinition for table {}".format(source_table)
)
target_cols = []
for source_column in source_table:
this_name = source_column[0]
this_type = self.hyper_sql_type(source_column)
if source_column[6] == False:
this_col = TableDefinition.Column(this_name, this_type, Nullability.NOT_NULLABLE)
else:
this_col = TableDefinition.Column(name=this_name, type=this_type)
target_cols.append(this_col)
logger.info("..Column {} - Type {}".format(this_name, this_type))

# Create the target schema for our Hyper File
target_schema = TableDefinition(table_name=TableName("Extract", hyper_table_name), columns=target_cols)
return target_schema

def load_sample(
self,
tab_ds_name: str,
source_table: Optional[str] = None,
sql_query: Optional[str] = None,
sample_rows: int = 0,
publish_mode: Any = None,
) -> None:
error_message = "METHOD load_sample is not implemented for SQL Server (Transact-SQL does not support the LIMIT statement)"
logger.error(error_message)
raise NotImplementedError(error_message)

def main():
pass


if __name__ == "__main__":
main()
8 changes: 4 additions & 4 deletions Community-Supported/clouddb-extractor/base_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ class BaseExtractor(ABC):

def __init__(
self,
source_database_config: Dict,
source_database_config: Dict[str, Any],
tableau_hostname: str,
tableau_project: str,
tableau_site_id: str = DEFAULT_SITE_ID,
Expand Down Expand Up @@ -272,7 +272,7 @@ def quoted_sql_identifier(self, sql_identifier: str) -> str:
raise Exception("Invalid SQL identifier: {} - exceeded max allowed length: {}".format(sql_identifier, maxlength))

# char_whitelist = re.compile("^[A-Za-z0-9_-.]*$")
char_whitelist = re.compile("\A[\w\.\-]*\Z")
char_whitelist = re.compile(r"\A[\w\.\-]*\Z")
if char_whitelist.match(sql_identifier) is None:
raise Exception("Invalid SQL identifier: {} - found invalid characters".format(sql_identifier))

Expand Down Expand Up @@ -346,7 +346,7 @@ def query_result_to_hyper_file(
self,
target_table_def: Optional[TableDefinition] = None,
cursor: Any = None,
query_result_iter: Iterable[Iterable[object]] = None,
query_result_iter: Optional[Iterable[Iterable[object]]] = None,
hyper_table_name: str = "Extract",
) -> Path:
"""
Expand Down Expand Up @@ -379,9 +379,9 @@ def query_result_to_hyper_file(
if cursor.description is None:
raise Exception("DBAPI Cursor did not return any schema description for query:{}".format(cursor.query))
target_table_def = self.hyper_table_definition(source_table=cursor.description, hyper_table_name=hyper_table_name)
assert target_table_def is not None

path_to_database = Path(tempfile_name(prefix="temp_", suffix=".hyper"))

with HyperProcess(telemetry=TELEMETRY) as hyper:

# Creates new Hyper extract file
Expand Down
14 changes: 13 additions & 1 deletion Community-Supported/clouddb-extractor/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,23 @@ mysql: #Mysql configuration defaults
database : "dev"
port : 3306
username : "test"
#Recommended: use key vault instead of password
#key_vault_url : "https://<your_keyvault_name>.vault.azure.net"
#secret_name : my-password-secret
password : "password"
raise_on_warnings : True
azuresql: #Azure SQL Database configuration defaults
connection:
host : "mydbserver.test"
port : 1433
database : "test"
username : "test"
password : "password"
connect_str_suffix : "Encrypt=yes;TrustServerCertificate=no;Connection Timeout=30"
raise_on_warnings : True
redshift: #Redshift configuration defaults
connection:
host : 'redshift-cluster-1.XXX.eu-west-1.redshift.amazonaws.com'
database : 'dev'
user : 'test'
password : 'password'
password : 'password'
18 changes: 16 additions & 2 deletions Community-Supported/clouddb-extractor/extractor_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@
import json
import yaml

import tableauserverclient as TSC

# Globals
EXTRACTORS = {
"bigquery": "bigquery_extractor.BigQueryExtractor",
"redshift": "redshift_extractor.RedshiftExtractor",
"mysql": "mysql_extractor.MySQLExtractor",
"postgres": "postgres_extractor.PostgresExtractor",
"azuresql": "azuresql_extractor.AzureSQLExtractor",
}
CONFIGURATION_FILE = "config.yml"

Expand Down Expand Up @@ -60,7 +63,7 @@ def _exclusive_args(args, *arg_names, required=True, message=None):
if required:
if count_args != 1:
if message is None:
raise IllegalArgumentError(message="Must specify one of {}".format(",".join(arg_names)))
raise IllegalArgumentError("Must specify one of {}".format(",".join(arg_names)))
else:
raise IllegalArgumentError(message)
else:
Expand Down Expand Up @@ -185,7 +188,11 @@ def main():
"--match_conditions_json",
help="Json file defining conditions for matching rows when command=[update|delete].",
)

parser.add_argument(
"--overwrite",
action='store_true',
help="Overwrite published datasource when command=[load_sample|export_load] - default behaviour returns error if target datasource exists",
)
# Tableau Server / Tableau Online options
_add_arg_with_default(parser, config, "tableau_env.server_address", "Tableau connection string", True, "--tableau_hostname", "-H")
_add_arg_with_default(parser, config, "tableau_env.site_id", "Tableau site id", True, "--tableau_site_id", "-S")
Expand Down Expand Up @@ -229,6 +236,7 @@ def main():
# These are loaded on demand so that you don't have to install
# client libraries for all source database implementations
extractor_class_str = EXTRACTORS.get(selected_extractor)
assert extractor_class_str is not None
extractor_module_str = extractor_class_str.split(".")[0]
extractor_class_str = extractor_class_str.split(".")[1]
extractor_module = importlib.import_module(extractor_module_str)
Expand Down Expand Up @@ -263,6 +271,10 @@ def main():
tableau_username=args.tableau_username,
tableau_password=tableau_password,
)

publishmode=TSC.Server.PublishMode.CreateNew
if args.overwrite:
publishmode=TSC.Server.PublishMode.Overwrite

if selected_command == "load_sample":
_required_arg(
Expand All @@ -275,13 +287,15 @@ def main():
source_table=args.source_table_id,
tab_ds_name=args.tableau_datasource,
sample_rows=_get_int_from_arg(args.sample_rows, "sample_rows", True),
publish_mode=publishmode,
)

if selected_command == "export_load":
extractor.export_load(
sql_query=sql_string,
source_table=args.source_table_id,
tab_ds_name=args.tableau_datasource,
publish_mode=publishmode,
)

if selected_command == "append":
Expand Down

0 comments on commit 0f9062b

Please sign in to comment.