PythonOperator¶
使用 PythonOperator
执行 Python 可调用对象。
提示
建议使用 @task
装饰器而不是传统的 PythonOperator
来执行 Python 可调用对象。
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return "Whatever you return gets printed in the logs"
run_this = print_context()
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print("::group::All kwargs")
pprint(kwargs)
print("::endgroup::")
print("::group::Context variable ds")
print(ds)
print("::endgroup::")
return "Whatever you return gets printed in the logs"
run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)
传入参数¶
像使用普通的 Python 函数一样,将额外的参数传递给 @task
装饰的函数。
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
for i in range(5):
sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)
run_this >> log_the_sql >> sleeping_task
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
for i in range(5):
sleeping_task = PythonOperator(
task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
)
run_this >> log_the_sql >> sleeping_task
模板化¶
Airflow 传入一组额外的关键字参数:每个 Jinja 模板变量 一个,以及一个 templates_dict
参数。
templates_dict
参数是模板化的,因此字典中的每个值都将评估为 Jinja 模板。
@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log_the_sql = log_sql()
def log_sql(**kwargs):
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log_the_sql = PythonOperator(
task_id="log_sql_query",
python_callable=log_sql,
templates_dict={"query": "sql/sample.sql"},
templates_exts=[".sql"],
)
PythonVirtualenvOperator¶
使用 PythonVirtualenvOperator
装饰器,在新 Python 虚拟环境中执行 Python 可调用对象。需要在运行 Airflow 的环境中安装 virtualenv
包(作为可选依赖项 pip install apache-airflow[virtualenv] --constraint ...
)。
此外,需要使用命令 pip install [cloudpickle] --constraint ...
安装 cloudpickle
包作为可选依赖项。此包是当前使用的 dill
包的替代品。Cloudpickle 的主要优势在于它专注于标准 pickle 协议,确保更广泛的兼容性和更顺畅的数据交换,同时仍然可以有效地处理函数内的常见 Python 对象和全局变量。
提示
建议使用 @task.virtualenv
装饰器而不是传统的 PythonVirtualenvOperator
,在新 Python 虚拟环境中执行 Python 可调用对象。
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = callable_virtualenv()
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
)
传入参数¶
像使用普通的 Python 函数一样,将额外的参数传递给 @task.virtualenv
装饰的函数。不幸的是,由于与底层库的不兼容,Airflow 不支持序列化 var
、ti
和 task_instance
。对于 Airflow 上下文变量,请确保可以通过将 system_site_packages
设置为 True
或将 apache-airflow
添加到 requirements
参数来访问 Airflow。否则,您将无法在 op_kwargs
中访问 Airflow 的大多数上下文变量。如果您想要与日期时间对象相关的上下文,例如 data_interval_start
,您可以添加 pendulum
和 lazy_object_proxy
。
重要
定义要执行的 Python 函数体从 DAG 中剪切到一个临时文件中,没有周围的代码。如示例所示,您需要再次添加所有导入,并且不能依赖全局 Python 上下文中的变量。
如果要将变量传递到传统的 PythonVirtualenvOperator
中,请使用 op_args
和 op_kwargs
。
如果需要其他软件包安装参数,请通过 pip_install_options
参数传递,或者使用如下示例中的 requirements.txt
。
SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives
所有受支持的选项都列在 requirements 文件格式中。
虚拟环境设置选项¶
虚拟环境是根据您的工作程序上的全局 python pip 配置创建的。在您的环境中或在 pip config 中描述的常规 pip 配置中使用其他 ENV 或调整。
如果要使用其他任务特定的私有 python 存储库来设置虚拟环境,您可以传递 index_urls
参数,这将调整 pip 安装配置。传递的索引 URL 将替换标准系统配置的索引 URL 设置。为了防止在 DAG 代码中向私有存储库添加秘密,您可以使用 Airflow 连接 & 钩子。为此,可以使用连接类型 Package Index (Python)
。
在特殊情况下,如果您想阻止远程调用来设置虚拟环境,请将 index_urls
作为空列表传递为 index_urls=[]
,这将强制 pip 安装程序使用 --no-index
选项。
缓存和重用¶
虚拟环境的设置是在临时目录中按任务执行进行的。执行后,虚拟环境将再次删除。确保您的工作程序上的 $tmp
文件夹有足够的磁盘空间。通常(如果未进行不同的配置),将使用本地 pip 缓存,以防止每次执行都重新下载软件包。
但是,仍然需要一些时间来为每次执行设置虚拟环境。对于重复执行,您可以将选项 venv_cache_path
设置为工作程序上的文件系统文件夹。在这种情况下,虚拟环境将设置一次并重复使用。如果使用虚拟环境缓存,则在缓存路径中为每个唯一的 requirements 集创建不同的虚拟环境子文件夹。因此,根据系统中 DAG 的变化,需要足够的磁盘空间。
请注意,在缓存模式下,不会进行自动清理。所有 worker 插槽共享同一个虚拟环境,但如果任务在不同的 worker 上反复调度,则可能会在多个 worker 上单独创建虚拟环境。此外,如果 worker 在 Kubernetes POD 中启动,worker 的重启将删除缓存(假设 venv_cache_path
不在持久卷上)。
如果在运行时遇到缓存的虚拟环境损坏的问题,可以通过将 Airflow 变量 PythonVirtualenvOperator.cache_key
设置为任意文本来影响缓存目录哈希。此变量的内容用于计算缓存目录键的向量。
请注意,对缓存的虚拟环境的任何修改(例如二进制路径中的临时文件、安装后进一步的需求)都可能会污染缓存的虚拟环境,并且操作符不会维护或清理缓存路径。
ExternalPythonOperator¶
ExternalPythonOperator
可以帮助您使用与其它任务(以及主 Airflow 环境)不同的 Python 库集来运行一些任务。这可以是虚拟环境,也可以是在 Airflow 任务运行的环境中预安装并可用的任何 Python 安装。该操作符将 Python 二进制文件作为 python
参数。请注意,即使在虚拟环境的情况下,python
路径也应指向虚拟环境内的 python 二进制文件(通常在虚拟环境的 bin
子目录中)。与虚拟环境的常规使用相反,不需要 activation
环境。仅使用 python
二进制文件即可自动激活它。在下面的两个示例中,PATH_TO_PYTHON_BINARY
都是指向可执行 Python 二进制文件的路径。
使用 ExternalPythonOperator
在预定义的环境中执行 Python 可调用对象。 virtualenv 包应预安装在 Python 运行的环境中。如果使用 dill
,则必须在环境中预安装它(与主 Airflow 环境中安装的版本相同)。
提示
建议使用 @task.external_python
装饰器来执行预定义的 Python 环境中的 Python 代码,而不是传统的 ExternalPythonOperator
。
@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = callable_external_python()
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = ExternalPythonOperator(
task_id="external_python",
python_callable=callable_external_python,
python=PATH_TO_PYTHON_BINARY,
)
传入参数¶
像使用普通的 Python 函数一样,将额外的参数传递给 @task.external_python
装饰的函数。遗憾的是,由于与底层库不兼容,Airflow 不支持序列化 var
和 ti
/ task_instance
。对于 Airflow 上下文变量,请确保 Airflow 也作为虚拟环境的一部分安装,并且版本与任务运行的 Airflow 版本相同。否则,您将无法在 op_kwargs
中访问 Airflow 的大多数上下文变量。如果您想要与日期时间对象相关的上下文,如 data_interval_start
,可以将 pendulum
和 lazy_object_proxy
添加到您的虚拟环境中。
重要
定义要执行的 Python 函数体从 DAG 中剪切到一个临时文件中,没有周围的代码。如示例所示,您需要再次添加所有导入,并且不能依赖全局 Python 上下文中的变量。
如果想将变量传递到传统的 ExternalPythonOperator
中,请使用 op_args
和 op_kwargs
。
PythonBranchOperator¶
使用 PythonBranchOperator
执行 Python 分支 任务。
提示
建议使用 @task.branch
装饰器来执行 Python 代码,而不是传统的 PythonBranchOperator
。
@task.branch()
def branching(choices: list[str]) -> str:
return f"branch_{random.choice(choices)}"
branching = BranchPythonOperator(
task_id="branching",
python_callable=lambda: f"branch_{random.choice(options)}",
)
参数传递和模板化选项与 PythonOperator 相同。
BranchPythonVirtualenvOperator¶
使用 BranchPythonVirtualenvOperator
装饰器来执行 Python 分支 任务,它是 PythonBranchOperator
与在虚拟环境中执行的混合体。
提示
建议使用 @task.branch_virtualenv
装饰器来执行 Python 代码,而不是传统的 BranchPythonVirtualenvOperator
。
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = tempfile.gettempdir()
@task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
def branching_virtualenv(choices) -> str:
import random
import numpy as np
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())
def branch_with_venv(choices):
import random
import numpy as np
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
branching_venv = BranchPythonVirtualenvOperator(
task_id="branching_venv",
requirements=["numpy~=1.26.0"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=branch_with_venv,
op_args=[options],
)
参数传递和模板化选项与 PythonVirtualenvOperator 相同。
BranchExternalPythonOperator¶
使用 BranchExternalPythonOperator
来执行 Python 分支 任务,它是 PythonBranchOperator
与在外部 Python 环境中执行的混合体。
提示
建议使用 @task.branch_external_python
装饰器来执行 Python 代码,而不是传统的 BranchExternalPythonOperator
。
@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
def branching_ext_python(choices) -> str:
import random
return f"ext_py_{random.choice(choices)}"
def branch_with_external_python(choices):
import random
return f"ext_py_{random.choice(choices)}"
branching_ext_py = BranchExternalPythonOperator(
task_id="branching_ext_python",
python=PATH_TO_PYTHON_BINARY,
python_callable=branch_with_external_python,
op_args=[options],
)
参数传递和模板化选项与 ExternalPythonOperator 相同。
ShortCircuitOperator¶
使用 ShortCircuitOperator
来控制在满足条件或获得真值时管道是否继续。
此条件和真值的评估通过可调用对象的输出来完成。如果可调用对象返回 True 或真值,则允许管道继续,并且会推送输出的 XCom。如果输出为 False 或假值,则会根据配置的短路(稍后会详细介绍)来短路管道。在下面的示例中,当“condition_is_true”任务执行时,其后的任务将执行,而“condition_is_false”任务的下游任务将被跳过。
提示
建议使用 @task.short_circuit
装饰器来通过 Python 可调用对象短路管道,而不是传统的 ShortCircuitOperator
。
@task.short_circuit()
def check_condition(condition):
return condition
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]
condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)
chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)
cond_true = ShortCircuitOperator(
task_id="condition_is_True",
python_callable=lambda: True,
)
cond_false = ShortCircuitOperator(
task_id="condition_is_False",
python_callable=lambda: False,
)
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]
chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
可以将“短路”配置为遵守或忽略为下游任务定义的 触发规则。如果 ignore_downstream_trigger_rules
设置为 True(默认配置),则所有下游任务都将被跳过,而无需考虑为任务定义的 trigger_rule
。如果此参数设置为 False,则会跳过直接下游任务,但会遵守其他后续下游任务的指定 trigger_rule
。在此短路配置中,操作符假定直接下游任务是有意跳过的,但可能不是其他后续任务。如果仅应短路管道的一部分,而不是短路任务之后的所有任务,则此配置特别有用。
在下面的示例中,请注意,“short_circuit”任务配置为遵守下游触发规则。这意味着,当装饰函数返回 False 时,将跳过“short_circuit”任务之后的任务,但“task_7”仍将执行,因为它设置为在即使状态不满足(即 TriggerRule.ALL_DONE
触发规则)的情况下,上游任务完成后执行。
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
condition=False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = ShortCircuitOperator(
task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
传入参数¶
像使用普通的 Python 函数一样,将额外的参数传递给 @task.short_circuit
装饰的函数。
模板化¶
Jinja 模板化可以像 PythonOperator 中描述的方式使用。
PythonSensor¶
PythonSensor
执行任意可调用对象并等待其返回值为 True。
提示
推荐使用 @task.sensor
装饰器,而不是经典的 PythonSensor
,来执行 Python 可调用对象以检查 True 条件。
# Using a sensor operator to wait for the upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
return PokeReturnValue(is_done=True, xcom_value="xcom_value")
t9 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)
t10 = PythonSensor(
task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
)