airflow.operators.python

模块内容

PythonOperator

执行 Python 可调用对象。

BranchPythonOperator

工作流可以在执行此任务后“分支”或遵循路径。

ShortCircuitOperator

允许管道基于 python_callable 的结果继续运行。

PythonVirtualenvOperator

在自动创建和销毁的虚拟环境中运行函数。

BranchPythonVirtualenvOperator

工作流可以在虚拟环境中执行此任务后“分支”或遵循路径。

ExternalPythonOperator

在不重新创建的虚拟环境中运行函数。

BranchExternalPythonOperator

工作流可以在执行此任务后“分支”或遵循路径。

函数

is_venv_installed()

通过检查 virtualenv 包是否在路径上或作为包安装来检查是否已安装该包。

task([python_callable, multiple_outputs])

请改用 airflow.decorators.task(),此方法已弃用。

get_current_context()

检索执行上下文字典,而无需更改用户方法的签名。

属性

log

airflow.operators.python.log[源代码]
airflow.operators.python.is_venv_installed()[源代码]

通过检查 virtualenv 包是否在路径上或作为包安装来检查是否已安装该包。

返回

如果是,则返回 True。无论哪种检查方式有效都可以。

返回类型

bool

airflow.operators.python.task(python_callable=None, multiple_outputs=None, **kwargs)[源代码]

请改用 airflow.decorators.task(),此方法已弃用。

调用 @task.python 并允许用户将 Python 函数转换为 Airflow 任务。

参数
  • python_callable (Callable | None) – 对可调用对象的引用

  • op_kwargs – 一个关键字参数字典,将在您的函数中解包(已模板化)

  • op_args – 一个位置参数列表,将在调用您的可调用对象时解包(已模板化)

  • multiple_outputs (bool | None) – 如果设置,函数返回值将展开为多个 XCom 值。字典将展开为以键作为键的 xcom 值。默认为 False。

class airflow.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[源代码]

基类: airflow.models.baseoperator.BaseOperator

执行 Python 可调用对象。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:PythonOperator

在运行您的可调用对象时,Airflow 将传递一组可在您的函数中使用的关键字参数。这组 kwargs 与您可以在 Jinja 模板中使用的内容完全对应。为了使此工作正常进行,您需要在函数头中定义 **kwargs,或者您可以直接添加您想要获取的关键字参数 - 例如,使用以下代码,您的可调用对象将获取 tinext_ds 上下文变量的值。

使用显式参数

def my_python_callable(ti, next_ds):
    pass

使用 kwargs

def my_python_callable(**kwargs):
    ti = kwargs["ti"]
    next_ds = kwargs["next_ds"]
参数
  • python_callable (Callable) – 对可调用对象的引用

  • op_args (Collection[Any] | None) – 一个位置参数列表,将在调用您的可调用对象时解包

  • op_kwargs (Mapping[str, Any] | None) – 一个关键字参数字典,将在您的函数中解包

  • templates_dict (dict[str, Any] | None) – 一个字典,其中值是模板,这些模板将在 __init__execute 之间由 Airflow 引擎进行模板化处理,并在应用模板后在您的可调用对象的上下文中可用。(已模板化)

  • templates_exts (Sequence[str] | None) – 一个文件扩展名列表,用于在处理模板字段时解析,例如 ['.sql', '.hql']

  • show_return_value_in_logs (bool) – 一个布尔值,指示是否在日志中显示 return_value。默认为 True,允许 return value 日志输出。当您返回大量数据(例如向 TaskAPI 传输大量 XCom)时,可以将其设置为 False 以防止返回值的日志输出。

template_fields: Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')[源代码]
template_fields_renderers[源代码]
BLUE = '#ffefeb'[源代码]
ui_color[源代码]
shallow_copy_attrs: Sequence[str] = ('python_callable', 'op_kwargs')[source]
execute(context)[source]

在创建操作符时派生。

Context 是在渲染 Jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

determine_kwargs(context)[source]
execute_callable()[source]

使用给定的参数调用 Python 可调用对象。

返回

调用的返回值。

返回类型

任意类型

class airflow.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[source]

基类:PythonOperator, airflow.operators.branch.BranchMixIn

工作流可以在此任务执行后“分支”或遵循路径。

它派生自 PythonOperator 并期望一个 Python 函数,该函数返回一个 task_id、一个 task_group_id 或要遵循的 task_ids 和/或 task_group_ids 列表。返回的 task_id(s) 和/或 task_group_id(s) 应该指向 {self} 的直接下游的任务或任务组。所有其他“分支”或直接下游任务都标记为 skipped 状态,以便这些路径无法向前移动。 skipped 状态会向下传播,以允许 DAG 状态填充并推断 DAG 运行的状态。

execute(context)[source]

在创建操作符时派生。

Context 是在渲染 Jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[source]

基类:PythonOperator, airflow.models.skipmixin.SkipMixin

允许管道基于 python_callable 的结果继续运行。

