airflow.providers.standard.operators.python

属性

log

PythonOperator

所有 Python 操作符的基类。

BranchPythonOperator

工作流可以在此任务执行后“分支”或沿某条路径继续。

ShortCircuitOperator

允许管道根据 python_callable 的结果继续执行。

PythonVirtualenvOperator

在自动创建和销毁的 virtualenv 中运行函数。

BranchPythonVirtualenvOperator

工作流可以在虚拟环境中此任务执行后“分支”或沿某条路径继续。

ExternalPythonOperator

在未重新创建的 virtualenv 中运行函数。

BranchExternalPythonOperator

工作流可以在此任务执行后“分支”或沿某条路径继续。

函数

get_current_context()

在不更改用户方法签名的情况下检索执行上下文字典。

模块内容

airflow.providers.standard.operators.python.log[source]
class airflow.providers.standard.operators.python.PythonOperator(*, python_callable, op_args=None, op_kwargs=None, templates_dict=None, templates_exts=None, show_return_value_in_logs=True, **kwargs)[source]

Bases: airflow.providers.common.compat.standard.operators.BaseAsyncOperator

所有 Python 操作符的基类。

另请参阅

欲了解如何使用此操作符的更多信息,请参阅指南: PythonOperator

在运行可调用函数时,Airflow 会传入一组关键字参数,可在函数中使用。这组 kwargs 与您在 Jinja 模板中使用的完全对应。为使其生效,您需要在函数签名中定义 **kwargs,或者直接添加您想获取的关键字参数——例如下面的代码中,可调用函数将会获得 ti 上下文变量的值。

使用显式参数

def my_python_callable(ti):
    pass

使用 kwargs

def my_python_callable(**kwargs):
    ti = kwargs["ti"]
参数:
  • python_callable (collections.abc.Callable) – 对可调用对象的引用

  • op_args (collections.abc.Collection[Any] | None) – 调用可调用函数时将被解包的位置参数列表

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 将在函数调用时解包的关键字参数字典

  • templates_dict (dict[str, Any] | None) – 一个字典,值为模板,这些模板将在 __init__execute 之间的某个时刻由 Airflow 引擎渲染,并在模板应用后在可调用函数的上下文中可用。(已模板化)

  • templates_exts (collections.abc.Sequence[str] | None) – 处理模板化字段时要解析的文件扩展名列表,例如 ['.sql', '.hql']

  • show_return_value_in_logs (bool) – 布尔值,指示是否显示 return_value 日志。默认 True,允许输出返回值日志。若设为 False,可在返回大量数据(如向 TaskAPI 传输大量 XCom)时阻止返回值日志输出。

template_fields: collections.abc.Sequence[str] = ('templates_dict', 'op_args', 'op_kwargs')[source]
template_fields_renderers[source]
BLUE = '#ffefeb'[source]
ui_color = '#ffefeb'[source]
shallow_copy_attrs: collections.abc.Sequence[str] = ('python_callable', 'op_kwargs')[source]
python_callable[source]
op_args = ()[source]
op_kwargs[source]
templates_dict = None[source]
show_return_value_in_logs = True[source]
property is_async: bool[source]
execute(context)[source]

在事件循环中运行 aexecute()

determine_kwargs(context)[source]
execute_callable()[source]

使用给定参数调用 python 可调用对象。

返回:

调用的返回值。

返回类型:

Any

async aexecute(context)[source]

execute() 的异步版本。子类应实现此方法。

class airflow.providers.standard.operators.python.BranchPythonOperator(*, task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=DEFAULT_EMAIL_ON_RETRY, email_on_failure=DEFAULT_EMAIL_ON_FAILURE, retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=0, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, render_template_as_native_obj=None, **kwargs)[source]

Bases: airflow.providers.common.compat.sdk.BaseBranchOperator, PythonOperator

工作流可以在此任务执行后“分支”或沿某条路径继续。

它继承自 PythonOperator,并期望一个返回单个 task_id、单个 task_group_id,或任务 ID 和/或任务组 ID 列表的 Python 函数。返回的 task_id(s) 或 task_group_id(s) 应直接指向位于 {self} 下游的任务或任务组。所有其他“分支”或直接下游任务将被标记为 skipped 状态,以阻止这些路径继续前进。skipped 状态会向下游传播,以便填充 DAG 状态并推断 DAG 运行的状态。

choose_branch(context)[source]

抽象方法,用于选择要运行的分支。

