PythonOperator

使用 PythonOperator 来执行 Python 可调用对象。

提示

建议使用 @task 装饰器而不是经典的 PythonOperator 来执行 Python 可调用对象。

airflow/example_dags/example_python_decorator.py

@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return "Whatever you return gets printed in the logs"

run_this = print_context()

airflow/example_dags/example_python_operator.py

def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print("::group::All kwargs")
    pprint(kwargs)
    print("::endgroup::")
    print("::group::Context variable ds")
    print(ds)
    print("::endgroup::")
    return "Whatever you return gets printed in the logs"

run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)

传入参数

您可以像处理普通 Python 函数一样,将额外参数传递给 @task 装饰的函数。

airflow/example_dags/example_python_decorator.py

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
@task
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = my_sleeping_function.override(task_id=f"sleep_for_{i}")(random_base=i / 10)

    run_this >> log_the_sql >> sleeping_task

airflow/example_dags/example_python_operator.py

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

for i in range(5):
    sleeping_task = PythonOperator(
        task_id=f"sleep_for_{i}", python_callable=my_sleeping_function, op_kwargs={"random_base": i / 10}
    )

    run_this >> log_the_sql >> sleeping_task

模板化

Airflow 会传入一组额外的关键字参数:每个 Jinja 模板变量 对应一个,还有一个 templates_dict 参数。

templates_dictop_argsop_kwargs 参数是模板化的,因此字典中的每个值都会被评估为一个 Jinja 模板

airflow/example_dags/example_python_decorator.py

@task(task_id="log_sql_query", templates_dict={"query": "sql/sample.sql"}, templates_exts=[".sql"])
def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = log_sql()

airflow/example_dags/example_python_operator.py

def log_sql(**kwargs):
    log.info("Python task decorator query: %s", str(kwargs["templates_dict"]["query"]))

log_the_sql = PythonOperator(
    task_id="log_sql_query",
    python_callable=log_sql,
    templates_dict={"query": "sql/sample.sql"},
    templates_exts=[".sql"],
)

上下文

Context 是一个字典对象,包含有关 DagRun 环境的信息。例如,选择 task_instance 将获取当前运行的 TaskInstance 对象。

它可以隐式使用,例如使用 **kwargs,也可以使用 get_current_context() 显式使用。在这种情况下,类型提示可用于静态分析。

PythonVirtualenvOperator

使用 PythonVirtualenvOperator 装饰器在新的 Python 虚拟环境中执行 Python 可调用对象。virtualenv 包需要在运行 Airflow 的环境中安装(作为可选依赖项 pip install apache-airflow[virtualenv] --constraint ...)。

此外,cloudpickle 包需要作为可选依赖项安装,使用命令 pip install [cloudpickle] --constraint ...。此包取代了当前使用的 dill 包。Cloudpickle 的强大优势在于它专注于标准的 pickling 协议,确保更广泛的兼容性和更流畅的数据交换,同时仍能有效地处理函数中的常见 Python 对象和全局变量。

提示

建议使用 @task.virtualenv 装饰器而不是经典的 PythonVirtualenvOperator 来在新的 Python 虚拟环境中执行 Python 可调用对象。

airflow/example_dags/example_python_decorator.py

@task.virtualenv(
    task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = callable_virtualenv()

airflow/example_dags/example_python_operator.py

def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the function level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + "some red text")
    print(Back.GREEN + "and with a green background")
    print(Style.DIM + "and in dim text")
    print(Style.RESET_ALL)
    for _ in range(4):
        print(Style.DIM + "Please wait...", flush=True)
        sleep(1)
    print("Finished")

virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_python",
    python_callable=callable_virtualenv,
    requirements=["colorama==0.4.0"],
    system_site_packages=False,
)

传入参数

