diff --git a/airflow/cli/cli_config.py b/airflow/cli/cli_config.py index 30e71302832c..a68de35b86df 100644 --- a/airflow/cli/cli_config.py +++ b/airflow/cli/cli_config.py @@ -843,7 +843,7 @@ def string_lower_type(val): ("-c", "--concurrency"), type=int, help="The number of worker processes", - default=conf.getint("celery", "worker_concurrency", fallback=16), + default=conf.getint("celery", "worker_concurrency"), ) ARG_CELERY_HOSTNAME = Arg( ("-H", "--celery-hostname"), @@ -870,24 +870,24 @@ def string_lower_type(val): ARG_BROKER_API = Arg(("-a", "--broker-api"), help="Broker API") ARG_FLOWER_HOSTNAME = Arg( ("-H", "--hostname"), - default=conf.get("celery", "FLOWER_HOST", fallback="0.0.0.0"), + default=conf.get("celery", "FLOWER_HOST"), help="Set the hostname on which to run the server", ) ARG_FLOWER_PORT = Arg( ("-p", "--port"), - default=conf.getint("celery", "FLOWER_PORT", fallback=5555), + default=conf.getint("celery", "FLOWER_PORT"), type=int, help="The port on which to run the server", ) ARG_FLOWER_CONF = Arg(("-c", "--flower-conf"), help="Configuration file for flower") ARG_FLOWER_URL_PREFIX = Arg( ("-u", "--url-prefix"), - default=conf.get("celery", "FLOWER_URL_PREFIX", fallback=""), + default=conf.get("celery", "FLOWER_URL_PREFIX"), help="URL prefix for Flower", ) ARG_FLOWER_BASIC_AUTH = Arg( ("-A", "--basic-auth"), - default=conf.get("celery", "FLOWER_BASIC_AUTH", fallback=""), + default=conf.get("celery", "FLOWER_BASIC_AUTH"), help=( "Securing Flower with Basic Authentication. " "Accepts user:password pairs separated by a comma. " diff --git a/airflow/config_templates/pre_2_7_defaults.cfg b/airflow/config_templates/pre_2_7_defaults.cfg new file mode 100644 index 000000000000..abdef2ee078e --- /dev/null +++ b/airflow/config_templates/pre_2_7_defaults.cfg @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# This file contains pre Airflow 2.7, provider defaults for Airflow configuration. +# They are provided as fallback option to older version of the +# providers that might expect them to be present. +# +# NOTE !!!! Please DO NOT modify values in the file even if they change in corresponding +# providers. The values here should be treated as "read only" and should not be modified +# even if defaults in newer versions of corresponding Providers change. +# They are only here so that backwards compatible behaviour for old provider +# versions can be maintained. + +[atlas] +sasl_enabled = False +host = +port = 21000 +username = +password = + +[hive] +default_hive_mapred_queue = + +[local_kubernetes_executor] +kubernetes_queue = kubernetes + +[celery_kubernetes_executor] +kubernetes_queue = kubernetes + +[celery] +celery_app_name = airflow.executors.celery_executor +worker_concurrency = 16 +worker_prefetch_multiplier = 1 +worker_enable_remote_control = true +broker_url = redis://redis:6379/0 +result_backend_sqlalchemy_engine_options = +flower_host = 0.0.0.0 +flower_url_prefix = +flower_port = 5555 +flower_basic_auth = +sync_parallelism = 0 +celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG +ssl_active = False +ssl_key = +ssl_cert = +ssl_cacert = +pool = prefork +operation_timeout = 1.0 +task_track_started = True +task_publish_max_retries = 3 +worker_precheck = False + +[elasticsearch_configs] +use_ssl = False +verify_certs = True + +[kubernetes_executor] +api_client_retry_configuration = +logs_task_metadata = False +pod_template_file = +worker_container_repository = +worker_container_tag = +namespace = default +delete_worker_pods = True +delete_worker_pods_on_failure = False +worker_pods_creation_batch_size = 1 +multi_namespace_mode = False +multi_namespace_mode_namespace_list = +in_cluster = True +kube_client_request_args = +delete_option_kwargs = +enable_tcp_keepalive = True +tcp_keep_idle = 120 +tcp_keep_intvl = 30 +tcp_keep_cnt = 6 +verify_ssl = True +worker_pods_queued_check_interval = 60 +ssl_ca_cert = diff --git a/airflow/configuration.py b/airflow/configuration.py index d6492891d3dd..54ce39225dd8 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -163,7 +163,6 @@ def retrieve_configuration_description( :param include_airflow: Include Airflow configs :param include_providers: Include provider configs :param selected_provider: If specified, include selected provider only - :param config_file_name: name of the file in "config_templates" directory to read default config from :return: Python dictionary containing configs & their info """ base_configuration_description: dict[str, dict[str, Any]] = {} @@ -208,6 +207,7 @@ def __init__( # interpolation placeholders. The _default_values config parser will interpolate them # properly when we call get() on it. self._default_values = create_default_config_parser(self.configuration_description) + self._pre_2_7_default_values = create_pre_2_7_defaults() if default_config is not None: self._update_defaults_from_string(default_config) self._update_logging_deprecated_template_to_one_from_defaults() @@ -287,6 +287,10 @@ def get_default_value(self, section: str, key: str, fallback: Any = None, raw=Fa return value.replace("%", "%%") return value + def get_default_pre_2_7_value(self, section: str, key: str, **kwargs) -> Any: + """Get pre 2.7 default config values.""" + return self._pre_2_7_default_values.get(section, key, fallback=None, **kwargs) + # These configuration elements can be fetched as the stdout of commands # following the "{section}__{name}_cmd" pattern, the idea behind this # is to not store password on boxes in text files. @@ -1043,6 +1047,11 @@ def get( # type: ignore[override,misc] # ...then the default config if self.get_default_value(section, key) is not None or "fallback" in kwargs: return expand_env_var(self.get_default_value(section, key, **kwargs)) + + if self.get_default_pre_2_7_value(section, key) is not None: + # no expansion needed + return self.get_default_pre_2_7_value(section, key, **kwargs) + if not suppress_warnings: log.warning("section/key [%s/%s] not found in config", section, key) @@ -1402,7 +1411,10 @@ def as_dict( ) config_sources: ConfigSourcesType = {} + + # We check sequentially all those sources and the last one we saw it in will "win" configs: Iterable[tuple[str, ConfigParser]] = [ + ("default-pre-2-7", self._pre_2_7_default_values), ("default", self._default_values), ("airflow.cfg", self), ] @@ -1735,6 +1747,7 @@ def _replace_section_config_with_display_sources( if display_source: updated_source_name = source_name if source_name == "default": + # defaults can come from other sources (default-) that should be used here source_description_section = configuration_description.get(section, {}) source_description_key = source_description_section.get("options", {}).get(k, {}) if source_description_key is not None: @@ -1929,6 +1942,18 @@ def create_default_config_parser(configuration_description: dict[str, dict[str, return parser +def create_pre_2_7_defaults() -> ConfigParser: + """ + Creates parser using the old defaults from Airflow < 2.7.0, in order to be able to fall-back to those + defaults when old version of provider, not supporting "config contribution" is installed with Airflow + 2.7.0+. This "default" configuration does not support variable expansion, those are pretty much + hard-coded defaults we want to fall-back to in such case. + """ + config_parser = ConfigParser() + config_parser.read(_default_config_file_path("pre_2_7_defaults.cfg")) + return config_parser + + def write_default_airflow_configuration_if_needed() -> AirflowConfigParser: if not os.path.isfile(AIRFLOW_CONFIG): log.debug("Creating new Airflow config file in: %s", AIRFLOW_CONFIG) diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index c1f8861f895f..d525417a2715 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -379,7 +379,7 @@ def _get_pod_namespace(ti: TaskInstance): namespace = None with suppress(Exception): namespace = pod_override.metadata.namespace - return namespace or conf.get("kubernetes_executor", "namespace", fallback="default") + return namespace or conf.get("kubernetes_executor", "namespace") def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]: messages = [] diff --git a/airflow/kubernetes/kubernetes_helper_functions.py b/airflow/kubernetes/kubernetes_helper_functions.py index 4cd3422cb646..27762421ca51 100644 --- a/airflow/kubernetes/kubernetes_helper_functions.py +++ b/airflow/kubernetes/kubernetes_helper_functions.py @@ -128,7 +128,7 @@ def annotations_to_key(annotations: dict[str, str]) -> TaskInstanceKey: @cache def get_logs_task_metadata() -> bool: - return conf.getboolean("kubernetes_executor", "logs_task_metadata", fallback=False) + return conf.getboolean("kubernetes_executor", "logs_task_metadata") def annotations_for_logging_task_metadata(annotation_set): diff --git a/airflow/providers/celery/executors/celery_executor.py b/airflow/providers/celery/executors/celery_executor.py index 0786c9553853..d587be44db03 100644 --- a/airflow/providers/celery/executors/celery_executor.py +++ b/airflow/providers/celery/executors/celery_executor.py @@ -104,7 +104,7 @@ def __init__(self): self.bulk_state_fetcher = BulkStateFetcher(self._sync_parallelism) self.tasks = {} self.task_publish_retries: Counter[TaskInstanceKey] = Counter() - self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries", fallback=3) + self.task_publish_max_retries = conf.getint("celery", "task_publish_max_retries") def start(self) -> None: self.log.debug("Starting Celery Executor using %s processes for syncing", self._sync_parallelism) diff --git a/airflow/providers/celery/executors/celery_executor_utils.py b/airflow/providers/celery/executors/celery_executor_utils.py index e970174a658c..c414a88b0c22 100644 --- a/airflow/providers/celery/executors/celery_executor_utils.py +++ b/airflow/providers/celery/executors/celery_executor_utils.py @@ -57,36 +57,18 @@ TaskInstanceInCelery = Tuple[TaskInstanceKey, CommandType, Optional[str], Task] -# IMPORTANT NOTE! Celery Executor has initialization done dynamically and it performs initialization when -# it is imported, so we need fallbacks here in order to be able to import the class directly without -# having configuration initialized before. Do not remove those fallbacks! -# -# This is not strictly needed for production: -# -# * for Airflow 2.6 and before the defaults will come from the core defaults -# * for Airflow 2.7+ the defaults will be loaded via ProvidersManager -# -# But it helps in our tests to import the executor class and validate if the celery code can be imported -# in the current and older versions of Airflow. - -OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout", fallback=1.0) +OPERATION_TIMEOUT = conf.getfloat("celery", "operation_timeout") # Make it constant for unit test. CELERY_FETCH_ERR_MSG_HEADER = "Error fetching Celery task state" if conf.has_option("celery", "celery_config_options"): - celery_configuration = conf.getimport( - "celery", - "celery_config_options", - fallback="airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG", - ) + celery_configuration = conf.getimport("celery", "celery_config_options") else: celery_configuration = DEFAULT_CELERY_CONFIG -celery_app_name = conf.get( - "celery", "CELERY_APP_NAME", fallback="airflow.providers.celery.executors.celery_executor" -) +celery_app_name = conf.get("celery", "CELERY_APP_NAME") if celery_app_name == "airflow.executors.celery_executor": warnings.warn( "The celery.CELERY_APP_NAME configuration uses deprecated package name: " diff --git a/airflow/settings.py b/airflow/settings.py index f3242def491d..e51cd208d9f7 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -396,7 +396,7 @@ def validate_session(): """Validate ORM Session.""" global engine - worker_precheck = conf.getboolean("celery", "worker_precheck", fallback=False) + worker_precheck = conf.getboolean("celery", "worker_precheck") if not worker_precheck: return True else: diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index fb540ed9ecb0..50938f96cb0d 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -356,7 +356,7 @@ def _get_pod_namespace(ti: TaskInstance): namespace = None with suppress(Exception): namespace = pod_override.metadata.namespace - return namespace or conf.get("kubernetes_executor", "namespace", fallback="default") + return namespace or conf.get("kubernetes_executor", "namespace") def _get_log_retrieval_url( self, ti: TaskInstance, log_relative_path: str, log_type: LogType | None = None diff --git a/tests/core/test_configuration.py b/tests/core/test_configuration.py index 6eb50d919f10..91102003a57b 100644 --- a/tests/core/test_configuration.py +++ b/tests/core/test_configuration.py @@ -1629,8 +1629,8 @@ def test_restore_and_reload_provider_configuration(): assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor" conf.restore_core_default_configuration() assert conf.providers_configuration_loaded is False - with pytest.raises(AirflowConfigException, match="not found"): - conf.get("celery", "celery_app_name") + # built-in pre-2-7 celery executor + assert conf.get("celery", "celery_app_name") == "airflow.executors.celery_executor" conf.load_providers_configuration() assert conf.providers_configuration_loaded is True assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor" @@ -1639,7 +1639,7 @@ def test_restore_and_reload_provider_configuration(): def test_error_when_contributing_to_existing_section(): from airflow.settings import conf - try: + with conf.make_sure_configuration_loaded(with_providers=True): assert conf.providers_configuration_loaded is True assert conf.get("celery", "celery_app_name") == "airflow.providers.celery.executors.celery_executor" conf.restore_core_default_configuration() @@ -1665,6 +1665,3 @@ def test_error_when_contributing_to_existing_section(): ): conf.load_providers_configuration() assert conf.get("celery", "celery_app_name") == "test" - finally: - conf.restore_core_default_configuration() - conf.load_providers_configuration()