子类应实现此方法,执行选择分支所需的逻辑,并返回 task_id 或 task_id 列表。如果返回 None,则所有下游任务将被跳过。

参数:

context (airflow.providers.common.compat.sdk.Context) – 传递给 execute() 的上下文字典。

class airflow.providers.standard.operators.python.ShortCircuitOperator(*, ignore_downstream_trigger_rules=True, **kwargs)[source]

Bases: PythonOperator, airflow.providers.common.compat.sdk.SkipMixin

允许管道根据 python_callable 的结果继续执行。

ShortCircuitOperator 派生自 PythonOperator,并评估 python_callable 的返回结果。如果返回结果为 False 或任何等价的假值,管道将被短路。下游任务将根据配置的短路模式被标记为 “skipped” 状态。如果返回结果为 True 或任何等价的真值,下游任务将正常继续,并将返回结果作为 XCom 推送。

短路的行为可以配置为遵守或忽略下游任务设置的 trigger_rule。如果 ignore_downstream_trigger_rules 为 True(默认设置),所有下游任务将在不考虑其 trigger_rule 的情况下被跳过。然而,如果此参数设为 False,则直接下游任务会被跳过,但对其他后续下游任务的指定 trigger_rule 将被尊重。在此模式下,操作符假定直接下游任务被有意跳过,而其他后续任务则可能不被跳过。

另请参阅

欲了解如何使用此操作符的更多信息,请参阅指南: ShortCircuitOperator

参数:

ignore_downstream_trigger_rules (bool) – 如果设为 True,则此操作符任务的所有下游任务都会被跳过。这是默认行为。如果设为 False,则直接下游任务会被跳过,但对所有其他下游任务定义的 trigger_rule 将被遵守。

inherits_from_skipmixin = True[source]

用于判断运算符是否继承自 SkipMixin 或其子类(例如 BranchMixin)。

ignore_downstream_trigger_rules = True[source]
execute(context)[source]

在事件循环中运行 aexecute()

class airflow.providers.standard.operators.python.PythonVirtualenvOperator(*, python_callable, requirements=None, python_version=None, serializer=None, system_site_packages=True, pip_install_options=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, skip_on_exit_code=None, index_urls=None, index_urls_from_connection_ids=None, venv_cache_path=None, env_vars=None, inherit_env=True, **kwargs)[source]

Bases: _BasePythonVirtualenvOperator

在自动创建和销毁的 virtualenv 中运行函数。

该函数(有一定限制)必须使用 def 定义,且不能是类的一部分。所有导入必须在函数内部进行,且不能引用作用域外的变量。一个名为 virtualenv_string_args 的全局变量将可用(由 string_args 填充)。此外,可以通过 op_args 和 op_kwargs 传递参数,并且可以使用返回值。请注意,如果您的 virtualenv 使用的 Python 主版本与 Airflow 不同,则无法使用返回值、op_args、op_kwargs,或使用通过插件提供给 Airflow 的任何宏。不过可以使用 string_args。

另请参阅

欲了解如何使用此操作符的更多信息,请参阅指南: PythonVirtualenvOperator

