From 2ac237b3eba93ed0c5fa15bced690f42d7444897 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Mon, 24 Jul 2023 20:32:04 +0200 Subject: [PATCH] Add pre-Airflow-2-7 hardcoded defaults for config for older providers (#32775) * Quarantine test_backfill_integration in dask executor The test has been recently failing with deadlock (see #32778) and needs thorough looking at if we want to find the root cause/remedium. In the meantime it looks like a niche case connected with Dask Executor that is rather obsure and we have no expertise in solving problems with and diagnosing, so until the problem is diagnosed it might be a long time (and maybe even we decide not to care about it and let Dask community take a look and either fix or ignore it. We aim to have a very low number of those Quarantined tests (currently we have 1 and we have not run it for a while as this was a mysql test run on Postgres) but we have now the opportunity to also improve the quarantined tests framework. This test will be run together with other (1) quarantined test and: * they will not be run in our regular tests * they are run sequentially not in parallel with all other tests * they are run for all 4 backends but only for the default versions of those backends * failure of the quarantined tests will not cause failure of the whole job or limit constraints from being generated and updated * Add pre-Airflow-2-7 hardcoded defaults for config for older providers During thorough testing and review of moving configuration to provoders I realised that there was a case that was not handled properly. In some cases some providers and DAGs could rely on some default values being available as default, but when we move them from core, and use older version of provider those defaults were not available: * they were remove as defaults in core * the old providers did not have "config" section to contribute the defaults This would be a breaking change and old providers (Celery, K8s) could fail - as it happened in some tests. This PR implements a nice solution to that, also allowing to remove some manual fallbacks in Celery and Kubernetes executor code. The solution is to add a hard-coded "pre-2.7" configuration which would only contain "provider" pre-2.7 hard-coded defaults and make it a fallback option if the values are neither set nor defaults contributed by the providers. We do not have to maintain those - the defaults are "frozen" effectively at the values available just before 2.7. The nice side effect is that we can remove a number of fallbacks, because this hard-coded configuration becomes the fallback automatically, That entirely solves the case where you want to install older providers on 2.7 where config.yml does not contain those provider values. * Update airflow/configuration.py Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --------- Co-authored-by: Jed Cunningham <66968678+jedcunningham@users.noreply.github.com> --- airflow/cli/cli_config.py | 10 +- airflow/config_templates/pre_2_7_defaults.cfg | 93 +++++++++++++++++++ airflow/configuration.py | 27 +++++- airflow/executors/kubernetes_executor.py | 2 +- .../kubernetes/kubernetes_helper_functions.py | 2 +- .../celery/executors/celery_executor.py | 2 +- .../celery/executors/celery_executor_utils.py | 24 +---- airflow/settings.py | 2 +- airflow/utils/log/file_task_handler.py | 2 +- tests/core/test_configuration.py | 9 +- 10 files changed, 135 insertions(+), 38 deletions(-) create mode 100644 airflow/config_templates/pre_2_7_defaults.cfg diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 30e71302832c..a68de35b86df 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -843,7 +843,7 @@ def string_lower_type(val): ("-c", "--concurrency"), type=int, help="The number of worker processes", - default=conf.getint("celery", "worker_concurrency", fallback=16), + default=conf.getint("celery", "worker_concurrency"), ) ARG_CELERY_HOSTNAME = Arg( ("-H", "--celery-hostname"), @@ -870,24 +870,24 @@ def string_lower_type(val): ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API") ARG_FLOWER_HOSTNAME = Arg( ("-H", "--hostname"), - default=conf.get("celery", "FLOWER_HOST", fallback="0.0.0.0"), + default=conf.get("celery", "FLOWER_HOST"), help="Set the hostname on which to run the server", ) ARG_FLOWER_PORT = Arg( ("-p", "--port"), - default=conf.getint("celery", "FLOWER_PORT", fallback=5555), + default=conf.getint("celery", "FLOWER_PORT"), type=int, help="The port on which to run the server", ) ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower") ARG_FLOWER_URL_PREFIX = Arg( ("-u", "--url-prefix"), - default=conf.get("celery", "FLOWER_URL_PREFIX", fallback=""), + default=conf.get("celery", "FLOWER_URL_PREFIX"), help="URL prefix for Flower", ) ARG_FLOWER_BASIC_AUTH = Arg( ("-A", "--basic-auth"), - default=conf.get("celery", "FLOWER_BASIC_AUTH", fallback=""), + default=conf.get("celery", "FLOWER_BASIC_AUTH"), help=( "Securing Flower with Basic Authentication. " "Accepts user:password pairs separated by a comma. " diff --git a/airflow/config_templates/pre_2_7_defaults.cfg b/airflow/config_templates/pre_2_7_defaults.cfg new file mode 100644 index 000000000000..abdef2ee078e --- /dev/null +++ b/airflow/config_templates/pre_2_7_defaults.cfg @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file contains pre Airflow 2.7, provider defaults for Airflow configuration. +# They are provided as fallback option to older version of the +# providers that might expect them to be present. +# +# NOTE !!!! Please DO NOT modify values in the file even if they change in corresponding +# providers. The values here should be treated as "read only" and should not be modified +# even if defaults in newer versions of corresponding Providers change. +# They are only here so that backwards compatible behaviour for old provider +# versions can be maintained. + +[atlas] +sasl_enabled = False +host = +port = 21000 +username = +password = + +[hive] +default_hive_mapred_queue = + +[local_kubernetes_executor] +kubernetes_queue = kubernetes + +[celery_kubernetes_executor] +kubernetes_queue = kubernetes + +[celery] +celery_app_name = airflow.executors.celery_executor +worker_concurrency = 16 +worker_prefetch_multiplier = 1 +worker_enable_remote_control = true +broker_url = redis://redis:6379/0 +result_backend_sqlalchemy_engine_options = +flower_host = 0.0.0.0 +flower_url_prefix = +flower_port = 5555 +flower_basic_auth = +sync_parallelism = 0 +celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG +ssl_active = False +ssl_key = +ssl_cert = +ssl_cacert = +pool = prefork +operation_timeout = 1.0 +task_track_started = True +task_publish_max_retries = 3 +worker_precheck = False + +[elasticsearch_configs] +use_ssl = False +verify_certs = True + +[kubernetes_executor] +api_client_retry_configuration = +logs_task_metadata = False +pod_template_file = +worker_container_repository = +worker_container_tag = +namespace = default +delete_worker_pods = True +delete_worker_pods_on_failure = False +worker_pods_creation_batch_size = 1 +multi_namespace_mode = False +multi_namespace_mode_namespace_list = +in_cluster = True +kube_client_request_args = +delete_option_kwargs = +enable_tcp_keepalive = True +tcp_keep_idle = 120 +tcp_keep_intvl = 30 +tcp_keep_cnt = 6 +verify_ssl = True +worker_pods_queued_check_interval = 60 +ssl_ca_cert = diff --git a/airflow/configuration.py b/airflow/configuration.py index d6492891d3dd..54ce39225dd8 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -163,7 +163,6 @@ def retrieve_configuration_description( :param include_airflow: Include Airflow configs :param include_providers: Include provider configs :param selected_provider: If specified, include selected provider only - :param config_file_name: name of the file in "config_templates" directory to read default config from :return: Python dictionary containing configs & their info """ base_configuration_description: dict[str, dict[str, Any]] = {} @@ -208,6 +207,7 @@ def __init__( # interpolation placeholders. The _default_values config parser will interpolate them # properly when we call get() on it. self._default_values = create_default_config_parser(self.configuration_description) + self._pre_2_7_default_values = create_pre_2_7_defaults() if default_config is not None: self._update_defaults_from_string(default_config) self._update_logging_deprecated_template_to_one_from_defaults() @@ -287,6 +287,10 @@ def get_default_value(self, section: str, key: str, fallback: Any = None, raw=Fa return value.replace("%", "%%") return value + def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any: + """Get pre 2.7 default config values.""" + return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs) + # These configuration elements can be fetched as the stdout of commands # following the "{section}__{name}_cmd" pattern, the idea behind this # is to not store password on boxes in text files. @@ -1043,6 +1047,11 @@ def get( # type: ignore[override,misc] # ...then the default config if self.get_default_value(section, key) is not None or "fallback" in kwargs: return expand_env_var(self.get_default_value(section, key, **kwargs)) + + if self.get_default_pre_2_7_value(section, key) is not None: + # no expansion needed + return self.get_default_pre_2_7_value(section, key, **kwargs) + if not suppress_warnings: log.warning("section/key [%s/%s] not found in config", section, key) @@ -1402,7 +1411,10 @@ def as_dict( ) config_sources: ConfigSourcesType = {} + + # We check sequentially all those sources and the last one we saw it in will "win" configs: Iterable[tuple[str, ConfigParser]] = [ + ("default-pre-2-7", self._pre_2_7_default_values), ("default", self._default_values), ("airflow.cfg", self), ] @@ -1735,6 +1747,7 @@ def _replace_section_config_with_display_sources( if display_source: updated_source_name = source_name if source_name == "default": + # defaults can come from other sources (default-) that should be used here source_description_section = configuration_description.get(section, {}) source_description_key = source_description_section.get("options", {}).get(k, {}) if source_description_key is not None: @@ -1929,6 +1942,18 @@ def create_default_config_parser(configuration_description: dict[str, dict[str, return parser +def create_pre_2_7_defaults() -> ConfigParser: + """ + Creates parser using the old defaults from Airflow < 2.7.0, in order to be able to fall-back to those + defaults when old version of provider, not supporting "config contribution" is installed with Airflow + 2.7.0+. This "default" configuration does not support variable expansion, those are pretty much + hard-coded defaults we want to fall-back to in such case. + """ + config_parser = ConfigParser() + config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg")) + return config_parser + + def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: if not os.path.isfile(AIRFLOW_CONFIG): log.debug("Creating new Airflow config file in: %s", AIRFLOW_CONFIG) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index c1f8861f895f..d525417a2715 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -379,7 +379,7 @@ def _get_pod_namespace(ti: TaskInstance): namespace = None with suppress(Exception): namespace = pod_override.metadata.namespace - return namespace or conf.get("kubernetes_executor", "namespace", fallback="default") + return namespace or conf.get("kubernetes_executor", "namespace") def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: messages = [] diff --git a/airflow/kubernetes/kubernetes_helper_functions.py b/airflow/kubernetes/kubernetes_helper_functions.py index 4cd3422cb646..27762421ca51 100644 --- a/airflow/kubernetes/kubernetes_helper_functions.py +++ b/airflow/kubernetes/kubernetes_helper_functions.py @@ -128,7 +128,7 @@ def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey: @cache def get_logs_task_metadata() -> bool: - return conf.getboolean("kubernetes_executor", "logs_task_metadata", fallback=False) + return conf.getboolean("kubernetes_executor", "logs_task_metadata") def annotations_for_logging_task_metadata(annotation_set): diff --git a/airflow/providers/celery/executors/celery_executor.py b/airflow/providers/celery/executors/celery_executor.py index 0786c9553853..d587be44db03 100644 --- a/airflow/providers/celery/executors/celery_executor.py +++ b/airflow/providers/celery/executors/celery_executor.py @@ -104,7 +104,7 @@ def __init__(self): self.bulk_state_fetcher = BulkStateFetcher(self._sync_parallelism) self.tasks = {} self.task_publish_retries: Counter[TaskInstanceKey] = Counter() - self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries", fallback=3) + self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries") def start(self) -> None: self.log.debug("Starting Celery Executor using %s processes for syncing", self._sync_parallelism) diff --git a/airflow/providers/celery/executors/celery_executor_utils.py b/airflow/providers/celery/executors/celery_executor_utils.py index e970174a658c..c414a88b0c22 100644 --- a/airflow/providers/celery/executors/celery_executor_utils.py +++ b/airflow/providers/celery/executors/celery_executor_utils.py @@ -57,36 +57,18 @@ TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], Task] -# IMPORTANT NOTE! Celery Executor has initialization done dynamically and it performs initialization when -# it is imported, so we need fallbacks here in order to be able to import the class directly without -# having configuration initialized before. Do not remove those fallbacks! -# -# This is not strictly needed for production: -# -# * for Airflow 2.6 and before the defaults will come from the core defaults -# * for Airflow 2.7+ the defaults will be loaded via ProvidersManager -# -# But it helps in our tests to import the executor class and validate if the celery code can be imported -# in the current and older versions of Airflow. - -OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout", fallback=1.0) +OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout") # Make it constant for unit test. CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state" if conf.has_option("celery", "celery_config_options"): - celery_configuration = conf.getimport( - "celery", - "celery_config_options", - fallback="airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG", - ) + celery_configuration = conf.getimport("celery", "celery_config_options") else: celery_configuration = DEFAULT_CELERY_CONFIG -celery_app_name = conf.get( - "celery", "CELERY_APP_NAME", fallback="airflow.providers.celery.executors.celery_executor" -) +celery_app_name = conf.get("celery", "CELERY_APP_NAME") if celery_app_name == "airflow.executors.celery_executor": warnings.warn( "The celery.CELERY_APP_NAME configuration uses deprecated package name: " diff --git a/airflow/settings.py b/airflow/settings.py index f3242def491d..e51cd208d9f7 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -396,7 +396,7 @@ def validate_session(): """Validate ORM Session.""" global engine - worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False) + worker_precheck = conf.getboolean("celery", "worker_precheck") if not worker_precheck: return True else: diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index fb540ed9ecb0..50938f96cb0d 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -356,7 +356,7 @@ def _get_pod_namespace(ti: TaskInstance): namespace = None with suppress(Exception): namespace = pod_override.metadata.namespace - return namespace or conf.get("kubernetes_executor", "namespace", fallback="default") + return namespace or conf.get("kubernetes_executor", "namespace") def _get_log_retrieval_url( self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index 6eb50d919f10..91102003a57b 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -1629,8 +1629,8 @@ def test_restore_and_reload_provider_configuration(): assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor" conf.restore_core_default_configuration() assert conf.providers_configuration_loaded is False - with pytest.raises(AirflowConfigException, match="not found"): - conf.get("celery", "celery_app_name") + # built-in pre-2-7 celery executor + assert conf.get("celery", "celery_app_name") == "airflow.executors.celery_executor" conf.load_providers_configuration() assert conf.providers_configuration_loaded is True assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor" @@ -1639,7 +1639,7 @@ def test_restore_and_reload_provider_configuration(): def test_error_when_contributing_to_existing_section(): from airflow.settings import conf - try: + with conf.make_sure_configuration_loaded(with_providers=True): assert conf.providers_configuration_loaded is True assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor" conf.restore_core_default_configuration() @@ -1665,6 +1665,3 @@ def test_error_when_contributing_to_existing_section(): ): conf.load_providers_configuration() assert conf.get("celery", "celery_app_name") == "test" - finally: - conf.restore_core_default_configuration() - conf.load_providers_configuration()