Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] sc-31711 Piperider run support no database connection #842

Merged
1 change: 1 addition & 0 deletions piperider_cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ def run(**kwargs):
if dbt_list or dbt_resources or select:
Guide().show('No resources was profiled. Please check given "--select", "--dbt-list" option or '
'environment variable "PIPERIDER_DBT_RESOURCES" to choose the resources to profile.')
ret = 0

if CloudConnector.is_login() and is_cloud_view:
ret = CloudConnector.upload_latest_report(report_dir=kwargs.get('report_dir'), debug=kwargs.get('debug'),
Expand Down
18 changes: 13 additions & 5 deletions piperider_cli/cloud_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ def upload_report(report_path=None, report_dir=None, datasource=None, debug=Fals

if enable_share:
run_id = response.get('run_id')
CloudConnector.share_run_report(run_id, debug)
CloudConnector.share_run_report(run_id, debug, project_name=project_name)

return rc

Expand Down Expand Up @@ -513,12 +513,16 @@ def compare_reports(base=None, target=None, tables_from='all', summary_file=None
f.write(response.get('summary'))

@staticmethod
def share_run_report(run_id=None, debug=False):
def share_run_report(run_id=None, debug=False, project_name=None):
if piperider_cloud.available is False:
console.rule('Please login PipeRider Cloud first', style='red')
return 1

workspace_name, project_name = piperider_cloud.get_default_workspace_and_project()
if project_name is None:
workspace_name, project_name = piperider_cloud.get_default_workspace_and_project()
else:
workspace_name, project_name = project_name.split('/')

if workspace_name is None or project_name is None:
console.rule('Please select a workspace and a project first', style='red')
return 1
Expand All @@ -532,12 +536,16 @@ def share_run_report(run_id=None, debug=False):
)

@staticmethod
def share_compare_report(base_id=None, target_id=None, debug=False):
def share_compare_report(base_id=None, target_id=None, debug=False, project_name=None):
if piperider_cloud.available is False:
console.rule('Please login PipeRider Cloud first', style='red')
return 1

workspace_name, project_name = piperider_cloud.get_default_workspace_and_project()
if project_name is None:
workspace_name, project_name = piperider_cloud.get_default_workspace_and_project()
else:
workspace_name, project_name = project_name.split('/')

if workspace_name is None or project_name is None:
console.rule('Please select a workspace and a project first', style='red')
return 1
Expand Down
9 changes: 4 additions & 5 deletions piperider_cli/compare_report.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
import io
import json
import os
import shutil
import sys
from datetime import date, datetime
from datetime import datetime
from typing import Dict, List, Optional

import inquirer
import readchar
from rich.console import Console
from piperider_cli.githubutil import fetch_pr_metadata

import piperider_cli.hack.inquirer as inquirer_hack
from piperider_cli import clone_directory, datetime_to_str, open_report_in_browser, \
raise_exception_when_directory_not_writable, str_to_datetime
from piperider_cli.configuration import Configuration, ReportDirectory
from piperider_cli.generate_report import setup_report_variables
from piperider_cli.dbt.changeset import SummaryChangeSet
from piperider_cli.dbt.utils import ChangeType
from piperider_cli.generate_report import setup_report_variables
from piperider_cli.githubutil import fetch_pr_metadata
from piperider_cli.utils import create_link, remove_link


Expand Down Expand Up @@ -501,7 +500,7 @@ def output_summary(directory, summary_data):
from piperider_cli.cloud_connector import CloudConnector
base = str(report.a.cloud.get('run_id'))
target = str(report.b.cloud.get('run_id'))
CloudConnector.share_compare_report(base, target)
CloudConnector.share_compare_report(base, target, project_name=project_name)

if output:
clone_directory(default_report_directory, output)
Expand Down
6 changes: 5 additions & 1 deletion piperider_cli/datasource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,13 @@ def get_engine_by_database(self, database=None):

@property
def threads(self):
try:
engine = self.get_engine_by_database()
except Exception:
engine = None
if self.credential.get('threads'):
return self.credential.get('threads')
elif not isinstance(self.get_engine_by_database().pool, SingletonThreadPool):
elif engine and not isinstance(engine.pool, SingletonThreadPool):
return 5
else:
return 1
Expand Down
52 changes: 48 additions & 4 deletions piperider_cli/profiler/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,10 +133,10 @@ class Profiler:
"""

def __init__(
self,
data_source: DataSource,
event_handler: ProfilerEventHandler = DefaultProfilerEventHandler(),
config: Configuration = None
self,
data_source: DataSource,
event_handler: ProfilerEventHandler = DefaultProfilerEventHandler(),
config: Configuration = None
):
self.data_source = data_source
self.event_handler = event_handler
Expand Down Expand Up @@ -267,6 +267,50 @@ async def _collect_metadata(self, subjects: List[ProfileSubject], metadata_subje
def collect_metadata(self, metadata_subjects: List[ProfileSubject], subjects: List[ProfileSubject]):
return asyncio.run(self._collect_metadata(subjects, metadata_subjects))

async def _collect_metadata_from_dbt_manifest(self, dbt_manifest, metadata_subjects, subjects):
profiled_tables = {}
if subjects is None:
subjects = []
# table_nodes = filter(
# lambda node: node.startswith('model.') or node.startswith('seed.') or node.startswith('source.'),
# dbt_manifest['nodes'].keys())
# print(table_nodes)

if metadata_subjects is None:
metadata_subjects = subjects

for subject in metadata_subjects:
table_name = subject.name
ref_id = subject.ref_id
dbt_node = dbt_manifest.get('nodes', {}).get(ref_id, {})
dbt_source = dbt_manifest.get('sources', {}).get(ref_id, {})
dbt_columns = {}
columns = {}
if dbt_node:
# Read columns from dbt manifest `columns` field
dbt_columns = dbt_node.get('columns', {})
elif dbt_source:
# Read columns from dbt manifest `sources` field
dbt_columns = dbt_source.get('columns', {})

# Fill the columns with name and description
for key, val in dbt_columns.items():
name = val.get('name')
description = val.get('description')
columns[key] = dict(
name=name,
type='other',
schema_type='other',
description=description)

profiled_tables[ref_id] = dict(name=table_name, columns=columns, ref_id=ref_id)

return dict(tables=profiled_tables)

def collect_metadata_from_dbt_manifest(self, dbt_manifest, metadata_subjects: List[ProfileSubject],
subjects: List[ProfileSubject]):
return asyncio.run(self._collect_metadata_from_dbt_manifest(dbt_manifest, metadata_subjects, subjects))

def profile(self, subjects: List[ProfileSubject] = None, *, metadata_subjects: List[ProfileSubject] = None) -> dict:
def job():
return asyncio.run(self._profile(subjects, metadata_subjects=metadata_subjects))
Expand Down
113 changes: 74 additions & 39 deletions piperider_cli/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import uuid
from datetime import datetime
from enum import Enum
from typing import List, Optional

from rich import box
Expand Down Expand Up @@ -317,6 +318,35 @@ def _transform_assertion_result(table: str, results):
return dict(tests=tests, columns=columns)


class PreRunValidatingResult(Enum):
OK = 0
ERROR = 1
SKIP_PROFILING = 2


def _pre_run_validating(ds: DataSource) -> PreRunValidatingResult:
console = Console()
err = ds.verify_connector()
if err:
console.print(
f'[[bold yellow]WARNING[/bold yellow]] Failed to load the \'{ds.type_name}\' connector.')
return PreRunValidatingResult.SKIP_PROFILING

try:
ds.verify_connection()
except Exception:
console.print(
f'[[bold yellow]WARNING[/bold yellow]] Failed to connect the \'{ds.name}\' data source.')
return PreRunValidatingResult.SKIP_PROFILING

stop_runner = _validate_assertions(console)
if stop_runner:
console.print('\n\n[bold red]ERROR:[/bold red] Stop profiling, please fix the syntax errors above.')
return PreRunValidatingResult.ERROR

return PreRunValidatingResult.OK


def _validate_assertions(console: Console):
assertion_engine = AssertionEngine(None)
assertion_engine.load_all_assertions_for_validation()
Expand Down Expand Up @@ -533,7 +563,7 @@ def get_dbt_profile_subjects(dbt_state_dir, options, filter_fn):
return tagged_subjects


def get_dbt_state_dir(target_path, dbt_config, ds):
def get_dbt_state_dir(target_path, dbt_config, ds, skip_datasource_connection=False):
if target_path is None or os.path.exists(target_path) is False:
project_dir = dbt_config.get('projectDir')
dbt_project = dbtutil.load_dbt_project(project_dir)
Expand All @@ -546,7 +576,7 @@ def get_dbt_state_dir(target_path, dbt_config, ds):
if not dbtutil.is_dbt_state_ready(target_path):
return None, f"[bold red]Error:[/bold red] No available 'manifest.json' under '{target_path}'"

if os.environ.get('PIPERIDER_SKIP_TARGET_CHECK', None) != '1':
if os.environ.get('PIPERIDER_SKIP_TARGET_CHECK', None) != '1' and skip_datasource_connection is False:
if not check_dbt_manifest_compatibility(ds, target_path):
return None, f"[bold red]Error:[/bold red] Target mismatched. Please run 'dbt compile -t {dbt_config.get('target')}' to generate the new manifest, or set the environment variable 'PIPERIDER_SKIP_TARGET_CHECK=1' to skip the check."

Expand Down Expand Up @@ -597,6 +627,7 @@ def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target
dbt_resources: Optional[dict] = None, dbt_select: tuple = None, dbt_state: str = None,
report_dir: str = None):
console = Console()
skip_datasource_connection = False

raise_exception_when_directory_not_writable(output)

Expand Down Expand Up @@ -641,22 +672,12 @@ def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target

console.print(f'[bold dark_orange]DataSource:[/bold dark_orange] {ds.name}')
console.rule('Validating')
err = ds.verify_connector()
if err:
console.print(
f'[[bold red]FAILED[/bold red]] Failed to load the \'{ds.type_name}\' connector.')
raise err

try:
ds.verify_connection()
except Exception as err:
console.print(
f'[[bold red]FAILED[/bold red]] Failed to connect the \'{ds.name}\' data source.')
raise err
stop_runner = _validate_assertions(console)
if stop_runner:
console.print('\n\n[bold red]ERROR:[/bold red] Stop profiling, please fix the syntax errors above.')
result = _pre_run_validating(ds)
if result is PreRunValidatingResult.ERROR:
return 1
elif result is PreRunValidatingResult.SKIP_PROFILING:
console.print('[[bold yellow]WARNING[/bold yellow]] [bold dark_orange]Skip profiling[/bold dark_orange]')
skip_datasource_connection = True

dbt_config = ds.args.get('dbt')
dbt_manifest = None
Expand All @@ -667,7 +688,7 @@ def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target
console.log(
'[bold red]ERROR:[/bold red] DBT configuration is not completed, please check the config.yml')
return sys.exit(1)
dbt_target_path, err_msg = get_dbt_state_dir(dbt_target_path, dbt_config, ds)
dbt_target_path, err_msg = get_dbt_state_dir(dbt_target_path, dbt_config, ds, skip_datasource_connection)
if err_msg:
console.print(err_msg)
return sys.exit(1)
Expand All @@ -679,11 +700,13 @@ def exec(datasource=None, table=None, output=None, skip_report=False, dbt_target
select=dbt_select,
state=dbt_state)
console.print('everything is OK.')

console.rule('Collect metadata')
run_id = uuid.uuid4().hex
created_at = datetime.utcnow()
engine = ds.get_engine_by_database()
if skip_datasource_connection:
engine = None
else:
engine = ds.get_engine_by_database()

subjects: List[ProfileSubject]
dbt_metadata_subjects: List[ProfileSubject] = None
Expand Down Expand Up @@ -720,38 +743,50 @@ def filter_fn(subject: ProfileSubject):
subjects = list(filter(filter_fn, subjects))

run_result = {}
profiler_result = {}

statistics = Statistics()
# Profile the datasource
profiler = Profiler(ds, RichProfilerEventHandler([subject.name for subject in subjects]), configuration)
try:
profiler.collect_metadata(dbt_metadata_subjects, subjects)

console.rule('Profile statistics')
profiler_result = profiler.profile(subjects, metadata_subjects=dbt_metadata_subjects)
if skip_datasource_connection:
# Generate run result from dbt manifest
profiler_result = profiler.collect_metadata_from_dbt_manifest(dbt_manifest, dbt_metadata_subjects, subjects)
run_result.update(profiler_result)
except NoSuchTableError as e:
console.print(f"[bold red]Error:[/bold red] No such table '{str(e)}'")
return 1
except Exception as e:
raise Exception(f'Profiler Exception: {type(e).__name__}(\'{e}\')')
else:
try:
profiler.collect_metadata(dbt_metadata_subjects, subjects)

console.rule('Profile statistics')
profiler_result = profiler.profile(subjects, metadata_subjects=dbt_metadata_subjects)
run_result.update(profiler_result)
except NoSuchTableError as e:
console.print(f"[bold red]Error:[/bold red] No such table '{str(e)}'")
return 1
except Exception as e:
raise Exception(f'Profiler Exception: {type(e).__name__}(\'{e}\')')

statistics.reset()
metrics = []
if dbt_config:
metrics = dbtutil.get_dbt_state_metrics(dbt_target_path, dbt_config.get('tag'), dbt_resources)

console.rule('Query metrics')
statistics.display_statistic('query', 'metric')
if metrics:
run_result['metrics'] = MetricEngine(
ds,
metrics,
RichMetricEventHandler([m.label for m in metrics])
).execute()
if skip_datasource_connection is False:
console.rule('Query metrics')
statistics.display_statistic('query', 'metric')
if metrics:
run_result['metrics'] = MetricEngine(
ds,
metrics,
RichMetricEventHandler([m.label for m in metrics])
).execute()

# TODO: refactor input unused arguments
assertion_results, assertion_exceptions = _execute_assertions(console, engine, ds.name, output,
profiler_result, created_at)
if skip_datasource_connection:
assertion_results, assertion_exceptions = [], []
else:
assertion_results, assertion_exceptions = _execute_assertions(console, engine, ds.name, output,
profiler_result, created_at)

run_result['tests'] = []
if assertion_results or dbt_test_results:
Expand Down