Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Running Dbt test command using DbtDag in Airflow #1330

Open
sbutol01 opened this issue Nov 18, 2024 · 6 comments
Open

Running Dbt test command using DbtDag in Airflow #1330

sbutol01 opened this issue Nov 18, 2024 · 6 comments
Labels
dbt:test Primarily related to dbt test command or functionality

Comments

@sbutol01
Copy link

Hello experts,

I'm pretty new to Cosmos and trying to run my dbt commands using Cosmos in Airflow.

I'm trying to run dbt test command using Cosmos operators DbtDag. But to my surprise it doesn't run dbt test, instead its using the dbt run command. Is there a way to pass in an argument to force it to run the command I want it to run. I have below code snippet, I tried using operator_args, but looks like it doesn't accept dbt_command parameter.

dbt_dag = DbtDag(
project_config=ProjectConfig(local_dir),
profile_config=profile_config,
execution_config=execution_config,
operator_args={
"dbt_command": "test",
"dbt_args": ["--select", "model_name"],
},

dag_id="dbt_dag",
)
dbt_dag.test()

**gives me below error.
airflow.exceptions.AirflowException: Invalid arguments were passed to DbtRunLocalOperator (task_id: run). Invalid arguments were:
[2024-11-18, 15:43:54 ] {{process_utils.py:190}} INFO - kwargs: {'dbt_command': 'test', 'dbt_args': ['--select', 'model_name']}

Earlier I had used below code and it did run but it ran with below command. Looks like test() function isn't doing anything.
INFO - Trying to run dbtRunner with:
[2024-11-15, 13:55:44 ] {{process_utils.py:190}} INFO - ['run', '--models', 'model_name'.....]

dbt_dag = DbtDag(
project_config=ProjectConfig(local_dir),
profile_config=profile_config,
execution_config=execution_config,
dag_id="dbt_dag",
)
dbt_dag.test()

I know there is DbtTestLocalOperator, but I want to achieve test command execution using DbtDag.

Any help would be highly appreciated.

@dosubot dosubot bot added the dbt:test Primarily related to dbt test command or functionality label Nov 18, 2024
Copy link

dosubot bot commented Nov 18, 2024

Hey @sbutol01! I'm here to help you with your Cosmos issue. Let's tackle this together!

To run the dbt test command using the DbtDag operator in Cosmos, you need to use the appropriate operator designed for the dbt test command. The DbtDag operator itself does not directly support changing the dbt command through arguments like dbt_command. Instead, you should use the DbtTestLocalOperator if you want to execute the dbt test command specifically.

However, if you want to achieve this within the context of a DAG setup similar to DbtDag, you might need to create a custom DAG or task that uses the DbtTestLocalOperator to run the dbt test command. Here's an example of how you can use the DbtTestLocalOperator:

from cosmos.operators.local import DbtTestLocalOperator

dbt_test_task = DbtTestLocalOperator(
    task_id='dbt_test',
    # Add any additional parameters you need here
)

If you are set on using DbtDag and want to customize it to run dbt test, you might need to look into customizing the DAG creation process to include tasks that specifically run the dbt test command using the appropriate operator. Unfortunately, DbtDag does not natively support switching commands like this through its configuration alone [1].

To continue talking to Dosu, mention @dosu.

@sbutol01
Copy link
Author

sbutol01 commented Nov 18, 2024