参数:
  • python_callable (collections.abc.Callable) – 使用 def 定义的 Python 函数,不引用外部变量,将在虚拟环境中运行。

  • requirements (None | collections.abc.Iterable[str] | str) – 可以是需求字符串列表,或者是 pip 指定的(模板化的)“requirements 文件”。

  • python_version (str | None) – 用于运行虚拟环境的 Python 版本。注意,2 和 2.7 都是可接受的写法。

  • serializer (_SerializerTypeDef | None) –

    用于序列化参数和结果的序列化器。它可以是以下之一:

    • "pickle": (默认)使用 pickle 进行序列化。包含在 Python 标准库中。

    • "cloudpickle": 使用 cloudpickle 序列化更复杂的类型,这需要在你的需求中包含 cloudpickle。

    • "dill": 使用 dill 序列化更复杂的类型,这需要在你的需求中包含 dill。

  • system_site_packages (bool) – 是否在虚拟环境中包含 system_site_packages。更多信息请参阅 virtualenv 文档。

  • pip_install_options (list[str] | None) – 安装需求时的 pip 安装选项列表。可用选项请参见 ‘pip install -h’。

  • op_args (collections.abc.Collection[Any] | None) – 传递给 python_callable 的位置参数列表。

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。

  • string_args (collections.abc.Iterable[str] | None) – 在全局变量 virtualenv_string_args 中存在的字符串,运行时作为 list[str] 提供给 python_callable。注意,参数会按换行符拆分。

  • templates_dict (dict | None) – 一个字典,其值为模板,模板将在 __init__execute 之间的某个时刻由 Airflow 引擎渲染,并在模板应用后在你的 callable 上下文中可用。

  • templates_exts (list[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 期望在目标环境中安装 Airflow。如果为 true,若未安装 Airflow,操作符将发出警告,并在启动时尝试加载 Airflow 宏。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以该退出码退出,则将任务置为 skipped 状态(默认:None)。若设置为 None,任何非零退出码都将视为失败。

  • index_urls (None | collections.abc.Collection[str] | str) – 可选的 Python 包索引 URL 列表。若未提供,则使用系统 pip 配置的源。

  • index_urls_from_connection_ids (None | collections.abc.Collection[str] | str) – 可选的 `PackageIndex` 连接 ID 列表。会追加到 `index_urls`。

  • venv_cache_path (None | os.PathLike[str]) – 可选的虚拟环境父文件夹路径,虚拟环境将在此缓存,会创建子文件夹 venv-{hash}(hash 为需求的校验和)。若未提供,虚拟环境将在每次执行时于临时文件夹中创建并删除。

  • env_vars (dict[str, str] | None) – 包含在执行虚拟环境时要设置的额外环境变量的字典。

  • inherit_env (bool) – 执行虚拟环境时是否继承当前环境变量。若设为 True,虚拟环境将继承父进程的环境变量(os.environ)。若设为 False,虚拟环境将在干净的环境中执行。

template_fields: collections.abc.Sequence[str][source]
template_ext: collections.abc.Sequence[str] = ('.txt',)[source]
python_version = None[source]
system_site_packages = True[source]
pip_install_options = None[source]
venv_cache_path = None[source]
execute_callable()[source]

使用给定参数调用 python 可调用对象。

返回:

调用的返回值。

class airflow.providers.standard.operators.python.BranchPythonVirtualenvOperator(*, task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=DEFAULT_EMAIL_ON_RETRY, email_on_failure=DEFAULT_EMAIL_ON_FAILURE, retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=0, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, render_template_as_native_obj=None, **kwargs)[source]

Bases: airflow.providers.common.compat.sdk.BaseBranchOperator, PythonVirtualenvOperator

工作流可以在虚拟环境中执行此任务后“分支”或选择路径。

它继承自 PythonVirtualenvOperator,并期望一个返回单个 task_id、单个 task_group_id,或一个 task_id 和/或 task_group_id 列表的 Python 函数。返回的 task_id(s) 和/或 task_group_id(s) 应指向直接下游于 {self} 的任务或任务组。所有其他“分支”或直接下游任务会被标记为 skipped 状态,以防这些路径继续前进。skipped 状态会向下游传播,以便填满 DAG 状态并推断 DAG 运行的状态。

另请参阅

欲了解如何使用此操作符的更多信息,请参阅指南: BranchPythonVirtualenvOperator

choose_branch(context)[source]

抽象方法,用于选择要运行的分支。

子类应实现此方法,执行选择分支所需的逻辑,并返回 task_id 或 task_id 列表。如果返回 None,则所有下游任务将被跳过。

参数:

context (airflow.providers.common.compat.sdk.Context) – 传递给 execute() 的上下文字典。

class airflow.providers.standard.operators.python.ExternalPythonOperator(*, python, python_callable, serializer=None, op_args=None, op_kwargs=None, string_args=None, templates_dict=None, templates_exts=None, expect_airflow=True, expect_pendulum=False, skip_on_exit_code=None, env_vars=None, inherit_env=True, **kwargs)[source]

Bases: _BasePythonVirtualenvOperator

在未重新创建的 virtualenv 中运行函数。

直接复用,无需创建虚拟环境的开销(有一定限制)。

函数必须使用 def 定义,且不能是类的一部分。所有导入必须在函数内部完成,且不能引用作用域之外的变量。全局作用域变量 virtualenv_string_args(由 string_args 填充)将可用。此外,可以通过 op_args 和 op_kwargs 传递参数,也可以使用返回值。注意,如果你的虚拟环境使用的 Python 主版本与 Airflow 不同,则不能使用返回值、op_args、op_kwargs,也不能使用通过插件提供给 Airflow 的任何宏。但可以使用 string_args。

如果外部环境中安装的 Airflow 版本与操作符使用的版本不同,操作符将失败。

另请参阅

欲了解如何使用此操作符的更多信息,请参阅指南: ExternalPythonOperator

参数:
  • python (str) – 完整的路径字符串(与文件系统相关),指向虚拟环境中应使用的 Python 可执行文件(位于 VENV/bin 文件夹)。应为绝对路径(通常以 “/” 或 “X:/” 开头,取决于所用的文件系统/操作系统)。

  • python_callable (collections.abc.Callable) – 使用 def 定义的 Python 函数,不引用外部变量,将在虚拟环境中运行。

  • serializer (_SerializerTypeDef | None) –

    用于序列化参数和结果的序列化器。它可以是以下之一:

    • "pickle": (默认)使用 pickle 进行序列化。包含在 Python 标准库中。

    • "cloudpickle": 使用 cloudpickle 序列化更复杂的类型,这需要在你的需求中包含 cloudpickle。

    • "dill": 使用 dill 序列化更复杂的类型,这需要在你的需求中包含 dill。

  • op_args (collections.abc.Collection[Any] | None) – 传递给 python_callable 的位置参数列表。

  • op_kwargs (collections.abc.Mapping[str, Any] | None) – 传递给 python_callable 的关键字参数字典。

  • string_args (collections.abc.Iterable[str] | None) – 在全局变量 virtualenv_string_args 中存在的字符串,运行时作为 list[str] 提供给 python_callable。注意,参数会按换行符拆分。

  • templates_dict (dict | None) – 一个字典,其值为模板,模板将在 __init__execute 之间的某个时刻由 Airflow 引擎渲染,并在模板应用后在你的 callable 上下文中可用。

  • templates_exts (list[str] | None) – 在处理模板化字段时要解析的文件扩展名列表,例如 ['.sql', '.hql']

  • expect_airflow (bool) – 期望在目标环境中安装 Airflow。如果为 true,若未安装 Airflow,操作符将发出警告,并在启动时尝试加载 Airflow 宏。

  • skip_on_exit_code (int | collections.abc.Container[int] | None) – 如果 python_callable 以该退出码退出,则将任务置为 skipped 状态(默认:None)。若设置为 None,任何非零退出码都将视为失败。

  • env_vars (dict[str, str] | None) – 包含在执行虚拟环境时要设置的额外环境变量的字典。

  • inherit_env (bool) – 执行虚拟环境时是否继承当前环境变量。若设为 True,虚拟环境将继承父进程的环境变量(os.environ)。若设为 False,虚拟环境将在干净的环境中执行。

template_fields: collections.abc.Sequence[str][source]
python[source]
expect_pendulum = False[source]
execute_callable()[source]

使用给定参数调用 python 可调用对象。

返回:

调用的返回值。

class airflow.providers.standard.operators.python.BranchExternalPythonOperator(*, task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=DEFAULT_EMAIL_ON_RETRY, email_on_failure=DEFAULT_EMAIL_ON_FAILURE, retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=0, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, render_template_as_native_obj=None, **kwargs)[source]

基类: airflow.providers.common.compat.sdk.BaseBranchOperator, ExternalPythonOperator

工作流可以在此任务执行后“分支”或沿某条路径继续。

扩展自 ExternalPythonOperator,因此期望获取应使用的 Python 虚拟环境(位于 VENV/bin 文件夹)。应为绝对路径,以便能够像 ExternalPythonOperator 那样在独立的虚拟环境中运行。

另请参阅

欲了解如何使用此运算符的更多信息,请参阅指南: BranchExternalPythonOperator

choose_branch(context)[source]

抽象方法,用于选择要运行的分支。

子类应实现此方法,执行选择分支所需的逻辑,并返回 task_id 或 task_id 列表。如果返回 None,则所有下游任务将被跳过。

参数:

context (airflow.providers.common.compat.sdk.Context) – 传递给 execute() 的上下文字典。

airflow.providers.standard.operators.python.get_current_context()[source]

检索执行上下文字典,而不更改用户方法的签名。

这是检索执行上下文字典的最简单方法。

旧式

def my_task(**context):
    ti = context["ti"]

新式

from airflow.providers.standard.operators.python import get_current_context


def my_task():
    context = get_current_context()
    ti = context["ti"]

仅当在运算符开始执行后调用此方法时,当前上下文才会有值。

此条目是否有帮助?