airflow.providers.standard.operators.python

模块内容

PythonOperator

执行 Python 可调用对象。

BranchPythonOperator

工作流可以在此任务执行后“分支”或遵循路径。

ShortCircuitOperator

允许管道根据 python_callable 的结果继续执行。

PythonVirtualenvOperator

在自动创建和销毁的 virtualenv 中运行函数。

BranchPythonVirtualenvOperator

工作流可以在此任务在虚拟环境中执行后“分支”或遵循路径。

ExternalPythonOperator

在不会重新创建的 virtualenv 中运行函数。

BranchExternalPythonOperator

工作流可以在此任务执行后“分支”或遵循路径。

函数

get_current_context()

检索执行上下文字典,而不更改用户方法的签名。

属性

log

airflow.providers.standard.operators.python.log[源代码]
class airflow.providers.standard.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[源代码]

基类:airflow.models.baseoperator.BaseOperator

执行 Python 可调用对象。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南:PythonOperator

在运行可调用对象时,Airflow 将传递一组可在函数中使用的关键字参数。这组 kwargs 与你在 jinja 模板中使用的内容完全对应。要使此功能正常工作,你需要在函数头中定义 **kwargs,或者可以直接添加你想要获取的关键字参数 - 例如,使用以下代码,你的可调用对象将获取 ti 上下文变量的值。

使用显式参数

def my_python_callable(ti):
    pass

使用 kwargs

def my_python_callable(**kwargs):
    ti = kwargs["ti"]
