diff --git a/airflow_example/dags/tableau_datasource_update/configs/column_config.json b/airflow_example/dags/tableau_datasource_update/configs/column_config.json deleted file mode 100644 index 909b77d0..00000000 --- a/airflow_example/dags/tableau_datasource_update/configs/column_config.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "Sample Column Name": { - "description": "This is an example of a column description for a sample column.", - "folder": "Sample Folder Name", - "persona": "string_dimension", - "datasources": [ - { - "name": "First Datasource", - "local-name": "sample_column_name", - "sql_alias": "SAMPLE_COLUMN_NAME" - }, - { - "name": "Second Datasource", - "local-name": "SAMPLE_COLUMN_NAME", - "sql_alias": "SAMPLE_COLUMN_NAME" - } - ] - } -} \ No newline at end of file diff --git a/airflow_example/dags/tableau_datasource_update/configs/column_persona_config.json b/airflow_example/dags/tableau_datasource_update/configs/column_persona_config.json deleted file mode 100644 index 663f4aa8..00000000 --- a/airflow_example/dags/tableau_datasource_update/configs/column_persona_config.json +++ /dev/null @@ -1,72 +0,0 @@ -{ - "string_dimension": { - "role": "dimension", - "role_type": "nominal", - "datatype": "string" - }, - "date_dimension": { - "role": "dimension", - "role_type": "ordinal", - "datatype": "date" - }, - "datetime_dimension": { - "role": "dimension", - "role_type": "ordinal", - "datatype": "datetime" - }, - "date_measure": { - "role": "measure", - "role_type": "ordinal", - "datatype": "date" - }, - "datetime_measure": { - "role": "measure", - "role_type": "ordinal", - "datatype": "datetime" - }, - "discrete_number_dimension": { - "role": "dimension", - "role_type": "ordinal", - "datatype": "integer" - }, - "continuous_number_dimension": { - "role": "dimension", - "role_type": "quantitative", - "datatype": "integer" - }, - "discrete_number_measure": { - "role": "measure", - "role_type": "ordinal", - "datatype": "integer" - }, - "continuous_number_measure": { - "role": "measure", - "role_type": "quantitative", - "datatype": "integer" - }, - "discrete_decimal_dimension": { - "role": "dimension", - "role_type": "ordinal", - "datatype": "real" - }, - "continuous_decimal_dimension": { - "role": "dimension", - "role_type": "quantitative", - "datatype": "real" - }, - "discrete_decimal_measure": { - "role": "measure", - "role_type": "ordinal", - "datatype": "real" - }, - "continuous_decimal_measure": { - "role": "measure", - "role_type": "quantitative", - "datatype": "real" - }, - "boolean_dimension": { - "role": "dimension", - "role_type": "nominal", - "datatype": "boolean" - } -} \ No newline at end of file diff --git a/airflow_example/dags/tableau_datasource_update/configs/column_personas.yaml b/airflow_example/dags/tableau_datasource_update/configs/column_personas.yaml new file mode 100644 index 00000000..089dd406 --- /dev/null +++ b/airflow_example/dags/tableau_datasource_update/configs/column_personas.yaml @@ -0,0 +1,123 @@ +boolean_dimension: + datatype: boolean + role: dimension + role_type: nominal + metadata: + remote_type: 11 + aggregation: Count + +continuous_decimal_dimension: + datatype: real + role: dimension + role_type: quantitative + metadata: + remote_type: 5 + precision: 15 + aggregation: Sum + +continuous_decimal_measure: + datatype: real + role: measure + role_type: quantitative + metadata: + remote_type: 5 + precision: 15 + aggregation: Sum + +continuous_number_dimension: + datatype: integer + role: dimension + role_type: quantitative + metadata: + remote_type: 131 + scale: 0 + precision: 38 + aggregation: Sum + +continuous_number_measure: + datatype: integer + role: measure + role_type: quantitative + metadata: + remote_type: 131 + scale: 0 + precision: 38 + aggregation: Sum + +date_dimension: + datatype: date + role: dimension + role_type: ordinal + metadata: + remote_type: 7 + aggregation: Year + +date_measure: + datatype: date + role: measure + role_type: ordinal + metadata: + remote_type: 7 + aggregation: Year + +datetime_dimension: + datatype: datetime + role: dimension + role_type: ordinal + metadata: + remote_type: 7 + aggregation: Year + +datetime_measure: + datatype: datetime + role: measure + role_type: ordinal + metadata: + remote_type: 7 + aggregation: Year + +discrete_decimal_dimension: + datatype: real + role: dimension + role_type: ordinal + metadata: + remote_type: 5 + precision: 15 + aggregation: Sum + +discrete_decimal_measure: + datatype: real + role: measure + role_type: ordinal + metadata: + remote_type: 5 + precision: 15 + aggregation: Sum + +discrete_number_dimension: + datatype: integer + role: dimension + role_type: ordinal + metadata: + remote_type: 131 + scale: 0 + precision: 38 + aggregation: Sum + +discrete_number_measure: + datatype: integer + role: measure + role_type: ordinal + metadata: + remote_type: 131 + scale: 0 + precision: 38 + aggregation: Sum + +string_dimension: + datatype: string + role: dimension + role_type: nominal + metadata: + remote_type: 129 + aggregation: Count diff --git a/airflow_example/dags/tableau_datasource_update/configs/configuration.py b/airflow_example/dags/tableau_datasource_update/configs/configuration.py new file mode 100644 index 00000000..14f55efe --- /dev/null +++ b/airflow_example/dags/tableau_datasource_update/configs/configuration.py @@ -0,0 +1,241 @@ +import logging +import re +import yaml +from github import Github, Auth +from dataclasses import dataclass, field, asdict + + +class CFGList(list): + """ Extends the functionality of a list to get and set items by equality """ + def __getitem__(self, item): + if isinstance(item, int): + return super().__getitem__(item) + return super().__getitem__(self.index(item)) + + def __setitem__(self, item, value): + if isinstance(item, int): + super().__setitem__(item, value) + else: + super().__setitem__(self.index(item), value) + + def get(self, item): + """ Gets the item from the list """ + for obj in self: + if obj == item: + return item + return None + + +@dataclass +class CFGColumn: + """ A Column in the columns and calculations YAML files """ + name: str + caption: str + role: str + type: str + datatype: str + remote_name: str = None + desc: str = None + calculation: str = None + folder_name: str = None + metadata: dict = None + + def __post_init__(self): + # Surround the column local-name with brackets + if not re.search(r'^\[.+?]', self.name): + self.name = f'[{self.name}]' + + def __eq__(self, other): + if isinstance(other, str): + return self.caption == other + if isinstance(other, dict): + return self.caption == other.get('caption') \ + and self.name == other.get('name') \ + and self.role == other.get('role') \ + and self.type == other.get('type') \ + and self.datatype == other.get('datatype') \ + and self.desc == other.get('desc') \ + and self.calculation == other.get('calculation') + if isinstance(other, (CFGColumn, object)): + return self.caption == other.caption \ + and self.name == other.name \ + and self.role == other.role \ + and self.type == other.type \ + and self.datatype == other.datatype \ + and self.desc == other.desc \ + and self.calculation == other.calculation + return False + + def dict(self): + """ Returns a dict of the class """ + dictionary = asdict(self) + del dictionary['metadata'] + return dictionary + + +@dataclass +class CFGFolder: + """ A Folder of Columns in the columns and calculations YAML files """ + name: str + role: str = None + + def __eq__(self, other): + if isinstance(other, str): + return self.name == other + if isinstance(other, dict): + return self.name == other.get('name') and self.role == other.get('role') \ + or not other.get('role') and self.name == other.get('name') + if isinstance(other, (CFGFolder, object)): + return self.name == other.name and self.role == other.role \ + or not other.role and self.name == other.name + return False + + def dict(self): + """ Returns a dict of the class """ + return asdict(self) + + +@dataclass +class CFGDatasource: + """ A Datasource of a Column in the columns and calculations YAML files """ + name: str + project_name: str + id: str = None + columns: CFGList[CFGColumn] = field(default_factory=CFGList) + folders: CFGList[CFGFolder] = field(default_factory=CFGList) + + def __eq__(self, other): + if isinstance(other, str): + return self.name == other + if isinstance(other, dict): + return self.name == other.get('name') and self.project_name == other.get('project_name') + if isinstance(other, (CFGDatasource, object)): + return self.name == other.name and self.project_name == other.project_name + return False + + +class Config: + """ Creates a configuration of the Tableau Datasource(s), + based on the corresponding DBT model YAML file(s) in a separate GitHub repo. + + Args: + githup_token (str): The GitHub token with scope to read from the DBT repo + repo_name (str): The name of the DBT GitHub repo + repo_branch (str): The branch name of the GitHub repo + subfolder (str): The sub-folder in the GitHub repo which contains the Datasource yaml file(s) + """ + def __init__(self, githup_token, repo_name, subfolder, repo_branch='main'): + self.__github_token = githup_token + self._repo_name = repo_name + self._repo_branch = repo_branch + self._yaml_subfolder = subfolder + with open('dags/tableau_datasource_update/configs/column_personas.yaml') as read_config: + self._personas = yaml.safe_load(read_config) + + # Set the datasources configuration based on the config files + self.datasources: CFGList[CFGDatasource] = CFGList() + self.__set_datasources() + self.__update_column_calcs() + + def __set_datasources(self): + """ + Reads the YAML files from DBT to form the datasources attribute. + Output -> {datasource: {column_caption: info}} + """ + # Read the files from GitHub + auth = Auth.Token(self.__github_token) + github = Github(auth=auth) + repo = github.get_repo(self._repo_name) + files = repo.get_contents(self._yaml_subfolder, ref=self._repo_branch) + for file in files: + # Parse YAML files + name, extension = file.name.split('.') + if extension.lower() not in ['yaml', 'yml']: + continue + model_cfg = yaml.safe_load(file.decoded_content.decode()) + model_meta_tableau = model_cfg['models'][0].get('meta', {}).get('tableau', None) + if not model_meta_tableau: + logging.warning('(Skipping) Datasource model does not contain tableau meta data: %s', name) + continue + calculations = model_meta_tableau.pop('calculations', []) + columns = model_cfg['models'][0].pop('columns', []) + datasource = CFGDatasource( + name=model_meta_tableau.pop('datasource'), + project_name=model_meta_tableau.pop('project') + ) + # Add metadata columns + for column in columns: + column_meta_tableau = column.get('meta', {}).get('tableau') + if not column_meta_tableau: + logging.warning('(Skipping) Column does not contain tableau meta data: %s / %s -> %s', + datasource.project_name, datasource.name, column['name']) + continue + persona = self._personas[column_meta_tableau['persona']] + desc = column['description'].strip() if column.get('description') else None + metadata = { + 'class_name': 'column', + 'remote_name': column['name'], + 'remote_alias': column['name'], + 'local_name': column_meta_tableau['local_name'], + 'local_type': persona['datatype'], + 'object_id': '[Migrated Data]', + 'contains_null': True + } + metadata.update(persona['metadata']) + cfg_column = CFGColumn( + name=column_meta_tableau['local_name'], + caption=column_meta_tableau['alias'], + role=persona['role'], + type=persona['role_type'], + datatype=persona['datatype'], + desc=desc, + folder_name=column_meta_tableau['folder'], + remote_name=column['name'], + metadata=metadata + ) + if cfg_column.folder_name not in datasource.folders: + datasource.folders.append(CFGFolder(name=cfg_column.folder_name, role=cfg_column.role)) + datasource.columns.append(cfg_column) + # Add tableau calculation columns + for column in calculations: + desc = column['description'].strip() if column.get('description') else None + cfg_column = CFGColumn( + name=column['local_name'], + caption=column['alias'], + role=self._personas[column['persona']]['role'], + type=self._personas[column['persona']]['role_type'], + datatype=self._personas[column['persona']]['datatype'], + calculation=column['calculation'], + desc=desc, + folder_name=column['folder'] + ) + if cfg_column.folder_name not in datasource.folders: + datasource.folders.append(CFGFolder(name=cfg_column.folder_name, role=cfg_column.role)) + datasource.columns.append(cfg_column) + self.datasources.append(datasource) + + def __replace_caption_references(self, datasource_name: str, column: CFGColumn): + """ + Replaces captions in a column calculation with their local-name, + when calculation is provided + """ + # Get all distinct captions referenced in the calc + calc_captions = set(re.findall(r'\[.+?]', column.calculation)) + # Replace each caption, with the corresponding local-name defined in the column_config + for calc_caption in calc_captions: + caption = re.sub(r'[\[\]]+', '', calc_caption) + local_name = self.datasources[datasource_name].columns[caption].name + column.calculation = column.calculation.replace(calc_caption, local_name) + return column.calculation + + def __update_column_calcs(self): + """ + Updates the calculation for each column to reference the local-name of a column, + rather than caption + """ + for datasource in self.datasources: + for column in datasource.columns: + if not column.calculation: + continue + new_calc = self.__replace_caption_references(datasource.name, column) + self.datasources[datasource.name].columns[column.caption].calculation = new_calc diff --git a/airflow_example/dags/tableau_datasource_update/configs/datasource_project_config.json b/airflow_example/dags/tableau_datasource_update/configs/datasource_project_config.json deleted file mode 100644 index 943dc8da..00000000 --- a/airflow_example/dags/tableau_datasource_update/configs/datasource_project_config.json +++ /dev/null @@ -1,4 +0,0 @@ -{ - "First Datasource": "Project Name", - "Second Datasource": "Project Name" -} \ No newline at end of file diff --git a/airflow_example/dags/tableau_datasource_update/configs/tableau_calc_config.json b/airflow_example/dags/tableau_datasource_update/configs/tableau_calc_config.json deleted file mode 100644 index c092e07d..00000000 --- a/airflow_example/dags/tableau_datasource_update/configs/tableau_calc_config.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "Sample Custom Calculation Column": { - "description": "This is an example of a column description for a sample custom calculation column.", - "calculation": "COUNT(1)", - "folder": "Folder Name", - "persona": "continuous_number_measure", - "datasources": [ - { - "name": "First Datasource", - "local-name": "Calculation_1", - "sql_alias": "SAMPLE_CUSTOM_CALCULATION_COLUMN" - }, - { - "name": "Second Datasource", - "local-name": "Calculation_1", - "sql_alias": "SAMPLE_CUSTOM_CALCULATION_COLUMN" - } - ] - } -} \ No newline at end of file diff --git a/airflow_example/dags/tableau_datasource_update/dbt_model_sample.yaml b/airflow_example/dags/tableau_datasource_update/dbt_model_sample.yaml new file mode 100644 index 00000000..704a6ce4 --- /dev/null +++ b/airflow_example/dags/tableau_datasource_update/dbt_model_sample.yaml @@ -0,0 +1,33 @@ +version: 2 +models: + - name: my_datasource_model + docs: + show: True + meta: + tableau: + # Required + datasource: My Datasource Name + project: My Datasource Project Name + # Optional + calculations: + - local_name: Calculation_1 + alias: My Custom Calc + # Format the calculation the same as you would in Tableau; i.e. Column references should use the alias + calculation: 'COUNT([Unique ID])' + description: A count of unique IDs + persona: continuous_number_measure + folder: My Custom Calcs Folder + columns: + # (CASE SENSITIVE) Name casing should match the output casing of the SQL query + - name: UNIQUE_ID + description: A unique ID for each row in the datasource + tests: + - unique + meta: + tableau: + # (CASE SENSITIVE) local_name casing should match the underlying metadata local-name in the Datasource XML + # For new columns, this should match the SQL column name + local_name: UNIQUE_ID + alias: Unique ID + persona: discrete_number_dimension + folder: My Datasource Folder diff --git a/airflow_example/dags/tableau_datasource_update/tableau_datasource_update.md b/airflow_example/dags/tableau_datasource_update/tableau_datasource_update.md new file mode 100644 index 00000000..235d63c3 --- /dev/null +++ b/airflow_example/dags/tableau_datasource_update/tableau_datasource_update.md @@ -0,0 +1,23 @@ +## Overview +> The `tableau_datasource_update` DAG maintains our Tableau Datasources in Tableau Online. +> This DAG will update Columns, Folders, and Connections of a Datasource, based on the config YAML files. + +## DAG Tasks +### `gather_datasource_update_tasks` +> - Downloads each Datasource from Tableau Online (without the extract) +> - Parses each Datasource and compares attributes of the Datasource against the YAML config files +> - Returns a dictionary of actions (tasks) to update for each Datasource + +### `update_datasources` +> - Downloads each Datasource from Tableau Online (with the extract) that needs to be updated, +> based on the tasks from the previous task +> - Updates the Datasources locally that were Downloaded, based on the tasks of actions from the previous task +> - Publishes the Datsources that were updated + +- _Will skip datasources if they are specified in the `EXCLUDED_DATASOURCES` variable_ +- _Will kick off `refresh_datasources` if there is a failure during this task_ + +### `refresh_datasources` +> - Refreshes all Datasources in Tableau Online + +- _Will skip datasources if they are specified in the `NO_REFRESH_DATASOURCES` variable_ \ No newline at end of file diff --git a/airflow_example/dags/tableau_datasource_update/tableau_datasource_update.py b/airflow_example/dags/tableau_datasource_update/tableau_datasource_update.py index c33adbf6..71b2df2e 100644 --- a/airflow_example/dags/tableau_datasource_update/tableau_datasource_update.py +++ b/airflow_example/dags/tableau_datasource_update/tableau_datasource_update.py @@ -1,451 +1,354 @@ """ - Updates each Tableau datasource's columns/connection/etc, according to the config files. +Updates each Tableau datasource's columns/connection/etc, according to the config files. """ import datetime -import time import logging -import json import os import shutil -import re import ast -from copy import deepcopy -from tdscc import TDS, TableauServer, extract_tds, update_tdsx, TDSCCError +from tableau_utilities import Datasource, TableauServer +from tableau_utilities.tableau_file.tableau_file_objects import Folder, Column, MetadataRecord +import dags.tableau_datasource_update.configs.configuration as cfg from airflow import DAG, models -from airflow.operators.python_operator import PythonOperator -from airflow.hooks.base_hook import BaseHook - - -CFG_PATH = 'dags/tableau_datasource_update/configs' -with open(os.path.join(CFG_PATH, 'column_persona_config.json')) as read_config: - PERSONA_CFG = json.load(read_config) -with open(os.path.join(CFG_PATH, 'datasource_project_config.json')) as read_config: - DS_PROJECT_CFG = json.load(read_config) -with open(os.path.join(CFG_PATH, 'column_config.json')) as read_config: - COLUMN_CFG = json.load(read_config) -with open(os.path.join(CFG_PATH, 'tableau_calc_config.json')) as read_config: - CALC_CFG = json.load(read_config) - - -def invert_config(iterator, config): - """ Helper function to invert the column config and calc config. - Output -> {datasource: {column: info}} - - Args: - iterator (dict): The iterator to append invert data to. - config (dict): The config to invert. - """ - for column, i in config.items(): - for datasource in i['datasources']: - new_info = deepcopy(i) - del new_info['datasources'] - new_info['local-name'] = datasource['local-name'] - new_info['remote_name'] = datasource['sql_alias'] if 'sql_alias' in datasource else None - iterator.setdefault(datasource['name'], {column: new_info}) - iterator[datasource['name']].setdefault(column, new_info) - - -# Setup the column_cfg for the rest of the DAG -temp = dict() -invert_config(temp, COLUMN_CFG) -invert_config(temp, CALC_CFG) -COLUMN_CFG = deepcopy(temp) -del temp +from airflow.operators.python import PythonOperator +from airflow.hooks.base import BaseHook +# This is our custom SnowflakeHook - Your code will need to be adapted +from plugins.snowflake_connection.snowflake_operator_manual_update import SnowflakeHook + +UPDATE_ACTIONS = [ + 'add_metadata', + 'add_column', + 'modify_column', + 'add_folder', + 'delete_folder', + 'update_connection' +] + + +def get_tableau_server(tableau_conn_id: str): + """ Returns a TableauServer object """ + conn = BaseHook.get_connection(tableau_conn_id) + api_version = conn.extra_dejson.get('api_version') + if api_version: + api_version = float(api_version) + return TableauServer( + host=conn.host + , site=conn.extra_dejson.get('site') + , api_version=api_version + , personal_access_token_name=conn.extra_dejson.get('personal_access_token_name') + , personal_access_token_secret=conn.extra_dejson.get('personal_access_token_secret') + ) -def refresh_datasource(tasks, tableau_conn_id='tableau_default', snowflake_conn_id='snowflake_default'): +def refresh_datasources(tasks, tableau_conn_id='tableau_default'): """ Refresh a datasource extract. Args: - tasks: A dictionary with the columns to add or modify. + tasks (str|dict): A dictionary of the actions for updating the datasource. tableau_conn_id (str): The Tableau connection ID - snowflake_conn_id (str): The connection ID for Snowflake. """ - if isinstance(tasks, str): - tasks = ast.literal_eval(tasks) + tasks: dict = ast.literal_eval(tasks) + no_refresh = ast.literal_eval(models.Variable.get('NO_REFRESH_DATASOURCES')) + ts = get_tableau_server(tableau_conn_id) - conn = BaseHook.get_connection(tableau_conn_id) - ts = TableauServer(user=conn.login, password=conn.password, url=conn.host, - site=conn.extra_dejson['site']) + for datasource_id in tasks: + datasource_name = tasks[datasource_id]['datasource_name'] + # All listed datasources in this variable won't be refreshed + # Common use-case for not refreshing a datasource, is because it has a live connection + if datasource_name in no_refresh: + logging.info('(Marked to not refresh) Skipping Refresh: %s %s', datasource_id, datasource_name) + continue - snowflake_conn = BaseHook.get_connection(snowflake_conn_id) - embeded_credentials_attempts = 0 - embed_tries = 10 - while embeded_credentials_attempts < embed_tries: try: - ts.embed_credentials(tasks["dsid"], connection_type='snowflake', - credentials={'username': snowflake_conn.login, - 'password': snowflake_conn.password}) - logging.info('Successfully embedded credentials') - embeded_credentials_attempts = embed_tries - except AttributeError as err: - if embeded_credentials_attempts < embed_tries - 1: - embeded_credentials_attempts += 1 - time.sleep(10) - logging.warning('Embedding credentials failed: %s', err) - logging.info('Retrying embedding credentials: %s / %s attempts', - embeded_credentials_attempts, embed_tries) + ts.refresh_datasource(datasource_id) + logging.info('Refreshed: %s %s', datasource_id, datasource_name) + except Exception as error: + if 'Not queuing a duplicate.' in str(error): + logging.info(error) + logging.info('(Refresh already running) Skipping Refresh: %s %s', + datasource_id, datasource_name) else: - raise Exception(err) from err - - # All listed datasources in this variable won't be refreshed - # Common use-case for not refreshing a datasource, is because it has a live connection - no_refresh = ast.literal_eval(models.Variable.get('NO_REFRESH_DATASOURCES')) - if tasks['datasource_name'] in no_refresh: - logging.info('No refresh required - skipping refresh of %s %s', - tasks["dsid"], tasks['datasource_name']) - return None - - try: - ts.refresh_datasource(tasks["dsid"]) - logging.info('Refreshed %s %s', tasks["dsid"], tasks['datasource_name']) - except Exception as error: - if isinstance(error, TDSCCError) or 'Not queuing a duplicate.' in str(error): - logging.info(error) - logging.info('Skipping Refresh %s %s ... Already running', - tasks["dsid"], tasks['datasource_name']) - else: - raise Exception(error) from error + raise Exception(error) from error class TableauDatasourceTasks(models.BaseOperator): """ Compares config files to the published datasource, to get a dictionary of tasks needing to be updated. - Args: - snowflake_conn_id (str): The Snowflake connection ID, used for the datasource connection info - tableau_conn_id (str): The Tableau connection ID - datasource_name (str): The name of the datasource - project (str): The project of the datasource in Tableau Online - column_cfg (dict): The config information for the datasource. + Keyword Args: + snowflake_conn_id (str): The connection ID for Snowflake, used for the datasource connection info + tableau_conn_id (str): The connection ID for Tableau + github_conn_id (str): The connection ID for GitHub Returns: A dict of tasks to be updated for the datasource. """ def __init__(self, *args, **kwargs): + self.snowflake_conn_id = kwargs.pop('snowflake_conn_id', 'gcp_snowflake_default') + self.tableau_conn_id = kwargs.pop('tableau_conn_id', None) + self.github_conn_id = kwargs.pop('github_conn_id', None) super().__init__(*args, **kwargs) - self.snowflake_conn_id = kwargs.get('snowflake_conn_id', 'snowflake_default') - self.tableau_conn_id = kwargs.get('tableau_conn_id', 'tableau_default') - self.actions = ['add_column', 'modify_column', 'add_folder', 'delete_folder', 'update_connection'] - self.tasks = {a: [] for a in self.actions} - self.tasks['datasource_name'] = kwargs.get('datasource_name') - self.tasks['project'] = kwargs.get('project') - self.column_cfg = kwargs.get('column_cfg', {}).get(self.tasks['datasource_name']) - self.persona_cfg = kwargs.get('persona_cfg') - self.tds = None + # Set on execution + self.tasks = dict() def __set_connection_attributes(self): """ Sets attributes of the datasource connection. """ - snowflake_conn = BaseHook.get_connection(self.snowflake_conn_id) - - # In the event that we switch to a different data warehouse, - # we will need to update the host specification here - if snowflake_conn.conn_type.lower() != 'snowflake': - raise Exception('Connection must be of type: Snowflake') - - return { - 'conn_type': 'snowflake', - 'conn_db': snowflake_conn.extra_dejson['database'], - 'conn_schema': snowflake_conn.extra_dejson['schema'], - 'conn_host': f'{snowflake_conn.host}.snowflakecomputing.com', - 'conn_role': snowflake_conn.extra_dejson['role'], - 'conn_user': snowflake_conn.login, - 'conn_warehouse': snowflake_conn.extra_dejson['warehouse'] - } - - @staticmethod - def __set_column_attributes(caption, col_config, persona_cfg): - """ Sets attributes as a function of caption and cfg. + snowflake_hook = SnowflakeHook(self.snowflake_conn_id) - :param caption: The top-level key of the cfg. - :param col_config: A dictionary of the column info from the config file - :param persona_cfg: A dictionary of the persona config file - - """ return { - 'caption': caption, - 'column_name': col_config['local-name'], - 'role': persona_cfg['role'], - 'role_type': persona_cfg['role_type'], - 'datatype': persona_cfg['datatype'], - 'desc': col_config['description'], - 'calculation': col_config['calculation'], - 'folder_name': col_config['folder'], - 'remote_name': col_config['remote_name'] + 'class_name': 'snowflake', + 'dbname': snowflake_hook.database, + 'schema': snowflake_hook.schema, + 'server': f'{snowflake_hook.account}.snowflakecomputing.com', + 'service': snowflake_hook.role, + 'username': snowflake_hook.user, + 'warehouse': snowflake_hook.warehouse } - def __is_column_in_folder(self, col_attributes): - """ Determine if the column is in the given folder. - - Args: - col_attributes: Dict of column attributes. - - Returns: True, False, or None if the folder doesn't exist. - """ - - folder = TDS(tds=self.td).get('folder', **col_attributes) - if not folder: - return None - if folder.get('folder-item') is None: - return False - folder_items = folder.get('folder-item') - if isinstance(folder_items, list): - for folder_item in folder_items: - if folder_item['@name'] == f'[{col_attributes["column_name"]}]': - return True - else: - if folder_items['@name'] == f'[{col_attributes["column_name"]}]': - return True - return False - - def __add_task(self, action, attributes): + def __add_task(self, datasource_id, action, cfg_attrs, tds_attrs=None): """ Add a task to the dictionary of tasks: - add_column, modify_column, add_folder, or delete_folder + add_column, modify_column, add_folder, delete_folder, or update_connection - Sample: - { - "dsid": "abc123def456", - "datasource_name": "Datasource Name Here", - "project": "General", + Sample: { + "abc123def456": { + "datasource_name": "Datasource Name", + "project": "Project Name", "add_column": [attrib, attrib], "modify_column": [attrib, attrib] "add_folder": [attrib, attrib] "delete_folder": [attrib, attrib] "update_connection": [attrib, attrib] } + } Args: - action: The name of action to do. - attributes: Dict of attributes for the action to use. + datasource_id (str): The ID of the datasource + action (str): The name of action to do. + cfg_attrs (dict): Dict of attributes for the action to use, from the config. + tds_attrs (dict): (Optional) Dict of the attributes from the Datasource, for log comparison. """ - if action and action not in self.actions: + if action and action not in UPDATE_ACTIONS: raise Exception(f'Invalid action {action}') if action: - self.tasks[action].append(attributes) - logging.info('Adding to task table action: %s dsid: %s Datasource Name: %s Attributes: %s', - action, self.tasks['dsid'], self.tasks['datasource_name'], attributes) - - def __append_folders_from_tds(self, folders_from_tds): - """ Appends the folders_from_tds with folders found in the datasource - - :param folders_from_tds: The table to append information to - :return: None - """ - if self.tasks['dsid'] not in folders_from_tds: - folders = TDS(tds=self.tds).list('folder') - if isinstance(folders, list): - folders = [{'name': f['@name'], - 'role': f['@role'][:-1] if '@role' in f else None} - for f in folders] - elif folders: - folders = [{'name': folders['@name'], - 'role': folders['@role'][:-1] if '@role' in folders else None}] - else: - folders = [] - folders_from_tds[self.tasks['dsid']] = {'name': self.tasks['datasource_name'], - 'folders': folders} - - def __append_folders_from_config(self, col_attributes, folders_from_cfg): - """ Appends the folders_from_cfg with folders found in the config - - Args: - col_attributes: Dict of attributes about the column - folders_from_cfg: The table to append information to - """ - folder_info = {'name': col_attributes['folder_name'], 'role': col_attributes['role']} - if self.tasks['dsid'] not in folders_from_cfg: - folders_from_cfg[self.tasks['dsid']] = {'name': self.tasks['datasource_name'], - 'folders': [folder_info]} - elif folder_info not in folders_from_cfg[self.tasks['dsid']]['folders']: - folders_from_cfg[self.tasks['dsid']]['folders'].append(folder_info) + self.tasks[datasource_id][action].append(cfg_attrs) + datasource_name = self.tasks[datasource_id]['datasource_name'] + logging.info( + ' > (Adding task) %s: %s %s\nAttributes:\n\t%s\n\t%s', + action, datasource_id, datasource_name, cfg_attrs, tds_attrs + ) @staticmethod - def __different_column(tds_col, attributes): + def __get_column_diffs(tds_col, cfg_column): """ Compare the column from the tds to attributes we expect. Args: - tds_col: The OrderedDict column from the tds. - attributes: The column attributes generated from the config file entry. + tds_col (Column): The Tableau Column object from the datasource. + cfg_column (cfg.CFGColumn): The column from the Config. - Returns: bool + Returns: A dict of differences """ - diff = False - # If there is no column - either in the config or the source - then return false - if not tds_col or not attributes: - return diff - - if '@caption' in tds_col and tds_col['@caption'] != attributes.get('caption'): - diff = True - if 'desc' in tds_col and tds_col['desc']['formatted-text']['run'] != attributes.get('desc'): - diff = True - if 'calculation' in tds_col and tds_col['calculation']['@formula'] != attributes.get('calculation'): - diff = True - if '@role' in tds_col and tds_col['@role'] != attributes.get('role'): - diff = True - if '@type' in tds_col and tds_col['@type'] != attributes.get('role_type'): - diff = True - if '@datatype' in tds_col and tds_col['@datatype'] != attributes.get('datatype'): - diff = True - return diff + different_value_attrs = dict() + # If there is no column, either in the Datasource.columns or the config, then return False + if not tds_col or not cfg_column: + return different_value_attrs + # Get a list of attributes that have different values in the Datasource Column vs the config + cfg_attrs = cfg_column.dict() + cfg_attrs.pop('folder_name', None) + cfg_attrs.pop('remote_name', None) + for attr, value in cfg_attrs.items(): + tds_value = getattr(tds_col, attr) + if tds_value != value: + different_value_attrs[attr] = tds_value + # Return the different attributes + if different_value_attrs: + logging.info(' > (Column diffs) %s: %s', cfg_column.caption, different_value_attrs) + return different_value_attrs + + def __compare_column_metadata(self, datasource_id: str, tds: Datasource, column: cfg.CFGColumn): + """ Compares the metadata of the column """ + if not column.remote_name: + return False + # Return true, If the metadata exists, but is different + metadata: MetadataRecord = tds.connection.metadata_records.get(column.remote_name) + if metadata and metadata.local_name != column.name: + return True + # Add task to add the metadata if it doesn't exist + if not metadata: + logging.warning('Column metadata does not exist - may be missing in the SQL: %s', + column.remote_name) + metadata_attrs = { + 'conn': { + 'parent_name': f'[{tds.connection.relation.name}]', + 'ordinal': len(tds.connection.metadata_records) + + len(self.tasks[datasource_id]['add_metadata']), + }, + 'extract': { + 'parent_name': f'[{tds.extract.connection.relation.name}]', + 'ordinal': len(tds.extract.connection.metadata_records) + + len(self.tasks[datasource_id]['add_metadata']), + 'family': tds.connection.relation.name + }, + } + metadata_attrs['conn'].update(column.metadata) + metadata_attrs['extract'].update(column.metadata) + self.__add_task(datasource_id, 'add_metadata', metadata_attrs) - @staticmethod - def __different_connection(tds_conn, credentials): - """ Compare the connection from the tds to attributes we expect. + return False - Args: - tds_conn (dict): The OrderedDict connection from the tds. - credentials (dict): The credentials that should be attributed to the connection + @staticmethod + def __compare_tds_metadata(tds: Datasource, config: cfg.CFGDatasource): + """ Compares the metadata of the Datasource against the config columns """ + for metadata in tds.connection.metadata_records: + remote_name_exists = False + for column in config.columns: + if metadata.remote_name == column.remote_name: + remote_name_exists = True + if remote_name_exists: + continue + logging.warning('Columns is not defined in the config: %s / %s', + config.name, metadata.remote_name) - Returns: True if there is a difference between the connection in the source and the credentials provided - """ - diff = False - if not tds_conn or not credentials: - return diff - - if '@class' in tds_conn and tds_conn['@class'] != credentials.get('conn_type'): - diff = True - if '@dbname' in tds_conn and tds_conn['@dbname'] != credentials.get('conn_db'): - diff = True - if '@schema' in tds_conn and tds_conn['@schema'] != credentials.get('conn_schema'): - diff = True - if '@server' in tds_conn and tds_conn['@server'] != credentials.get('conn_host'): - diff = True - if '@service' in tds_conn and tds_conn['@service'] != credentials.get('conn_role'): - diff = True - if '@username' in tds_conn and tds_conn['@username'] != credentials.get('conn_user'): - diff = True - if '@warehouse' in tds_conn and tds_conn['@warehouse'] != credentials.get('conn_warehouse'): - diff = True - return diff - - def __add_folder_task_exists(self, col_attributes): - """ Checks if the add_folder task has already been added as a task for the datasource. + @staticmethod + def __compare_column_mapping(tds: Datasource, column: cfg.CFGColumn): + """ Compares the expected column mapping to the mapping in the Datasource """ + # Get the column metadata + metadata = None + if column.remote_name: + metadata = tds.connection.metadata_records.get(column.remote_name) + # Mapping is not required when there is no metadata, + # or if there is no cols section and the local_name is the same as the remote name + mapping_not_required = ( + not metadata + or not tds.connection.cols and column.name[1:-1] == column.remote_name + ) + # Return True if mapping is needed + return not ( + # If mapping is not required, or the column is already mapped the cols section + mapping_not_required or { + 'key': column.name, + 'value': f'{metadata.parent_name}.[{column.remote_name}]' + } in tds.connection.cols + ) + + def __compare_connection(self, dsid, ds_name, tds_connection, expected_attrs): + """ Compare the connection from the Datasource to attributes we expect. + If there is a difference, add a task to update the connection. Args: - col_attributes (dict): A dict of attributes about the column - - Returns: True if the task exists + dsid (str): The Datasource ID. + ds_name (str): The Datasource name. + tds_connection (Datasource.connection): The Datasource.connection object. + expected_attrs (dict): The dict of expected connection attributes. """ - try: - add_folder_task_folder_role_exists = [ - a for a in self.tasks['add_folder'] - if a['folder_name'] == col_attributes['folder_name'] and a['role'] == col_attributes['role']] - return add_folder_task_folder_role_exists != [] - except KeyError: - return False + named_conn = tds_connection.named_connections[expected_attrs['class_name']] + tds_conn = tds_connection[expected_attrs['class_name']] + if not tds_conn: + logging.warning('Datasource does not have a %s connection: %s', + expected_attrs['class_name'], ds_name) + # Check for a difference between the Datasource connection and the expected connection information + connection_diff = False + if expected_attrs['server'] != named_conn.caption: + connection_diff = True + for attr, value in expected_attrs.items(): + tds_attr_value = getattr(tds_conn, attr) + if tds_attr_value and tds_attr_value.lower() != value.lower(): + connection_diff = True + # Add a task if there is a difference + if connection_diff: + self.__add_task(dsid, 'update_connection', expected_attrs, tds_conn.dict()) + else: + logging.info(' > (No changes needed) Connection: %s', ds_name) - def __compare_folders(self, folders_from_tds, folders_from_cfg): + def __compare_folders(self, datasource_id, tds_folders, cfg_folders): """ Compares folders found in the datasource and in the config. - If there are folders in the source that are not in the config, - a task will be added to delete the folder. + - If there are folders in the source that are not in the config, + a task will be added to delete the folder. + - If there are folders in the config that are not in the datasource, + a task will be added to add the folder. Args: - folders_from_tds (dict): The table of folders from the datasource's tds - folders_from_cfg (dict): The table of folders from the config + tds_folders (Datasource.folders_common): The dict of folders from the Datasource + cfg_folders (cfg.CFGList[cfg.CFGFolder]): The dict of folders from the Config """ - - def exists(tds_f, cfg_folders): - for cfg_f in cfg_folders: - if tds_f['name'] == cfg_f['name']: - if not tds_f['role'] or tds_f['role'] == cfg_f['role']: - return True - return False - - for dsid, ds_info in folders_from_tds.items(): - for tds_f in ds_info['folders']: - cfg_folders = folders_from_cfg[dsid]['folders'] - if not exists(tds_f, cfg_folders): - self.__add_task( - action='delete_folder', - attributes={'folder_name': tds_f['name'], 'role': tds_f['role']} - ) + for tds_folder in tds_folders: + if not cfg_folders.get(tds_folder): + self.__add_task(datasource_id, 'delete_folder', {'name': tds_folder.name}) + for cfg_folder in cfg_folders: + if not tds_folders.get(cfg_folder): + self.__add_task(datasource_id, 'add_folder', {'name': cfg_folder.name}) def execute(self, context): """ Update Tableau datasource according to config. """ - conn = BaseHook.get_connection(self.tableau_conn_id) - ts = TableauServer(user=conn.login, password=conn.password, url=conn.host, - site=conn.extra_dejson['site']) - - dsid_tbl = ts.list_datasources(print_info=False) - self.tasks['dsid'] = dsid_tbl.get((self.tasks['project'], self.tasks['datasource_name'])) - - folders_from_tds = {} - folders_from_cfg = {} - - if self.tasks['dsid'] is None: - logging.error('No datasource %s in %s.', self.tasks['datasource_name'], self.tasks['project']) - return None - - dl_path = f"downloads/{self.tasks['dsid']}/" - os.makedirs(dl_path, exist_ok=True) - tdsx = ts.download_datasource(self.tasks['dsid'], filepath=dl_path, include_extract=False) - self.tds = extract_tds(tdsx) - # Clean up downloaded and extracted files - shutil.rmtree(dl_path, ignore_errors=True) - - # Add connection task - conn_attribs = self.__set_connection_attributes() - tds_conn = TDS(tds=self.tds).list('connection') - if isinstance(tds_conn, list): - tds_conn = tds_conn[0]['connection'] - else: - tds_conn = tds_conn['connection'] - if self.__different_connection(tds_conn, conn_attribs): - self.__add_task(action='update_connection', attributes=conn_attribs) - else: - logging.info('No changes needed for connection in %s', self.tasks['datasource_name']) - - # Add Column and Folder tasks - for caption, col_info in self.column_cfg.items(): - # Replace full column names with their local-name in the calculation - if 'calculation' in col_info and col_info['calculation']: - captions = set(re.findall(r'\[.+?\]', col_info['calculation'])) - for full_name in captions: - key = re.sub(r'[\[\]]+', '', full_name) - if key in self.column_cfg: - col_info['calculation'] = col_info['calculation'].replace( - full_name, f"[{self.column_cfg[key]['local-name']}]") - else: - col_info['calculation'] = None - - column_attribs = self.__set_column_attributes( - caption, col_info, self.persona_cfg[col_info['persona']]) - - column_from_tds = TDS(tds=self.tds).get('column', **column_attribs) - - metadata_diff = False - if column_attribs['remote_name']: - metadata = TDS(tds=self.tds).get('datasource-metadata', **column_attribs) - metadata_local_name = re.sub(r'^\[|]$', '', metadata['local-name']) if metadata else None - if metadata_local_name and metadata_local_name != column_attribs['column_name']: - metadata_diff = True - - if column_from_tds: - column_from_tds = dict(column_from_tds) - column_from_tds['@name'] = re.sub(r'^\[|]$', '', column_from_tds['@name']) - folder_check = self.__is_column_in_folder(column_attribs) - different_column = self.__different_column(column_from_tds, column_attribs) - # If the folder is missing and there is not already a task to add this folder/role, - # then add the task - if folder_check is None and not self.__add_folder_task_exists(column_attribs): - self.__add_task(action='add_folder', attributes=column_attribs) - if not column_from_tds: - self.__add_task(action='add_column', attributes=column_attribs) - elif different_column or not folder_check or metadata_diff: - self.__add_task(action='modify_column', attributes=column_attribs) - else: - logging.info('No changes needed for %s in %s', caption, self.tasks['datasource_name']) - - # Get the table of folders from the datasource - self.__append_folders_from_tds(folders_from_tds) - - # Get the table of folders from the config - self.__append_folders_from_config(column_attribs, folders_from_cfg) - # Add tasks to delete folders that are not in the config for each datasource - self.__compare_folders(folders_from_tds, folders_from_cfg) + github_conn = BaseHook.get_connection(self.github_conn_id) + config = cfg.Config( + githup_token=github_conn.password, + repo_name=github_conn.extra_dejson.get('repo_name'), + repo_branch=github_conn.extra_dejson.get('repo_branch'), + subfolder=github_conn.extra_dejson.get('subfolder') + ) + ts = get_tableau_server(self.tableau_conn_id) + expected_conn_attrs = self.__set_connection_attributes() + + # Get the ID for each datasource in the config + for ds in ts.get_datasources(): + if ds not in config.datasources: + continue + config.datasources[ds].id = ds.id + + for datasource in config.datasources: + logging.info('Checking Datasource: %s', datasource.name) + if not datasource.id: + logging.error('!! Datasource not found in Tableau Online: %s / %s', + datasource.project_name, datasource.name) + continue + dsid = datasource.id + # Set default dict attributes for tasks, for each datasource + self.tasks[dsid] = {a: [] for a in UPDATE_ACTIONS} + self.tasks[dsid]['project'] = datasource.project_name + self.tasks[dsid]['datasource_name'] = datasource.name + # Download the Datasource for comparison + dl_path = f"downloads/{dsid}/" + os.makedirs(dl_path, exist_ok=True) + ds_path = ts.download_datasource(dsid, file_dir=dl_path, include_extract=False) + tds = Datasource(ds_path) + # Cleanup downloaded file after assigning the Datasource + shutil.rmtree(dl_path, ignore_errors=True) + # Compare the Datasource metadata to the config datasource columns + self.__compare_tds_metadata(tds, datasource) + # Add connection task, if there is a difference + self.__compare_connection(dsid, datasource.name, tds.connection, expected_conn_attrs) + # Add folder tasks, if folders need to be added/deleted + self.__compare_folders(dsid, tds.folders_common, datasource.folders) + # Add Column tasks, if there are missing columns, or columns need to be updated + for column in datasource.columns: + # Check the column metadata for differences + metadata_update_needed = self.__compare_column_metadata(dsid, tds, column) + # Check if the column needs mapping + column_needs_mapping = self.__compare_column_mapping(tds, column) + # Check the column for updates + tds_column: Column = tds.columns.get(column.name) + column_diffs: dict = self.__get_column_diffs(tds_column, column) + tds_folder: Folder = tds.folders_common.get(column.folder_name) + not_in_folder: bool = tds_folder is None or tds_folder.folder_item.get(column.name) is None + if not tds_column: + self.__add_task(dsid, action='add_column', cfg_attrs=column.dict()) + elif column_diffs or not_in_folder or metadata_update_needed or column_needs_mapping: + self.__add_task( + dsid, + action='modify_column', + cfg_attrs=column.dict(), + tds_attrs={ + 'column_diffs': column_diffs, + 'not_in_folder': not_in_folder, + 'column_needs_mapping': column_needs_mapping, + 'metadata_update_needed': metadata_update_needed + } + ) + else: + logging.info(' > (No changes needed) Column: %s / %s', datasource.name, column.caption) return self.tasks @@ -455,111 +358,117 @@ class TableauDatasourceUpdate(models.BaseOperator): Makes all necessary updates to the datasource. Publishes the datasource. - Args: + Keyword Args: tasks_task_id (str): The task_id of the task that ran the TableauDatasourceTasks operator. snowflake_conn_id (str): The connection ID for Snowflake. tableau_conn_id (str): The Tableau connection ID """ def __init__(self, *args, **kwargs): + self.tasks_task_id = kwargs.pop('tasks_task_id') + self.tableau_conn_id = kwargs.pop('tableau_conn_id', 'tableau_default') + self.snowflake_conn_id = kwargs.pop('snowflake_conn_id', 'gcp_snowflake_default') super().__init__(*args, **kwargs) - self.tasks_task_id = kwargs.get('tasks_task_id') - self.tableau_conn_id = kwargs.get('tableau_conn_id', 'tableau_default') - self.snowflake_conn_id = kwargs.get('snowflake_conn_id', 'snowflake_default') - # Set on execute - self.tasks = None - self.tds = None - - def __has_any_task(self): - """ Check if there are any tasks to be done """ - for attributes in self.tasks.values(): + + @staticmethod + def __has_tasks_to_do(tasks): + """ Check if there are any tasks to be done + + Args: + tasks (dict): The tasks to be done + """ + for attributes in tasks.values(): if isinstance(attributes, list) and attributes: return True return False - def __do_action(self, tdsx, action_name, action, item_type): - """ Executes all of the action items + @staticmethod + def __do_action(tasks, tds, action): + """ Executes the action, for each item to do for that action Args: - tdsx (str): The path to the TDSX file - action_name (str): The name of the action to be done - action (str): The action to be done - item_type (str): The type of item + tasks (dict): The dict of tasks to be done + tds (Datasource): The Tableau Datasource object + action (str): The name of the action to be done """ - for item in self.tasks[action_name]: - try: - logging.info('Going to %s %s: %s -- %s', item_type, tdsx, item) - TDS(tds=self.tds).__getattribute__(action)(item_type, **item) - except TDSCCError as err: - # If a source is updated again before it has refreshed in tableau, - # it will not detect the folders in the source, and try to add them all again - if item_type == 'folder' and action == 'add': - logging.info('Skipping Adding Folder: %s -- Already Exists', item['folder_name']) - - # TEMP -- THIS SHOULD BE REMOVED EVENTUALLY - # THIS IS HERE SO THAT COLUMNS THAT DO NOT EXIST WILL NOT ERROR ON MISSING METADATA - elif item_type == 'column' and action == 'add' and 'Metadata does not exist' in str(err): - logging.warning('Missing SQL metadata for column: %s', item['remote_name']) - temp_item = deepcopy(item) - del temp_item['remote_name'] - TDS(tds=self.tds).__getattribute__(action)(item_type, **temp_item) + for attrs in tasks[action]: + logging.info(' > (Update) %s: %s', action, attrs) + if action == 'add_metadata': + tds.connection.metadata_records.add(MetadataRecord(**attrs['conn'])) + if tds.extract: + tds.extract.connection.metadata_records.add(MetadataRecord(**attrs['extract'])) + if action in ['modify_column', 'add_column']: + folder_name: str = attrs.pop('folder_name', None) + remote_name: str = attrs.pop('remote_name', None) + tds.enforce_column(Column(**attrs), folder_name, remote_name) + if action == 'add_folder': + tds.folders_common.add(Folder(**attrs)) + if action == 'delete_folder': + tds.folders_common.delete(attrs['name']) + if action == 'update_connection': + # Only update the attributes of the connection we specify. + # There are some attributes of a connection we do not need to update, + # but are provided in the existing connection. + connection = tds.connection[attrs['class_name']] + for attr, value in attrs.items(): + setattr(connection, attr, value) + tds.connection.update(connection) def execute(self, context): - self.tasks = context['ti'].xcom_pull(task_ids=self.tasks_task_id) - - conn = BaseHook.get_connection(self.tableau_conn_id) - ts = TableauServer(user=conn.login, password=conn.password, - url=conn.host, site=conn.extra_dejson['site']) - - dl_path = f'downloads/{self.tasks["dsid"]}/' - try: - # Skip updating these datasources until we optimize them well enough - # to downloaded & published without timing out + all_tasks = context['ti'].xcom_pull(task_ids=self.tasks_task_id) + ts = get_tableau_server(self.tableau_conn_id) + snowflake_conn = BaseHook.get_connection(self.snowflake_conn_id) + snowflake_creds = {'username': snowflake_conn.login, 'password': snowflake_conn.password} + errors = list() + # Attempt to update each Datasource + for dsid, tasks in all_tasks.items(): + dl_path = f'downloads/{dsid}/' + datasource_name = tasks['datasource_name'] + project = tasks['project'] + # Skipping datasources if they cannot be downloaded / published without timing out excluded = ast.literal_eval(models.Variable.get('EXCLUDED_DATASOURCES')) - if self.tasks['datasource_name'] in excluded: - logging.info('Marked as excluded - Skipping Updating Datasource: %s', - self.tasks['datasource_name']) - return None - - if self.__has_any_task(): + if datasource_name in excluded: + logging.info('(Marked to exclude) Skipping Datasource: %s', datasource_name) + continue + # Skipping datasources if they have no tasks that need to be updated + if not self.__has_tasks_to_do(tasks): + logging.info('(No tasks to update) Skipping Datasource: %s', datasource_name) + continue + # Update the Datasource + try: # Download os.makedirs(dl_path, exist_ok=True) - logging.info('Downloading Datasource: %s -> %s', - self.tasks['project'], self.tasks['datasource_name']) - tdsx = ts.download_datasource(dsid=self.tasks['dsid'], filepath=dl_path, include_extract=True) + logging.info('Downloading Datasource: %s / %s', project, datasource_name) + ds_path = ts.download_datasource(dsid, file_dir=dl_path, include_extract=True) # Update - logging.info('Extracting tds from %s -> %s: %s', - self.tasks['project'], self.tasks['datasource_name'], tdsx) - self.tds = extract_tds(tdsx) - self.__do_action(tdsx, 'update_connection', 'update', 'connection') - self.__do_action(tdsx, 'add_folder', 'add', 'folder') - self.__do_action(tdsx, 'add_column', 'add', 'column') - self.__do_action(tdsx, 'modify_column', 'update', 'column') - self.__do_action(tdsx, 'delete_folder', 'delete', 'folder') - logging.info('Updating tdsx %s %s', - self.tasks["dsid"], self.tasks['datasource_name']) - update_tdsx(tdsx_path=tdsx, tds=self.tds) + logging.info('Updating Datasource: %s / %s', project, datasource_name) + datasource = Datasource(ds_path) + self.__do_action(tasks, datasource, 'add_metadata') + self.__do_action(tasks, datasource, 'update_connection') + self.__do_action(tasks, datasource, 'add_folder') + self.__do_action(tasks, datasource, 'add_column') + self.__do_action(tasks, datasource, 'modify_column') + self.__do_action(tasks, datasource, 'delete_folder') + datasource.save() # Publish - logging.info('About to publish %s %s %s', - self.tasks["dsid"], self.tasks['datasource_name'], tdsx) - ts.publish_datasource(tdsx, self.tasks["dsid"], keep_tdsx=False) - logging.info('Published %s %s', self.tasks["dsid"], self.tasks['datasource_name']) - else: - logging.info('No tasks to update - Skipping Updating Datasource: %s', - self.tasks['datasource_name']) - - except Exception as e: - refresh_datasource(self.tasks, self.tableau_conn_id, self.snowflake_conn_id) - raise Exception(e) - finally: - # Clean up downloaded and extracted files - shutil.rmtree(dl_path, ignore_errors=True) - - return None + logging.info('Publishing Datasource: %s / %s -> %s', project, datasource_name, dsid) + ts.publish_datasource(ds_path, dsid, connection=snowflake_creds) + logging.info('Published Successfully: %s / %s -> %s', project, datasource_name, dsid) + os.remove(ds_path) + except Exception as e: + # Log the error, but wait to fail the task until all Datasources have been attempted + logging.error(e) + errors.append(e) + finally: + # Clean up downloaded and extracted files + shutil.rmtree(dl_path, ignore_errors=True) + # Fail task if there were errors updating any Datasources + if errors: + refresh_datasources(all_tasks, self.tableau_conn_id) + raise Exception(f'Some datasources had errors when updating.\n{errors}') default_dag_args = { - 'start_date': datetime.datetime(2020, 8, 1), - 'retries': 2 + 'start_date': datetime.datetime(2020, 8, 1) } dag = DAG( @@ -570,35 +479,33 @@ def execute(self, context): max_active_runs=1, default_args=default_dag_args) -for ds, project in DS_PROJECT_CFG.items(): - task_name = re.sub(r'[^a-zA-Z0-9]+', '_', ds).lower() - add_tasks = TableauDatasourceTasks( - dag=dag, - task_id=f'add_tasks_{task_name}', - snowflake_conn_id='snowflake_tableau_datasource', - tableau_conn_id='tableau_default', - datasource_name=ds, - project=project, - column_cfg=COLUMN_CFG, - persona_cfg=PERSONA_CFG - ) - - update = TableauDatasourceUpdate( - dag=dag, - task_id=f'update_{task_name}', - snowflake_conn_id='snowflake_tableau_datasource', - tableau_conn_id='tableau_default', - tasks_task_id=f'add_tasks_{task_name}' - ) - - refresh = PythonOperator( - dag=dag, - task_id=f'refresh_{task_name}', - python_callable=refresh_datasource, - op_kwargs={'snowflake_conn_id': 'snowflake_tableau_datasource', - 'tableau_conn_id': 'tableau_default', - 'tasks': "{{task_instance.xcom_pull(task_ids='%s')}}" % - f'add_tasks_{task_name}'} - ) - - add_tasks >> update >> refresh +with open('dags/tableau_datasource_update/tableau_datasource_update.md') as doc_md: + dag.doc_md = doc_md.read() + +add_tasks = TableauDatasourceTasks( + dag=dag, + task_id='gather_datasource_update_tasks', + snowflake_conn_id='snowflake_tableau_datasource', + tableau_conn_id='tableau_update_datasources', + github_conn_id='github_dbt_repo' +) + +update = TableauDatasourceUpdate( + dag=dag, + task_id='update_datasources', + snowflake_conn_id='snowflake_tableau_datasource', + tableau_conn_id='tableau_update_datasources', + tasks_task_id='gather_datasource_update_tasks' +) + +refresh = PythonOperator( + dag=dag, + task_id='refresh_datasources', + python_callable=refresh_datasources, + op_kwargs={ + 'tableau_conn_id': 'tableau_update_datasources', + 'tasks': "{{task_instance.xcom_pull(task_ids='gather_datasource_update_tasks')}}" + } +) + +add_tasks >> update >> refresh diff --git a/airflow_example/requirements.txt b/airflow_example/requirements.txt new file mode 100644 index 00000000..0bdebfcd --- /dev/null +++ b/airflow_example/requirements.txt @@ -0,0 +1 @@ +PyGithub==1.59.0