airflow.providers.standard.operators.python¶
属性¶
类¶
所有 Python 操作符的基类。 |
|
工作流可以在此任务执行后“分支”或沿某条路径继续。 |
|
允许管道根据 |
|
在自动创建和销毁的 virtualenv 中运行函数。 |
|
工作流可以在虚拟环境中此任务执行后“分支”或沿某条路径继续。 |
|
在未重新创建的 virtualenv 中运行函数。 |
|
工作流可以在此任务执行后“分支”或沿某条路径继续。 |
函数¶
在不更改用户方法签名的情况下检索执行上下文字典。 |
模块内容¶
- class airflow.providers.standard.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[source]¶
Bases:
airflow.providers.common.compat.standard.operators.BaseAsyncOperator所有 Python 操作符的基类。
另请参阅
欲了解如何使用此操作符的更多信息,请参阅指南: PythonOperator
在运行可调用函数时,Airflow 会传入一组关键字参数,可在函数中使用。这组 kwargs 与您在 Jinja 模板中使用的完全对应。为使其生效,您需要在函数签名中定义 **kwargs,或者直接添加您想获取的关键字参数——例如下面的代码中,可调用函数将会获得 ti 上下文变量的值。
使用显式参数
def my_python_callable(ti): pass
使用 kwargs
def my_python_callable(**kwargs): ti = kwargs["ti"]
- 参数:
python_callable (collections.abc.Callable) – 对可调用对象的引用
op_args (collections.abc.Collection[Any] | None) – 调用可调用函数时将被解包的位置参数列表
op_kwargs (collections.abc.Mapping[str, Any] | None) – 将在函数调用时解包的关键字参数字典
templates_dict (dict[str, Any] | None) – 一个字典,值为模板,这些模板将在
__init__与execute之间的某个时刻由 Airflow 引擎渲染,并在模板应用后在可调用函数的上下文中可用。(已模板化)templates_exts (collections.abc.Sequence[str] | None) – 处理模板化字段时要解析的文件扩展名列表,例如 ['.sql', '.hql']
show_return_value_in_logs (bool) – 布尔值,指示是否显示 return_value 日志。默认 True,允许输出返回值日志。若设为 False,可在返回大量数据(如向 TaskAPI 传输大量 XCom)时阻止返回值日志输出。
- template_fields: collections.abc.Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')[source]¶
- shallow_copy_attrs: collections.abc.Sequence[str] = ('python_callable', 'op_kwargs')[source]¶
- class airflow.providers.standard.operators.python.BranchPythonOperator(*, task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=DEFAULT_EMAIL_ON_RETRY, email_on_failure=DEFAULT_EMAIL_ON_FAILURE, retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=0, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, render_template_as_native_obj=None, **kwargs)[source]¶
Bases:
airflow.providers.common.compat.sdk.BaseBranchOperator,PythonOperator工作流可以在此任务执行后“分支”或沿某条路径继续。
它继承自 PythonOperator,并期望一个返回单个 task_id、单个 task_group_id,或任务 ID 和/或任务组 ID 列表的 Python 函数。返回的 task_id(s) 或 task_group_id(s) 应直接指向位于 {self} 下游的任务或任务组。所有其他“分支”或直接下游任务将被标记为
skipped状态,以阻止这些路径继续前进。skipped状态会向下游传播,以便填充 DAG 状态并推断 DAG 运行的状态。
- class airflow.providers.standard.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[source]¶
Bases:
PythonOperator,airflow.providers.common.compat.sdk.SkipMixin允许管道根据
python_callable的结果继续执行。ShortCircuitOperator 派生自 PythonOperator,并评估
python_callable的返回结果。如果返回结果为 False 或任何等价的假值,管道将被短路。下游任务将根据配置的短路模式被标记为 “skipped” 状态。如果返回结果为 True 或任何等价的真值,下游任务将正常继续,并将返回结果作为XCom推送。短路的行为可以配置为遵守或忽略下游任务设置的
trigger_rule。如果ignore_downstream_trigger_rules为 True(默认设置),所有下游任务将在不考虑其trigger_rule的情况下被跳过。然而,如果此参数设为 False,则直接下游任务会被跳过,但对其他后续下游任务的指定trigger_rule将被尊重。在此模式下,操作符假定直接下游任务被有意跳过,而其他后续任务则可能不被跳过。另请参阅
欲了解如何使用此操作符的更多信息,请参阅指南: ShortCircuitOperator
- 参数:
ignore_downstream_trigger_rules (bool) – 如果设为 True,则此操作符任务的所有下游任务都会被跳过。这是默认行为。如果设为 False,则直接下游任务会被跳过,但对所有其他下游任务定义的
trigger_rule将被遵守。
- class airflow.providers.standard.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, index_urls_from_connection_ids=None, venv_cache_path=None, env_vars=None, inherit_env=True, **kwargs)[source]¶
Bases:
_BasePythonVirtualenvOperator在自动创建和销毁的 virtualenv 中运行函数。
该函数(有一定限制)必须使用 def 定义,且不能是类的一部分。所有导入必须在函数内部进行,且不能引用作用域外的变量。一个名为 virtualenv_string_args 的全局变量将可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递参数,并且可以使用返回值。请注意,如果您的 virtualenv 使用的 Python 主版本与 Airflow 不同,则无法使用返回值、op_args、op_kwargs,或使用通过插件提供给 Airflow 的任何宏。不过可以使用 string_args。
另请参阅
欲了解如何使用此操作符的更多信息,请参阅指南: PythonVirtualenvOperator
- 参数:
python_callable (collections.abc.Callable) – 使用 def 定义的 Python 函数,不引用外部变量,将在虚拟环境中运行。
requirements (None | collections.abc.Iterable[str] | str) – 可以是需求字符串列表,或者是 pip 指定的(模板化的)“requirements 文件”。
python_version (str | None) – 用于运行虚拟环境的 Python 版本。注意,2 和 2.7 都是可接受的写法。
serializer (_SerializerTypeDef | None) –
用于序列化参数和结果的序列化器。它可以是以下之一:
"pickle": (默认)使用 pickle 进行序列化。包含在 Python 标准库中。"cloudpickle": 使用 cloudpickle 序列化更复杂的类型,这需要在你的需求中包含 cloudpickle。"dill": 使用 dill 序列化更复杂的类型,这需要在你的需求中包含 dill。
system_site_packages (bool) – 是否在虚拟环境中包含 system_site_packages。更多信息请参阅 virtualenv 文档。
pip_install_options (list[str] | None) – 安装需求时的 pip 安装选项列表。可用选项请参见 ‘pip install -h’。
op_args (collections.abc.Collection[Any] | None) – 传递给 python_callable 的位置参数列表。
op_kwargs (collections.abc.Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。
string_args (collections.abc.Iterable[str] | None) – 在全局变量 virtualenv_string_args 中存在的字符串,运行时作为 list[str] 提供给 python_callable。注意,参数会按换行符拆分。
templates_dict (dict | None) – 一个字典,其值为模板,模板将在
__init__与execute之间的某个时刻由 Airflow 引擎渲染,并在模板应用后在你的 callable 上下文中可用。templates_exts (list[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如
['.sql', '.hql']。expect_airflow (bool) – 期望在目标环境中安装 Airflow。如果为 true,若未安装 Airflow,操作符将发出警告,并在启动时尝试加载 Airflow 宏。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以该退出码退出,则将任务置为
skipped状态(默认:None)。若设置为None,任何非零退出码都将视为失败。index_urls (None | collections.abc.Collection[str] | str) – 可选的 Python 包索引 URL 列表。若未提供,则使用系统 pip 配置的源。
index_urls_from_connection_ids (None | collections.abc.Collection[str] | str) – 可选的 `PackageIndex` 连接 ID 列表。会追加到 `index_urls`。
venv_cache_path (None | os.PathLike[str]) – 可选的虚拟环境父文件夹路径,虚拟环境将在此缓存,会创建子文件夹 venv-{hash}(hash 为需求的校验和)。若未提供,虚拟环境将在每次执行时于临时文件夹中创建并删除。
inherit_env (bool) – 执行虚拟环境时是否继承当前环境变量。若设为
True,虚拟环境将继承父进程的环境变量(os.environ)。若设为False,虚拟环境将在干净的环境中执行。
- template_fields: collections.abc.Sequence[str][source]¶
- template_ext: collections.abc.Sequence[str] = ('.txt',)[source]¶
- class airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator(*, task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=DEFAULT_EMAIL_ON_RETRY, email_on_failure=DEFAULT_EMAIL_ON_FAILURE, retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=0, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, render_template_as_native_obj=None, **kwargs)[source]¶
Bases:
airflow.providers.common.compat.sdk.BaseBranchOperator,PythonVirtualenvOperator工作流可以在虚拟环境中执行此任务后“分支”或选择路径。
它继承自 PythonVirtualenvOperator,并期望一个返回单个 task_id、单个 task_group_id,或一个 task_id 和/或 task_group_id 列表的 Python 函数。返回的 task_id(s) 和/或 task_group_id(s) 应指向直接下游于 {self} 的任务或任务组。所有其他“分支”或直接下游任务会被标记为
skipped状态,以防这些路径继续前进。skipped状态会向下游传播,以便填满 DAG 状态并推断 DAG 运行的状态。另请参阅
欲了解如何使用此操作符的更多信息,请参阅指南: BranchPythonVirtualenvOperator
- class airflow.providers.standard.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, **kwargs)[source]¶
Bases:
_BasePythonVirtualenvOperator在未重新创建的 virtualenv 中运行函数。
直接复用,无需创建虚拟环境的开销(有一定限制)。
函数必须使用 def 定义,且不能是类的一部分。所有导入必须在函数内部完成,且不能引用作用域之外的变量。全局作用域变量 virtualenv_string_args(由 string_args 填充)将可用。此外,可以通过 op_args 和 op_kwargs 传递参数,也可以使用返回值。注意,如果你的虚拟环境使用的 Python 主版本与 Airflow 不同,则不能使用返回值、op_args、op_kwargs,也不能使用通过插件提供给 Airflow 的任何宏。但可以使用 string_args。
如果外部环境中安装的 Airflow 版本与操作符使用的版本不同,操作符将失败。
另请参阅
欲了解如何使用此操作符的更多信息,请参阅指南: ExternalPythonOperator
- 参数:
python (str) – 完整的路径字符串(与文件系统相关),指向虚拟环境中应使用的 Python 可执行文件(位于
VENV/bin文件夹)。应为绝对路径(通常以 “/” 或 “X:/” 开头,取决于所用的文件系统/操作系统)。python_callable (collections.abc.Callable) – 使用 def 定义的 Python 函数,不引用外部变量,将在虚拟环境中运行。
serializer (_SerializerTypeDef | None) –
用于序列化参数和结果的序列化器。它可以是以下之一:
"pickle": (默认)使用 pickle 进行序列化。包含在 Python 标准库中。"cloudpickle": 使用 cloudpickle 序列化更复杂的类型,这需要在你的需求中包含 cloudpickle。"dill": 使用 dill 序列化更复杂的类型,这需要在你的需求中包含 dill。
op_args (collections.abc.Collection[Any] | None) – 传递给 python_callable 的位置参数列表。
op_kwargs (collections.abc.Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。
string_args (collections.abc.Iterable[str] | None) – 在全局变量 virtualenv_string_args 中存在的字符串,运行时作为 list[str] 提供给 python_callable。注意,参数会按换行符拆分。
templates_dict (dict | None) – 一个字典,其值为模板,模板将在
__init__与execute之间的某个时刻由 Airflow 引擎渲染,并在模板应用后在你的 callable 上下文中可用。templates_exts (list[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如
['.sql', '.hql']。expect_airflow (bool) – 期望在目标环境中安装 Airflow。如果为 true,若未安装 Airflow,操作符将发出警告,并在启动时尝试加载 Airflow 宏。
skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以该退出码退出,则将任务置为
skipped状态(默认:None)。若设置为None,任何非零退出码都将视为失败。inherit_env (bool) – 执行虚拟环境时是否继承当前环境变量。若设为
True,虚拟环境将继承父进程的环境变量(os.environ)。若设为False,虚拟环境将在干净的环境中执行。
- template_fields: collections.abc.Sequence[str][source]¶
- class airflow.providers.standard.operators.python.BranchExternalPythonOperator(*, task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=DEFAULT_EMAIL_ON_RETRY, email_on_failure=DEFAULT_EMAIL_ON_FAILURE, retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=0, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, render_template_as_native_obj=None, **kwargs)[source]¶
基类:
airflow.providers.common.compat.sdk.BaseBranchOperator,ExternalPythonOperator工作流可以在此任务执行后“分支”或沿某条路径继续。
扩展自 ExternalPythonOperator,因此期望获取应使用的 Python 虚拟环境(位于
VENV/bin文件夹)。应为绝对路径,以便能够像 ExternalPythonOperator 那样在独立的虚拟环境中运行。另请参阅
欲了解如何使用此运算符的更多信息,请参阅指南: BranchExternalPythonOperator
- airflow.providers.standard.operators.python.get_current_context()[source]¶
检索执行上下文字典,而不更改用户方法的签名。
这是检索执行上下文字典的最简单方法。
旧式
def my_task(**context): ti = context["ti"]
新式
from airflow.providers.standard.operators.python import get_current_context def my_task(): context = get_current_context() ti = context["ti"]
仅当在运算符开始执行后调用此方法时,当前上下文才会有值。