Skip to content

Commit

Permalink
Add pre-Airflow-2-7 hardcoded defaults for config for older providers (
Browse files Browse the repository at this point in the history
…apache#32775)

* Quarantine test_backfill_integration in dask executor

The test has been recently failing with deadlock (see apache#32778) and
needs thorough looking at if we want to find the root cause/remedium.

In the meantime it looks like a niche case connected with Dask
Executor that is rather obsure and we have no expertise in solving
problems with and diagnosing, so until the problem is diagnosed
it might be a long time (and maybe even we decide not to care
about it and let Dask community take a look and either fix or
ignore it.

We aim to have a very low number of those Quarantined tests
(currently we have 1 and we have not run it for a while as this
was a mysql test run on Postgres) but we have now the opportunity
to also improve the quarantined tests framework.

This test will be run together with other (1) quarantined test and:

* they will not be run in our regular tests
* they are run sequentially not in parallel with all other tests
* they are run for all 4 backends but only for the default
  versions of those backends
* failure of the quarantined tests will not cause failure of the
  whole job or limit constraints from being generated and updated

* Add pre-Airflow-2-7 hardcoded defaults for config for older providers

During thorough testing and review of moving configuration to provoders
I realised that there was a case that was not handled properly. In some
cases some providers and DAGs could rely on some default values being
available as default, but when we move them from core, and use older
version of provider those defaults were not available:

* they were remove as defaults in core
* the old providers did not have "config" section to contribute the
  defaults

This would be a breaking change and old providers (Celery, K8s) could
fail - as it happened in some tests.

This PR implements a nice solution to that, also allowing to remove
some manual fallbacks in Celery and Kubernetes executor code.

The solution is to add a hard-coded "pre-2.7" configuration which
would only contain "provider" pre-2.7 hard-coded defaults and make
it a fallback option if the values are neither set nor defaults
contributed by the providers.

We do not have to maintain those - the defaults are "frozen"
effectively at the values available just before 2.7. The nice side
effect is that we can remove a number of fallbacks, because this
hard-coded configuration becomes the fallback automatically,

That entirely solves the case where you want to install older
providers on 2.7 where config.yml does not contain those provider
values.

* Update airflow/configuration.py

Co-authored-by: Jed Cunningham <[email protected]>

---------

Co-authored-by: Jed Cunningham <[email protected]>
  • Loading branch information
potiuk and jedcunningham authored Jul 24, 2023
1 parent 031e394 commit 2ac237b
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 38 deletions.
10 changes: 5 additions & 5 deletions airflow/cli/cli_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ def string_lower_type(val):
("-c", "--concurrency"),
type=int,
help="The number of worker processes",
default=conf.getint("celery", "worker_concurrency", fallback=16),
default=conf.getint("celery", "worker_concurrency"),
)
ARG_CELERY_HOSTNAME = Arg(
("-H", "--celery-hostname"),
Expand All @@ -870,24 +870,24 @@ def string_lower_type(val):
ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API")
ARG_FLOWER_HOSTNAME = Arg(
("-H", "--hostname"),
default=conf.get("celery", "FLOWER_HOST", fallback="0.0.0.0"),
default=conf.get("celery", "FLOWER_HOST"),
help="Set the hostname on which to run the server",
)
ARG_FLOWER_PORT = Arg(
("-p", "--port"),
default=conf.getint("celery", "FLOWER_PORT", fallback=5555),
default=conf.getint("celery", "FLOWER_PORT"),
type=int,
help="The port on which to run the server",
)
ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower")
ARG_FLOWER_URL_PREFIX = Arg(
("-u", "--url-prefix"),
default=conf.get("celery", "FLOWER_URL_PREFIX", fallback=""),
default=conf.get("celery", "FLOWER_URL_PREFIX"),
help="URL prefix for Flower",
)
ARG_FLOWER_BASIC_AUTH = Arg(
("-A", "--basic-auth"),
default=conf.get("celery", "FLOWER_BASIC_AUTH", fallback=""),
default=conf.get("celery", "FLOWER_BASIC_AUTH"),
help=(
"Securing Flower with Basic Authentication. "
"Accepts user:password pairs separated by a comma. "
Expand Down
93 changes: 93 additions & 0 deletions airflow/config_templates/pre_2_7_defaults.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# This file contains pre Airflow 2.7, provider defaults for Airflow configuration.
# They are provided as fallback option to older version of the
# providers that might expect them to be present.
#
# NOTE !!!! Please DO NOT modify values in the file even if they change in corresponding
# providers. The values here should be treated as "read only" and should not be modified
# even if defaults in newer versions of corresponding Providers change.
# They are only here so that backwards compatible behaviour for old provider
# versions can be maintained.

[atlas]
sasl_enabled = False
host =
port = 21000
username =
password =

[hive]
default_hive_mapred_queue =

[local_kubernetes_executor]
kubernetes_queue = kubernetes

[celery_kubernetes_executor]
kubernetes_queue = kubernetes

[celery]
celery_app_name = airflow.executors.celery_executor
worker_concurrency = 16
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
broker_url = redis://redis:6379/0
result_backend_sqlalchemy_engine_options =
flower_host = 0.0.0.0
flower_url_prefix =
flower_port = 5555
flower_basic_auth =
sync_parallelism = 0
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
ssl_active = False
ssl_key =
ssl_cert =
ssl_cacert =
pool = prefork
operation_timeout = 1.0
task_track_started = True
task_publish_max_retries = 3
worker_precheck = False

[elasticsearch_configs]
use_ssl = False
verify_certs = True

[kubernetes_executor]
api_client_retry_configuration =
logs_task_metadata = False
pod_template_file =
worker_container_repository =
worker_container_tag =
namespace = default
delete_worker_pods = True
delete_worker_pods_on_failure = False
worker_pods_creation_batch_size = 1
multi_namespace_mode = False
multi_namespace_mode_namespace_list =
in_cluster = True
kube_client_request_args =
delete_option_kwargs =
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6
verify_ssl = True
worker_pods_queued_check_interval = 60
ssl_ca_cert =
27 changes: 26 additions & 1 deletion airflow/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def retrieve_configuration_description(
:param include_airflow: Include Airflow configs
:param include_providers: Include provider configs
:param selected_provider: If specified, include selected provider only
:param config_file_name: name of the file in "config_templates" directory to read default config from
:return: Python dictionary containing configs & their info
"""
base_configuration_description: dict[str, dict[str, Any]] = {}
Expand Down Expand Up @@ -208,6 +207,7 @@ def __init__(
# interpolation placeholders. The _default_values config parser will interpolate them
# properly when we call get() on it.
self._default_values = create_default_config_parser(self.configuration_description)
self._pre_2_7_default_values = create_pre_2_7_defaults()
if default_config is not None:
self._update_defaults_from_string(default_config)
self._update_logging_deprecated_template_to_one_from_defaults()
Expand Down Expand Up @@ -287,6 +287,10 @@ def get_default_value(self, section: str, key: str, fallback: Any = None, raw=Fa
return value.replace("%", "%%")
return value

def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any:
"""Get pre 2.7 default config values."""
return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs)

# These configuration elements can be fetched as the stdout of commands
# following the "{section}__{name}_cmd" pattern, the idea behind this
# is to not store password on boxes in text files.
Expand Down Expand Up @@ -1043,6 +1047,11 @@ def get( # type: ignore[override,misc]
# ...then the default config
if self.get_default_value(section, key) is not None or "fallback" in kwargs:
return expand_env_var(self.get_default_value(section, key, **kwargs))

if self.get_default_pre_2_7_value(section, key) is not None:
# no expansion needed
return self.get_default_pre_2_7_value(section, key, **kwargs)

if not suppress_warnings:
log.warning("section/key [%s/%s] not found in config", section, key)

Expand Down Expand Up @@ -1402,7 +1411,10 @@ def as_dict(
)

config_sources: ConfigSourcesType = {}

# We check sequentially all those sources and the last one we saw it in will "win"
configs: Iterable[tuple[str, ConfigParser]] = [
("default-pre-2-7", self._pre_2_7_default_values),
("default", self._default_values),
("airflow.cfg", self),
]
Expand Down Expand Up @@ -1735,6 +1747,7 @@ def _replace_section_config_with_display_sources(
if display_source:
updated_source_name = source_name
if source_name == "default":
# defaults can come from other sources (default-<PROVIDER>) that should be used here
source_description_section = configuration_description.get(section, {})
source_description_key = source_description_section.get("options", {}).get(k, {})
if source_description_key is not None:
Expand Down Expand Up @@ -1929,6 +1942,18 @@ def create_default_config_parser(configuration_description: dict[str, dict[str,
return parser


def create_pre_2_7_defaults() -> ConfigParser:
"""
Creates parser using the old defaults from Airflow < 2.7.0, in order to be able to fall-back to those
defaults when old version of provider, not supporting "config contribution" is installed with Airflow
2.7.0+. This "default" configuration does not support variable expansion, those are pretty much
hard-coded defaults we want to fall-back to in such case.
"""
config_parser = ConfigParser()
config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg"))
return config_parser


def write_default_airflow_configuration_if_needed() -> AirflowConfigParser:
if not os.path.isfile(AIRFLOW_CONFIG):
log.debug("Creating new Airflow config file in: %s", AIRFLOW_CONFIG)
Expand Down
2 changes: 1 addition & 1 deletion airflow/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ def _get_pod_namespace(ti: TaskInstance):
namespace = None
with suppress(Exception):
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
return namespace or conf.get("kubernetes_executor", "namespace")

def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
messages = []
Expand Down
2 changes: 1 addition & 1 deletion airflow/kubernetes/kubernetes_helper_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey:

@cache
def get_logs_task_metadata() -> bool:
return conf.getboolean("kubernetes_executor", "logs_task_metadata", fallback=False)
return conf.getboolean("kubernetes_executor", "logs_task_metadata")


def annotations_for_logging_task_metadata(annotation_set):
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/celery/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(self):
self.bulk_state_fetcher = BulkStateFetcher(self._sync_parallelism)
self.tasks = {}
self.task_publish_retries: Counter[TaskInstanceKey] = Counter()
self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries", fallback=3)
self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries")

def start(self) -> None:
self.log.debug("Starting Celery Executor using %s processes for syncing", self._sync_parallelism)
Expand Down
24 changes: 3 additions & 21 deletions airflow/providers/celery/executors/celery_executor_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,36 +57,18 @@

TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], Task]

# IMPORTANT NOTE! Celery Executor has initialization done dynamically and it performs initialization when
# it is imported, so we need fallbacks here in order to be able to import the class directly without
# having configuration initialized before. Do not remove those fallbacks!
#
# This is not strictly needed for production:
#
# * for Airflow 2.6 and before the defaults will come from the core defaults
# * for Airflow 2.7+ the defaults will be loaded via ProvidersManager
#
# But it helps in our tests to import the executor class and validate if the celery code can be imported
# in the current and older versions of Airflow.

OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout", fallback=1.0)
OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout")

# Make it constant for unit test.
CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state"

if conf.has_option("celery", "celery_config_options"):
celery_configuration = conf.getimport(
"celery",
"celery_config_options",
fallback="airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG",
)
celery_configuration = conf.getimport("celery", "celery_config_options")

else:
celery_configuration = DEFAULT_CELERY_CONFIG

celery_app_name = conf.get(
"celery", "CELERY_APP_NAME", fallback="airflow.providers.celery.executors.celery_executor"
)
celery_app_name = conf.get("celery", "CELERY_APP_NAME")
if celery_app_name == "airflow.executors.celery_executor":
warnings.warn(
"The celery.CELERY_APP_NAME configuration uses deprecated package name: "
Expand Down
2 changes: 1 addition & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def validate_session():
"""Validate ORM Session."""
global engine

worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False)
worker_precheck = conf.getboolean("celery", "worker_precheck")
if not worker_precheck:
return True
else:
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/log/file_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def _get_pod_namespace(ti: TaskInstance):
namespace = None
with suppress(Exception):
namespace = pod_override.metadata.namespace
return namespace or conf.get("kubernetes_executor", "namespace", fallback="default")
return namespace or conf.get("kubernetes_executor", "namespace")

def _get_log_retrieval_url(
self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None
Expand Down
9 changes: 3 additions & 6 deletions tests/core/test_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -1629,8 +1629,8 @@ def test_restore_and_reload_provider_configuration():
assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor"
conf.restore_core_default_configuration()
assert conf.providers_configuration_loaded is False
with pytest.raises(AirflowConfigException, match="not found"):
conf.get("celery", "celery_app_name")
# built-in pre-2-7 celery executor
assert conf.get("celery", "celery_app_name") == "airflow.executors.celery_executor"
conf.load_providers_configuration()
assert conf.providers_configuration_loaded is True
assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor"
Expand All @@ -1639,7 +1639,7 @@ def test_restore_and_reload_provider_configuration():
def test_error_when_contributing_to_existing_section():
from airflow.settings import conf

try:
with conf.make_sure_configuration_loaded(with_providers=True):
assert conf.providers_configuration_loaded is True
assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor"
conf.restore_core_default_configuration()
Expand All @@ -1665,6 +1665,3 @@ def test_error_when_contributing_to_existing_section():
):
conf.load_providers_configuration()
assert conf.get("celery", "celery_app_name") == "test"
finally:
conf.restore_core_default_configuration()
conf.load_providers_configuration()

0 comments on commit 2ac237b

Please sign in to comment.