airflow.providers.standard.operators.python¶
属性¶
类¶
执行一个 Python 可调用对象。 |
|
工作流可以在此任务执行后“分支”或遵循一条路径。 |
|
根据 |
|
在一个自动创建和销毁的虚拟环境中运行一个函数。 |
|
工作流可以在此任务在虚拟环境中执行后“分支”或遵循一条路径。 |
|
在不重新创建的虚拟环境中运行一个函数。 |
|
工作流可以在此任务执行后“分支”或遵循一条路径。 |
函数¶
检索执行上下文字典,而无需更改用户方法的签名。 |
模块内容¶
- class airflow.providers.standard.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[source]¶
基类:
airflow.models.baseoperator.BaseOperator
执行一个 Python 可调用对象。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南: PythonOperator
运行您的可调用对象时,Airflow 将传递一组可在函数中使用的关键字参数。这组 kwargs 与您在 jinja 模板中使用的内容完全对应。为此,您需要在函数头中定义
**kwargs
,或者您可以直接添加您想要获取的关键字参数 - 例如,使用下面的代码,您的可调用对象将获取ti
上下文变量的值。使用显式参数
def my_python_callable(ti): pass
使用 kwargs
def my_python_callable(**kwargs): ti = kwargs["ti"]
- 参数:
python_callable (Callable) – 对可调用对象的引用
op_args (collections.abc.Collection[Any] | None) – 调用您的可调用对象时将被解包的位置参数列表
op_kwargs (collections.abc.Mapping[str, Any] | None) – 一个将在您的函数中被解包的关键字参数字典
templates_dict (dict[str, Any] | None) – 一个字典,其值是模板,这些模板将在
__init__
和execute
执行之间被 Airflow 引擎模板化,并在应用模板后在您的可调用对象的上下文中可用。(已模板化)templates_exts (collections.abc.Sequence[str] | None) – 处理模板化字段时要解析的文件扩展名列表,例如
['.sql', '.hql']
show_return_value_in_logs (bool) – 一个布尔值,指示是否显示返回值日志。默认为 True,允许输出返回值日志。当您返回大量数据(例如通过 XCom 向 TaskAPI 传输大量数据)时,可以将其设置为 False 以防止输出返回值日志。
- template_fields: collections.abc.Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')[source]¶
- shallow_copy_attrs: collections.abc.Sequence[str] = ('python_callable', 'op_kwargs')[source]¶
- class airflow.providers.standard.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[source]¶
基类:
PythonOperator
,airflow.providers.standard.operators.branch.BranchMixIn
工作流可以在此任务执行后“分支”或遵循一条路径。
它派生自 PythonOperator,并期望一个 Python 函数返回一个 task_id、一个 task_group_id,或一个要遵循的 task_id(s) 和/或 task_group_id(s) 列表。返回的 task_id(s) 和/或 task_group_id(s) 应该指向 {self} 的直接下游任务或任务组。所有其他“分支”或直接下游任务都被标记为
skipped
状态,以便这些路径无法继续前进。skipped
状态会向下游传播,以允许 DAG 状态填充并推断 DAG 运行状态。
- class airflow.providers.standard.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[source]¶
基类:
PythonOperator
,airflow.providers.standard.utils.skipmixin.SkipMixin
根据
python_callable
的结果允许管道继续执行。ShortCircuitOperator 派生自 PythonOperator,并评估
python_callable
的结果。如果返回结果为 False 或假值,则管道将被短路。下游任务将根据配置的短路模式标记为“skipped”状态。如果返回结果为 True 或真值,下游任务将正常进行,并且返回结果的XCom
将被推送。短路可以配置为遵循或忽略为下游任务设置的
trigger_rule
。如果将ignore_downstream_trigger_rules
设置为 True(默认设置),则所有下游任务都将被跳过,而不考虑为任务定义的trigger_rule
。但是,如果将此参数设置为 False,则直接下游任务将被跳过,但将遵循为所有其他后续下游任务指定的trigger_rule
。在此模式下,Operator 假定直接下游任务是有意跳过的,但其他后续任务可能不是。另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南: ShortCircuitOperator
- 参数:
ignore_downstream_trigger_rules (bool) – 如果设置为 True,则从此 Operator 任务开始的所有下游任务都将被跳过。这是默认行为。如果设置为 False,则直接下游任务将被跳过,但将遵循为所有其他下游任务定义的
trigger_rule
。
- class airflow.providers.standard.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, **kwargs)[source]¶
基类:
_BasePythonVirtualenvOperator
在一个自动创建和销毁的虚拟环境中运行一个函数。
该函数(有一些注意事项)必须使用 def 定义,且不能是类的一部分。所有导入都必须发生在函数内部,并且不能引用作用域外部的任何变量。一个名为 virtualenv_string_args 的全局作用域变量将可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递内容,并且可以使用返回值。请注意,如果您的 virtualenv 运行的 Python 主要版本与 Airflow 不同,则不能使用返回值、op_args、op_kwargs,或使用通过插件提供给 Airflow 的任何宏。但您可以使用 string_args。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南: PythonVirtualenvOperator
- 参数:
python_callable (`Callable`) – 一个没有外部变量引用的 Python 函数,使用 def 定义,将在虚拟环境中运行。
requirements (`None` `|` `collections.abc.Iterable`[`str`] `|` `str`) – 一个需求字符串列表,或 pip 指定的(模板化)“需求文件”。
python_version (`str` `|` `None`) – 运行虚拟环境时使用的 Python 版本。请注意,2 和 2.7 都是可接受的形式。
serializer (`_SerializerTypeDef` `|` `None`) –
用于序列化参数和结果的序列化器。可以是以下之一:
"pickle"
:(默认)使用 pickle 进行序列化。包含在 Python 标准库中。"cloudpickle"
: 使用 cloudpickle 序列化更复杂的类型,这需要在您的需求中包含 cloudpickle。"dill"
: 使用 dill 序列化更复杂的类型,这需要在您的需求中包含 dill。
system_site_packages (`bool`) – 是否在您的虚拟环境中包含 system_site_packages。更多信息请参见 virtualenv 文档。
pip_install_options (`list`[`str`] `|` `None`) – 安装需求时使用的 pip 安装选项列表。有关可用选项,请参见‘pip install -h’。
op_args (`collections.abc.Collection`[`Any`] `|` `None`) – 要传递给 python_callable 的位置参数列表。
op_kwargs (`collections.abc.Mapping`[`str`, `Any`] `|` `None`) – 要传递给 python_callable 的关键字参数字典。
string_args (`collections.abc.Iterable`[`str`] `|` `None`) – 存在于全局变量 virtualenv_string_args 中的字符串,在运行时作为 list[str] 可供 python_callable 使用。注意参数按换行符分割。
templates_dict (`dict` `|` `None`) – 一个字典,其值是模板,这些模板将在
__init__
和execute
执行之间被 Airflow 引擎模板化,并在应用模板后在您的可调用对象的上下文中可用templates_exts (`list`[`str`] `|` `None`) – 处理模板化字段时要解析的文件扩展名列表,例如
['.sql', '.hql']
expect_airflow (`bool`) – 期望 Airflow 安装在目标环境中。如果为 true,则如果 Airflow 未安装,Operator 将发出警告,并在启动时尝试加载 Airflow 宏。
skip_on_exit_code (`int` `|` `collections.abc.Container`[`int`] `|` `None`) – 如果 python_callable 以此退出代码退出,则将任务保留在
skipped
状态(默认值: None)。如果设置为None
,则任何非零退出代码都将被视为失败。index_urls (`None` `|` `collections.abc.Collection`[`str`] `|` `str`) – 用于加载 Python 包的可选索引 url 列表。如果未提供,将使用系统 pip conf 来获取包。
venv_cache_path (`None` `|` `os.PathLike`[`str`]) – 虚拟环境父文件夹的可选路径,虚拟环境将在此文件夹中缓存,会创建一个子文件夹 venv-{hash},其中 hash 将替换为需求的校验和。如果未提供,虚拟环境将在每次执行时在临时文件夹中创建和删除。
env_vars (`dict`[`str`, `str`] `|` `None`) – 一个包含为虚拟环境执行时设置的附加环境变量的字典。
inherit_env (`bool`) – 执行虚拟环境时是否继承当前环境变量。如果设置为
True
,虚拟环境将继承父进程 (os.environ
) 的环境变量。如果设置为False
,虚拟环境将使用干净的环境执行。
- template_fields: collections.abc.Sequence[str][source]¶
- template_ext: collections.abc.Sequence[str] = ('.txt',)[source]¶
- class airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, **kwargs)[source]¶
基类:
PythonVirtualenvOperator
,airflow.providers.standard.operators.branch.BranchMixIn
工作流可以在此任务在虚拟环境中执行后“分支”或遵循一条路径。
它继承自 PythonVirtualenvOperator 并需要一个 Python 函数,该函数返回单个 task_id、单个 task_group_id,或要遵循的 task_ids 和/或 task_group_ids 列表。返回的 task_id(s) 和/或 task_group_id(s) 应指向紧接在 {self} 下游的任务或任务组。所有其他“分支”或直接下游任务都会被标记为
skipped
状态,这样这些路径就无法继续。skipped
状态会向下游传播,以便填充 DAG 状态并推断 DAG 运行的状态。另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:BranchPythonVirtualenvOperator
- class airflow.providers.standard.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, **kwargs)[source]¶
基类:
_BasePythonVirtualenvOperator
在不重新创建的虚拟环境中运行一个函数。
在不创建虚拟环境的开销下(带有一些注意事项)直接重用。
函数必须使用 def 定义,且不能是类的一部分。所有导入必须发生在函数内部,不能引用作用域之外的变量。一个名为 virtualenv_string_args 的全局作用域变量将可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递内容,并且可以使用返回值。请注意,如果您的虚拟环境运行的 Python 主版本与 Airflow 不同,则不能使用返回值、op_args、op_kwargs,或使用通过插件提供给 Airflow 的任何宏。但您可以使用 string_args。
如果 Airflow 安装在外部环境中的版本与 Operator 使用的版本不同,则 Operator 将会失败。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:ExternalPythonOperator
- 参数:
python (str) – 完整路径字符串(文件系统特定),指向虚拟环境中应使用的 Python 二进制文件(位于
VENV/bin
文件夹中)。应为绝对路径(因此通常根据所使用的文件系统/操作系统以“/”或“X:/”开头)。python_callable (`Callable`) – 一个没有外部变量引用的 Python 函数,使用 def 定义,将在虚拟环境中运行。
serializer (`_SerializerTypeDef` `|` `None`) –
用于序列化参数和结果的序列化器。可以是以下之一:
"pickle"
:(默认)使用 pickle 进行序列化。包含在 Python 标准库中。"cloudpickle"
: 使用 cloudpickle 序列化更复杂的类型,这需要在您的需求中包含 cloudpickle。"dill"
: 使用 dill 序列化更复杂的类型,这需要在您的需求中包含 dill。
op_args (`collections.abc.Collection`[`Any`] `|` `None`) – 要传递给 python_callable 的位置参数列表。
op_kwargs (`collections.abc.Mapping`[`str`, `Any`] `|` `None`) – 要传递给 python_callable 的关键字参数字典。
string_args (`collections.abc.Iterable`[`str`] `|` `None`) – 存在于全局变量 virtualenv_string_args 中的字符串,在运行时作为 list[str] 可供 python_callable 使用。注意参数按换行符分割。
templates_dict (`dict` `|` `None`) – 一个字典,其值是模板,这些模板将在
__init__
和execute
执行之间被 Airflow 引擎模板化,并在应用模板后在您的可调用对象的上下文中可用templates_exts (`list`[`str`] `|` `None`) – 处理模板化字段时要解析的文件扩展名列表,例如
['.sql', '.hql']
expect_airflow (`bool`) – 期望 Airflow 安装在目标环境中。如果为 true,则如果 Airflow 未安装,Operator 将发出警告,并在启动时尝试加载 Airflow 宏。
skip_on_exit_code (`int` `|` `collections.abc.Container`[`int`] `|` `None`) – 如果 python_callable 以此退出代码退出,则将任务保留在
skipped
状态(默认值: None)。如果设置为None
,则任何非零退出代码都将被视为失败。env_vars (`dict`[`str`, `str`] `|` `None`) – 一个包含为虚拟环境执行时设置的附加环境变量的字典。
inherit_env (`bool`) – 执行虚拟环境时是否继承当前环境变量。如果设置为
True
,虚拟环境将继承父进程 (os.environ
) 的环境变量。如果设置为False
,虚拟环境将使用干净的环境执行。
- template_fields: collections.abc.Sequence[str][source]¶
- class airflow.providers.standard.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, **kwargs)[source]¶
基类:
ExternalPythonOperator
,airflow.providers.standard.operators.branch.BranchMixIn
工作流可以在此任务执行后“分支”或遵循一条路径。
继承 ExternalPythonOperator,因此需要获取 Python:应使用的虚拟环境(位于
VENV/bin
文件夹中)。应为绝对路径,以便它可以在单独的虚拟环境中运行,类似于 ExternalPythonOperator。另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:BranchExternalPythonOperator
- airflow.providers.standard.operators.python.get_current_context()[source]¶
检索执行上下文字典,而不改变用户方法的签名。
这是检索执行上下文字典的最简单方法。
旧样式
def my_task(**context): ti = context["ti"]
新样式
from airflow.providers.standard.operators.python import get_current_context def my_task(): context = get_current_context() ti = context["ti"]
仅当在 Operator 开始执行后调用此方法时,当前上下文才会有值。