您可以像处理普通 Python 函数一样,将额外参数传递给 @task.virtualenv 装饰的函数。遗憾的是,由于与底层库不兼容,Airflow 不支持序列化 vartitask_instance。对于 Airflow 上下文变量,请确保您可以通过设置 system_site_packagesTrue 来访问 Airflow,或者将 apache-airflow 添加到 requirements 参数中。否则,您将无法在 op_kwargs 中访问大多数 Airflow 上下文变量。如果您想要访问与 datetime 对象相关的上下文,例如 data_interval_start,您可以添加 pendulumlazy_object_proxy

重要

用于执行的 Python 函数体将从 DAG 中剪切出来,放入一个临时文件,不包含周围的代码。正如示例所示,您需要再次添加所有导入,并且不能依赖于全局 Python 上下文中的变量。

如果您想将变量传递给经典的 PythonVirtualenvOperator,请使用 op_argsop_kwargs

如果需要额外的参数来安装包,请通过 pip_install_options 参数传递,或者像下面的示例一样使用 requirements.txt

SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives

requirements 文件格式 中列出了所有支持的选项。

模板化

可以像 PythonOperator 中描述的那样使用 Jinja 模板化。

虚拟环境设置选项

虚拟环境是根据工作节点上全局 Python pip 配置创建的。您可以在环境中使用额外的 ENV 变量或根据 pip 配置 中描述的方式调整通用的 pip 配置。

如果您想使用额外的任务特定私有 Python 仓库来设置虚拟环境,您可以传递 index_urls 参数,这将调整 pip 安装配置。传递的 index urls 将替换标准的系统配置的 index url 设置。为了避免在您的 DAG 代码中添加私有仓库的秘密信息,您可以使用 Airflow 连接和 Hooks。为此,可以使用连接类型 Package Index (Python)

在您想要阻止设置虚拟环境的远程调用这种特殊情况下,将 index_urls 作为空列表传递,例如 index_urls=[],这将强制 pip 安装程序使用 --no-index 选项。

缓存和重用

虚拟环境在任务执行时在临时目录中设置。执行后虚拟环境会被再次删除。请确保您的工作节点上的 $tmp 文件夹有足够的磁盘空间。通常(如果未另行配置),将使用本地 pip 缓存,从而避免每次执行都重新下载包。

但每次执行仍需要一些时间来设置虚拟环境。对于重复执行,您可以将选项 venv_cache_path 设置为工作节点上的文件系统文件夹。在这种情况下,虚拟环境将设置一次并被重用。如果使用虚拟环境缓存,则会根据每个唯一的 requirements 集在缓存路径中创建不同的虚拟环境子文件夹。因此,取决于您的系统中 DAG 的变动情况,需要足够的磁盘空间。

请注意,在缓存模式下,不会进行自动清理。所有工作节点槽位共享同一个虚拟环境,但如果任务反复调度到不同的工作节点上,则可能发生虚拟环境在多个工作节点上单独创建的情况。此外,如果工作节点在 Kubernetes POD 中启动,工作节点的重启将丢失缓存(假设 venv_cache_path 不在持久卷上)。

如果在运行时遇到损坏的缓存虚拟环境问题,您可以通过将 Airflow 变量 PythonVirtualenvOperator.cache_key 设置为任意文本来影响缓存目录哈希值。此变量的内容用于计算缓存目录键的向量。

请注意,对缓存的虚拟环境进行任何修改(例如二进制路径中的临时文件、后安装更多 requirements)可能会污染缓存的虚拟环境,并且 operator 不会维护或清理缓存路径。

ExternalPythonOperator

ExternalPythonOperator 可以帮助您使用与 Adags 和主 Airflow 环境不同的 Python 库集来运行部分任务。这可能是一个虚拟环境或任何预安装在运行 Airflow 任务的环境中的 Python 安装。operator 将 Python 二进制文件作为 python 参数。请注意,即使是虚拟环境,python 路径也应指向虚拟环境内的 python 二进制文件(通常在虚拟环境的 bin 子目录中)。与虚拟环境的常规用法相反,不需要 activation 环境。仅仅使用 python 二进制文件即可自动激活它。在下面的两个示例中,PATH_TO_PYTHON_BINARY 就是这样的路径,指向可执行的 Python 二进制文件。

