配置参考

此页面包含 apache-airflow-providers-celery provider 的所有可用 Airflow 配置列表,这些配置可以在 airflow.cfg 文件中或使用环境变量进行设置。

注意

provider 中嵌入的配置从 Airflow 2.7.0 版本开始使用。之前,配置是在 Airflow core package 中描述和配置的——因此,如果您使用的是低于 2.7.0 的 Airflow 版本,请查阅 Airflow 文档以获取 Airflow core 中可用的配置选项列表。

注意

更多信息请参阅 设置配置选项

[celery]

本节仅当您在上面的 [core] 小节中使用 CeleryExecutor 时适用

broker_url

Celery broker URL。Celery 支持 RabbitMQ、Redis 和实验性的 sqlalchemy 数据库。有关更多信息,请参阅 Celery 文档。

类型:

字符串

默认值:

redis://redis:6379/0

环境变量:

AIRFLOW__CELERY__BROKER_URL

AIRFLOW__CELERY__BROKER_URL_CMD

AIRFLOW__CELERY__BROKER_URL_SECRET

celery_app_name

Celery 将使用的应用程序名称

类型:

字符串

默认值:

airflow.providers.celery.executors.celery_executor

环境变量:

AIRFLOW__CELERY__CELERY_APP_NAME

celery_config_options

Celery 配置选项的导入路径

类型:

字符串

默认值:

airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG

环境变量:

AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS

extra_celery_config

要包含在 Celery worker 中的额外 Celery 配置。任何 Celery 配置都可以添加到此配置中,并在启动 Celery worker 时应用。例如:{"worker_max_tasks_per_child": 10} 另请参阅:https://docs.celeryq.dev/en/stable/userguide/configuration.html#configuration-and-defaults

类型:

字符串

默认值:

{}

环境变量:

AIRFLOW__CELERY__EXTRA_CELERY_CONFIG

flower_basic_auth

使用基本认证保护 Flower 接受以逗号分隔的 user:password 对

类型:

字符串

默认值:

''

环境变量:

AIRFLOW__CELERY__FLOWER_BASIC_AUTH

AIRFLOW__CELERY__FLOWER_BASIC_AUTH_CMD

AIRFLOW__CELERY__FLOWER_BASIC_AUTH_SECRET

示例:

user1:password1,user2:password2

flower_host

Celery Flower 是一个美观的 Celery UI。Airflow 有一个快捷方式来启动它:airflow celery flower。这定义了 Celery Flower 运行的 IP 地址

类型:

字符串

默认值:

0.0.0.0

环境变量:

AIRFLOW__CELERY__FLOWER_HOST

flower_port

这定义了 Celery Flower 运行的端口

类型:

字符串

默认值:

5555

环境变量:

AIRFLOW__CELERY__FLOWER_PORT

flower_url_prefix

Flower 的根 URL

类型:

字符串

默认值:

''

环境变量:

AIRFLOW__CELERY__FLOWER_URL_PREFIX

示例:

/flower

operation_timeout

send_task_to_executorfetch_celery_task_state 操作超时之前等待的秒数。

类型:

浮点数

默认值:

1.0

环境变量:

AIRFLOW__CELERY__OPERATION_TIMEOUT

pool

Celery Pool 实现。选项包括:prefork(默认)、eventletgeventsolo。请参阅:https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency https://docs.celeryq.dev/en/latest/userguide/concurrency/eventlet.html

类型:

字符串

默认值:

prefork

环境变量:

AIRFLOW__CELERY__POOL

result_backend

Celery 的 result_backend。当一个作业完成时,它需要更新作业的元数据。因此,它会将消息发布到消息总线,或将其插入数据库(取决于后端)。调度程序使用此状态来更新任务的状态。强烈建议使用数据库。未指定时,将使用带有 db+ 方案前缀的 sql_alchemy_conn。请参阅:https://docs.celeryq.dev/en/latest/userguide/configuration.html#task-result-backend-settings

类型:

字符串

默认值:

None

环境变量:

AIRFLOW__CELERY__RESULT_BACKEND

AIRFLOW__CELERY__RESULT_BACKEND_CMD

AIRFLOW__CELERY__RESULT_BACKEND_SECRET

示例:

db+postgresql://postgres:airflow@postgres/airflow

result_backend_sqlalchemy_engine_options

传递给 Celery result backend SQLAlchemy 引擎的可选配置字典。

类型:

字符串

默认值:

''

环境变量:

AIRFLOW__CELERY__RESULT_BACKEND_SQLALCHEMY_ENGINE_OPTIONS

示例:

{"pool_recycle": 1800}

ssl_active

类型:

字符串

默认值:

False

环境变量:

AIRFLOW__CELERY__SSL_ACTIVE

ssl_cacert

CA 证书的路径。

类型:

字符串

默认值:

''

环境变量:

AIRFLOW__CELERY__SSL_CACERT

ssl_cert

客户端证书的路径。

类型:

字符串

默认值:

''

环境变量:

AIRFLOW__CELERY__SSL_CERT

ssl_key

客户端密钥的路径。

类型:

字符串

默认值:

''

环境变量:

AIRFLOW__CELERY__SSL_KEY

sync_parallelism

