PythonOperator¶
使用 PythonOperator
执行 Python 可调用对象。
提示
建议使用 @task
装饰器而不是经典的 PythonOperator
来执行 Python 可调用对象。
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return "Whatever you return gets printed in the logs"
run_this = print_context()
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
print("::group::All kwargs")
pprint(kwargs)
print("::endgroup::")
print("::group::Context variable ds")
print(ds)
print("::endgroup::")
return "Whatever you return gets printed in the logs"
run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)
传入参数¶
像使用普通 Python 函数一样,将额外的参数传递给 @task
装饰的函数。
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
for i in range(5):
sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)
run_this >> log_the_sql >> sleeping_task
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
for i in range(5):
sleeping_task = PythonOperator(
task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
)
run_this >> log_the_sql >> sleeping_task
模板¶
Airflow 传入一组额外的关键字参数:每个 Jinja 模板变量 一个,以及一个 templates_dict
参数。
templates_dict
参数是模板化的,因此字典中的每个值都被评估为 Jinja 模板。
@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log_the_sql = log_sql()
def log_sql(**kwargs):
log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))
log_the_sql = PythonOperator(
task_id="log_sql_query",
python_callable=log_sql,
templates_dict={"query": "sql/sample.sql"},
templates_exts=[".sql"],
)
PythonVirtualenvOperator¶
使用 PythonVirtualenvOperator
装饰器在新 Python 虚拟环境中执行 Python 可调用对象。virtualenv
包需要安装在运行 Airflow 的环境中(作为可选依赖项 pip install apache-airflow[virtualenv] --constraint ...
)。
提示
建议使用 @task.virtualenv
装饰器而不是经典的 PythonVirtualenvOperator
在新的 Python 虚拟环境中执行 Python 可调用对象。
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = callable_virtualenv()
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + "some red text")
print(Back.GREEN + "and with a green background")
print(Style.DIM + "and in dim text")
print(Style.RESET_ALL)
for _ in range(4):
print(Style.DIM + "Please wait...", flush=True)
sleep(1)
print("Finished")
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
)
传入参数¶
像使用普通 Python 函数一样,将额外的参数传递给 @task.virtualenv
装饰的函数。不幸的是,由于与底层库不兼容,Airflow 不支持序列化 var
、ti
和 task_instance
。对于 Airflow 上下文变量,请确保您可以通过将 system_site_packages
设置为 True
来访问 Airflow,或者将 apache-airflow
添加到 requirements
参数中。否则,您将无法在 op_kwargs
中访问 Airflow 的大多数上下文变量。如果您希望上下文与日期时间对象(如 data_interval_start
)相关,则可以添加 pendulum
和 lazy_object_proxy
。
重要
定义为要执行的 Python 函数体将从 DAG 中剪切到一个临时文件中,不带周围的代码。如示例所示,您需要再次添加所有导入,并且不能依赖全局 Python 上下文中的变量。
如果要将变量传递给经典的 PythonVirtualenvOperator
,请使用 op_args
和 op_kwargs
。
如果需要用于包安装的附加参数,请通过 pip_install_options
参数传递它们,或者使用 requirements.txt
,如下例所示
SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives
所有支持的选项都列在 需求文件格式 中。
虚拟环境设置选项¶
虚拟环境是根据您的工作器上的全局 python pip 配置创建的。在您的环境中使用额外的 ENV 或调整常规 pip 配置,如 pip 配置 中所述。
如果要使用其他特定于任务的私有 python 存储库来设置虚拟环境,则可以传递 index_urls
参数,该参数将调整 pip 安装配置。传递的索引 URL 将替换标准系统配置的索引 URL 设置。为了防止在 DAG 代码中向私有存储库添加机密,您可以使用 Airflow 连接和钩子。为此,可以使用连接类型 包索引 (Python)
。
在特殊情况下,如果要阻止远程调用来设置虚拟环境,请将 index_urls
作为空列表传递,如 index_urls=[]
,这将强制 pip 安装程序使用 --no-index
选项。
缓存和重用¶
虚拟环境的设置是针对临时目录中的每次任务执行进行的。执行后,虚拟环境将再次被删除。确保您的工作器上的 $tmp
文件夹有足够的磁盘空间。通常(如果没有另外配置),将使用本地 pip 缓存,以防止每次执行都重新下载包。
但是,每次执行都设置虚拟环境仍然需要一些时间。对于重复执行,您可以将选项 venv_cache_path
设置为工作器上的文件系统文件夹。在这种情况下,虚拟环境将设置一次并被重用。如果使用虚拟环境缓存,则在缓存路径中为每个唯一的要求集创建不同的虚拟环境子文件夹。因此,根据您的系统中 DAG 的变化,需要足够的磁盘空间。
请注意,在缓存模式下不会进行自动清理。所有工作器插槽共享相同的虚拟环境,但如果任务在不同的工作器上反复调度,则可能会在多个工作器上分别创建虚拟环境。此外,如果工作器在 Kubernetes POD 中启动,则重新启动工作器将丢弃缓存(假设 venv_cache_path
不在持久卷上)。
如果您在运行时遇到缓存虚拟环境损坏的问题,您可以通过将 Airflow 变量 PythonVirtualenvOperator.cache_key
设置为任何文本来影响缓存目录哈希值。此变量的内容用于向量中以计算缓存目录键。
请注意,对缓存虚拟环境的任何修改(例如二进制路径中的临时文件、后安装其他需求)都可能会污染缓存虚拟环境,并且操作员不会维护或清理缓存路径。
ExternalPythonOperator¶
ExternalPythonOperator
可以帮助您使用与其他任务(以及主 Airflow 环境)不同的 Python 库集来运行某些任务。这可以是虚拟环境,也可以是预先安装并在运行 Airflow 任务的环境中可用的任何 Python 安装。操作员将 Python 二进制文件作为 python
参数。请注意,即使在虚拟环境的情况下,python
路径也应该指向虚拟环境内的 python 二进制文件(通常位于虚拟环境的 bin
子目录中)。与常规使用虚拟环境相反,这里不需要 activation
环境。仅仅使用 python
二进制文件就会自动激活它。在以下两个示例中,PATH_TO_PYTHON_BINARY
都是这样的路径,指向可执行的 Python 二进制文件。
使用 ExternalPythonOperator
在预定义的环境中执行 Python 可调用对象。virtualenv 包应该预先安装在运行 Python 的环境中。如果使用 dill
,则必须将其预先安装在环境中(与安装在主 Airflow 环境中的版本相同)。
提示
建议使用 @task.external_python
装饰器而不是经典的 ExternalPythonOperator
在预定义的 Python 环境中执行 Python 代码。
@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = callable_external_python()
def callable_external_python():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
import sys
from time import sleep
print(f"Running task via {sys.executable}")
print("Sleeping")
for _ in range(4):
print("Please wait...", flush=True)
sleep(1)
print("Finished")
external_python_task = ExternalPythonOperator(
task_id="external_python",
python_callable=callable_external_python,
python=PATH_TO_PYTHON_BINARY,
)
传递参数¶
将额外的参数传递给 @task.external_python
装饰的函数,就像使用普通的 Python 函数一样。不幸的是,由于与底层库不兼容,Airflow 不支持序列化 var
和 ti
/ task_instance
。对于 Airflow 上下文变量,请确保 Airflow 也作为 virtualenv 环境的一部分安装,并且版本与运行任务的 Airflow 版本相同。否则,您将无法在 op_kwargs
中访问 Airflow 的大多数上下文变量。如果您希望上下文与日期时间对象相关,例如 data_interval_start
,您可以将 pendulum
和 lazy_object_proxy
添加到您的虚拟环境中。
重要
定义为要执行的 Python 函数体将从 DAG 中剪切到一个临时文件中,不带周围的代码。如示例所示,您需要再次添加所有导入,并且不能依赖全局 Python 上下文中的变量。
如果要将变量传递给经典的 ExternalPythonOperator
,请使用 op_args
和 op_kwargs
。
PythonBranchOperator¶
使用 PythonBranchOperator
执行 Python 分支 任务。
提示
建议使用 @task.branch
装饰器而不是经典的 PythonBranchOperator
来执行 Python 代码。
@task.branch()
def branching(choices: list[str]) -> str:
return f"branch_{random.choice(choices)}"
branching = BranchPythonOperator(
task_id="branching",
python_callable=lambda: f"branch_{random.choice(options)}",
)
参数传递和模板选项与 PythonOperator 相同。
BranchPythonVirtualenvOperator¶
使用 BranchPythonVirtualenvOperator
装饰器执行 Python 分支 任务,它是 PythonBranchOperator
与在虚拟环境中执行的混合体。
提示
建议使用 @task.branch_virtualenv
装饰器而不是经典的 BranchPythonVirtualenvOperator
来执行 Python 代码。
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = tempfile.gettempdir()
@task.branch_virtualenv(requirements=["numpy~=1.24.4"], venv_cache_path=VENV_CACHE_PATH)
def branching_virtualenv(choices) -> str:
import random
import numpy as np
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
# Run the example a second time and see that it re-uses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())
def branch_with_venv(choices):
import random
import numpy as np
print(f"Some numpy stuff: {np.arange(6)}")
return f"venv_{random.choice(choices)}"
branching_venv = BranchPythonVirtualenvOperator(
task_id="branching_venv",
requirements=["numpy~=1.26.0"],
venv_cache_path=VENV_CACHE_PATH,
python_callable=branch_with_venv,
op_args=[options],
)
参数传递和模板选项与 PythonVirtualenvOperator 相同。
BranchExternalPythonOperator¶
使用 BranchExternalPythonOperator
执行 Python 分支 任务,它是 PythonBranchOperator
与在外部 Python 环境中执行的混合体。
提示
建议使用 @task.branch_external_python
装饰器而不是经典的 BranchExternalPythonOperator
来执行 Python 代码。
@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
def branching_ext_python(choices) -> str:
import random
return f"ext_py_{random.choice(choices)}"
def branch_with_external_python(choices):
import random
return f"ext_py_{random.choice(choices)}"
branching_ext_py = BranchExternalPythonOperator(
task_id="branching_ext_python",
python=PATH_TO_PYTHON_BINARY,
python_callable=branch_with_external_python,
op_args=[options],
)
参数传递和模板选项与 ExternalPythonOperator 相同。
ShortCircuitOperator¶
使用 ShortCircuitOperator
控制在满足条件或获得真值时管道是否继续。
此条件和真值的评估是通过可调用对象的输出完成的。如果可调用对象返回 True 或真值,则允许管道继续,并将推送输出的 XCom。如果输出为 False 或假值,则管道将根据配置的短路方式进行短路(稍后将详细介绍)。在下面的示例中,“condition_is_true”任务后面的任务将执行,而“condition_is_false”任务后面的任务将被跳过。
提示
建议使用 @task.short_circuit
装饰器而不是经典的 ShortCircuitOperator
通过 Python 可调用对象来短路管道。
@task.short_circuit()
def check_condition(condition):
return condition
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]
condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)
chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)
cond_true = ShortCircuitOperator(
task_id="condition_is_True",
python_callable=lambda: True,
)
cond_false = ShortCircuitOperator(
task_id="condition_is_False",
python_callable=lambda: False,
)
ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]
chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
可以将“短路”配置为尊重或忽略为下游任务定义的 触发规则。如果 ignore_downstream_trigger_rules
设置为 True(默认配置),则所有下游任务都将被跳过,而不考虑为任务定义的 trigger_rule
。如果此参数设置为 False,则直接下游任务将被跳过,但其他后续下游任务的指定 trigger_rule
将被遵守。在此短路配置中,操作员假定直接下游任务是有意要跳过的,但其他后续任务可能不是。如果管道中只有 *一部分* 应该短路,而不是短路任务后面的所有任务,则此配置特别有用。
在下面的示例中,请注意“short_circuit”任务被配置为遵守下游触发规则。这意味着,虽然“short_circuit”任务后面的任务将被跳过,因为装饰函数返回 False,但“task_7”仍将执行,因为它设置为在上游任务完成运行时执行,而不管状态如何(即 TriggerRule.ALL_DONE
触发规则)。
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
condition=False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = ShortCircuitOperator(
task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
传递参数¶
将额外的参数传递给 @task.short_circuit
装饰的函数,就像使用普通的 Python 函数一样。
模板¶
Jinja 模板的使用方式与 PythonOperator 中描述的相同。
PythonSensor¶
PythonSensor
执行任意可调用对象,并等待其返回值为 True。
提示
建议使用 @task.sensor
装饰器而不是经典的 PythonSensor
来执行 Python 可调用对象以检查 True 条件。
# Using a sensor operator to wait for the upstream data to be ready.
@task.sensor(poke_interval=60, timeout=3600, mode="reschedule")
def wait_for_upstream() -> PokeReturnValue:
return PokeReturnValue(is_done=True, xcom_value="xcom_value")
t9 = PythonSensor(task_id="success_sensor_python", python_callable=success_callable)
t10 = PythonSensor(
task_id="failure_timeout_sensor_python", timeout=3, soft_fail=True, python_callable=failure_callable
)