执行器¶
执行器是 任务实例 运行的机制。它们具有通用的 API 并且是“可插拔的”,这意味着您可以根据安装需要交换执行器。
执行器通过 配置文件 的 [core]
部分中的 executor
选项进行设置。
内置执行器通过名称引用,例如
[core]
executor = KubernetesExecutor
可以通过提供执行器 Python 类的模块路径来配置自定义或第三方执行器,例如
[core]
executor = my.custom.executor.module.ExecutorClass
注意
有关 Airflow 配置的更多信息,请参阅 设置配置选项。
如果要检查当前设置的执行器,可以使用 airflow config get-value core executor
命令
$ airflow config get-value core executor
SequentialExecutor
执行器类型¶
执行器有两种类型 - 在 调度器
进程内部本地运行任务的执行器,以及远程运行任务(通常通过 worker 池)的执行器。默认情况下,Airflow 配置了 SequentialExecutor
,这是一个本地执行器,也是最简单的执行选项。但是,SequentialExecutor
不适用于生产环境,因为它不允许并行任务运行,并且因此,某些 Airflow 功能(例如,运行传感器)将无法正常工作。对于小型单机生产安装,您应该使用 LocalExecutor
,或者对于多机/云安装,则使用远程执行器之一。
远程执行器¶
远程执行器可以进一步分为两类
排队/批量执行器
Airflow 任务被发送到中央队列,远程 worker 从中拉取任务以执行。通常,worker 是持久的,并且一次运行多个任务。
优点:由于您将 worker 与调度器进程解耦,因此更加健壮。worker 可以是大型主机,可以处理许多任务(通常是并行处理),这具有成本效益。由于可以配置 worker 始终运行以立即从队列中获取任务,因此延迟可能相对较低。
缺点:共享 worker 存在嘈杂的邻居问题,即任务在共享主机上争夺资源或争夺环境/系统的配置方式。如果您的工作负载不是恒定的,它们也可能很昂贵,您可能会有 worker 处于空闲状态,资源过度扩展,或者您必须管理它们的向上和向下扩展。
示例:
容器化执行器
Airflow 任务在容器/Pod 中临时执行。每个任务都隔离在其自己的容器化环境中,该环境在 Airflow 任务排队时部署。
优点:每个 Airflow 任务都隔离到一个容器中,因此没有嘈杂的邻居问题。可以为特定任务自定义执行环境(系统库、二进制文件、依赖项、资源量等)。具有成本效益,因为 worker 仅在任务持续时间内处于活动状态。
缺点:启动时存在延迟,因为容器或 Pod 需要在任务开始之前部署。如果您运行许多短/小任务,可能会很昂贵。没有要管理的 worker,但是您必须管理类似 Kubernetes 集群的内容。
示例:
注意
新的 Airflow 用户可能会认为他们需要使用本地或远程执行器之一运行单独的执行器进程。这是不正确的。执行器逻辑在调度器进程内部运行,并且将根据选择的执行器在本地运行任务或不运行任务。
同时使用多个执行器¶
警告
多执行器配置目前是一个 alpha/实验性功能,可能会在没有警告的情况下发生更改。
从 2.10.0 版本开始,Airflow 现在可以使用多执行器配置运行。每个执行器都有自己的一组优点和缺点,通常它们是延迟、隔离和计算效率等其他属性之间的权衡(有关执行器的比较,请参阅 此处)。运行多个执行器可以让您更好地利用所有可用执行器的优势,并避免它们的弱点。换句话说,您可以针对特定的一组任务使用特定的执行器,其中它的特定优点和好处对于该用例最有意义。
配置¶
配置多个执行器使用与单个执行器用例相同的配置选项(如 此处 所述),利用逗号分隔列表表示法来指定多个执行器。
注意
列表中的第一个执行器(无论是单独使用还是与其他执行器一起使用)的行为方式与 2.10.0 之前的版本相同。换句话说,这将是环境的默认执行器。任何未指定特定执行器的 Airflow 任务或 DAG 都将使用此环境级别执行器。如果 Airflow 任务或 DAG 中指定,列表中的所有其他执行器都将初始化并准备好运行任务。如果未在此配置列表中指定执行器,则不能使用它来运行任务。
一些有效的多执行器配置示例
[core]
executor = 'LocalExecutor'
[core]
executor = 'LocalExecutor,CeleryExecutor'
[core]
executor = 'KubernetesExecutor,my.custom.module.ExecutorClass'
注意
目前不支持使用 _相同_ 执行器类的两个实例。
为了更容易在任务和 DAG 上指定执行器,执行器配置现在支持别名。然后,您可以使用此别名在 DAG 中引用执行器(请参阅下文)。
[core]
executor = 'LocalExecutor,my.custom.module.ExecutorClass:ShortName'
注意
如果 DAG 指定某个任务使用未配置的执行器,则 DAG 将无法解析,并且 Airflow UI 中将显示警告对话框。请确保在运行 Airflow 组件(调度器、worker 等)的任何主机/容器上的 Airflow 配置中指定所有要使用的执行器。
编写 DAG 和任务¶
要为任务指定执行器,请使用 Airflow 操作符上的执行器参数
BashOperator(
task_id="hello_world",
executor="LocalExecutor",
bash_command="echo 'hello world!'",
)
@task(executor="LocalExecutor")
def hello_world():
print("hello world!")
要为整个 DAG 指定执行器,请使用现有的默认参数 Airflow 机制。然后,DAG 中的所有任务都将使用指定的执行器(除非特定任务显式覆盖)
def hello_world():
print("hello world!")
def hello_world_again():
print("hello world again!")
with DAG(
dag_id="hello_worlds",
default_args={"executor": "LocalExecutor"}, # Applies to all tasks in the DAG
) as dag:
# All tasks will use the executor from default args automatically
hw = hello_world()
hw_again = hello_world_again()
注意
任务将它们配置为在其上运行的执行器存储在 Airflow 数据库中。每次解析 DAG 后都会反映更改。
监控¶
当使用单个执行器时,Airflow 指标的行为方式与 < 2.9 时相同。但是,如果配置了多个执行器,则将为配置的每个执行器发布执行器指标(executor.open_slots
、executor.queued_slots
和 executor.running_tasks
),并将执行器名称附加到指标名称(例如,executor.open_slots.<executor class name>
)。
日志记录的工作方式与单个执行器用例相同。
静态编码的混合执行器¶
目前有两个“静态编码”的执行器,这些执行器是两个不同执行器的混合体:LocalKubernetesExecutor 和 CeleryKubernetesExecutor。它们的实现不是 Airflow 核心的本地或内在实现。这些混合执行器改为使用任务实例上的 queue
字段来指示和持久化要在哪个子执行器上运行。这是对 queue
字段的滥用,使得在使用这些混合执行器时无法将其用于其预期目的。
诸如此类的执行器还需要手工制作新的“具体”类,以创建每个可能的执行器组合的排列。随着创建更多执行器,这是站不住脚的,并且会导致更多的维护开销。不应需要定制的编码工作来使用任何执行器组合。
因此,不再建议使用这些类型的执行器。
编写自己的执行器¶
所有 Airflow 执行器都实现了一个通用接口,因此它们是可插拔的,并且任何执行器都可以访问 Airflow 中的所有功能和集成。主要是,Airflow 调度器使用此接口与执行器进行交互,但其他组件(如日志记录、CLI 和回填)也是如此。公共接口是 BaseExecutor
。您可以查看代码以获取最详细和最新的接口,但下面概述了一些重要亮点。
注意
有关 Airflow 公共接口的更多信息,请参阅Airflow 的公共接口。
您可能想要编写自定义执行器的一些原因包括
不存在适合您的特定用例的执行器,例如用于计算的特定工具或服务。
您想使用利用您首选云提供商的计算服务的执行器。
您有一个用于任务执行的私有工具/服务,仅供您或您的组织使用。
重要的 BaseExecutor 方法¶
这些方法不需要重写即可实现您自己的执行器,但了解它们很有用
heartbeat
:Airflow 调度器 Job 循环将定期调用执行器上的 heartbeat。这是 Airflow 调度器和执行器之间交互的主要点之一。此方法会更新一些指标、触发新排队的任务执行并更新正在运行/已完成任务的状态。queue_command
:Airflow 执行器将调用 BaseExecutor 的此方法,以提供要由执行器运行的任务。BaseExecutor 只是将 TaskInstance 添加到执行器内部的排队任务列表中。get_event_buffer
:Airflow 调度器调用此方法以检索执行器正在执行的 TaskInstance 的当前状态。has_task
:调度器使用此 BaseExecutor 方法来确定执行器是否已将特定的任务实例排队或正在运行。send_callback
:将任何回调发送到执行器上配置的接收器。
必须实现的方法¶
必须至少重写以下方法,以便您的执行器受 Airflow 支持
sync
:同步将在执行器心跳期间定期调用。实现此方法以更新执行器知道的任务状态。可选地,尝试执行从调度器接收到的已排队任务。execute_async
:异步执行命令。此上下文中的命令是用于运行 Airflow 任务的 Airflow CLI 命令。此方法在执行器心跳期间(在几层之后)被调用,该心跳由调度器定期运行。在实践中,此方法通常只是将任务排队到要运行的任务的内部或外部队列中(例如,KubernetesExecutor
)。但也可以直接执行任务(例如,LocalExecutor
)。这将取决于执行器。
可选的接口实现方法¶
以下方法不是必须重写才能拥有可用的 Airflow 执行器。但是,实现它们可以带来一些强大的功能和稳定性
start
:Airflow 调度器(和回填)作业将在初始化执行器对象后调用此方法。可以在此处完成执行器所需的任何其他设置。end
:Airflow 调度器(和回填)作业将在其关闭时调用此方法。应在此处完成完成运行作业所需的任何同步清理。terminate
:更强制地停止执行器,甚至杀死/停止正在进行的任务,而不是同步等待完成。cleanup_stuck_queued_tasks
:如果任务在排队状态中停留的时间长于task_queued_timeout
,则它们将由调度器收集并提供给执行器,以通过此方法有机会处理它们(执行任何正常清理/拆卸)并返回 Task Instance,以便向用户显示警告消息。try_adopt_task_instances
:已放弃的任务(例如,来自已死掉的调度器作业的任务)将提供给执行器以通过此方法采用或以其他方式处理它们。应返回任何无法采用的任务(默认情况下,BaseExecutor 假定所有任务都无法采用)。get_cli_commands
:执行器可以通过实现此方法向用户提供 CLI 命令,有关更多详细信息,请参见下面的CLI部分。get_task_log
:执行器可以通过实现此方法向 Airflow 任务日志提供日志消息,有关更多详细信息,请参见下面的日志记录部分。
兼容性属性¶
BaseExecutor
类接口包含一组属性,Airflow 核心代码使用这些属性来检查您的执行器兼容的功能。在编写自己的 Airflow 执行器时,请确保为您的用例正确设置这些属性。每个属性只是一个布尔值,用于启用/禁用功能或指示执行器是否支持/不支持功能
supports_pickling
:执行器是否支持在执行之前从数据库读取 pickled DAG(而不是从文件系统读取 DAG 定义)。supports_sentry
:执行器是否支持 Sentry。is_local
:执行器是远程的还是本地的。请参见上面的执行器类型部分。is_single_threaded
:执行器是否是单线程的。这与支持哪些数据库后端尤其相关。单线程执行器可以使用任何后端运行,包括 SQLite。is_production
:执行器是否应用于生产目的。当用户使用非生产就绪的执行器时,会向用户显示一条 UI 消息。change_sensor_mode_to_reschedule
:在 poke 模式下运行 Airflow 传感器会阻塞执行器的线程,在某些情况下会阻塞 Airflow。serve_logs
:执行器是否支持提供日志,请参见任务的日志记录。
CLI¶
执行器可以通过实现 get_cli_commands
方法来提供将包含在 airflow
命令行工具中的 CLI 命令。例如,CeleryExecutor
和 KubernetesExecutor
等执行器利用了此机制。这些命令可用于设置所需的工作人员、初始化环境或设置其他配置。命令仅针对当前配置的执行器提供。下面可以看到从执行器实现 CLI 命令提供的伪代码示例
@staticmethod
def get_cli_commands() -> list[GroupCommand]:
sub_commands = [
ActionCommand(
name="command_name",
help="Description of what this specific command does",
func=lazy_load_command("path.to.python.function.for.command"),
args=(),
),
]
return [
GroupCommand(
name="my_cool_executor",
help="Description of what this group of commands do",
subcommands=sub_commands,
),
]
注意
目前,对于 Airflow 命令命名空间没有严格的规则。由开发人员为他们的 CLI 命令使用足够唯一的名称,以免与其他 Airflow 执行器或组件造成冲突。
注意
在创建新的执行器或更新任何现有执行器时,请务必不要在模块级别导入或执行任何昂贵的操作/代码。执行器类在多个位置导入,如果它们导入速度很慢,这将对您的 Airflow 环境的性能产生负面影响,尤其是对于 CLI 命令。
日志记录¶
执行器可以通过实现 get_task_logs
方法来提供将包含在 Airflow 任务日志中的日志消息。如果执行环境在任务失败的情况下有额外的上下文,这可能会有所帮助,这可能是由于执行环境本身而不是 Airflow 任务代码造成的。包括执行环境中的设置/拆卸日志也可能有所帮助。KubernetesExecutor
利用此功能来包括来自运行特定 Airflow 任务的 pod 的日志,并将它们显示在该 Airflow 任务的日志中。下面可以看到从执行器实现任务日志提供的伪代码示例
def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
messages = []
log = []
try:
res = helper_function_to_fetch_logs_from_execution_env(ti, try_number)
for line in res:
log.append(remove_escape_codes(line.decode()))
if log:
messages.append("Found logs from execution environment!")
except Exception as e: # No exception should cause task logs to fail
messages.append(f"Failed to find logs from execution environment: {e}")
return messages, ["\n".join(log)]