airflow.models.baseoperator

所有操作符的基础操作符。

sphinx-autoapi-skip

模块内容

ExecutorSafeguard

ExecutorSafeguard 装饰器。

BaseOperatorMeta

BaseOperator 的元类。

BaseOperator

所有操作符的抽象基类。

函数

parse_retries(重试)

coerce_timedelta(值, *, 键)

coerce_resources(资源)

get_merged_defaults(dag, task_group, task_params, ...)

partial(operator_class, *, task_id[, dag, task_group, ...])

chain(*tasks)

给定一些任务,构建依赖链。

cross_downstream(from_tasks, to_tasks)

设置 from_tasks 中所有任务到 to_tasks 中所有任务的下游依赖关系。

chain_linear(*elements)

简化任务依赖定义。

属性

ScheduleInterval

TaskPreExecuteHook

TaskPostExecuteHook

T

logger

BASEOPERATOR_ARGS_EXPECTED_TYPES

Chainable

airflow.models.baseoperator.ScheduleInterval[源代码]
airflow.models.baseoperator.TaskPreExecuteHook[源代码]
airflow.models.baseoperator.TaskPostExecuteHook[源代码]
airflow.models.baseoperator.T[源代码]
airflow.models.baseoperator.logger[源代码]
airflow.models.baseoperator.parse_retries(retries)[源代码]
airflow.models.baseoperator.coerce_timedelta(value, *, key)[源代码]
airflow.models.baseoperator.coerce_resources(resources)[源代码]
airflow.models.baseoperator.get_merged_defaults(dag, task_group, task_params, task_default_args)[源代码]
class airflow.models.baseoperator.ExecutorSafeguard[source]

ExecutorSafeguard 装饰器。

检查是否在 TaskInstance 之外手动调用了操作符的执行方法,以避免装饰操作符和传统操作符之间发生混淆。

test_mode[source]
classmethod decorator(func)[source]
class airflow.models.baseoperator.BaseOperatorMeta[source]

基类: abc.ABCMeta

BaseOperator 的元类。

airflow.models.baseoperator.BASEOPERATOR_ARGS_EXPECTED_TYPES[source]
class airflow.models.baseoperator.BaseOperator(task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure=conf.getboolean('email', 'default_email_on_failure', fallback=True), retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, task_concurrency=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, **kwargs)[source]

基类: airflow.models.abstractoperator.AbstractOperator

所有操作符的抽象基类。

由于操作符创建的对象会成为 DAG 中的节点,因此 BaseOperator 包含许多用于 DAG 爬取行为的递归方法。要从此类派生,您需要重写构造函数和 'execute' 方法。

从此类派生的操作符应同步执行或触发某些任务(等待完成)。操作符的示例可以是运行 Pig 作业的操作符 (PigOperator),等待分区在 Hive 中落地的传感器操作符 (HiveSensorOperator),或者将数据从 Hive 移动到 MySQL 的操作符 (Hive2MySqlOperator)。这些操作符(任务)的实例针对特定操作、运行特定脚本、函数或数据传输。

此类是抽象类,不应实例化。实例化从此类派生的类会导致创建任务对象,该对象最终成为 DAG 对象中的节点。任务依赖关系应通过使用 set_upstream 和/或 set_downstream 方法设置。