使用 ExternalPythonOperator 在预定义的环境中执行 Python 可调用对象。virtualenv 包应预安装在运行 Python 的环境中。如果使用 dill,则必须将其预安装在环境中(版本与主 Airflow 环境中安装的版本相同)。

提示

建议使用 @task.external_python 装饰器而不是经典的 ExternalPythonOperator 来在预定义的 Python 环境中执行 Python 代码。

airflow/example_dags/example_python_decorator.py

@task.external_python(task_id="external_python", python=PATH_TO_PYTHON_BINARY)
def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = callable_external_python()

airflow/example_dags/example_python_operator.py

def callable_external_python():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    import sys
    from time import sleep

    print(f"Running task via {sys.executable}")
    print("Sleeping")
    for _ in range(4):
        print("Please wait...", flush=True)
        sleep(1)
    print("Finished")

external_python_task = ExternalPythonOperator(
    task_id="external_python",
    python_callable=callable_external_python,
    python=PATH_TO_PYTHON_BINARY,
)

传入参数

您可以像处理普通 Python 函数一样,将额外参数传递给 @task.external_python 装饰的函数。遗憾的是,由于与底层库不兼容,Airflow 不支持序列化 varti / task_instance。对于 Airflow 上下文变量,请确保 Airflow 也作为虚拟环境的一部分安装,且版本与任务运行时的 Airflow 版本相同。否则,您将无法在 op_kwargs 中访问大多数 Airflow 上下文变量。如果您想要访问与 datetime 对象相关的上下文,例如 data_interval_start,您可以在虚拟环境中添加 pendulumlazy_object_proxy

重要

用于执行的 Python 函数体将从 DAG 中剪切出来,放入一个临时文件,不包含周围的代码。正如示例所示,您需要再次添加所有导入,并且不能依赖于全局 Python 上下文中的变量。

如果您想将变量传递给经典的 ExternalPythonOperator,请使用 op_argsop_kwargs

模板化

可以像 PythonOperator 中描述的那样使用 Jinja 模板化。

PythonBranchOperator

使用 PythonBranchOperator 来执行 Python 分支 任务。

提示

建议使用 @task.branch 装饰器而不是经典的 PythonBranchOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py

@task.branch()
def branching(choices: list[str]) -> str:
    return f"branch_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py

branching = BranchPythonOperator(
    task_id="branching",
    python_callable=lambda: f"branch_{random.choice(options)}",
)

传入参数和模板化

参数传递和模板化选项与 PythonOperator 相同。

BranchPythonVirtualenvOperator

使用 BranchPythonVirtualenvOperator 装饰器来执行 Python 分支 任务,它是 PythonBranchOperator 与在虚拟环境中执行的混合体。

提示

建议使用 @task.branch_virtualenv 装饰器而不是经典的 BranchPythonVirtualenvOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py

# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
#       Run the example a second time and see that it reuses it and is faster.
VENV_CACHE_PATH = tempfile.gettempdir()

@task.branch_virtualenv(requirements=["numpy~=1.26.0"], venv_cache_path=VENV_CACHE_PATH)
def branching_virtualenv(choices) -> str:
    import random

    import numpy as np

    print(f"Some numpy stuff: {np.arange(6)}")
    return f"venv_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py

# Note: Passing a caching dir allows to keep the virtual environment over multiple runs
#       Run the example a second time and see that it reuses it and is faster.
VENV_CACHE_PATH = Path(tempfile.gettempdir())

def branch_with_venv(choices):
    import random

    import numpy as np

    print(f"Some numpy stuff: {np.arange(6)}")
    return f"venv_{random.choice(choices)}"

branching_venv = BranchPythonVirtualenvOperator(
    task_id="branching_venv",
    requirements=["numpy~=1.26.0"],
    venv_cache_path=VENV_CACHE_PATH,
    python_callable=branch_with_venv,
    op_args=[options],
)

传入参数和模板化

参数传递和模板化选项与 PythonOperator 相同。

