Skip to content

Commit

Permalink
Merge branch 'main' into add_task_instance_ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
prabhusneha authored Jan 22, 2025
2 parents 3e2bac8 + 6832fc8 commit aeaa791
Show file tree
Hide file tree
Showing 115 changed files with 2,757 additions and 1,500 deletions.
11 changes: 8 additions & 3 deletions .github/boring-cyborg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -620,11 +620,16 @@ labelPRBasedOnFilePath:
- docs/apache-airflow/authoring-and-scheduling/plugins.rst

area:Scheduler:
- airflow/jobs/**/*
- airflow/jobs/scheduler_job_runner.py
- airflow/task/standard_task_runner.py
- airflow/dag_processing/**/*
- docs/apache-airflow/administration-and-deployment/scheduler.rst
- tests/jobs/**/*
- tests/jobs/test_scheduler_job.py

area:DAG-processing:
- airflow/dag_processing/**/*
- airflow/jobs/dag_processor_job_runner.py
- docs/apache-airflow/administration-and-deployment/dagfile-processing.rst
- tests/dag_processing/**/*

area:Executors-core:
- airflow/executors/**/*
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ on: # yamllint disable-line rule:truthy
- v[0-9]+-[0-9]+-test
- v[0-9]+-[0-9]+-stable
- providers-[a-z]+-?[a-z]*/v[0-9]+-[0-9]+
types: [opened, reopened, synchronize, ready_for_review]
workflow_dispatch:
permissions:
# All other permissions are set to none by default
Expand Down
3 changes: 0 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,3 @@ The CI infrastructure for Apache Airflow has been sponsored by:

<a href="https://astronomer.io"><img src="https://assets2.astronomer.io/logos/logoForLIGHTbackground.png" alt="astronomer.io" width="250px"></a>
<a href="https://aws.amazon.com/opensource/"><img src="docs/integration-logos/aws/[email protected]" alt="AWS OpenSource" width="130px"></a>