参数
  • task_id (str) – 任务的唯一且有意义的 ID

  • owner (str) – 任务的所有者。建议使用有意义的描述(例如,用户/个人/团队/角色名称)来明确所有权。

  • email (str | Iterable[str] | None) – 电子邮件警报中使用的 “收件人” 电子邮件地址。这可以是单个电子邮件或多个电子邮件。多个地址可以指定为逗号或分号分隔的字符串,或通过传递字符串列表来指定。

  • email_on_retry (bool) – 指示在重试任务时是否应发送电子邮件警报

  • email_on_failure (bool) – 指示在任务失败时是否应发送电子邮件警报

  • retries (int | None) – 在任务失败之前应执行的重试次数

  • retry_delay (datetime.timedelta | float) – 重试之间的延迟,可以设置为 timedeltafloat 秒,这将转换为 timedelta,默认值为 timedelta(seconds=300)

  • retry_exponential_backoff (bool) – 是否允许在重试延迟中使用指数退避算法,以实现重试之间等待时间逐渐增加(延迟将转换为秒)。

  • max_retry_delay (datetime.timedelta | float | None) – 重试之间的最大延迟间隔,可以设置为 timedeltafloat 秒,最终会转换为 timedelta

  • start_date (datetime.datetime | None) – 任务的 start_date,决定第一个任务实例的 execution_date。最佳实践是将 start_date 四舍五入为 DAG 的 schedule_interval。每日任务的 start_date 应为某天的 00:00:00,每小时任务的 start_date 应为特定小时的 00:00。 请注意,Airflow 仅查看最新的 execution_date 并添加 schedule_interval 以确定下一个 execution_date。 还要注意,不同任务的依赖关系需要在时间上对齐。如果任务 A 依赖于任务 B,并且它们的 start_date 存在偏移,导致它们的 execution_date 不对齐,则任务 A 的依赖关系将永远无法满足。 如果您想延迟任务,例如在凌晨 2 点运行每日任务,请查看 TimeSensorTimeDeltaSensor。 我们不建议使用动态 start_date,而建议使用固定的 start_date。有关更多信息,请阅读关于 start_date 的常见问题解答。

  • end_date (datetime.datetime | None) – 如果指定,调度程序将不会超过此日期。

  • depends_on_past (bool) – 当设置为 true 时,任务实例将按顺序运行,并且仅当上一个实例成功或已跳过时才运行。允许运行 start_date 的任务实例。

  • wait_for_past_depends_before_skipping (bool) – 当设置为 true 时,如果任务实例应标记为跳过,并且 depends_on_past 为 true,则该 ti 将保持 None 状态,等待上一次运行的任务。

  • wait_for_downstream (bool) – 当设置为 true 时,任务 X 的实例将等待先前任务 X 实例的下游任务成功完成或跳过,然后才会运行。如果任务 X 的不同实例修改同一资产,并且此资产被任务 X 的下游任务使用,则此设置很有用。 请注意,只要使用 wait_for_downstream,depends_on_past 都会强制为 True。 还要注意,只会等待先前任务实例的直接下游任务;任何更下游的任务的状态都会被忽略。

  • dag (airflow.models.dag.DAG | None) – 指向任务所附加的 dag 的引用(如果有)。

  • priority_weight (int) – 此任务相对于其他任务的优先级权重。这允许执行器在任务积压时优先触发更高优先级的任务。为更重要的任务设置更高的 priority_weight 值。 由于并非所有数据库引擎都支持 64 位整数,因此值的上限为 32 位。有效范围为 -2,147,483,648 到 2,147,483,647。

  • weight_rule (str | airflow.task.priority_strategy.PriorityWeightStrategy) – 用于任务的有效总优先级权重的加权方法。选项包括: { downstream | upstream | absolute } 默认值为 downstream。 当设置为 downstream 时,任务的有效权重是所有下游后代的总和。因此,当使用正权重值时,上游任务将具有更高的权重,并且会被更积极地调度。当您有多个 dag 运行实例,并且希望在每个 dag 可以继续处理下游任务之前完成所有运行的所有上游任务时,这非常有用。 当设置为 upstream 时,有效权重是所有上游祖先的总和。这是相反的,当使用正权重值时,下游任务具有更高的权重,并且会被更积极地调度。当您有多个 dag 运行实例,并且希望每个 dag 在开始其他 dag 的上游任务之前完成时,这非常有用。当设置为 absolute 时,有效权重是指定的精确的 priority_weight,没有额外的加权。当您确切知道每个任务应具有什么优先级权重时,您可能需要这样做。此外,当设置为 absolute 时,对于非常大的 DAG,还有一个额外的好处是可以显着加快任务创建过程。 可以将选项设置为字符串,或者使用静态类 airflow.utils.WeightRule 中定义的常量。无论使用何种权重规则,生成的优先级值的上限为 32 位。 这是一个实验性功能。 自 2.9.0 起,Airflow 允许通过创建 airflow.task.priority_strategy.PriorityWeightStrategy 的子类并在插件中注册,然后通过 weight_rule 参数提供类路径或类实例来定义自定义优先级权重策略。自定义优先级权重策略将用于计算任务实例的有效总优先级权重。

  • queue (str) – 运行此作业时要定位的队列。并非所有执行器都实现了队列管理,CeleryExecutor 支持定位特定队列。

  • pool (str | None) – 此任务应运行的槽池,槽池是限制某些任务并发的一种方法。

  • pool_slots (int) – 此任务应使用的池槽数(>= 1)。不允许使用小于 1 的值。

  • sla (datetime.timedelta | None) – 作业预期成功的截止时间。请注意,这表示周期关闭后的 timedelta。 例如,如果您设置了 1 小时的 SLA,如果 2016-01-01 实例尚未成功,则调度程序会在 2016-01-02 之后的 1:00 AM 之后不久发送电子邮件。调度程序会特别注意具有 SLA 的作业,并为 SLA 未命中发送警报电子邮件。SLA 未命中也会记录在数据库中以供将来参考。共享同一 SLA 时间的所有任务都会捆绑在一封电子邮件中,在该时间之后不久发送。每个任务实例的 SLA 通知仅发送一次。

  • execution_timeout (datetime.timedelta | None) – 此任务实例允许执行的最大时间,如果超出该时间,则会引发错误并失败。

  • on_failure_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 此任务的任务实例失败时要调用的一个或多个函数。 一个上下文字典作为单个参数传递给此函数。上下文包含对任务实例相关对象的引用,并在 API 的宏部分中记录。

  • on_execute_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 与 on_failure_callback 非常相似,只是它在任务执行之前执行。

  • on_retry_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 与 on_failure_callback 非常相似,只是它在发生重试时执行。

  • on_success_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 与 on_failure_callback 非常相似,只是它在任务成功时执行。

  • on_skipped_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 与 on_failure_callback 非常相似,只是它在发生跳过时执行;只有在引发 AirflowSkipException 时才会调用此回调。 明确地说,如果由于 DAG 中的先前分支决策或导致跳过执行的触发规则而未开始执行任务,则不会调用此回调,因此不会调度任务执行。

  • pre_execute (TaskPreExecuteHook | None) –

    在任务执行之前立即调用的函数,接收上下文字典;引发异常将阻止执行任务。

    这是一个实验性功能

  • post_execute (TaskPostExecuteHook | None) –

    在任务执行之后立即调用的函数,接收上下文字典和任务结果;引发异常将阻止任务成功。

    这是一个实验性功能

  • trigger_rule (str) – 定义任务触发时应用依赖关系的规则。选项包括: { all_success | all_failed | all_done | all_skipped | one_success | one_done | one_failed | none_failed | none_failed_min_one_success | none_skipped | always} 默认值为 all_success。选项可以设置为字符串,也可以使用静态类 airflow.utils.TriggerRule 中定义的常量。

  • resources (dict[str, Any] | None) – 资源参数名称(Resources 构造函数的参数名称)与其值的映射。

  • run_as_user (str | None) – 运行任务时要模拟的 Unix 用户名。

  • max_active_tis_per_dag (int | None) – 设置后,任务将能够限制跨执行日期的并发运行。

  • max_active_tis_per_dagrun (int | None) – 设置后,任务将能够限制每个 DAG 运行的并发任务实例数。

  • executor (str | None) – 运行此任务时要定位的执行器。 尚未支持。

  • executor_config (dict | None) –

    特定执行器解释的其他任务级配置参数。参数由执行器的名称命名空间。

    示例:通过 KubernetesExecutor 在特定 Docker 容器中运行此任务

    MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})
    

  • do_xcom_push (bool) – 如果为 True,则会推送一个包含 Operator 结果的 XCom。

  • multiple_outputs (bool) – 如果为 True 且 do_xcom_push 为 True,则推送多个 XCom,返回的字典结果中每个键对应一个 XCom。如果为 False 且 do_xcom_push 为 True,则推送单个 XCom。

  • task_group (airflow.utils.task_group.TaskGroup | None) – 任务应属于的任务组。这通常在不使用 TaskGroup 作为上下文管理器时提供。

  • doc (str | None) – 为您的任务对象添加文档或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。

  • doc_md (str | None) – 为您的任务对象添加文档(Markdown 格式)或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。

  • doc_rst (str | None) – 为您的任务对象添加文档(RST 格式)或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。

  • doc_json (str | None) – 为您的任务对象添加文档(JSON 格式)或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。

  • doc_yaml (str | None) – 为您的任务对象添加文档(YAML 格式)或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。

  • task_display_name (str | None) – 任务的显示名称,该名称显示在 UI 上。

  • logger_name (str | None) – Operator 用于发出日志的记录器名称。如果设置为 None (默认),则记录器名称将回退到 airflow.task.operators.{class.__module__}.{class.__name__} (例如,SimpleHttpOperator 的记录器将为 airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator)。

  • allow_nested_operators (bool) –

    如果为 True,则在另一个运算符内执行运算符时,将记录一条警告消息。如果为 False,则如果运算符使用不当(例如,嵌套在另一个运算符内),则会引发异常。在 Airflow 的未来版本中,将删除此参数,并且当运算符彼此嵌套时,始终会引发异常(默认值为 True)。

    示例:错误的运算符混合使用示例

    @task(provide_context=True)
    def say_hello_world(**context):
        hello_world_task = BashOperator(
            task_id="hello_world_task",
            bash_command="python -c \"print('Hello, world!')\"",
            dag=dag,
        )
        hello_world_task.execute(context)
    

