airflow.models.baseoperator
¶
所有操作符的基础操作符。
- sphinx-autoapi-skip
模块内容¶
类¶
ExecutorSafeguard 装饰器。 |
|
BaseOperator 的元类。 |
|
所有操作符的抽象基类。 |
函数¶
|
|
|
|
|
|
|
|
|
|
|
给定一些任务,构建依赖链。 |
|
设置 from_tasks 中所有任务到 to_tasks 中所有任务的下游依赖关系。 |
|
简化任务依赖定义。 |
属性¶
- airflow.models.baseoperator.get_merged_defaults(dag, task_group, task_params, task_default_args)[源代码]¶
- class airflow.models.baseoperator.ExecutorSafeguard[source]¶
ExecutorSafeguard 装饰器。
检查是否在 TaskInstance 之外手动调用了操作符的执行方法,以避免装饰操作符和传统操作符之间发生混淆。
- class airflow.models.baseoperator.BaseOperatorMeta[source]¶
基类:
abc.ABCMeta
BaseOperator 的元类。
- class airflow.models.baseoperator.BaseOperator(task_id, owner=DEFAULT_OWNER, email=None, email_on_retry=conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure=conf.getboolean('email', 'default_email_on_failure', fallback=True), retries=DEFAULT_RETRIES, retry_delay=DEFAULT_RETRY_DELAY, retry_exponential_backoff=False, max_retry_delay=None, start_date=None, end_date=None, depends_on_past=False, ignore_first_depends_on_past=DEFAULT_IGNORE_FIRST_DEPENDS_ON_PAST, wait_for_past_depends_before_skipping=DEFAULT_WAIT_FOR_PAST_DEPENDS_BEFORE_SKIPPING, wait_for_downstream=False, dag=None, params=None, default_args=None, priority_weight=DEFAULT_PRIORITY_WEIGHT, weight_rule=DEFAULT_WEIGHT_RULE, queue=DEFAULT_QUEUE, pool=None, pool_slots=DEFAULT_POOL_SLOTS, sla=None, execution_timeout=DEFAULT_TASK_EXECUTION_TIMEOUT, on_execute_callback=None, on_failure_callback=None, on_success_callback=None, on_retry_callback=None, on_skipped_callback=None, pre_execute=None, post_execute=None, trigger_rule=DEFAULT_TRIGGER_RULE, resources=None, run_as_user=None, task_concurrency=None, map_index_template=None, max_active_tis_per_dag=None, max_active_tis_per_dagrun=None, executor=None, executor_config=None, do_xcom_push=True, multiple_outputs=False, inlets=None, outlets=None, task_group=None, doc=None, doc_md=None, doc_json=None, doc_yaml=None, doc_rst=None, task_display_name=None, logger_name=None, allow_nested_operators=True, **kwargs)[source]¶
基类:
airflow.models.abstractoperator.AbstractOperator
所有操作符的抽象基类。
由于操作符创建的对象会成为 DAG 中的节点,因此 BaseOperator 包含许多用于 DAG 爬取行为的递归方法。要从此类派生,您需要重写构造函数和 'execute' 方法。
从此类派生的操作符应同步执行或触发某些任务(等待完成)。操作符的示例可以是运行 Pig 作业的操作符 (PigOperator),等待分区在 Hive 中落地的传感器操作符 (HiveSensorOperator),或者将数据从 Hive 移动到 MySQL 的操作符 (Hive2MySqlOperator)。这些操作符(任务)的实例针对特定操作、运行特定脚本、函数或数据传输。
此类是抽象类,不应实例化。实例化从此类派生的类会导致创建任务对象,该对象最终成为 DAG 对象中的节点。任务依赖关系应通过使用 set_upstream 和/或 set_downstream 方法设置。
- 参数
task_id (str) – 任务的唯一且有意义的 ID
owner (str) – 任务的所有者。建议使用有意义的描述(例如,用户/个人/团队/角色名称)来明确所有权。
email (str | Iterable[str] | None) – 电子邮件警报中使用的 “收件人” 电子邮件地址。这可以是单个电子邮件或多个电子邮件。多个地址可以指定为逗号或分号分隔的字符串,或通过传递字符串列表来指定。
email_on_retry (bool) – 指示在重试任务时是否应发送电子邮件警报
email_on_failure (bool) – 指示在任务失败时是否应发送电子邮件警报
retries (int | None) – 在任务失败之前应执行的重试次数
retry_delay (datetime.timedelta | float) – 重试之间的延迟,可以设置为
timedelta
或float
秒,这将转换为timedelta
,默认值为timedelta(seconds=300)
。retry_exponential_backoff (bool) – 是否允许在重试延迟中使用指数退避算法,以实现重试之间等待时间逐渐增加(延迟将转换为秒)。
max_retry_delay (datetime.timedelta | float | None) – 重试之间的最大延迟间隔,可以设置为
timedelta
或float
秒,最终会转换为timedelta
。start_date (datetime.datetime | None) – 任务的
start_date
,决定第一个任务实例的execution_date
。最佳实践是将 start_date 四舍五入为 DAG 的schedule_interval
。每日任务的 start_date 应为某天的 00:00:00,每小时任务的 start_date 应为特定小时的 00:00。 请注意,Airflow 仅查看最新的execution_date
并添加schedule_interval
以确定下一个execution_date
。 还要注意,不同任务的依赖关系需要在时间上对齐。如果任务 A 依赖于任务 B,并且它们的 start_date 存在偏移,导致它们的 execution_date 不对齐,则任务 A 的依赖关系将永远无法满足。 如果您想延迟任务,例如在凌晨 2 点运行每日任务,请查看TimeSensor
和TimeDeltaSensor
。 我们不建议使用动态start_date
,而建议使用固定的 start_date。有关更多信息,请阅读关于 start_date 的常见问题解答。end_date (datetime.datetime | None) – 如果指定,调度程序将不会超过此日期。
depends_on_past (bool) – 当设置为 true 时,任务实例将按顺序运行,并且仅当上一个实例成功或已跳过时才运行。允许运行 start_date 的任务实例。
wait_for_past_depends_before_skipping (bool) – 当设置为 true 时,如果任务实例应标记为跳过,并且 depends_on_past 为 true,则该 ti 将保持 None 状态,等待上一次运行的任务。
wait_for_downstream (bool) – 当设置为 true 时,任务 X 的实例将等待先前任务 X 实例的下游任务成功完成或跳过,然后才会运行。如果任务 X 的不同实例修改同一资产,并且此资产被任务 X 的下游任务使用,则此设置很有用。 请注意,只要使用 wait_for_downstream,depends_on_past 都会强制为 True。 还要注意,只会等待先前任务实例的直接下游任务;任何更下游的任务的状态都会被忽略。
dag (airflow.models.dag.DAG | None) – 指向任务所附加的 dag 的引用(如果有)。
priority_weight (int) – 此任务相对于其他任务的优先级权重。这允许执行器在任务积压时优先触发更高优先级的任务。为更重要的任务设置更高的 priority_weight 值。 由于并非所有数据库引擎都支持 64 位整数,因此值的上限为 32 位。有效范围为 -2,147,483,648 到 2,147,483,647。
weight_rule (str | airflow.task.priority_strategy.PriorityWeightStrategy) – 用于任务的有效总优先级权重的加权方法。选项包括:
{ downstream | upstream | absolute }
默认值为downstream
。 当设置为downstream
时,任务的有效权重是所有下游后代的总和。因此,当使用正权重值时,上游任务将具有更高的权重,并且会被更积极地调度。当您有多个 dag 运行实例,并且希望在每个 dag 可以继续处理下游任务之前完成所有运行的所有上游任务时,这非常有用。 当设置为upstream
时,有效权重是所有上游祖先的总和。这是相反的,当使用正权重值时,下游任务具有更高的权重,并且会被更积极地调度。当您有多个 dag 运行实例,并且希望每个 dag 在开始其他 dag 的上游任务之前完成时,这非常有用。当设置为absolute
时,有效权重是指定的精确的priority_weight
,没有额外的加权。当您确切知道每个任务应具有什么优先级权重时,您可能需要这样做。此外,当设置为absolute
时,对于非常大的 DAG,还有一个额外的好处是可以显着加快任务创建过程。 可以将选项设置为字符串,或者使用静态类airflow.utils.WeightRule
中定义的常量。无论使用何种权重规则,生成的优先级值的上限为 32 位。 这是一个实验性功能。 自 2.9.0 起,Airflow 允许通过创建airflow.task.priority_strategy.PriorityWeightStrategy
的子类并在插件中注册,然后通过weight_rule
参数提供类路径或类实例来定义自定义优先级权重策略。自定义优先级权重策略将用于计算任务实例的有效总优先级权重。queue (str) – 运行此作业时要定位的队列。并非所有执行器都实现了队列管理,CeleryExecutor 支持定位特定队列。
pool (str | None) – 此任务应运行的槽池,槽池是限制某些任务并发的一种方法。
pool_slots (int) – 此任务应使用的池槽数(>= 1)。不允许使用小于 1 的值。
sla (datetime.timedelta | None) – 作业预期成功的截止时间。请注意,这表示周期关闭后的
timedelta
。 例如,如果您设置了 1 小时的 SLA,如果2016-01-01
实例尚未成功,则调度程序会在2016-01-02
之后的 1:00 AM 之后不久发送电子邮件。调度程序会特别注意具有 SLA 的作业,并为 SLA 未命中发送警报电子邮件。SLA 未命中也会记录在数据库中以供将来参考。共享同一 SLA 时间的所有任务都会捆绑在一封电子邮件中,在该时间之后不久发送。每个任务实例的 SLA 通知仅发送一次。execution_timeout (datetime.timedelta | None) – 此任务实例允许执行的最大时间,如果超出该时间,则会引发错误并失败。
on_failure_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 此任务的任务实例失败时要调用的一个或多个函数。 一个上下文字典作为单个参数传递给此函数。上下文包含对任务实例相关对象的引用,并在 API 的宏部分中记录。
on_execute_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 与
on_failure_callback
非常相似,只是它在任务执行之前执行。on_retry_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 与
on_failure_callback
非常相似,只是它在发生重试时执行。on_success_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 与
on_failure_callback
非常相似,只是它在任务成功时执行。on_skipped_callback (None | airflow.models.abstractoperator.TaskStateChangeCallback | list[airflow.models.abstractoperator.TaskStateChangeCallback]) – 与
on_failure_callback
非常相似,只是它在发生跳过时执行;只有在引发 AirflowSkipException 时才会调用此回调。 明确地说,如果由于 DAG 中的先前分支决策或导致跳过执行的触发规则而未开始执行任务,则不会调用此回调,因此不会调度任务执行。pre_execute (TaskPreExecuteHook | None) –
在任务执行之前立即调用的函数,接收上下文字典;引发异常将阻止执行任务。
这是一个实验性功能。
post_execute (TaskPostExecuteHook | None) –
在任务执行之后立即调用的函数,接收上下文字典和任务结果;引发异常将阻止任务成功。
这是一个实验性功能。
trigger_rule (str) – 定义任务触发时应用依赖关系的规则。选项包括:
{ all_success | all_failed | all_done | all_skipped | one_success | one_done | one_failed | none_failed | none_failed_min_one_success | none_skipped | always}
默认值为all_success
。选项可以设置为字符串,也可以使用静态类airflow.utils.TriggerRule
中定义的常量。resources (dict[str, Any] | None) – 资源参数名称(Resources 构造函数的参数名称)与其值的映射。
run_as_user (str | None) – 运行任务时要模拟的 Unix 用户名。
max_active_tis_per_dag (int | None) – 设置后,任务将能够限制跨执行日期的并发运行。
max_active_tis_per_dagrun (int | None) – 设置后,任务将能够限制每个 DAG 运行的并发任务实例数。
executor (str | None) – 运行此任务时要定位的执行器。 尚未支持。
executor_config (dict | None) –
特定执行器解释的其他任务级配置参数。参数由执行器的名称命名空间。
示例:通过 KubernetesExecutor 在特定 Docker 容器中运行此任务
MyOperator(..., executor_config={"KubernetesExecutor": {"image": "myCustomDockerImage"}})
do_xcom_push (bool) – 如果为 True,则会推送一个包含 Operator 结果的 XCom。
multiple_outputs (bool) – 如果为 True 且 do_xcom_push 为 True,则推送多个 XCom,返回的字典结果中每个键对应一个 XCom。如果为 False 且 do_xcom_push 为 True,则推送单个 XCom。
task_group (airflow.utils.task_group.TaskGroup | None) – 任务应属于的任务组。这通常在不使用 TaskGroup 作为上下文管理器时提供。
doc (str | None) – 为您的任务对象添加文档或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。
doc_md (str | None) – 为您的任务对象添加文档(Markdown 格式)或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。
doc_rst (str | None) – 为您的任务对象添加文档(RST 格式)或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。
doc_json (str | None) – 为您的任务对象添加文档(JSON 格式)或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。
doc_yaml (str | None) – 为您的任务对象添加文档(YAML 格式)或注释,这些文档或注释在 Web 服务器的任务实例详细信息视图中可见。
task_display_name (str | None) – 任务的显示名称,该名称显示在 UI 上。
logger_name (str | None) – Operator 用于发出日志的记录器名称。如果设置为 None (默认),则记录器名称将回退到 airflow.task.operators.{class.__module__}.{class.__name__} (例如,SimpleHttpOperator 的记录器将为 airflow.task.operators.airflow.providers.http.operators.http.SimpleHttpOperator)。
allow_nested_operators (bool) –
如果为 True,则在另一个运算符内执行运算符时,将记录一条警告消息。如果为 False,则如果运算符使用不当(例如,嵌套在另一个运算符内),则会引发异常。在 Airflow 的未来版本中,将删除此参数,并且当运算符彼此嵌套时,始终会引发异常(默认值为 True)。
示例:错误的运算符混合使用示例
@task(provide_context=True) def say_hello_world(**context): hello_world_task = BashOperator( task_id="hello_world_task", bash_command="python -c \"print('Hello, world!')\"", dag=dag, ) hello_world_task.execute(context)
- property dag:airflow.models.dag.DAG[源代码]¶
返回 Operator 的 DAG(如果已设置),否则引发错误。
- property operator_class:type[BaseOperator][源代码]¶
- property roots:list[BaseOperator][源代码]¶
DAGNode 所需。
- property leaves:list[BaseOperator][源代码]¶
DAGNode 所需。
- operator_extra_links: Collection[airflow.models.baseoperatorlink.BaseOperatorLink] = ()[源代码]¶
- subdag: airflow.models.dag.DAG | None[源代码]¶
- start_trigger_args: airflow.triggers.base.StartTriggerArgs | None[源代码]¶
- deps: frozenset[airflow.ti_deps.deps.base_ti_dep.BaseTIDep][源代码]¶
返回操作符的依赖关系集合。这些与执行上下文依赖关系不同,因为它们特定于任务,并且可以被子类扩展/覆盖。
- set_xcomargs_dependencies()[source]¶
解析任务的上游依赖关系。
这样,将
XComArg
作为模板字段的值传递,将导致在两个任务之间创建上游关系。示例:
with DAG(...): generate_content = GenerateContentOperator(task_id="generate_content") send_email = EmailOperator(..., html_content=generate_content.output) # This is equivalent to with DAG(...): generate_content = GenerateContentOperator(task_id="generate_content") send_email = EmailOperator(..., html_content="{{ task_instance.xcom_pull('generate_content') }}") generate_content >> send_email
- abstract execute(context)[source]¶
在创建操作符时派生。
Context 与渲染 Jinja 模板时使用的字典相同。
有关更多上下文,请参阅 get_template_context。
- render_template_fields(context, jinja_env=None)[source]¶
模板化 self.template_fields 中列出的所有属性。
这会就地改变属性,并且是不可逆的。
- 参数
context (airflow.utils.context.Context) – 带有要应用于内容的 Context 值的字典。
jinja_env (jinja2.Environment | None) – Jinja 用于渲染的环境。
- clear(start_date=None, end_date=None, upstream=False, downstream=False, session=NEW_SESSION)[source]¶
根据指定的参数清除与任务关联的任务实例的状态。
- get_task_instances(start_date=None, end_date=None, session=NEW_SESSION)[source]¶
获取与此任务相关的特定日期范围的任务实例。
- run(start_date=None, end_date=None, ignore_first_depends_on_past=True, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, mark_success=False, test_mode=False, session=NEW_SESSION)[source]¶
运行日期范围内的任务实例集。
- static xcom_push(context, key, value, execution_date=None)[source]¶
使 XCom 可供任务拉取。
- 参数
context (Any) – 执行上下文字典
key (str) – XCom 的键
value (Any) – XCom 的值。该值将被 pickle 序列化并存储在数据库中。
execution_date (datetime.datetime | None) – 如果提供,则 XCom 在此日期之前不可见。 例如,这可以用来在未来的某个日期向任务发送消息,而无需立即可见。
- static xcom_pull(context, task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=None, session=NEW_SESSION)[source]¶
拉取可选地满足特定条件的 XComs。
key 的默认值将搜索限制为由其他任务返回的 XComs(而不是手动推送的)。要删除此筛选器,请传递 key=None(或任何所需的值)。
如果提供单个 task_id 字符串,则结果是来自该 task_id 的最近匹配的 XCom 的值。如果提供多个 task_ids,则返回匹配值的元组。如果找不到任何匹配项,则返回 None。
- 参数
context (Any) – 执行上下文字典
key (str) – XCom 的键。如果提供,则仅返回具有匹配键的 XCom。默认键为 ‘return_value’,也可作为常量 XCOM_RETURN_KEY 使用。此键会自动分配给任务返回的 XCom(而不是手动推送的)。要删除筛选器,请传递 key=None。
task_ids (str | list[str] | None) – 仅拉取来自具有匹配 ID 的任务的 XCom。可以传递 None 以删除筛选器。
dag_id (str | None) – 如果提供,则仅从此 DAG 拉取 XCom。如果为 None(默认值),则使用调用任务的 DAG。
include_prior_dates (bool | None) – 如果为 False,则仅返回来自当前 execution_date 的 XCom。如果为 True,则也会返回来自之前日期的 XCom。
- airflow.models.baseoperator.chain(*tasks)[source]¶
给定一些任务,构建依赖链。
此函数接受 BaseOperator(又名任务)、EdgeModifiers(又名标签)、XComArg、TaskGroups 或包含这些类型的任意组合的列表(或同一列表中的组合)的值。如果要链接两个列表,必须确保它们具有相同的长度。
使用经典操作符/传感器
chain(t1, [t2, t3], [t4, t5], t6)
等效于
/ -> t2 -> t4 \ t1 -> t6 \ -> t3 -> t5 /
t1.set_downstream(t2) t1.set_downstream(t3) t2.set_downstream(t4) t3.set_downstream(t5) t4.set_downstream(t6) t5.set_downstream(t6)
使用任务修饰的函数,又名 XComArgs
chain(x1(), [x2(), x3()], [x4(), x5()], x6())
等效于
/ -> x2 -> x4 \ x1 -> x6 \ -> x3 -> x5 /
x1 = x1() x2 = x2() x3 = x3() x4 = x4() x5 = x5() x6 = x6() x1.set_downstream(x2) x1.set_downstream(x3) x2.set_downstream(x4) x3.set_downstream(x5) x4.set_downstream(x6) x5.set_downstream(x6)
使用 TaskGroups
chain(t1, task_group1, task_group2, t2) t1.set_downstream(task_group1) task_group1.set_downstream(task_group2) task_group2.set_downstream(t2)
也可以在经典操作符/传感器、EdgeModifiers、XComArg 和 TaskGroups 之间混合使用
chain(t1, [Label("branch one"), Label("branch two")], [x1(), x2()], task_group1, x3())
等效于
/ "branch one" -> x1 \ t1 -> task_group1 -> x3 \ "branch two" -> x2 /
x1 = x1() x2 = x2() x3 = x3() label1 = Label("branch one") label2 = Label("branch two") t1.set_downstream(label1) label1.set_downstream(x1) t2.set_downstream(label2) label2.set_downstream(x2) x1.set_downstream(task_group1) x2.set_downstream(task_group1) task_group1.set_downstream(x3) # or x1 = x1() x2 = x2() x3 = x3() t1.set_downstream(x1, edge_modifier=Label("branch one")) t1.set_downstream(x2, edge_modifier=Label("branch two")) x1.set_downstream(task_group1) x2.set_downstream(task_group1) task_group1.set_downstream(x3)
- 参数
tasks (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 要设置依赖关系的单个和/或任务、EdgeModifiers、XComArgs 或 TaskGroups 的列表
- airflow.models.baseoperator.cross_downstream(from_tasks, to_tasks)[source]¶
设置 from_tasks 中所有任务到 to_tasks 中所有任务的下游依赖关系。
使用经典操作符/传感器
cross_downstream(from_tasks=[t1, t2, t3], to_tasks=[t4, t5, t6])
等效于
t1 ---> t4 \ / t2 -X -> t5 / \ t3 ---> t6
t1.set_downstream(t4) t1.set_downstream(t5) t1.set_downstream(t6) t2.set_downstream(t4) t2.set_downstream(t5) t2.set_downstream(t6) t3.set_downstream(t4) t3.set_downstream(t5) t3.set_downstream(t6)
使用任务修饰的函数,又名 XComArgs
cross_downstream(from_tasks=[x1(), x2(), x3()], to_tasks=[x4(), x5(), x6()])
等效于
x1 ---> x4 \ / x2 -X -> x5 / \ x3 ---> x6
x1 = x1() x2 = x2() x3 = x3() x4 = x4() x5 = x5() x6 = x6() x1.set_downstream(x4) x1.set_downstream(x5) x1.set_downstream(x6) x2.set_downstream(x4) x2.set_downstream(x5) x2.set_downstream(x6) x3.set_downstream(x4) x3.set_downstream(x5) x3.set_downstream(x6)
也可以在经典操作符/传感器和 XComArg 任务之间混合使用
cross_downstream(from_tasks=[t1, x2(), t3], to_tasks=[x1(), t2, x3()])
等效于
t1 ---> x1 \ / x2 -X -> t2 / \ t3 ---> x3
x1 = x1() x2 = x2() x3 = x3() t1.set_downstream(x1) t1.set_downstream(t2) t1.set_downstream(x3) x2.set_downstream(x1) x2.set_downstream(t2) x2.set_downstream(x3) t3.set_downstream(x1) t3.set_downstream(t2) t3.set_downstream(x3)
- 参数
from_tasks (Sequence[airflow.models.taskmixin.DependencyMixin]) – 要从中开始的任务或 XComArgs 的列表。
to_tasks (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 要设置为下游依赖项的任务或 XComArgs 的列表。
- airflow.models.baseoperator.chain_linear(*elements)[source]¶
简化任务依赖定义。
例如:假设您想要如下优先级
╭─op2─╮ ╭─op4─╮ op1─┤ ├─├─op5─┤─op7 ╰-op3─╯ ╰-op6─╯
那么你可以这样完成
chain_linear(op1, [op2, op3], [op4, op5, op6], op7)
- 参数
elements (airflow.models.taskmixin.DependencyMixin | Sequence[airflow.models.taskmixin.DependencyMixin]) – 操作符/操作符列表的列表