<!-- telemetry/analytics pixel: -->
<img referrerpolicy="no-referrer-when-downgrade" src="https://static.scarf.sh/a.png?x-pxid=1b5a5e3c-da81-42f5-befa-42d836bf1b54" alt="Tracking Pixel" />
1 change: 0 additions & 1 deletion RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ Scarf based telemetry: Airflow now collect telemetry data (#39510)
""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""
Airflow integrates Scarf to collect basic usage data during operation. Deployments can opt-out of data collection by
setting the ``[usage_data_collection]enabled`` option to ``False``, or the ``SCARF_ANALYTICS=false`` environment variable.
See :ref:`Usage data collection FAQ <usage-data-collection>` for more information.

Datasets no longer trigger inactive DAGs (#38891)
"""""""""""""""""""""""""""""""""""""""""""""""""
Expand Down
83 changes: 64 additions & 19 deletions airflow/api_fastapi/common/db/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,76 @@

from __future__ import annotations

from typing import TYPE_CHECKING

from sqlalchemy import func, select

if TYPE_CHECKING:
from sqlalchemy.sql import Select

from airflow.models.dag import DagModel
from airflow.models.dagrun import DagRun

latest_dag_run_per_dag_id_cte = (
select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
.where()
.group_by(DagRun.dag_id)
.cte()
)

def generate_dag_with_latest_run_query(dag_runs_cte: Select | None = None) -> Select:
latest_dag_run_per_dag_id_cte = (
select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
.where()
.group_by(DagRun.dag_id)
.cte()
)

dags_select_with_latest_dag_run = (
select(DagModel)
.join(
latest_dag_run_per_dag_id_cte,
DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.join(
DagRun,
DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.order_by(DagModel.dag_id)
)

if dag_runs_cte is None:
return dags_select_with_latest_dag_run

dags_select_with_latest_dag_run = (
select(DagModel)
.join(
latest_dag_run_per_dag_id_cte,
DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
dag_run_filters_cte = (
select(DagModel.dag_id)
.join(
dag_runs_cte,
DagModel.dag_id == dag_runs_cte.c.dag_id,
)
.join(
DagRun,
DagRun.dag_id == dag_runs_cte.c.dag_id,
)
.group_by(DagModel.dag_id)
.cte()
)
.join(
DagRun,
DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,

dags_with_latest_and_filtered_runs = (
select(DagModel)
.join(
dag_run_filters_cte,
dag_run_filters_cte.c.dag_id == DagModel.dag_id,
)
.join(
latest_dag_run_per_dag_id_cte,
DagModel.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.join(
DagRun,
DagRun.start_date == latest_dag_run_per_dag_id_cte.c.start_date
and DagRun.dag_id == latest_dag_run_per_dag_id_cte.c.dag_id,
isouter=True,
)
.order_by(DagModel.dag_id)
)
.order_by(DagModel.dag_id)
)

return dags_with_latest_and_filtered_runs
37 changes: 30 additions & 7 deletions airflow/api_fastapi/common/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
Any,
Callable,
Generic,
Literal,
Optional,
TypeVar,
Union,
Expand All @@ -36,7 +37,7 @@
from fastapi import Depends, HTTPException, Query, status
from pendulum.parsing.exceptions import ParserError
from pydantic import AfterValidator, BaseModel, NonNegativeInt
from sqlalchemy import Column, case, or_
from sqlalchemy import Column, and_, case, or_
from sqlalchemy.inspection import inspect

from airflow.models import Base
Expand Down Expand Up @@ -233,6 +234,7 @@ class FilterOptionEnum(Enum):
IN = "in"
NOT_IN = "not_in"
ANY_EQUAL = "any_eq"
ALL_EQUAL = "all_eq"
IS_NONE = "is_none"


Expand Down Expand Up @@ -265,6 +267,9 @@ def to_orm(self, select: Select) -> Select:
if self.filter_option == FilterOptionEnum.ANY_EQUAL:
conditions = [self.attribute == val for val in self.value]
return select.where(or_(*conditions))
if self.filter_option == FilterOptionEnum.ALL_EQUAL:
conditions = [self.attribute == val for val in self.value]
return select.where(and_(*conditions))
raise HTTPException(
400, f"Invalid filter option {self.filter_option} for list value {self.value}"
)
Expand Down Expand Up @@ -324,21 +329,33 @@ def depends_filter(value: T | None = query) -> FilterParam[T | None]:
return depends_filter


class _TagsFilter(BaseParam[list[str]]):
class _TagFilterModel(BaseModel):
"""Tag Filter Model with a match mode parameter."""

tags: list[str]
tags_match_mode: Literal["any", "all"] | None


class _TagsFilter(BaseParam[_TagFilterModel]):
"""Filter on tags."""

def to_orm(self, select: Select) -> Select:
if self.skip_none is False:
raise ValueError(f"Cannot set 'skip_none' to False on a {type(self)}")

if not self.value:
if not self.value or not self.value.tags:
return select

conditions = [DagModel.tags.any(DagTag.name == tag) for tag in self.value]
return select.where(or_(*conditions))
conditions = [DagModel.tags.any(DagTag.name == tag) for tag in self.value.tags]
operator = or_ if not self.value.tags_match_mode or self.value.tags_match_mode == "any" else and_
return select.where(operator(*conditions))

def depends(self, tags: list[str] = Query(default_factory=list)) -> _TagsFilter:
return self.set_value(tags)
def depends(
self,
tags: list[str] = Query(default_factory=list),
tags_match_mode: Literal["any", "all"] | None = None,
) -> _TagsFilter:
return self.set_value(_TagFilterModel(tags=tags, tags_match_mode=tags_match_mode))


class _OwnersFilter(BaseParam[list[str]]):
Expand Down Expand Up @@ -443,6 +460,12 @@ def to_orm(self, select: Select) -> Select:
def depends(self, *args: Any, **kwargs: Any) -> Self:
raise NotImplementedError("Use the `range_filter_factory` function to create the dependency")

def is_active(self) -> bool:
"""Check if the range filter has any active bounds."""
return self.value is not None and (
self.value.lower_bound is not None or self.value.upper_bound is not None
)


def datetime_range_filter_factory(
filter_name: str, model: Base, attribute_name: str | None = None
Expand Down
52 changes: 52 additions & 0 deletions airflow/api_fastapi/core_api/datamodels/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Common Data Models for Airflow REST API.
:meta private:
"""

from __future__ import annotations

import enum

from airflow.api_fastapi.core_api.base import BaseModel


# Common Bulk Data Models
class BulkAction(enum.Enum):
"""Bulk Action to be performed on the used model."""

CREATE = "create"
DELETE = "delete"
UPDATE = "update"


class BulkActionOnExistence(enum.Enum):
"""Bulk Action to be taken if the entity already exists or not."""

FAIL = "fail"
SKIP = "skip"
OVERWRITE = "overwrite"


# TODO: Unify All Bulk Operation Related Base Data Models
class BulkBaseAction(BaseModel):
"""Base class for bulk actions."""

action: BulkAction
action_on_existence: BulkActionOnExistence = BulkActionOnExistence.FAIL
73 changes: 69 additions & 4 deletions airflow/api_fastapi/core_api/datamodels/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
from __future__ import annotations

import json
from typing import Any

from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo

from airflow.api_fastapi.core_api.base import BaseModel
from airflow.api_fastapi.core_api.datamodels.common import BulkAction, BulkBaseAction
from airflow.utils.log.secrets_masker import redact


Expand Down Expand Up @@ -90,8 +92,71 @@ class ConnectionBody(BaseModel):
extra: str | None = Field(default=None)


class ConnectionBulkBody(BaseModel):
"""Connections Serializer for requests body."""
class ConnectionBulkCreateAction(BulkBaseAction):
"""Bulk Create Variable serializer for request bodies."""

action: BulkAction = BulkAction.CREATE
connections: list[ConnectionBody] = Field(..., description="A list of connections to be created.")


class ConnectionBulkUpdateAction(BulkBaseAction):
"""Bulk Update Connection serializer for request bodies."""

action: BulkAction = BulkAction.UPDATE
connections: list[ConnectionBody] = Field(..., description="A list of connections to be updated.")

connections: list[ConnectionBody]
overwrite: bool | None = Field(default=False)

class ConnectionBulkDeleteAction(BulkBaseAction):
"""Bulk Delete Connection serializer for request bodies."""

action: BulkAction = BulkAction.DELETE
connection_ids: list[str] = Field(..., description="A list of connection IDs to be deleted.")


class ConnectionBulkBody(BaseModel):
"""Request body for bulk Connection operations (create, update, delete)."""

actions: list[ConnectionBulkCreateAction | ConnectionBulkUpdateAction | ConnectionBulkDeleteAction] = (
Field(..., description="A list of Connection actions to perform.")
)


class ConnectionBulkActionResponse(BaseModel):
"""
Serializer for individual bulk action responses.
Represents the outcome of a single bulk operation (create, update, or delete).
The response includes a list of successful connection_ids and any errors encountered during the operation.
This structure helps users understand which key actions succeeded and which failed.
"""

success: list[str] = Field(
default_factory=list, description="A list of connection_ids representing successful operations."
)
errors: list[dict[str, Any]] = Field(
default_factory=list,
description="A list of errors encountered during the operation, each containing details about the issue.",
)


class ConnectionBulkResponse(BaseModel):
"""
Serializer for responses to bulk connection operations.
This represents the results of create, update, and delete actions performed on connections in bulk.
Each action (if requested) is represented as a field containing details about successful connection_ids and any encountered errors.
Fields are populated in the response only if the respective action was part of the request, else are set None.
"""

create: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk create operation, including successful connection_ids and errors.",
)
update: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk update operation, including successful connection_ids and errors.",
)
delete: ConnectionBulkActionResponse | None = Field(
default=None,
description="Details of the bulk delete operation, including successful connection_ids and errors.",
)
Loading

0 comments on commit aeaa791

Please sign in to comment.