property dagairflow.models.dag.DAG[源代码]

返回 Operator 的 DAG(如果已设置),否则引发错误。

property task_display_namestr[源代码]
property operator_classtype[BaseOperator][源代码]
property task_typestr[源代码]

@property: 任务的类型。

property operator_namestr[源代码]

@property: 如果设置了,则使用更友好的运算符显示名称。

property rootslist[BaseOperator][源代码]

DAGNode 所需。

property leaveslist[BaseOperator][源代码]

DAGNode 所需。

property output:airflow.models.xcom_arg.XComArg[源代码]

返回当前运算符推送的 XCom 的引用。

property inherits_from_empty_operator[源代码]

用于确定 Operator 是否继承自 EmptyOperator。

template_fields: Sequence[str] = ()[源代码]
template_ext: Sequence[str] = ()[源代码]
template_fields_renderers: dict[str, str][源代码]
ui_color: str = '#fff'[源代码]
ui_fgcolor: str = '#000'[源代码]
pool: str = ''[源代码]
shallow_copy_attrs: Sequence[str] = ()[源代码]
partial: Callable[Ellipsis, airflow.models.mappedoperator.OperatorPartial][源代码]
supports_lineage = False[源代码]
task_group: airflow.utils.task_group.TaskGroup | None[源代码]
subdag: airflow.models.dag.DAG | None[源代码]
start_date: pendulum.DateTime | None[源代码]
end_date: pendulum.DateTime | None[源代码]
start_trigger_args: airflow.triggers.base.StartTriggerArgs | None[源代码]
start_from_trigger: bool = False[源代码]
deps: frozenset[airflow.ti_deps.deps.base_ti_dep.BaseTIDep][源代码]