ShortCircuitOperator 派生自 PythonOperator,并评估 python_callable 的结果。如果返回的结果为 False 或假值,则管道将短路。根据配置的短路模式,下游任务将标记为“skipped”状态。如果返回的结果为 True 或真值,则下游任务将照常进行,并推送返回结果的 XCom

可以将短路配置为遵守或忽略为下游任务设置的 trigger_rule。如果 ignore_downstream_trigger_rules 设置为 True,这是默认设置,则所有下游任务都将在不考虑为任务定义的 trigger_rule 的情况下跳过。但是,如果此参数设置为 False,则会跳过直接下游任务,但会遵守其他后续下游任务的指定 trigger_rule。在此模式下,操作符假定直接下游任务的目的是跳过,但可能不包括其他后续任务。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:ShortCircuitOperator

参数

ignore_downstream_trigger_rules (bool) – 如果设置为 True,则将跳过此操作符任务的所有下游任务。这是默认行为。如果设置为 False,则将跳过直接下游任务,但将遵守为所有其他下游任务定义的 trigger_rule

execute(context)[source]

在创建操作符时派生。

Context 是在渲染 Jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]

基类:_BasePythonVirtualenvOperator

在自动创建和销毁的虚拟环境中运行函数。

必须使用 def 定义该函数(具有某些注意事项),并且不能是类的一部分。所有导入都必须在函数内部进行,并且不能引用范围之外的任何变量。将提供一个名为 virtualenv_string_args 的全局范围变量(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递内容,并且可以使用返回值。请注意,如果您的虚拟环境在与 Airflow 不同的 Python 主要版本中运行,则不能使用返回值、op_args、op_kwargs 或使用通过插件提供给 Airflow 的任何宏。但是,可以使用 string_args。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:PythonVirtualenvOperator

参数
  • python_callable (Callable) – 一个不引用外部变量的 Python 函数,使用 def 定义,将在虚拟环境中运行。

  • requirements (None | Iterable[str] | str) – 一个需求字符串列表,或者一个由 pip 指定的(模板化的)“需求文件”。

  • python_version (str | None) – 运行虚拟环境的 Python 版本。注意 2 和 2.7 都是可接受的形式。

  • serializer (_SerializerTypeDef | None) –

    用于序列化参数和结果的序列化器。它可以是以下之一

    • "pickle": (默认) 使用 pickle 进行序列化。包含在 Python 标准库中。

    • "cloudpickle": 使用 cloudpickle 序列化更复杂的类型,这需要在您的需求中包含 cloudpickle。

    • "dill": 使用 dill 序列化更复杂的类型,这需要在您的需求中包含 dill。

  • system_site_packages (bool) – 是否在虚拟环境中包含 system_site_packages。有关更多信息,请参阅 virtualenv 文档。

  • pip_install_options (list[str] | None) – 安装需求时 pip install 的选项列表。有关可用选项,请参阅 ‘pip install -h’

  • op_args (Collection[Any] | None) – 传递给 python_callable 的位置参数列表。

  • op_kwargs (Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。

  • string_args (Iterable[str] | None) – 存在于全局变量 virtualenv_string_args 中的字符串,在运行时以 list[str] 的形式提供给 python_callable。请注意,参数按换行符分隔。

  • templates_dict (dict | None) – 一个字典,其中的值是模板,这些模板将在 __init__execute 之间由 Airflow 引擎进行模板化,并在应用模板后在您的可调用对象的上下文中可用

  • templates_exts (list[str] | None) – 一个文件扩展名列表,用于在处理模板化字段时解析,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 预期目标环境中安装了 Airflow。如果为 true,则如果未安装 Airflow,该操作符将引发警告,并且它将在启动时尝试加载 Airflow 宏。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此退出代码退出,则将任务保留在 skipped 状态(默认值:None)。如果设置为 None,则任何非零退出代码都将被视为失败。

  • index_urls (None | Collection[str] | str) – 一个可选的索引 URL 列表,用于从中加载 Python 包。如果未提供,则将使用系统 pip conf 从中获取包。

  • venv_cache_path (None | os.PathLike[str]) – 虚拟环境父文件夹的可选路径,其中将缓存虚拟环境,创建子文件夹 venv-{hash},其中 hash 将替换为需求校验和。如果未提供,则将在每次执行时在临时文件夹中创建和删除虚拟环境。

  • env_vars (dict[str, str] | None) – 一个字典,包含在执行虚拟环境时要为虚拟环境设置的其他环境变量。

  • inherit_env (bool) – 是否在执行虚拟环境时继承当前环境变量。如果设置为 True,则虚拟环境将继承父进程 (os.environ) 的环境变量。如果设置为 False,则虚拟环境将在干净的环境中执行。

  • use_dill (bool) – 已弃用,请改用 serializer。是否使用 dill 序列化参数和结果(默认为 pickle)。这允许更复杂的类型,但需要您在需求中包含 dill。

template_fields: Sequence[str][source]
template_ext: Sequence[str] = ('.txt',)[source]
execute_callable()[source]

使用给定的参数调用 Python 可调用对象。

返回

调用的返回值。

class airflow.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]

基类: PythonVirtualenvOperator, airflow.operators.branch.BranchMixIn

工作流可以在虚拟环境中执行此任务后“分支”或遵循路径。

它派生自 PythonVirtualenvOperator 并期望一个 Python 函数,该函数返回单个 task_id、单个 task_group_id 或要遵循的 task_ids 和/或 task_group_ids 的列表。返回的 task_id(s) 和/或 task_group_id(s) 应指向 {self} 直接下游的任务或任务组。所有其他“分支”或直接下游的任务都标记为 skipped 状态,以便这些路径无法向前移动。skipped 状态会向下传播,以便填充 DAG 状态并推断 DAG 运行的状态。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:BranchPythonVirtualenvOperator

execute(context)[source]

在创建操作符时派生。

Context 是在渲染 Jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

class airflow.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]

