Skip to content

Commit

Permalink
Merge pull request #896 from InfuseAI/bug/sc-32360/bug-skip-datasourc…
Browse files Browse the repository at this point in the history
…e-compare-procedure

[Bug] Do dbt parse when given skip datasource option
  • Loading branch information
wcchang1115 authored Oct 12, 2023
2 parents 6e36aa6 + f1052bc commit 75d7314
Showing 1 changed file with 61 additions and 10 deletions.
71 changes: 61 additions & 10 deletions piperider_cli/recipes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import piperider_cli.dbtutil as dbtutil
from piperider_cli import get_run_json_path, load_jinja_template, load_json
from piperider_cli.configuration import Configuration, FileSystem
from piperider_cli.dbt import dbt_version
from piperider_cli.error import RecipeConfigException
from piperider_cli.event import CompareEventPayload
from piperider_cli.recipes.utils import InteractiveStopException
Expand Down Expand Up @@ -274,6 +275,14 @@ def replace_commands_dbt_state_path(commands: List[str], dbt_state_path: str):
return [command.replace('<DBT_STATE_PATH>', dbt_state_path) for command in commands]


def prepare_dbt_manifest(cfg: RecipeConfiguration, options: Dict, recipe_type: str = 'base'):
if options.get('skip_datasource_connection') and dbt_version >= '1.5':
# dbt parse will write or return a manifest from 1.5
execute_dbt_parse_archive(cfg, recipe_type=recipe_type)
else:
execute_dbt_compile_archive(cfg, recipe_type=recipe_type)


def prepare_dbt_resources_candidate(cfg: RecipeConfiguration, options: Dict):
config = Configuration.instance()
state = None
Expand All @@ -282,15 +291,18 @@ def prepare_dbt_resources_candidate(cfg: RecipeConfiguration, options: Dict):
select = update_select_with_cli_option(options)

if require_base_state(cfg):
execute_dbt_compile_archive(cfg, recipe_type='base')
prepare_dbt_manifest(cfg, options, recipe_type='base')
state = cfg.base.state_path

if cfg.target.ref is not None:
execute_dbt_compile_archive(cfg, recipe_type='target')
prepare_dbt_manifest(cfg, options, recipe_type='target')
target_path = 'state'
else:
execute_dbt_deps(cfg.target)
execute_dbt_compile(cfg.target)
if options.get('skip_datasource_connection') and dbt_version >= '1.5':
execute_dbt_parse(cfg.target)
else:
execute_dbt_compile(cfg.target)
dbt_project = dbtutil.load_dbt_project(config.dbt.get('projectDir'))
target_path = dbt_project.get('target-path') if dbt_project.get('target-path') else 'target'

Expand Down Expand Up @@ -351,20 +363,42 @@ def execute_recipe(cfg: RecipeConfiguration, recipe_type='base', debug=False, ev
console.print()


def execute_dbt_compile_archive(cfg: RecipeConfiguration, recipe_type='base'):
if recipe_type == 'target':
model = cfg.target
else:
model = cfg.base

def calculate_git_ref(cfg: RecipeConfiguration, recipe_type='base'):
if recipe_type == 'target':
branch_or_commit = tool().git_rev_parse(cfg.target.ref)
else:
diff_target = cfg.target.ref if cfg.target.ref else 'HEAD'
branch_or_commit = tool().git_merge_base(cfg.base.ref, diff_target) or cfg.base.ref

if not branch_or_commit:
raise RecipeConfigException("Branch is not specified")
raise RecipeConfigException("Git reference is not specified")

return branch_or_commit


def execute_dbt_parse_archive(cfg: RecipeConfiguration, recipe_type='base'):
branch_or_commit = calculate_git_ref(cfg, recipe_type)

if recipe_type == 'target':
model = cfg.target
else:
model = cfg.base

if model.tmp_dir_path is None:
model.tmp_dir_path = tool().git_archive(branch_or_commit)
model.state_path = Path(os.path.join(model.tmp_dir_path, 'state')).as_posix()

execute_dbt_deps(model, model.tmp_dir_path, Path(FileSystem.DBT_PROFILES_DIR).as_posix())
execute_dbt_parse(model, model.tmp_dir_path, Path(FileSystem.DBT_PROFILES_DIR).as_posix(), model.state_path)


def execute_dbt_compile_archive(cfg: RecipeConfiguration, recipe_type='base'):
branch_or_commit = calculate_git_ref(cfg, recipe_type)

if recipe_type == 'target':
model = cfg.target
else:
model = cfg.base

if model.tmp_dir_path is None:
model.tmp_dir_path = tool().git_archive(branch_or_commit)
Expand Down Expand Up @@ -407,6 +441,23 @@ def execute_dbt_compile(model: RecipeModel, project_dir: str = None, profiles_di
console.print()


def execute_dbt_parse(model: RecipeModel, project_dir: str = None, profiles_dir: str = None, target_path: str = None):
console.print("Run: \[dbt parse]")
cmd = 'dbt parse'
if project_dir:
cmd += f' --project-dir {project_dir}'
if target_path:
cmd += f' --target-path {target_path}'
if profiles_dir:
cmd += f' --profiles-dir {profiles_dir}'
exit_code = tool().execute_command_with_showing_output(cmd.strip(), model.dbt.envs())
if exit_code != 0:
console.print(
f"[bold yellow]Warning: [/bold yellow] Dbt command failed: '{cmd}' with exit code: {exit_code}")
sys.exit(exit_code)
console.print()


def execute_recipe_archive(cfg: RecipeConfiguration, recipe_type='base', debug=False, event_payload=CompareEventPayload()):
"""
We execute a recipe in the following steps:
Expand Down

0 comments on commit 75d7314

Please sign in to comment.