返回操作符的依赖关系集合。这些与执行上下文依赖关系不同,因为它们特定于任务,并且可以被子类扩展/覆盖。

__eq__(other)[源代码]

返回 self==value。

__ne__(other)[源代码]

返回 self!=value。

__hash__()[源代码]

返回 hash(self)。

__or__(other)[源代码]

返回 [此操作符] | [操作符]。

other 的入口将设置为从该操作符获取出口。other 将被设置为该操作符的下游任务。

__gt__(other)[源代码]

返回 [操作符] > [出口]。

如果 other 是一个带有属性注释的对象,则它被设置为此操作符的出口。

__lt__(other)[源代码]

返回 [入口] > [操作符] 或 [操作符] < [入口]。

如果 other 是一个带有 attr 注解的对象,它将被设置为该操作符的入口。

__setattr__(key, value)[source]

实现 setattr(self, name, value)。

add_inlets(inlets)[source]

设置此操作符的入口。

add_outlets(outlets)[source]

定义此操作符的出口。

get_dag()[source]
has_dag()[source]

如果该操作符已分配给 DAG,则返回 True。

prepare_for_execution()[source]

锁定任务以进行执行,禁用 __setattr__ 中的自定义操作,并返回一个副本。

set_xcomargs_dependencies()[source]

解析任务的上游依赖关系。

这样,将 XComArg 作为模板字段的值传递,将导致在两个任务之间创建上游关系。

示例:

with DAG(...):
    generate_content = GenerateContentOperator(task_id="generate_content")
    send_email = EmailOperator(..., html_content=generate_content.output)

# This is equivalent to
with DAG(...):
    generate_content = GenerateContentOperator(task_id="generate_content")
    send_email = EmailOperator(..., html_content="{{ task_instance.xcom_pull('generate_content') }}")
    generate_content >> send_email
pre_execute(context)[source]

在调用 self.execute() 之前立即执行。

abstract execute(context)[source]

在创建操作符时派生。

Context 与渲染 Jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

post_execute(context, result=None)[source]

在调用 self.execute() 之后立即执行。

它被传递执行上下文和操作符返回的任何结果。

on_kill()[source]

当任务实例被终止时,重写此方法以清理子进程。

操作符中任何使用线程、子进程或多进程模块都需要进行清理,否则它将留下幽灵进程。