基类:_BasePythonVirtualenvOperator

在不重新创建的虚拟环境中运行函数。

重用现有环境,避免创建虚拟环境的开销(但有一些注意事项)。

该函数必须使用 def 定义,不能是类的一部分。所有导入都必须在函数内部进行,并且不能引用范围之外的变量。一个名为 virtualenv_string_args 的全局范围变量将可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递数据,并且可以使用返回值。请注意,如果您的虚拟环境运行的 Python 主要版本与 Airflow 不同,则不能使用返回值、op_args、op_kwargs 或使用通过插件提供给 Airflow 的任何宏。但是,可以使用 string_args。

如果外部环境中安装的 Airflow 版本与 Operator 使用的版本不同,则 Operator 将失败。

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南:ExternalPythonOperator

参数
  • python (str) – 指向虚拟环境内部 Python 二进制文件的完整路径字符串(特定于文件系统)(位于 VENV/bin 文件夹中)。应该是绝对路径(因此通常以 “/” 或 “X:/” 开头,具体取决于使用的文件系统/操作系统)。

  • python_callable (Callable) – 一个不引用外部变量的 Python 函数,使用 def 定义,将在虚拟环境中运行。

  • serializer (_SerializerTypeDef | None) –

    用于序列化参数和结果的序列化器。它可以是以下之一

    • "pickle": (默认) 使用 pickle 进行序列化。包含在 Python 标准库中。

    • "cloudpickle": 使用 cloudpickle 序列化更复杂的类型,这需要在您的需求中包含 cloudpickle。

    • "dill": 使用 dill 序列化更复杂的类型,这需要在您的需求中包含 dill。

  • op_args (Collection[Any] | None) – 传递给 python_callable 的位置参数列表。

  • op_kwargs (Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。

  • string_args (Iterable[str] | None) – 存在于全局变量 virtualenv_string_args 中的字符串,在运行时以 list[str] 的形式提供给 python_callable。请注意,参数按换行符分隔。

  • templates_dict (dict | None) – 一个字典,其中的值是模板,这些模板将在 __init__execute 之间由 Airflow 引擎进行模板化,并在应用模板后在您的可调用对象的上下文中可用

  • templates_exts (list[str] | None) – 一个文件扩展名列表,用于在处理模板化字段时解析,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 预期目标环境中安装了 Airflow。如果为 true,则如果未安装 Airflow,该操作符将引发警告,并且它将在启动时尝试加载 Airflow 宏。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此退出代码退出,则将任务保留在 skipped 状态(默认值:None)。如果设置为 None,则任何非零退出代码都将被视为失败。

  • env_vars (dict[str, str] | None) – 一个字典,包含在执行虚拟环境时要为虚拟环境设置的其他环境变量。

  • inherit_env (bool) – 是否在执行虚拟环境时继承当前环境变量。如果设置为 True,则虚拟环境将继承父进程 (os.environ) 的环境变量。如果设置为 False,则虚拟环境将在干净的环境中执行。

  • use_dill (bool) – 已弃用,请改用 serializer。是否使用 dill 序列化参数和结果(默认为 pickle)。这允许更复杂的类型,但需要您在需求中包含 dill。

template_fields: Sequence[str][source]
execute_callable()[source]

使用给定的参数调用 Python 可调用对象。

返回

调用的返回值。

class airflow.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]

基类:ExternalPythonOperator, airflow.operators.branch.BranchMixIn

工作流可以在此任务执行后“分支”或遵循路径。

扩展了 ExternalPythonOperator,因此期望获取 Python:应该使用的虚拟环境(在 VENV/bin 文件夹中)。应该是绝对路径,以便它可以像 ExternalPythonOperator 一样在单独的虚拟环境中运行。

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南:BranchExternalPythonOperator

execute(context)[source]

在创建操作符时派生。

Context 是在渲染 Jinja 模板时使用的相同字典。

有关更多上下文,请参阅 get_template_context。

airflow.operators.python.get_current_context()[source]

检索执行上下文字典,而不更改用户方法的签名。

这是检索执行上下文字典的最简单方法。

旧样式

def my_task(**context):
    ti = context["ti"]

新样式

from airflow.operators.python import get_current_context


def my_task():
    context = get_current_context()
    ti = context["ti"]

仅当在 Operator 开始执行后调用此方法时,当前上下文才会有值。

此条目是否有帮助?