Executor¶
Executor 是运行 任务实例 的机制。它们拥有一个通用 API 并且是“可插拔的”,意味着你可以根据你的安装需求更换 Executor。
Executor 由配置文件 中 [core]
部分的 executor
选项设置。
内置 Executor 通过名称引用,例如
[core]
executor = KubernetesExecutor
自定义或第三方 Executor 可以通过提供 Executor Python 类的模块路径来配置,例如
[core]
executor = my.custom.executor.module.ExecutorClass
注意
有关 Airflow 配置的更多信息,请参阅 设置配置选项。
如果你想检查当前设置的是哪个 Executor,你可以使用 airflow config get-value core executor
命令
$ airflow config get-value core executor
LocalExecutor
Executor 类型¶
在代码仓库中,只有一种类型的 Executor 在*本地*(调度器进程内部)运行任务,但可以编写自定义 Executor 来实现类似的结果,也有些 Executor 在*远程*运行任务(通常通过工作进程池)。Airflow 默认配置为 LocalExecutor
,这是一个本地 Executor,也是最简单的执行选项。然而,由于 LocalExecutor
在调度器进程中运行进程,这可能会影响调度器的性能。你可以将 LocalExecutor
用于小型、单机生产安装,或将远程 Executor 之一用于多机/云端安装。
本地 Executor¶
Airflow 任务在调度器进程内部本地运行。
优点: 非常易于使用,速度快,延迟极低,设置要求少。
缺点: 功能有限,并与 Airflow 调度器共享资源。
示例:
远程 Executor¶
远程 Executor 可以进一步分为两类
队列/批处理 Executor
Airflow 任务被发送到一个中心队列,远程工作进程从队列中拉取任务执行。工作进程通常是持久的,并同时运行多个任务。
优点: 更健壮,因为它将工作进程与调度器进程解耦。工作进程可以是大型主机,可以处理许多任务(通常并行),这具有成本效益。延迟可以相对较低,因为可以随时配置工作进程运行,以便立即从队列中获取任务。
缺点: 共享工作进程存在“吵闹邻居”问题,即任务在共享主机上竞争资源或竞争环境/系统的配置方式。如果你的工作负载不恒定,它们也可能很昂贵,你可能会有空闲的工作进程,资源过度扩展,或者你必须管理它们的伸缩。
示例:
EdgeExecutor (实验性预发布)
容器化 Executor
Airflow 任务在容器/Pod 内即时执行。每个任务都在其自己的容器化环境中隔离,该环境在 Airflow 任务排队时部署。
优点: 每个 Airflow 任务都隔离到一个容器中,因此没有“吵闹邻居”问题。执行环境可以针对特定任务进行定制(系统库、二进制文件、依赖项、资源量等)。具有成本效益,因为工作进程仅在任务持续期间存活。
缺点: 启动时存在延迟,因为容器或 Pod 需要在任务开始前部署。如果你运行许多短/小任务,可能会很昂贵。没有工作进程需要管理,但你必须管理像 Kubernetes 集群这样的东西。
示例:
注意
新的 Airflow 用户可能认为他们需要使用本地或远程 Executor 之一运行单独的 Executor 进程。这是不正确的。Executor 逻辑运行在调度器进程*内部*,并根据选择的 Executor 本地或不本地运行任务。
并行使用多个 Executor¶
从版本 2.10.0 开始,Airflow 现在可以采用多 Executor 配置运行。每个 Executor 都有自己的一组优点和缺点,它们通常是延迟、隔离和计算效率等属性之间的权衡(比较 Executor 请参见此处)。运行多个 Executor 使你能更好地利用所有可用 Executor 的优势并避免其劣势。换句话说,你可以针对特定的任务集使用特定的 Executor,在该用例中,该 Executor 的特定优点和益处最能发挥作用。
配置¶
配置多个 Executor 使用与单个 Executor 用例相同的配置选项(如此处所述),利用逗号分隔列表表示法来指定多个 Executor。
注意
列表中的第一个 Executor(无论是单独使用还是与其他 Executor 一起使用)的行为将与 2.10.0 之前版本中的行为相同。换句话说,这将是环境的默认 Executor。任何未指定特定 Executor 的 Airflow Task 或 DAG 都将使用此环境级 Executor。列表中的所有其他 Executor 都将被初始化,并在 Airflow Task 或 DAG 上指定时准备好运行任务。如果在此配置列表中未指定 Executor,则不能用它来运行任务。
一些有效的多 Executor 配置示例
[core]
executor = LocalExecutor
[core]
executor = LocalExecutor,CeleryExecutor
[core]
executor = KubernetesExecutor,my.custom.module.ExecutorClass
注意
目前不支持使用 _同一_ Executor 类的两个实例。
为了更容易在任务和 DAGs 上指定 Executor,Executor 配置现在支持别名。然后你可以使用此别名在 DAGs 中引用 Executor(参见下文)。
[core]
executor = LocalExecutor,ShortName:my.custom.module.ExecutorClass
注意
如果 DAG 指定任务使用未配置的 Executor,则 DAG 将解析失败,并在 Airflow UI 中显示警告对话框。请确保所有您希望使用的 Executor 都已在运行任何 Airflow 组件(调度器、工作进程等)的主机/容器上的 Airflow 配置中指定。
编写 DAGs 和任务¶
要为任务指定 Executor,请使用 Airflow Operator 上的 executor
参数
BashOperator(
task_id="hello_world",
executor="LocalExecutor",
bash_command="echo 'hello world!'",
)
@task(executor="LocalExecutor")
def hello_world():
print("hello world!")
要为整个 DAG 指定 Executor,请使用 Airflow 现有的默认参数机制。然后,DAG 中的所有任务都将使用指定的 Executor(除非由特定任务明确覆盖)。
def hello_world():
print("hello world!")
def hello_world_again():
print("hello world again!")
with DAG(
dag_id="hello_worlds",
default_args={"executor": "LocalExecutor"}, # Applies to all tasks in the DAG
) as dag:
# All tasks will use the executor from default args automatically
hw = hello_world()
hw_again = hello_world_again()
注意
任务将其配置为运行的 Executor 存储在 Airflow 数据库中。更改在每次解析 DAG 后反映。
监控¶
使用单个 Executor 时,Airflow 指标的行为将与 <2.9 版本相同。但如果配置了多个 Executor,则每个配置的 Executor 都将发布 Executor 指标(executor.open_slots
、executor.queued_slots
和 executor.running_tasks
),并在指标名称后附加 Executor 名称(例如 executor.open_slots.<executor class name>
)。
日志记录的工作方式与单个 Executor 用例相同。
静态编码混合 Executor¶
目前有两个“静态编码”的 Executor,这些 Executor 是两个不同 Executor 的混合体:LocalKubernetesExecutor 和 CeleryKubernetesExecutor。它们的实现不是 Airflow 核心原生的。这些混合 Executor 转而利用任务实例上的 queue
字段来指示并持久化应在其上运行的子 Executor。这是对 queue
字段的滥用,并且在使用这些混合 Executor 时无法将其用于其预期目的。
像这样的 Executor 也需要手工编写新的“具体”类来创建 Executor 可能组合的每种排列。随着更多 Executor 的创建,这是不可持续的,并导致更多的维护开销。使用任何组合的 Executor 都不应需要定制编码工作。
因此,不再推荐使用这些类型的 Executor。
编写自己的 Executor¶
所有 Airflow Executor 都实现了一个通用接口,使其可插拔,并且任何 Executor 都可以访问 Airflow 中的所有能力和集成。主要地,Airflow 调度器使用此接口与 Executor 交互,但其他组件如日志记录和 CLI 也这样做。公共接口是 BaseExecutor
。你可以查阅代码以获取最详细和最新的接口,但一些重要亮点如下所述。
注意
有关 Airflow 公共接口的更多信息,请参阅 Airflow 公共接口。
你可能想要编写自定义 Executor 的一些原因包括
没有现有 Executor 适合你的特定用例,例如特定的计算工具或服务。
你想使用一个利用你首选云提供商的计算服务的 Executor。
你有一个仅供你或你的组织使用的任务执行专用工具/服务。
工作负载¶
在 Executor 的上下文中,工作负载 (workload) 是 Executor 的基本执行单元。它代表了 Executor 在工作进程上运行的离散操作或作业。例如,它可以在工作进程上运行封装在 Airflow 任务中的用户代码。
示例
ExecuteTask(
token="mock",
ti=TaskInstance(
id=UUID("4d828a62-a417-4936-a7a6-2b3fabacecab"),
task_id="mock",
dag_id="mock",
run_id="mock",
try_number=1,
map_index=-1,
pool_slots=1,
queue="default",
priority_weight=1,
executor_config=None,
parent_context_carrier=None,
context_carrier=None,
queued_dttm=None,
),
dag_rel_path=PurePosixPath("mock.py"),
bundle_info=BundleInfo(name="n/a", version="no matter"),
log_path="mock.log",
type="ExecuteTask",
)
重要 BaseExecutor 方法¶
这些方法不需要覆盖即可实现自己的 Executor,但了解它们很有用
heartbeat
: Airflow 调度器 Job 循环会定期在 Executor 上调用 heartbeat。这是 Airflow 调度器与 Executor 之间的主要交互点之一。此方法会更新一些指标,触发新排队的任务执行,并更新正在运行/已完成任务的状态。queue_workload
: Airflow Executor 会调用 BaseExecutor 的此方法来提供由 Executor 运行的任务。BaseExecutor 只需将工作负载(参阅上文了解)添加到 Executor 内部的待运行工作负载列表中。代码仓库中存在的所有 Executor 都使用此方法。get_event_buffer
: Airflow 调度器调用此方法来检索 Executor 正在执行的任务实例 (TaskInstances) 的当前状态。has_task
: 调度器使用此 BaseExecutor 方法来确定 Executor 是否已将某个特定任务实例排队或正在运行。send_callback
: 将任何回调发送到 Executor 上配置的接收端 (sink)。
必须实现的方法¶
你的 Executor 至少必须覆盖以下方法才能得到 Airflow 的支持
sync
: Sync 会在 Executor heartbeat 期间定期被调用。实现此方法以更新 Executor 知道的任务的状态。可选地,尝试执行从调度器接收到的已排队任务。execute_async
: 异步执行一个工作负载 (workload)。此方法在 Executor heartbeat 期间被调用(经过几层调用),而 heartbeat 是由调度器定期运行的。实际上,此方法通常只是将任务排入 Executor 的内部或外部任务队列中等待运行(例如KubernetesExecutor
)。但它也可以直接执行任务(例如LocalExecutor
)。这取决于具体的 Executor。
可选实现接口方法¶
不需要覆盖以下方法即可拥有功能正常的 Airflow Executor。但是,实现它们可以带来一些强大的功能和稳定性
start
: Airflow 调度器 Job 在初始化 Executor 对象后会调用此方法。Executor 所需的任何额外设置都可以在此处完成。end
: Airflow 调度器 Job 在关闭时会调用此方法。任何需要完成运行中 Job 的同步清理工作都应在此处完成。terminate
: 更强制地停止 Executor,甚至杀死/停止正在进行的任务,而不是同步等待完成。try_adopt_task_instances
: 通过此方法,将已放弃的任务(例如来自死掉的调度器 Job)提供给 Executor 来接管或以其他方式处理。任何无法接管的任务(默认情况下 BaseExecutor 假设所有任务都无法接管)都应返回。get_cli_commands
: Executor 可以通过实现此方法向用户提供 CLI 命令,更多详细信息请参阅下文的 CLI 部分。get_task_log
: Executor 可以通过实现此方法向 Airflow 任务日志提供日志消息,更多详细信息请参阅下文的 日志记录 部分。
兼容性属性¶
BaseExecutor
类接口包含一组属性,Airflow 核心代码使用这些属性来检查你的 Executor 兼容的功能。在编写自己的 Airflow Executor 时,请确保针对你的用例正确设置这些属性。每个属性只是一个布尔值,用于启用/禁用某个功能或指示 Executor 是否支持/不支持某个功能
supports_pickling
: Executor 是否支持在执行前从数据库读取 pickled DAGs(而不是从文件系统读取 DAG 定义)。supports_sentry
: Executor 是否支持 Sentry。is_local
: Executor 是远程还是本地。参见上文的 Executor 类型 部分。is_single_threaded
: Executor 是否是单线程的。这与支持哪些数据库后端特别相关。单线程 Executor 可以与任何后端一起运行,包括 SQLite。is_production
: Executor 是否应用于生产目的。当用户使用非生产就绪的 Executor 时,会在 UI 中显示消息。serve_logs
: Executor 是否支持提供日志,参见 任务日志记录。
CLI¶
Executor 可以通过实现 get_cli_commands
方法提供 CLI 命令,这些命令将包含在 airflow
命令行工具中。例如,CeleryExecutor
和 KubernetesExecutor
等 Executor 利用了此机制。这些命令可用于设置所需的工作进程、初始化环境或设置其他配置。只为当前配置的 Executor 提供命令。从 Executor 提供 CLI 命令的伪代码示例如下所示
@staticmethod
def get_cli_commands() -> list[GroupCommand]:
sub_commands = [
ActionCommand(
name="command_name",
help="Description of what this specific command does",
func=lazy_load_command("path.to.python.function.for.command"),
args=(),
),
]
return [
GroupCommand(
name="my_cool_executor",
help="Description of what this group of commands do",
subcommands=sub_commands,
),
]
注意
目前,Airflow 命令命名空间没有严格的规则。开发者应为其 CLI 命令使用足够独特的名称,以免与其他 Airflow Executor 或组件发生冲突。
注意
创建新 Executor 或更新现有 Executor 时,请务必不要在模块级别导入或执行任何昂贵的操作/代码。Executor 类在多个地方被导入,如果导入速度慢,将对你的 Airflow 环境的性能产生负面影响,特别是对于 CLI 命令。
日志记录¶
Executor 可以通过实现 get_task_logs
方法提供日志消息,这些消息将包含在 Airflow 任务日志中。如果执行环境在任务失败时包含额外上下文,这可能会有所帮助,因为失败可能归因于执行环境本身而非 Airflow 任务代码。包含来自执行环境的设置/拆卸日志记录也可能有所帮助。KubernetesExecutor
利用此功能包含运行特定 Airflow 任务的 pod 的日志,并在该 Airflow 任务的日志中显示它们。从 Executor 提供任务日志的伪代码示例如下所示
def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
messages = []
log = []
try:
res = helper_function_to_fetch_logs_from_execution_env(ti, try_number)
for line in res:
log.append(remove_escape_codes(line.decode()))
if log:
messages.append("Found logs from execution environment!")
except Exception as e: # No exception should cause task logs to fail
messages.append(f"Failed to find logs from execution environment: {e}")
return messages, ["\n".join(log)]
下一步¶
创建实现 BaseExecutor
接口的新 Executor 类后,你可以通过将 core.executor
配置值设置为你的 Executor 的模块路径来配置 Airflow 使用它
[core]
executor = my_company.executors.MyCustomExecutor