__deepcopy__(memo)[source]
__getstate__()[source]
__setstate__(state)[source]
render_template_fields(context, jinja_env=None)[source]

模板化 self.template_fields 中列出的所有属性。

这会就地改变属性,并且是不可逆的。

参数
  • context (airflow.utils.context.Context) – 带有要应用于内容的 Context 值的字典。

  • jinja_env (jinja2.Environment | None) – Jinja 用于渲染的环境。

clear(start_date=None, end_date=None, upstream=False, downstream=False, session=NEW_SESSION)[source]

根据指定的参数清除与任务关联的任务实例的状态。

get_task_instances(start_date=None, end_date=None, session=NEW_SESSION)[source]

获取与此任务相关的特定日期范围的任务实例。

run(start_date=None, end_date=None, ignore_first_depends_on_past=True, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, mark_success=False, test_mode=False, session=NEW_SESSION)[source]

运行日期范围内的任务实例集。

dry_run()[source]

对操作符执行空运行 - 仅渲染模板字段。

get_direct_relatives(upstream=False)[source]

获取当前任务的直接相关任务列表,上游或下游。

__repr__()[source]

返回 repr(self)。

static xcom_push(context, key, value, execution_date=None)[source]

使 XCom 可供任务拉取。

参数
  • context (Any) – 执行上下文字典

  • key (str) – XCom 的键

  • value (Any) – XCom 的值。该值将被 pickle 序列化并存储在数据库中。

  • execution_date (datetime.datetime | None) – 如果提供,则 XCom 在此日期之前不可见。 例如,这可以用来在未来的某个日期向任务发送消息,而无需立即可见。

static xcom_pull(context, task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=None, session=NEW_SESSION)[source]

拉取可选地满足特定条件的 XComs。

key 的默认值将搜索限制为由其他任务返回的 XComs(而不是手动推送的)。要删除此筛选器,请传递 key=None(或任何所需的值)。

如果提供单个 task_id 字符串,则结果是来自该 task_id 的最近匹配的 XCom 的值。如果提供多个 task_ids,则返回匹配值的元组。如果找不到任何匹配项,则返回 None。

参数
  • context (Any) – 执行上下文字典

  • key (str) – XCom 的键。如果提供,则仅返回具有匹配键的 XCom。默认键为 ‘return_value’,也可作为常量 XCOM_RETURN_KEY 使用。此键会自动分配给任务返回的 XCom(而不是手动推送的)。要删除筛选器,请传递 key=None。

  • task_ids (str | list[str] | None) – 仅拉取来自具有匹配 ID 的任务的 XCom。可以传递 None 以删除筛选器。

  • dag_id (str | None) – 如果提供,则仅从此 DAG 拉取 XCom。如果为 None(默认值),则使用调用任务的 DAG。

  • include_prior_dates (bool | None) – 如果为 False,则仅返回来自当前 execution_date 的 XCom。如果为 True,则也会返回来自之前日期的 XCom。

classmethod get_serialized_fields()[source]

字符串化的 DAG 和操作符正好包含这些字段。

serialize_for_task_group()[source]

序列化;DAGNode 需要。

defer(*, trigger, method_name, kwargs=None, timeout=None)[source]

将此操作符标记为“已延迟”,暂停其执行,直到提供的触发器触发事件。

这是通过引发一个特殊异常 (TaskDeferred) 来实现的,该异常在主 _execute_task 包装器中捕获。触发器可以将执行返回到任务或直接结束任务实例。如果触发器将自行结束任务实例,则 method_name 应为 None;否则,提供在任务中恢复执行时应使用的方法的名称。

resume_execution(next_method, next_kwargs, context)[source]

当恢复延迟的任务时,调用此方法。

airflow.models.baseoperator.Chainable[source]
airflow.models.baseoperator.chain(*tasks)[source]

给定一些任务,构建依赖链。

此函数接受 BaseOperator(又名任务)、EdgeModifiers(又名标签)、XComArg、TaskGroups 或包含这些类型的任意组合的列表(或同一列表中的组合)的值。如果要链接两个列表,必须确保它们具有相同的长度。

使用经典操作符/传感器

chain(t1, [t2, t3], [t4, t5], t6)

等效于

  / -> t2 -> t4 \
t1               -> t6
  \ -> t3 -> t5 /
t1.set_downstream(t2)
t1.set_downstream(t3)
t2.set_downstream(t4)
t3.set_downstream(t5)
t4.set_downstream(t6)
t5.set_downstream(t6)

使用任务修饰的函数,又名 XComArgs