参数
  • python_callable (Callable) – 对可调用对象的引用

  • op_args (collections.abc.Collection[Any] | None) – 调用你的可调用对象时将解包的位置参数列表

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 将在你的函数中解包的关键字参数字典

  • templates_dict (dict[str, Any] | None) – 一个字典,其中的值是模板,这些模板将在 __init__execute 之间由 Airflow 引擎进行模板化,并在应用模板后在可调用对象的上下文中可用。(模板化)

  • templates_exts (collections.abc.Sequence[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如 ['.sql', '.hql']

  • show_return_value_in_logs (bool) – 一个布尔值,指示是否显示 return_value 日志。默认为 True,允许 return value 日志输出。可以将其设置为 False,以防止在返回大量数据(例如将大量 XCom 传输到 TaskAPI)时输出 return value 的日志。

template_fields: collections.abc.Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')[源代码]
template_fields_renderers[源代码]
BLUE = '#ffefeb'[源代码]
ui_color[源代码]
shallow_copy_attrs: collections.abc.Sequence[str] = ('python_callable', 'op_kwargs')[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

determine_kwargs(context)[源代码]
execute_callable()[源代码]

使用给定的参数调用 python 可调用对象。

返回

调用的返回值。

返回类型

Any

class airflow.providers.standard.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[源代码]

基类:PythonOperatorairflow.operators.branch.BranchMixIn

工作流可以在此任务执行后“分支”或遵循路径。

它继承自 PythonOperator,并期望一个 Python 函数,该函数返回单个 task_id、单个 task_group_id 或一个 task_ids 和/或 task_group_ids 列表以供后续执行。返回的 task_id(s) 和/或 task_group_id(s) 应指向 {self} 的直接下游任务或任务组。所有其他“分支”或直接下游任务都将被标记为 skipped 状态,以便这些路径无法继续。 skipped 状态会向下传播,以允许 DAG 状态填充完整并推断 DAG 运行的状态。

execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.standard.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[source]

基类:PythonOperator, airflow.models.skipmixin.SkipMixin

允许管道根据 python_callable 的结果继续执行。

ShortCircuitOperator 继承自 PythonOperator,并评估 python_callable 的结果。如果返回结果为 False 或假值,则管道将短路。下游任务将根据配置的短路模式标记为“skipped”状态。如果返回结果为 True 或真值,则下游任务将正常进行,并且将推送返回结果的 XCom

短路可以配置为尊重或忽略为下游任务设置的 trigger_rule。如果 ignore_downstream_trigger_rules 设置为 True(默认设置),则所有下游任务都将被跳过,而无需考虑为任务定义的 trigger_rule。但是,如果此参数设置为 False,则直接下游任务将被跳过,但将尊重其他后续下游任务的指定 trigger_rule。在此模式下,操作符假设直接下游任务被有目的地跳过,但可能不是其他后续任务。

另请参阅

有关如何使用此操作符的更多信息,请查看以下指南:ShortCircuitOperator

参数

ignore_downstream_trigger_rules (bool) – 如果设置为 True,则此操作符任务的所有下游任务都将被跳过。这是默认行为。如果设置为 False,则直接下游任务将被跳过,但将尊重为所有其他下游任务定义的 trigger_rule

execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.standard.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]

基类:_BasePythonVirtualenvOperator

在自动创建和销毁的 virtualenv 中运行函数。

该函数(有一些注意事项)必须使用 def 定义,而不是类的一部分。所有导入都必须在函数内部进行,并且不得引用范围外的变量。将有一个名为 virtualenv_string_args 的全局范围变量可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递内容,并且可以使用返回值。请注意,如果您的虚拟环境运行的 Python 主要版本与 Airflow 不同,则不能使用返回值、op_args、op_kwargs 或使用任何通过插件提供给 Airflow 的宏。但是,可以使用 string_args。

另请参阅

有关如何使用此操作符的更多信息,请查看以下指南:PythonVirtualenvOperator

参数
  • python_callable (Callable) – 一个没有外部变量引用的 python 函数,使用 def 定义,它将在虚拟环境中运行。

  • requirements (None | collections.abc.Iterable[str] | str) – 一个需求字符串列表,或 pip 指定的(模板化)“需求文件”。

  • python_version (str | None) – 用于运行虚拟环境的 Python 版本。请注意,2 和 2.7 都是可接受的形式。

  • serializer (_SerializerTypeDef | None) –

    用于序列化参数和结果的序列化器。它可以是以下之一

    • "pickle":(默认)使用 pickle 进行序列化。包含在 Python 标准库中。

    • "cloudpickle":使用 cloudpickle 序列化更复杂的类型,这需要将 cloudpickle 包含在您的需求中。

    • "dill":使用 dill 序列化更复杂的类型,这需要将 dill 包含在您的需求中。

  • system_site_packages (bool) – 是否在虚拟环境中包含 system_site_packages。有关更多信息,请参阅 virtualenv 文档。

  • pip_install_options (list[str] | None) – 安装需求时 pip 安装选项的列表。有关可用选项,请参阅“pip install -h”

  • op_args (collections.abc.Collection[Any] | None) – 传递给 python_callable 的位置参数列表。

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。

  • string_args (collections.abc.Iterable[str] | None) – 存在于全局变量 virtualenv_string_args 中的字符串,可在运行时作为 list[str] 提供给 python_callable。请注意,参数按换行符分割。

  • templates_dict (dict | None) – 一个字典,其中的值是将在 __init__execute 之间某个时间由 Airflow 引擎模板化的模板,并在应用模板后在可调用对象的上下文中可用

  • templates_exts (list[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 期望目标环境中安装了 Airflow。如果为 true,则当未安装 Airflow 时,操作符会发出警告,并且在启动时会尝试加载 Airflow 宏。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此退出代码退出,则将任务保持在 skipped 状态(默认值:None)。如果设置为 None,则任何非零退出代码都将被视为失败。

  • index_urls (None | collections.abc.Collection[str] | str) – 一个可选的索引 URL 列表,用于从中加载 Python 包。如果未提供,则将使用系统 pip 配置来获取包。

  • venv_cache_path (None | os.PathLike[str]) – 虚拟环境父文件夹的可选路径,虚拟环境将在此文件夹中缓存,创建一个子文件夹 venv-{hash},其中 hash 将被替换为需求的校验和。如果未提供,则虚拟环境将在每次执行时在临时文件夹中创建和删除。

  • env_vars (dict[str, str] | None) – 一个字典,其中包含在执行虚拟环境时要为虚拟环境设置的其他环境变量。

  • inherit_env (bool) – 是否在执行虚拟环境时继承当前环境变量。如果设置为 True,则虚拟环境将继承父进程的环境变量 (os.environ)。如果设置为 False,则虚拟环境将在干净的环境中执行。

  • use_airflow_context (bool) – 是否将 get_current_context() 提供给 python_callable。尚未实现 - 等待 AIP-72 上下文序列化。

template_fields: collections.abc.Sequence[str][source]
template_ext: collections.abc.Sequence[str] = ('.txt',)[source]
execute_callable()[source]

使用给定的参数调用 python 可调用对象。

返回

调用的返回值。

class airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]

基类:PythonVirtualenvOperator, airflow.operators.branch.BranchMixIn

工作流可以在虚拟环境中执行此任务后“分支”或遵循路径。

它派生自 PythonVirtualenvOperator,并期望一个返回单个 task_id、单个 task_group_id 或要遵循的 task_id 和/或 task_group_id 列表的 Python 函数。返回的 task_id 和/或 task_group_id 应指向 {self} 的直接下游的任务或任务组。所有其他“分支”或直接下游任务都标记为 skipped 状态,以便这些路径无法前进。skipped 状态向下游传播,以允许 DAG 状态填充,并推断 DAG 运行的状态。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:BranchPythonVirtualenvOperator

execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.standard.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]

基类:_BasePythonVirtualenvOperator

在不会重新创建的 virtualenv 中运行函数。

在不创建虚拟环境的情况下按原样重用(有一定的注意事项)。

该函数必须使用 def 定义,而不是类的一部分。所有导入都必须在函数内部进行,并且不能引用范围外的变量。一个名为 virtualenv_string_args 的全局作用域变量将可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递内容,并且可以使用返回值。请注意,如果您的虚拟环境运行的 Python 主要版本与 Airflow 不同,则无法使用返回值、op_args、op_kwargs,或使用通过插件提供给 Airflow 的任何宏。但是可以使用 string_args。

如果 Airflow 安装在外部环境中,其版本与操作符使用的版本不同,则操作符将失败。

另请参阅

有关如何使用此操作符的更多信息,请查看以下指南: ExternalPythonOperator

参数
  • python (str) – 指向虚拟环境内部 Python 二进制文件的完整路径字符串(特定于文件系统)。应该使用绝对路径(因此通常以“/”或“X:/”开头,具体取决于使用的文件系统/操作系统)。(位于 VENV/bin 文件夹中)。

  • python_callable (Callable) – 一个没有外部变量引用的 python 函数,使用 def 定义,它将在虚拟环境中运行。

  • serializer (_SerializerTypeDef | None) –

    用于序列化参数和结果的序列化器。它可以是以下之一

    • "pickle":(默认)使用 pickle 进行序列化。包含在 Python 标准库中。

    • "cloudpickle":使用 cloudpickle 序列化更复杂的类型,这需要将 cloudpickle 包含在您的需求中。

    • "dill":使用 dill 序列化更复杂的类型,这需要将 dill 包含在您的需求中。

  • op_args (collections.abc.Collection[Any] | None) – 传递给 python_callable 的位置参数列表。

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。

  • string_args (collections.abc.Iterable[str] | None) – 存在于全局变量 virtualenv_string_args 中的字符串,可在运行时作为 list[str] 提供给 python_callable。请注意,参数按换行符分割。

  • templates_dict (dict | None) – 一个字典,其中的值是将在 __init__execute 之间某个时间由 Airflow 引擎模板化的模板,并在应用模板后在可调用对象的上下文中可用

  • templates_exts (list[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 期望目标环境中安装了 Airflow。如果为 true,则当未安装 Airflow 时,操作符会发出警告,并且在启动时会尝试加载 Airflow 宏。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此退出代码退出,则将任务保持在 skipped 状态(默认值:None)。如果设置为 None,则任何非零退出代码都将被视为失败。

  • env_vars (dict[str, str] | None) – 一个字典,其中包含在执行虚拟环境时要为虚拟环境设置的其他环境变量。

  • inherit_env (bool) – 是否在执行虚拟环境时继承当前环境变量。如果设置为 True,则虚拟环境将继承父进程的环境变量 (os.environ)。如果设置为 False,则虚拟环境将在干净的环境中执行。

  • use_airflow_context (bool) – 是否将 get_current_context() 提供给 python_callable。尚未实现 - 等待 AIP-72 上下文序列化。

template_fields: collections.abc.Sequence[str][source]
execute_callable()[source]

使用给定的参数调用 python 可调用对象。

返回

调用的返回值。

class airflow.providers.standard.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_airflow_context=False, **kwargs)[source]

基类: ExternalPythonOperator, airflow.operators.branch.BranchMixIn

工作流可以在此任务执行后“分支”或遵循路径。

扩展了 ExternalPythonOperator,因此期望获得 Python:应该使用的虚拟环境(位于 VENV/bin 文件夹中)。应该是绝对路径,因此它可以在单独的虚拟环境中运行,类似于 ExternalPythonOperator。

另请参阅

有关如何使用此操作符的更多信息,请查看以下指南: BranchExternalPythonOperator

execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

airflow.providers.standard.operators.python.get_current_context()[source]

检索执行上下文字典,而不更改用户方法的签名。

这是检索执行上下文字典的最简单方法。

旧样式

def my_task(**context):
    ti = context["ti"]

新样式

from airflow.providers.standard.operators.python import get_current_context


def my_task():
    context = get_current_context()
    ti = context["ti"]

只有在操作符开始执行后调用此方法时,当前上下文才会有值。

此条目是否有帮助?