airflow.operators.python
¶
模块内容¶
类¶
执行 Python 可调用对象。 |
|
工作流可以在执行此任务后“分支”或遵循路径。 |
|
允许管道基于 |
|
在自动创建和销毁的虚拟环境中运行函数。 |
|
工作流可以在虚拟环境中执行此任务后“分支”或遵循路径。 |
|
在不重新创建的虚拟环境中运行函数。 |
|
工作流可以在执行此任务后“分支”或遵循路径。 |
函数¶
通过检查 virtualenv 包是否在路径上或作为包安装来检查是否已安装该包。 |
|
|
请改用 |
检索执行上下文字典,而无需更改用户方法的签名。 |
属性¶
- airflow.operators.python.is_venv_installed()[源代码]¶
通过检查 virtualenv 包是否在路径上或作为包安装来检查是否已安装该包。
- 返回
如果是,则返回 True。无论哪种检查方式有效都可以。
- 返回类型
- airflow.operators.python.task(python_callable=None, multiple_outputs=None, **kwargs)[源代码]¶
请改用
airflow.decorators.task()
,此方法已弃用。调用
@task.python
并允许用户将 Python 函数转换为 Airflow 任务。- 参数
python_callable (Callable | None) – 对可调用对象的引用
op_kwargs – 一个关键字参数字典,将在您的函数中解包(已模板化)
op_args – 一个位置参数列表,将在调用您的可调用对象时解包(已模板化)
multiple_outputs (bool | None) – 如果设置,函数返回值将展开为多个 XCom 值。字典将展开为以键作为键的 xcom 值。默认为 False。
- class airflow.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[源代码]¶
基类:
airflow.models.baseoperator.BaseOperator
执行 Python 可调用对象。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:PythonOperator
在运行您的可调用对象时,Airflow 将传递一组可在您的函数中使用的关键字参数。这组 kwargs 与您可以在 Jinja 模板中使用的内容完全对应。为了使此工作正常进行,您需要在函数头中定义
**kwargs
,或者您可以直接添加您想要获取的关键字参数 - 例如,使用以下代码,您的可调用对象将获取ti
和next_ds
上下文变量的值。使用显式参数
def my_python_callable(ti, next_ds): pass
使用 kwargs
def my_python_callable(**kwargs): ti = kwargs["ti"] next_ds = kwargs["next_ds"]
- 参数
python_callable (Callable) – 对可调用对象的引用
op_args (Collection[Any] | None) – 一个位置参数列表,将在调用您的可调用对象时解包
op_kwargs (Mapping[str, Any] | None) – 一个关键字参数字典,将在您的函数中解包
templates_dict (dict[str, Any] | None) – 一个字典,其中值是模板,这些模板将在
__init__
和execute
之间由 Airflow 引擎进行模板化处理,并在应用模板后在您的可调用对象的上下文中可用。(已模板化)templates_exts (Sequence[str] | None) – 一个文件扩展名列表,用于在处理模板字段时解析,例如
['.sql', '.hql']
show_return_value_in_logs (bool) – 一个布尔值,指示是否在日志中显示 return_value。默认为 True,允许 return value 日志输出。当您返回大量数据(例如向 TaskAPI 传输大量 XCom)时,可以将其设置为 False 以防止返回值的日志输出。
- class airflow.operators.python.BranchPythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[source]¶
基类:
PythonOperator
,airflow.operators.branch.BranchMixIn
工作流可以在此任务执行后“分支”或遵循路径。
它派生自 PythonOperator 并期望一个 Python 函数,该函数返回一个 task_id、一个 task_group_id 或要遵循的 task_ids 和/或 task_group_ids 列表。返回的 task_id(s) 和/或 task_group_id(s) 应该指向 {self} 的直接下游的任务或任务组。所有其他“分支”或直接下游任务都标记为
skipped
状态,以便这些路径无法向前移动。skipped
状态会向下传播,以允许 DAG 状态填充并推断 DAG 运行的状态。
- class airflow.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[source]¶
基类:
PythonOperator
,airflow.models.skipmixin.SkipMixin
允许管道基于
python_callable
的结果继续运行。ShortCircuitOperator 派生自 PythonOperator,并评估
python_callable
的结果。如果返回的结果为 False 或假值,则管道将短路。根据配置的短路模式,下游任务将标记为“skipped”状态。如果返回的结果为 True 或真值,则下游任务将照常进行,并推送返回结果的XCom
。可以将短路配置为遵守或忽略为下游任务设置的
trigger_rule
。如果ignore_downstream_trigger_rules
设置为 True,这是默认设置,则所有下游任务都将在不考虑为任务定义的trigger_rule
的情况下跳过。但是,如果此参数设置为 False,则会跳过直接下游任务,但会遵守其他后续下游任务的指定trigger_rule
。在此模式下,操作符假定直接下游任务的目的是跳过,但可能不包括其他后续任务。另请参阅
有关如何使用此操作符的更多信息,请查看指南:ShortCircuitOperator
- 参数
ignore_downstream_trigger_rules (bool) – 如果设置为 True,则将跳过此操作符任务的所有下游任务。这是默认行为。如果设置为 False,则将跳过直接下游任务,但将遵守为所有其他下游任务定义的
trigger_rule
。
- class airflow.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]¶
基类:
_BasePythonVirtualenvOperator
在自动创建和销毁的虚拟环境中运行函数。
必须使用 def 定义该函数(具有某些注意事项),并且不能是类的一部分。所有导入都必须在函数内部进行,并且不能引用范围之外的任何变量。将提供一个名为 virtualenv_string_args 的全局范围变量(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递内容,并且可以使用返回值。请注意,如果您的虚拟环境在与 Airflow 不同的 Python 主要版本中运行,则不能使用返回值、op_args、op_kwargs 或使用通过插件提供给 Airflow 的任何宏。但是,可以使用 string_args。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:PythonVirtualenvOperator
- 参数
python_callable (Callable) – 一个不引用外部变量的 Python 函数,使用 def 定义,将在虚拟环境中运行。
requirements (None | Iterable[str] | str) – 一个需求字符串列表,或者一个由 pip 指定的(模板化的)“需求文件”。
python_version (str | None) – 运行虚拟环境的 Python 版本。注意 2 和 2.7 都是可接受的形式。
serializer (_SerializerTypeDef | None) –
用于序列化参数和结果的序列化器。它可以是以下之一
"pickle"
: (默认) 使用 pickle 进行序列化。包含在 Python 标准库中。"cloudpickle"
: 使用 cloudpickle 序列化更复杂的类型,这需要在您的需求中包含 cloudpickle。"dill"
: 使用 dill 序列化更复杂的类型,这需要在您的需求中包含 dill。
system_site_packages (bool) – 是否在虚拟环境中包含 system_site_packages。有关更多信息,请参阅 virtualenv 文档。
pip_install_options (list[str] | None) – 安装需求时 pip install 的选项列表。有关可用选项,请参阅 ‘pip install -h’
op_args (Collection[Any] | None) – 传递给 python_callable 的位置参数列表。
op_kwargs (Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。
string_args (Iterable[str] | None) – 存在于全局变量 virtualenv_string_args 中的字符串,在运行时以 list[str] 的形式提供给 python_callable。请注意,参数按换行符分隔。
templates_dict (dict | None) – 一个字典,其中的值是模板,这些模板将在
__init__
和execute
之间由 Airflow 引擎进行模板化,并在应用模板后在您的可调用对象的上下文中可用templates_exts (list[str] | None) – 一个文件扩展名列表,用于在处理模板化字段时解析,例如
['.sql', '.hql']
expect_airflow (bool) – 预期目标环境中安装了 Airflow。如果为 true,则如果未安装 Airflow,该操作符将引发警告,并且它将在启动时尝试加载 Airflow 宏。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此退出代码退出,则将任务保留在
skipped
状态(默认值:None)。如果设置为None
,则任何非零退出代码都将被视为失败。index_urls (None | Collection[str] | str) – 一个可选的索引 URL 列表,用于从中加载 Python 包。如果未提供,则将使用系统 pip conf 从中获取包。
venv_cache_path (None | os.PathLike[str]) – 虚拟环境父文件夹的可选路径,其中将缓存虚拟环境,创建子文件夹 venv-{hash},其中 hash 将替换为需求校验和。如果未提供,则将在每次执行时在临时文件夹中创建和删除虚拟环境。
env_vars (dict[str, str] | None) – 一个字典,包含在执行虚拟环境时要为虚拟环境设置的其他环境变量。
inherit_env (bool) – 是否在执行虚拟环境时继承当前环境变量。如果设置为
True
,则虚拟环境将继承父进程 (os.environ
) 的环境变量。如果设置为False
,则虚拟环境将在干净的环境中执行。use_dill (bool) – 已弃用,请改用
serializer
。是否使用 dill 序列化参数和结果(默认为 pickle)。这允许更复杂的类型,但需要您在需求中包含 dill。
- class airflow.operators.python.BranchPythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, venv_cache_path=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]¶
基类:
PythonVirtualenvOperator
,airflow.operators.branch.BranchMixIn
工作流可以在虚拟环境中执行此任务后“分支”或遵循路径。
它派生自 PythonVirtualenvOperator 并期望一个 Python 函数,该函数返回单个 task_id、单个 task_group_id 或要遵循的 task_ids 和/或 task_group_ids 的列表。返回的 task_id(s) 和/或 task_group_id(s) 应指向 {self} 直接下游的任务或任务组。所有其他“分支”或直接下游的任务都标记为
skipped
状态,以便这些路径无法向前移动。skipped
状态会向下传播,以便填充 DAG 状态并推断 DAG 运行的状态。另请参阅
有关如何使用此操作符的更多信息,请查看指南:BranchPythonVirtualenvOperator
- class airflow.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]¶
基类:
_BasePythonVirtualenvOperator
在不重新创建的虚拟环境中运行函数。
重用现有环境,避免创建虚拟环境的开销(但有一些注意事项)。
该函数必须使用 def 定义,不能是类的一部分。所有导入都必须在函数内部进行,并且不能引用范围之外的变量。一个名为 virtualenv_string_args 的全局范围变量将可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递数据,并且可以使用返回值。请注意,如果您的虚拟环境运行的 Python 主要版本与 Airflow 不同,则不能使用返回值、op_args、op_kwargs 或使用通过插件提供给 Airflow 的任何宏。但是,可以使用 string_args。
如果外部环境中安装的 Airflow 版本与 Operator 使用的版本不同,则 Operator 将失败。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:ExternalPythonOperator
- 参数
python (str) – 指向虚拟环境内部 Python 二进制文件的完整路径字符串(特定于文件系统)(位于
VENV/bin
文件夹中)。应该是绝对路径(因此通常以 “/” 或 “X:/” 开头,具体取决于使用的文件系统/操作系统)。python_callable (Callable) – 一个不引用外部变量的 Python 函数,使用 def 定义,将在虚拟环境中运行。
serializer (_SerializerTypeDef | None) –
用于序列化参数和结果的序列化器。它可以是以下之一
"pickle"
: (默认) 使用 pickle 进行序列化。包含在 Python 标准库中。"cloudpickle"
: 使用 cloudpickle 序列化更复杂的类型,这需要在您的需求中包含 cloudpickle。"dill"
: 使用 dill 序列化更复杂的类型,这需要在您的需求中包含 dill。
op_args (Collection[Any] | None) – 传递给 python_callable 的位置参数列表。
op_kwargs (Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。
string_args (Iterable[str] | None) – 存在于全局变量 virtualenv_string_args 中的字符串,在运行时以 list[str] 的形式提供给 python_callable。请注意,参数按换行符分隔。
templates_dict (dict | None) – 一个字典,其中的值是模板,这些模板将在
__init__
和execute
之间由 Airflow 引擎进行模板化,并在应用模板后在您的可调用对象的上下文中可用templates_exts (list[str] | None) – 一个文件扩展名列表,用于在处理模板化字段时解析,例如
['.sql', '.hql']
expect_airflow (bool) – 预期目标环境中安装了 Airflow。如果为 true,则如果未安装 Airflow,该操作符将引发警告,并且它将在启动时尝试加载 Airflow 宏。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以此退出代码退出,则将任务保留在
skipped
状态(默认值:None)。如果设置为None
,则任何非零退出代码都将被视为失败。env_vars (dict[str, str] | None) – 一个字典,包含在执行虚拟环境时要为虚拟环境设置的其他环境变量。
inherit_env (bool) – 是否在执行虚拟环境时继承当前环境变量。如果设置为
True
,则虚拟环境将继承父进程 (os.environ
) 的环境变量。如果设置为False
,则虚拟环境将在干净的环境中执行。use_dill (bool) – 已弃用,请改用
serializer
。是否使用 dill 序列化参数和结果(默认为 pickle)。这允许更复杂的类型,但需要您在需求中包含 dill。
- class airflow.operators.python.BranchExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, use_dill=False, **kwargs)[source]¶
基类:
ExternalPythonOperator
,airflow.operators.branch.BranchMixIn
工作流可以在此任务执行后“分支”或遵循路径。
扩展了 ExternalPythonOperator,因此期望获取 Python:应该使用的虚拟环境(在
VENV/bin
文件夹中)。应该是绝对路径,以便它可以像 ExternalPythonOperator 一样在单独的虚拟环境中运行。另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:BranchExternalPythonOperator
- airflow.operators.python.get_current_context()[source]¶
检索执行上下文字典,而不更改用户方法的签名。
这是检索执行上下文字典的最简单方法。
旧样式
def my_task(**context): ti = context["ti"]
新样式
from airflow.operators.python import get_current_context def my_task(): context = get_current_context() ti = context["ti"]
仅当在 Operator 开始执行后调用此方法时,当前上下文才会有值。