BranchExternalPythonOperator

使用 BranchExternalPythonOperator 来执行 Python 分支 任务,它是 PythonBranchOperator 与在外部 Python 环境中执行的混合体。

提示

建议使用 @task.branch_external_python 装饰器而不是经典的 BranchExternalPythonOperator 来执行 Python 代码。

airflow/example_dags/example_branch_operator_decorator.py

@task.branch_external_python(python=PATH_TO_PYTHON_BINARY)
def branching_ext_python(choices) -> str:
    import random

    return f"ext_py_{random.choice(choices)}"

airflow/example_dags/example_branch_operator.py

def branch_with_external_python(choices):
    import random

    return f"ext_py_{random.choice(choices)}"

branching_ext_py = BranchExternalPythonOperator(
    task_id="branching_ext_python",
    python=PATH_TO_PYTHON_BINARY,
    python_callable=branch_with_external_python,
    op_args=[options],
)

传入参数和模板化

参数传递和模板化选项与 PythonOperator 相同。

ShortCircuitOperator

使用 ShortCircuitOperator 来控制当条件满足或获得真值时流水线是否继续。

此条件和真值的评估是通过可调用对象的输出完成的。如果可调用对象返回 True 或一个真值,则允许流水线继续,并将输出的 XCom 推送出去。如果输出是 False 或一个假值,则流水线将根据配置的短路逻辑进行短路(稍后会详细介绍)。在下面的示例中,“condition_is_true”任务之后的任务将执行,而“condition_is_false”任务下游的任务将被跳过。

提示

建议使用 @task.short_circuit 装饰器而不是经典的 ShortCircuitOperator 来通过 Python 可调用对象对流水线进行短路。

airflow/example_dags/example_short_circuit_decorator.py

@task.short_circuit()
def check_condition(condition):
    return condition

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

condition_is_true = check_condition.override(task_id="condition_is_true")(condition=True)
condition_is_false = check_condition.override(task_id="condition_is_false")(condition=False)

chain(condition_is_true, *ds_true)
chain(condition_is_false, *ds_false)

airflow/example_dags/example_short_circuit_operator.py

cond_true = ShortCircuitOperator(
    task_id="condition_is_True",
    python_callable=lambda: True,
)

cond_false = ShortCircuitOperator(
    task_id="condition_is_False",
    python_callable=lambda: False,
)

ds_true = [EmptyOperator(task_id=f"true_{i}") for i in [1, 2]]
ds_false = [EmptyOperator(task_id=f"false_{i}") for i in [1, 2]]

chain(cond_true, *ds_true)
chain(cond_false, *ds_false)

“短路”可以配置为尊重或忽略下游任务定义的 触发规则。如果 ignore_downstream_trigger_rules 设置为 True(默认配置),则所有下游任务都会被跳过,不考虑为任务定义的 trigger_rule。如果此参数设置为 False,则直接下游任务会被跳过,但后续其他下游任务指定的 trigger_rule 会被尊重。在此短路配置中,operator 假定直接下游任务被特意跳过,但后续其他任务可能不会被跳过。此配置特别有用,如果只需要短路流水线的 *部分*,而不是跟随短路任务的所有任务。

在下面的示例中,请注意“short_circuit”任务被配置为尊重下游触发规则。这意味着虽然跟随“short_circuit”任务的任务会被跳过(因为装饰函数返回 False),但“task_7”仍会执行,因为它被设置为在其上游任务无论状态如何完成运行时执行(即 TriggerRule.ALL_DONE 触发规则)。

airflow/example_dags/example_short_circuit_decorator.py

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = check_condition.override(task_id="short_circuit", ignore_downstream_trigger_rules=False)(
    condition=False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

airflow/example_dags/example_short_circuit_operator.py

[task_1, task_2, task_3, task_4, task_5, task_6] = [
    EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]

task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)

short_circuit = ShortCircuitOperator(
    task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)

chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)

传入参数和模板化

参数传递和模板化选项与 PythonOperator 相同。

此条目有帮助吗?