配置参考¶
此页面包含 apache-airflow-providers-celery
provider 的所有可用 Airflow 配置列表,这些配置可以在 airflow.cfg
文件中或使用环境变量进行设置。
注意
provider 中嵌入的配置从 Airflow 2.7.0 版本开始使用。之前,配置是在 Airflow core package 中描述和配置的——因此,如果您使用的是低于 2.7.0 的 Airflow 版本,请查阅 Airflow 文档以获取 Airflow core 中可用的配置选项列表。
注意
更多信息请参阅 设置配置选项。
[celery]¶
本节仅当您在上面的 [core]
小节中使用 CeleryExecutor 时适用
broker_url¶
Celery broker URL。Celery 支持 RabbitMQ、Redis 和实验性的 sqlalchemy 数据库。有关更多信息,请参阅 Celery 文档。
- 类型:
字符串
- 默认值:
redis://redis:6379/0
- 环境变量:
AIRFLOW__CELERY__BROKER_URL
AIRFLOW__CELERY__BROKER_URL_CMD
AIRFLOW__CELERY__BROKER_URL_SECRET
celery_app_name¶
Celery 将使用的应用程序名称
- 类型:
字符串
- 默认值:
airflow.providers.celery.executors.celery_executor
- 环境变量:
AIRFLOW__CELERY__CELERY_APP_NAME
celery_config_options¶
Celery 配置选项的导入路径
- 类型:
字符串
- 默认值:
airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG
- 环境变量:
AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS
extra_celery_config¶
要包含在 Celery worker 中的额外 Celery 配置。任何 Celery 配置都可以添加到此配置中,并在启动 Celery worker 时应用。例如:{"worker_max_tasks_per_child": 10}
另请参阅:https://docs.celeryq.dev/en/stable/userguide/configuration.html#configuration-and-defaults
- 类型:
字符串
- 默认值:
{}
- 环境变量:
AIRFLOW__CELERY__EXTRA_CELERY_CONFIG
flower_basic_auth¶
使用基本认证保护 Flower 接受以逗号分隔的 user:password 对
- 类型:
字符串
- 默认值:
''
- 环境变量:
AIRFLOW__CELERY__FLOWER_BASIC_AUTH
AIRFLOW__CELERY__FLOWER_BASIC_AUTH_CMD
AIRFLOW__CELERY__FLOWER_BASIC_AUTH_SECRET
- 示例:
user1:password1,user2:password2
flower_host¶
Celery Flower 是一个美观的 Celery UI。Airflow 有一个快捷方式来启动它:airflow celery flower
。这定义了 Celery Flower 运行的 IP 地址
- 类型:
字符串
- 默认值:
0.0.0.0
- 环境变量:
AIRFLOW__CELERY__FLOWER_HOST
flower_port¶
这定义了 Celery Flower 运行的端口
- 类型:
字符串
- 默认值:
5555
- 环境变量:
AIRFLOW__CELERY__FLOWER_PORT
flower_url_prefix¶
Flower 的根 URL
- 类型:
字符串
- 默认值:
''
- 环境变量:
AIRFLOW__CELERY__FLOWER_URL_PREFIX
- 示例:
/flower
operation_timeout¶
在 send_task_to_executor
或 fetch_celery_task_state
操作超时之前等待的秒数。
- 类型:
浮点数
- 默认值:
1.0
- 环境变量:
AIRFLOW__CELERY__OPERATION_TIMEOUT
pool¶
Celery Pool 实现。选项包括:prefork
(默认)、eventlet
、gevent
或 solo
。请参阅:https://docs.celeryq.dev/en/latest/userguide/workers.html#concurrency https://docs.celeryq.dev/en/latest/userguide/concurrency/eventlet.html
- 类型:
字符串
- 默认值:
prefork
- 环境变量:
AIRFLOW__CELERY__POOL
result_backend¶
Celery 的 result_backend。当一个作业完成时,它需要更新作业的元数据。因此,它会将消息发布到消息总线,或将其插入数据库(取决于后端)。调度程序使用此状态来更新任务的状态。强烈建议使用数据库。未指定时,将使用带有 db+ 方案前缀的 sql_alchemy_conn。请参阅:https://docs.celeryq.dev/en/latest/userguide/configuration.html#task-result-backend-settings
- 类型:
字符串
- 默认值:
None
- 环境变量:
AIRFLOW__CELERY__RESULT_BACKEND
AIRFLOW__CELERY__RESULT_BACKEND_CMD
AIRFLOW__CELERY__RESULT_BACKEND_SECRET
- 示例:
db+postgresql://postgres:airflow@postgres/airflow
result_backend_sqlalchemy_engine_options¶
传递给 Celery result backend SQLAlchemy 引擎的可选配置字典。
- 类型:
字符串
- 默认值:
''
- 环境变量:
AIRFLOW__CELERY__RESULT_BACKEND_SQLALCHEMY_ENGINE_OPTIONS
- 示例:
{"pool_recycle": 1800}
ssl_active¶
- 类型:
字符串
- 默认值:
False
- 环境变量:
AIRFLOW__CELERY__SSL_ACTIVE
ssl_cacert¶
CA 证书的路径。
- 类型:
字符串
- 默认值:
''
- 环境变量:
AIRFLOW__CELERY__SSL_CACERT
ssl_cert¶
客户端证书的路径。
- 类型:
字符串
- 默认值:
''
- 环境变量:
AIRFLOW__CELERY__SSL_CERT
ssl_key¶
客户端密钥的路径。
- 类型:
字符串
- 默认值:
''
- 环境变量:
AIRFLOW__CELERY__SSL_KEY
sync_parallelism¶
CeleryExecutor 用于同步任务状态的进程数。0 表示使用 max(1, number of cores - 1) 个进程。
- 类型:
字符串
- 默认值:
0
- 环境变量:
AIRFLOW__CELERY__SYNC_PARALLELISM
task_acks_late¶
添加于版本 3.6.0。
如果 Airflow 任务的执行时间超过 visibility_timeout,Celery 将重新分配该任务给另一个 Celery worker,即使原始任务仍在成功运行。新的任务实例将与原始任务同时运行,Airflow UI 和日志只会显示错误消息:“Task Instance Not Running” FAILED: Task is in the running state”。将 task_acks_late 设置为 True 将强制 Celery 等待任务完成后才分配新的任务实例。这有效地覆盖了 visibility timeout。另请参阅:https://docs.celeryq.dev/en/stable/reference/celery.app.task.html#celery.app.task.Task.acks_late
- 类型:
布尔值
- 默认值:
True
- 环境变量:
AIRFLOW__CELERY__TASK_ACKS_LATE
- 示例:
True
task_publish_max_retries¶
由于 AirflowTaskTimeout
错误导致向 broker 发布任务消息失败时,在放弃并将任务标记为失败之前,尝试重新发布的次数上限。
- 类型:
整数
- 默认值:
3
- 环境变量:
AIRFLOW__CELERY__TASK_PUBLISH_MAX_RETRIES
task_track_started¶
Celery 任务在由 worker 执行时,会将其状态报告为“started”。这在 Airflow 中用于跟踪正在运行的任务,并且如果 Scheduler 重启或在 HA 模式下运行,它可以接管先前 SchedulerJob 启动的孤立任务。
- 类型:
布尔值
- 默认值:
True
- 环境变量:
AIRFLOW__CELERY__TASK_TRACK_STARTED
worker_autoscale¶
将用于根据负载动态调整进程池大小的最大和最小进程数。通过 airflow celery worker
命令提供 max_concurrency,min_concurrency 来启用自动扩缩(始终保持最小进程数,必要时增长到最大)。根据 worker box 的资源和任务的性质选择这些数字。如果 autoscale 选项可用,worker_concurrency 将被忽略。请参阅:https://docs.celeryq.dev/en/latest/reference/celery.bin.worker.html#cmdoption-celery-worker-autoscale
- 类型:
字符串
- 默认值:
None
- 环境变量:
AIRFLOW__CELERY__WORKER_AUTOSCALE
- 示例:
16,12
worker_concurrency¶
使用 airflow celery worker
命令启动 worker 时将使用的并发度。这定义了一个 worker 将处理的任务实例数量,因此根据您的 worker box 资源和任务性质来调整您的 worker 大小
- 类型:
字符串
- 默认值:
16
- 环境变量:
AIRFLOW__CELERY__WORKER_CONCURRENCY
worker_enable_remote_control¶
指定是否启用对 worker 的远程控制。在某些 broker 不支持远程控制的情况下,Celery 会创建大量的 .*reply-celery-pidbox
队列。您可以通过将此设置为 false 来防止这种情况。但是,禁用此选项后 Flower 将无法工作。请参阅:https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/index.html#broker-overview
- 类型:
布尔值
- 默认值:
true
- 环境变量:
AIRFLOW__CELERY__WORKER_ENABLE_REMOTE_CONTROL
worker_prefetch_multiplier¶
用于增加 worker 预取任务的数量,从而提高性能。进程数乘以 worker_prefetch_multiplier 就是 worker 预取的任务数量。如果存在多个 worker,并且一个 worker 预取了位于长时间运行任务之后的任务,而另一个 worker 的进程未被充分利用,无法处理已认领的被阻塞任务,那么大于 1 的值可能会导致任务不必要地被阻塞。请参阅:https://docs.celeryq.dev/en/stable/userguide/optimizing.html#prefetch-limits
- 类型:
整数
- 默认值:
1
- 环境变量:
AIRFLOW__CELERY__WORKER_PREFETCH_MULTIPLIER
[celery_broker_transport_options]¶
本节用于指定可以传递给底层 celery broker transport 的选项。请参阅:https://docs.celeryq.dev/en/latest/userguide/configuration.html#std:setting-broker_transport_options
sentinel_kwargs¶
sentinel_kwargs 参数允许将额外选项传递给 Sentinel 客户端。在 Redis Sentinel 用作 broker 且 Redis 服务器受密码保护的典型场景中,需要通过此参数传递密码。虽然其类型为字符串,但要求传递一个符合字典格式的字符串。请参阅:https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#configuration
- 类型:
字符串
- 默认值:
None
- 环境变量:
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS_CMD
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__SENTINEL_KWARGS_SECRET
- 示例:
{"password": "password_for_redis_server"}
visibility_timeout¶
visibility timeout 定义了等待 worker 确认任务的秒数,在此之前消息将被重新发送给另一个 worker。请确保增加 visibility timeout 以匹配您计划使用的最长 ETA 时间。visibility_timeout 仅支持 Redis 和 SQS celery broker。请参阅:https://docs.celeryq.dev/en/stable/getting-started/backends-and-brokers/redis.html#visibility-timeout
- 类型:
字符串
- 默认值:
None
- 环境变量:
AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT
- 示例:
21600
[celery_kubernetes_executor]¶
本节仅当您在上面的 [core]
小节中使用 CeleryKubernetesExecutor
时适用
kubernetes_queue¶
定义使用 CeleryKubernetesExecutor
时何时将任务发送到 KubernetesExecutor
。当任务的队列是 kubernetes_queue
的值(默认为 kubernetes
)时,任务通过 KubernetesExecutor
执行,否则通过 CeleryExecutor
执行
- 类型:
字符串
- 默认值:
kubernetes
- 环境变量:
AIRFLOW__CELERY_KUBERNETES_EXECUTOR__KUBERNETES_QUEUE