chain(x1(), [x2(), x3()], [x4(), x5()], x6())

等效于

  / -> x2 -> x4 \
x1               -> x6
  \ -> x3 -> x5 /
x1 = x1()
x2 = x2()
x3 = x3()
x4 = x4()
x5 = x5()
x6 = x6()
x1.set_downstream(x2)
x1.set_downstream(x3)
x2.set_downstream(x4)
x3.set_downstream(x5)
x4.set_downstream(x6)
x5.set_downstream(x6)

使用 TaskGroups

chain(t1, task_group1, task_group2, t2)

t1.set_downstream(task_group1)
task_group1.set_downstream(task_group2)
task_group2.set_downstream(t2)

也可以在经典操作符/传感器、EdgeModifiers、XComArg 和 TaskGroups 之间混合使用

chain(t1, [Label("branch one"), Label("branch two")], [x1(), x2()], task_group1, x3())

等效于

  / "branch one" -> x1 \
t1                      -> task_group1 -> x3
  \ "branch two" -> x2 /
x1 = x1()
x2 = x2()
x3 = x3()
label1 = Label("branch one")
label2 = Label("branch two")
t1.set_downstream(label1)
label1.set_downstream(x1)
t2.set_downstream(label2)
label2.set_downstream(x2)
x1.set_downstream(task_group1)
x2.set_downstream(task_group1)
task_group1.set_downstream(x3)

# or

x1 = x1()
x2 = x2()
x3 = x3()
t1.set_downstream(x1, edge_modifier=Label("branch one"))
t1.set_downstream(x2, edge_modifier=Label("branch two"))
x1.set_downstream(task_group1)
x2.set_downstream(task_group1)
task_group1.set_downstream(x3)
参数

tasks (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 要设置依赖关系的单个和/或任务、EdgeModifiers、XComArgs 或 TaskGroups 的列表

airflow.models.baseoperator.cross_downstream(from_tasks, to_tasks)[source]

设置 from_tasks 中所有任务到 to_tasks 中所有任务的下游依赖关系。

使用经典操作符/传感器

cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6])

等效于

t1 ---> t4
   \ /
t2 -X -> t5
   / \
t3 ---> t6
t1.set_downstream(t4)
t1.set_downstream(t5)
t1.set_downstream(t6)
t2.set_downstream(t4)
t2.set_downstream(t5)
t2.set_downstream(t6)
t3.set_downstream(t4)
t3.set_downstream(t5)
t3.set_downstream(t6)

使用任务修饰的函数,又名 XComArgs

cross_downstream(from_tasks=[x1(), x2(), x3()], to_tasks=[x4(), x5(), x6()])

等效于

x1 ---> x4
   \ /
x2 -X -> x5
   / \
x3 ---> x6
x1 = x1()
x2 = x2()
x3 = x3()
x4 = x4()
x5 = x5()
x6 = x6()
x1.set_downstream(x4)
x1.set_downstream(x5)
x1.set_downstream(x6)
x2.set_downstream(x4)
x2.set_downstream(x5)
x2.set_downstream(x6)
x3.set_downstream(x4)
x3.set_downstream(x5)
x3.set_downstream(x6)

也可以在经典操作符/传感器和 XComArg 任务之间混合使用

cross_downstream(from_tasks=[t1, x2(), t3], to_tasks=[x1(), t2, x3()])

等效于

t1 ---> x1
   \ /
x2 -X -> t2
   / \
t3 ---> x3
x1 = x1()
x2 = x2()
x3 = x3()
t1.set_downstream(x1)
t1.set_downstream(t2)
t1.set_downstream(x3)
x2.set_downstream(x1)
x2.set_downstream(t2)
x2.set_downstream(x3)
t3.set_downstream(x1)
t3.set_downstream(t2)
t3.set_downstream(x3)
参数
  • from_tasks (Sequence[airflow.models.taskmixin.DependencyMixin]) – 要从中开始的任务或 XComArgs 的列表。

  • to_tasks (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 要设置为下游依赖项的任务或 XComArgs 的列表。

airflow.models.baseoperator.chain_linear(*elements)[source]

简化任务依赖定义。

例如:假设您想要如下优先级

    ╭─op2─╮ ╭─op4─╮
op1─┤     ├─├─op5─┤─op7
    ╰-op3─╯ ╰-op6─╯

那么你可以这样完成

chain_linear(op1, [op2, op3], [op4, op5, op6], op7)
参数

elements (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 操作符/操作符列表的列表

此条目是否有帮助?