CeleryExecutor 用于同步任务状态的进程数。0 表示使用 max(1, number of cores - 1) 个进程。

类型:

字符串

默认值:

0

环境变量:

AIRFLOW__CELERY__SYNC_PARALLELISM

task_acks_late

添加于版本 3.6.0。

如果 Airflow 任务的执行时间超过 visibility_timeout,Celery 将重新分配该任务给另一个 Celery worker,即使原始任务仍在成功运行。新的任务实例将与原始任务同时运行,Airflow UI 和日志只会显示错误消息:“Task Instance Not Running” FAILED: Task is in the running state”。将 task_acks_late 设置为 True 将强制 Celery 等待任务完成后才分配新的任务实例。这有效地覆盖了 visibility timeout。另请参阅:https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late

类型:

布尔值

默认值:

True

环境变量:

AIRFLOW__CELERY__TASK_ACKS_LATE

示例:

True

task_publish_max_retries

由于 AirflowTaskTimeout 错误导致向 broker 发布任务消息失败时,在放弃并将任务标记为失败之前,尝试重新发布的次数上限。

类型:

整数

默认值:

3

环境变量:

AIRFLOW__CELERY__TASK_PUBLISH_MAX_RETRIES

task_track_started

Celery 任务在由 worker 执行时,会将其状态报告为“started”。这在 Airflow 中用于跟踪正在运行的任务,并且如果 Scheduler 重启或在 HA 模式下运行,它可以接管先前 SchedulerJob 启动的孤立任务。

类型:

布尔值

默认值:

True

环境变量:

AIRFLOW__CELERY__TASK_TRACK_STARTED

worker_autoscale

将用于根据负载动态调整进程池大小的最大和最小进程数。通过 airflow celery worker 命令提供 max_concurrency,min_concurrency 来启用自动扩缩(始终保持最小进程数,必要时增长到最大)。根据 worker box 的资源和任务的性质选择这些数字。如果 autoscale 选项可用,worker_concurrency 将被忽略。请参阅:https://docs.celeryq.dev/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale

类型:

字符串

默认值:

None

环境变量:

AIRFLOW__CELERY__WORKER_AUTOSCALE

示例:

16,12

worker_concurrency

使用 airflow celery worker 命令启动 worker 时将使用的并发度。这定义了一个 worker 将处理的任务实例数量,因此根据您的 worker box 资源和任务性质来调整您的 worker 大小

类型:

字符串

默认值:

16

环境变量:

AIRFLOW__CELERY__WORKER_CONCURRENCY

worker_enable_remote_control

指定是否启用对 worker 的远程控制。在某些 broker 不支持远程控制的情况下,Celery 会创建大量的 .*reply-celery-pidbox 队列。您可以通过将此设置为 false 来防止这种情况。但是,禁用此选项后 Flower 将无法工作。请参阅:https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html#broker-overview

类型:

布尔值

默认值:

true

环境变量:

AIRFLOW__CELERY__WORKER_ENABLE_REMOTE_CONTROL

worker_prefetch_multiplier

用于增加 worker 预取任务的数量,从而提高性能。进程数乘以 worker_prefetch_multiplier 就是 worker 预取的任务数量。如果存在多个 worker,并且一个 worker 预取了位于长时间运行任务之后的任务,而另一个 worker 的进程未被充分利用,无法处理已认领的被阻塞任务,那么大于 1 的值可能会导致任务不必要地被阻塞。请参阅:https://docs.celeryq.dev/en/stable/userguide/optimizing.html#prefetch-limits

类型:

整数

默认值:

1

环境变量:

AIRFLOW__CELERY__WORKER_PREFETCH_MULTIPLIER

[celery_broker_transport_options]

本节用于指定可以传递给底层 celery broker transport 的选项。请参阅:https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-broker_transport_options

sentinel_kwargs

sentinel_kwargs 参数允许将额外选项传递给 Sentinel 客户端。在 Redis Sentinel 用作 broker 且 Redis 服务器受密码保护的典型场景中,需要通过此参数传递密码。虽然其类型为字符串,但要求传递一个符合字典格式的字符串。请参阅:https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration

类型:

字符串

默认值:

None

环境变量:

AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS

AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS_CMD

AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS_SECRET

示例:

{"password": "password_for_redis_server"}

visibility_timeout

visibility timeout 定义了等待 worker 确认任务的秒数,在此之前消息将被重新发送给另一个 worker。请确保增加 visibility timeout 以匹配您计划使用的最长 ETA 时间。visibility_timeout 仅支持 Redis 和 SQS celery broker。请参阅:https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout

类型:

字符串

默认值:

None

环境变量:

AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT

示例:

21600

[celery_kubernetes_executor]

本节仅当您在上面的 [core] 小节中使用 CeleryKubernetesExecutor 时适用

kubernetes_queue

定义使用 CeleryKubernetesExecutor 时何时将任务发送到 KubernetesExecutor。当任务的队列是 kubernetes_queue 的值(默认为 kubernetes)时,任务通过 KubernetesExecutor 执行,否则通过 CeleryExecutor 执行

类型:

字符串

默认值:

kubernetes

环境变量:

AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE

本条目是否有帮助?