Thanks @dosu for your quick response. Actually I also used DbtTestLocalOperator, it does work also, but at the end it just fails my dag, giving me error like this - ERROR�[0m - Error during DBT run: Operator <Task(DbtTestLocalOperator): test> has not been assigned to a DAG yet�[0m

I get this error after the test execution are successfully ran. Earlier I thought this will be a quick fix, but then unfortunately I couldn't get this fixed. Below is the skeleton code I'm using for this. Also please note that I'm doing all this within my virtualenv using @task.virtualenv.

Below code does run the dag, but fails with above mentioned error, I tried modifying the code, but everytime I get error saying operator has not been assigned to dag yet. Could there be a quick fix. Thanks.

@dag(
    dag_id=Path(__file__).stem,
    start_date=pendulum.datetime(2024, 9, 12, tz="Pacific/Auckland"),
    tags=["test-tag"],
    # schedule="0 * * * *",
    max_active_runs=1,  # Prevent multiple runs of DAG, if previous run did not complete
    catchup=False,
)
def dbt_test_process():
    """
    The Airflow Dag to invoke dbt tests
    """
    @task.virtualenv(
        task_id="dbt_test_using_operator",
        requirements=[
            "dbt-snowflake==1.7.0",
            "astronomer-cosmos==1.7.0",
        ],
    )
    def run_dbt_using_operators():
        from cosmos import DbtTestLocalOperator, ProfileConfig
     
        test_operator = DbtTestLocalOperator(
            profile_config=profile_config,
            project_dir=local_dir,
            task_id="test",
            dbt_cmd_flags=["--models", "modle_name"],
            # install_deps=True,
            append_env=True,
            # dag=dbt_test_process,
        )

        try:
            logger.info("Executing DBT test operator...")
            test_operator.execute(context={})
            logger.info("DBT run completed successfully.")
        except Exception as e:
            logger.error(f"Error during DBT run: {e}")
            raise

        return "DBT task completed successfully"
    run_dbt_using_operators()


dbt_test_process()

Copy link

dosubot bot commented Nov 18, 2024

To resolve the error "Operator <Task(DbtTestLocalOperator): test> has not been assigned to a DAG yet," ensure that the DbtTestLocalOperator is properly associated with a DAG. This error typically occurs when an operator is instantiated without being added to a DAG context.

In your code, the DbtTestLocalOperator should be instantiated within the DAG context. Here's how you can modify your code to ensure the operator is correctly assigned to the DAG:

@dag(
    dag_id=Path(__file__).stem,
    start_date=pendulum.datetime(2024, 9, 12, tz="Pacific/Auckland"),
    tags=["test-tag"],
    max_active_runs=1,
    catchup=False,
)
def dbt_test_process():
    """
    The Airflow Dag to invoke dbt tests
    """
    from cosmos import DbtTestLocalOperator, ProfileConfig

    test_operator = DbtTestLocalOperator(
        profile_config=profile_config,
        project_dir=local_dir,
        task_id="test",
        dbt_cmd_flags=["--models", "model_name"],
        append_env=True,
    )

    @task.virtualenv(
        task_id="dbt_test_using_operator",
        requirements=[
            "dbt-snowflake==1.7.0",
            "astronomer-cosmos==1.7.0",
        ],
    )
    def run_dbt_using_operators():
        try:
            logger.info("Executing DBT test operator...")
            test_operator.execute(context={})
            logger.info("DBT run completed successfully.")
        except Exception as e:
            logger.error(f"Error during DBT run: {e}")
            raise

        return "DBT task completed successfully"

    run_dbt_using_operators()

dbt_test_process()

Ensure that the DbtTestLocalOperator is instantiated directly within the DAG function, not inside the @task.virtualenv function. This way, it is automatically associated with the DAG context [1].

To continue talking to Dosu, mention @dosu.

@sbutol01
Copy link
Author

sbutol01 commented Nov 18, 2024

This gets me import error. ModuleNotFoundError: No module named 'cosmos'
it can't identify them because I'm installing them in virtualenv in later stage.
@dosu, @tatiana

Copy link

dosubot bot commented Nov 18, 2024

To resolve the ModuleNotFoundError: No module named 'cosmos', ensure that the Cosmos library is installed in the environment where your Airflow tasks are running. Since you're using a virtual environment, you need to make sure that Cosmos is included in the requirements list of your @task.virtualenv decorator. Here's how you can adjust your code:

  1. Include Cosmos in the Virtual Environment Requirements:

    Make sure that astronomer-cosmos is listed in the requirements parameter of the @task.virtualenv decorator:

    @task.virtualenv(
        task_id="dbt_test_using_operator",
        requirements=[
            "dbt-snowflake==1.7.0",
            "astronomer-cosmos==1.7.0",
        ],
    )
  2. Ensure Proper Import:

    Import the necessary modules inside the function decorated with @task.virtualenv to ensure they are available in the virtual environment:

    @task.virtualenv(
        task_id="dbt_test_using_operator",
        requirements=[
            "dbt-snowflake==1.7.0",
            "astronomer-cosmos==1.7.0",
        ],
    )
    def run_dbt_using_operators():
        from cosmos import DbtTestLocalOperator, ProfileConfig
        # Rest of your code...

By ensuring that astronomer-cosmos is included in the requirements list, it will be installed in the virtual environment, making the cosmos module available for import during task execution.

To continue talking to Dosu, mention @dosu.

@pankajastro
Copy link
Contributor

Hey @sbutol01, How are you installing astronomer-cosmos when you are trying to use DbtDag?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dbt:test Primarily related to dbt test command or functionality
Projects
None yet
Development

No branches or pull requests

2 participants