airflow.models.dag¶
属性¶
异常¶
当模型错误地填充数据间隔字段时引发的异常。 |
类¶
DAG(有向无环图)是具有方向依赖关系的任务集合。 |
|
每个 DAG 的标签名称,以便在 DAG 视图中快速过滤。 |
|
定义不同所有者属性的表。 |
|
包含 DAG 属性的表。 |
函数¶
|
返回某个 DAG 的最后一次运行,如果没有则返回 None。 |
|
获取给定 dag_ids 列表的下一次运行信息。 |
模块内容¶
- exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[source]¶
基类:
airflow.exceptions.AirflowException
当模型错误地填充数据间隔字段时引发的异常。
数据间隔字段应要么都为 None(对于 AIP-39 之前调度的运行),要么都为 datetime(对于 AIP-39 实现后调度的运行)。如果其中恰好一个字段为 None,则会引发此异常。
- airflow.models.dag.get_last_dagrun(dag_id, session, include_manually_triggered=False)[source]¶
返回某个 DAG 的最后一次运行,如果没有则返回 None。
最后一次 DAG 运行可以是任何类型的运行,例如调度或回填。忽略被覆盖的 DagRun。
- airflow.models.dag.get_asset_triggered_next_run_info(dag_ids, *, session)[source]¶
获取给定 dag_ids 列表的下一次运行信息。
给定一个 dag_ids 列表,获取一个字符串表示任何资产触发的 DAG 距离下一次运行有多近,例如“2 个资产中 1 个已更新”。
- class airflow.models.dag.DAG(context=None)[source]¶
基类:
airflow.sdk.definitions.dag.DAG
,airflow.utils.log.logging_mixin.LoggingMixin
DAG(有向无环图)是具有方向依赖关系的任务集合。
一个 DAG 也有一个调度、一个开始日期和一个结束日期(可选)。对于每个调度(例如每日或每小时),DAG 需要在其依赖项满足时运行每个单独的任务。某些任务具有依赖于其自身过去的属性,这意味着它们在完成其先前的调度(以及上游任务)之前无法运行。
DAGs 本质上充当任务的命名空间。一个 task_id 只能添加到同一个 DAG 一次。
请注意,如果您计划使用时区,所有提供的日期都应该是 pendulum 日期。请参阅 时区感知 DAGs。
版本 2.4 新增: 用于指定基于时间的调度逻辑 (timetable) 或资产驱动触发器的 schedule 参数。
版本 3.0 变更: schedule 的默认值已更改为 None(无调度)。先前的默认值为
timedelta(days=1)
。- 参数:
dag_id – DAG 的 ID;必须仅包含字母数字字符、破折号、点和下划线(所有 ASCII 字符)
description – DAG 的描述,例如用于在 webserver 上显示
schedule – 如果提供,这将定义 DAG 运行的调度规则。可能的值包括 cron 表达式字符串、timedelta 对象、Timetable 或 Asset 对象列表。另请参阅 使用 Timetables 定制 DAG 调度。
start_date – 调度器将尝试回填的时间戳。如果未提供,则必须手动使用明确的时间范围进行回填。
end_date – DAG 不会运行的日期,留空(None)表示开放式调度。
template_searchpath – 此文件夹列表(非相对路径)定义了 jinja 查找模板的位置。顺序很重要。请注意,jinja/airflow 默认包含您的 DAG 文件路径
template_undefined – 模板未定义类型。
user_defined_macros – 在 jinja 模板中公开的宏字典。例如,将
dict(foo='bar')
作为参数传递,可以使您在此 DAG 相关的所有 jinja 模板中使用{{ foo }}
。请注意,您可以在此处传递任何类型的对象。user_defined_filters – 在 jinja 模板中公开的过滤器字典。例如,将
dict(hello=lambda name: 'Hello %s' % name)
作为参数传递,可以使您在此 DAG 相关的所有 jinja 模板中使用{{ 'world' | hello }}
。default_args – 用于在初始化操作符时作为构造函数关键字参数的默认参数字典。请注意,操作符具有相同的 hook,并且优先于此处定义的参数,这意味着如果您的字典中包含 ‘depends_on_past’: True,而操作符调用的 default_args 中包含 ‘depends_on_past’: False,则实际值将为 False。
params – DAG 级别的参数字典,可在模板中访问,位于 params 命名空间下。这些参数可以在任务级别被覆盖。
max_active_tasks – 允许同时运行的任务实例数量
max_active_runs – 最大活跃 DAG 运行数,超过此数量处于运行状态的 DAG 运行后,调度器将不会创建新的活跃 DAG 运行
max_consecutive_failed_dag_runs – (实验性)最大连续失败 DAG 运行数,超过此数量后,调度器将禁用该 DAG
dagrun_timeout – 指定 DagRun 在超时或失败前允许运行的持续时间。DagRun 超时时正在运行的任务实例将被标记为跳过。
sla_miss_callback – 已弃用 - SLA 功能在 Airflow 3.0 中移除,将在 3.1 中由新实现取代
catchup – 执行调度器追赶(或仅运行最新的)?默认为 False
on_failure_callback – 当此 DAG 的 DagRun 失败时要调用的函数或函数列表。一个上下文字典作为单个参数传递给此函数。
on_success_callback – 与
on_failure_callback
非常相似,只是它在 DAG 成功时执行。access_control – 指定可选的 DAG 级别操作,例如“{‘role1’: {‘can_read’}, ‘role2’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”,或者如果存在 DAG 运行资源,则可以指定资源名称,例如“{‘role1’: {‘DAG Runs’: {‘can_create’}}, ‘role2’: {‘DAGs’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”
is_paused_upon_creation – 指定 DAG 在首次创建时是否暂停。如果 DAG 已存在,此标志将被忽略。如果未指定此可选参数,将使用全局配置设置。
jinja_environment_kwargs –
要传递给 Jinja
Environment
以进行模板渲染的额外配置选项示例: 为了避免 Jinja 从模板字符串中移除末尾的换行符
DAG( dag_id="my-dag", jinja_environment_kwargs={ "keep_trailing_newline": True, # some other jinja2 Environment options here }, )
render_template_as_native_obj – 如果为 True,使用 Jinja
NativeEnvironment
将模板渲染为本地 Python 类型。如果为 False,使用 JinjaEnvironment
将模板渲染为字符串值。tags – 标签列表,用于帮助在 UI 中过滤 DAG。
owner_links – 所有者及其链接的字典,可在 DAG 视图 UI 上点击。可用作 HTTP 链接(例如指向您的 Slack 频道),或 mailto 链接。例如: {“dag_owner”: “https://airflow.org.cn/”}
auto_register – 当在
with
块中使用此 DAG 时自动注册它fail_fast – 当 DAG 中的任务失败时,使当前正在运行的任务失败。警告: 快速失败的 DAG 只能包含具有默认触发规则(“all_success”)的任务。如果快速失败的 DAG 中的任何任务具有非默认触发规则,则会引发异常。
dag_display_name – 显示在 UI 上的 DAG 显示名称。
- last_loaded: datetime.datetime | None[source]¶
- next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]¶
获取此 DAG 在
date_last_automated_dagrun
之后的下一次 DagRun 的信息。此方法根据 DAG 的 timetable、start_date、end_date 等计算下一次 DagRun 应操作的时间间隔(其逻辑日期)以及何时可以调度。此方法不检查最大活跃运行数或任何其他“max_active_tasks”类型的限制,仅根据此 DAG 及其任务的各种日期和间隔字段执行计算。
- 参数:
last_automated_dagrun (None | airflow.timetables.base.DataInterval) – 此 DAG 现有“自动化” DagRun(调度或回填,非手动)的
max(logical_date)
。restricted (bool) – 如果设置为 False(默认为 True),忽略 DAG 或任务上指定的
start_date
、end_date
和catchup
。
- 返回:
下一次 dagrun 的 DagRunInfo,如果不会调度 dagrun 则返回 None。
- 返回类型:
- iter_dagrun_infos_between(earliest, latest, *, align=True)[source]¶
使用此 DAG 的 timetable 在给定间隔内生成 DagRunInfo。
如果 DagRunInfo 实例的
logical_date
不早于earliest
,也不晚于latest
,则生成这些实例。实例按其logical_date
从最早到最新排序。如果
align
为False
,则第一次运行将立即在earliest
时发生,即使它不符合逻辑 timetable 调度。默认为True
。示例: 一个 DAG 被调度为每午夜运行一次 (
0 0 * * *
)。如果earliest
为2021-06-03 23:00:00
,则如果align=False
,第一次 DagRunInfo 将是2021-06-03 23:00:00
,如果align=True
,则为2021-06-04 00:00:00
。
- static fetch_callback(dag, run_id, success=True, reason=None, *, session=NEW_SESSION)[source]¶
根据 success 的值获取相应的回调。
此方法获取属于此 DAG 运行的单个任务实例的上下文,并连同回调列表一起返回。
- 参数:
dag (DAG) – DAG 对象
run_id (str) – DAG 运行 ID
success (bool) – 指示是调用失败回调还是成功回调的标志
reason (str | None) – 完成原因
session (sqlalchemy.orm.session.Session) – 数据库会话
- handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]¶
触发相应的 on_failure_callback 或 on_success_callback。
此方法获取属于此 DAG 运行的单个任务实例的上下文,并将其连同“reason”(主要用于区分 DAG 运行失败)一起传递给可调用对象。
- 参数:
dagrun (airflow.models.dagrun.DagRun) – DagRun 对象
success – 指示是调用失败回调还是成功回调的标志
reason – 完成原因
session – 数据库会话
- static fetch_dagrun(dag_id, run_id, session=NEW_SESSION)[source]¶
返回给定 run_id 的 DAG 运行(如果存在),否则返回 None。
- 参数:
dag_id (str) – 要查找的 DAG 的 dag_id。
run_id (str) – 要查找的 DagRun 的 run_id。
session (sqlalchemy.orm.session.Session)
- 返回:
如果找到,则为 DagRun;否则为 None。
- 返回类型:
- get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]¶
返回 start_date(包含)和 end_date(包含)之间的 DAG 运行列表。
- 参数:
start_date – 要查找的 DagRun 的起始逻辑日期。
end_date – 要查找的 DagRun 的结束逻辑日期。
session
- 返回:
找到的 DagRun 列表。
- get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]¶
获取
base_date
之前(含)的num
个任务实例。返回的列表可能包含与任何 DagRunType 对应的恰好
num
个任务实例。如果在base_date
之前计划的 DAG 运行少于num
个,则返回的数量可能会少于num
。
- set_task_instance_state(*, task_id, map_indexes=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]¶
设置任务实例的状态,并清除处于失败或上游失败状态的下游任务。
- 参数:
task_id (str) – 任务实例的任务 ID
map_indexes (collections.abc.Collection[int] | None) – 仅在 map_index 匹配时设置任务实例。如果为 None(默认),则设置该任务的所有映射任务实例。
run_id (str | None) – 任务实例的 run_id
state (airflow.utils.state.TaskInstanceState) – 要将任务实例设置为的状态
upstream (bool) – 包含给定 task_id 的所有上游任务
downstream (bool) – 包含给定 task_id 的所有下游任务
future (bool) – 包含给定 task_id 的所有未来的任务实例
commit (bool) – 提交更改
past (bool) – 包含给定 task_id 的所有过去的任务实例
- set_task_group_state(*, group_id, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]¶
将任务组设置为给定状态,并清除处于失败或上游失败状态的下游任务。
- 参数:
group_id (str) – 任务组的 group_id
run_id (str | None) – 任务实例的 run_id
state (airflow.utils.state.TaskInstanceState) – 要将任务实例设置为的状态
upstream (bool) – 包含给定 task_id 的所有上游任务
downstream (bool) – 包含给定 task_id 的所有下游任务
future (bool) – 包含给定 task_id 的所有未来的任务实例
commit (bool) – 提交更改
past (bool) – 包含给定 task_id 的所有过去的任务实例
session (sqlalchemy.orm.session.Session) – 新会话
- clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, run_id: str, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance] [source]¶
- clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, run_id: str, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int
- clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance]
- clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int
在指定日期范围内清除与当前 DAG 关联的一组任务实例。
- 参数:
task_ids – 要清除的任务 ID 列表或 (
task_id
,map_index
) 元组run_id – 要清除任务的运行 ID
start_date – 要清除的最小逻辑日期
end_date – 要清除的最大逻辑日期
only_failed – 仅清除失败的任务
only_running – 仅清除正在运行的任务。
confirm_prompt – 请求确认
dag_run_state – 要将 DagRun 设置成的状态。如果设置为 False,则不会更改 dagrun 状态。
dry_run – 查找要清除的任务,但不实际清除它们。
session – 要使用的 SQLAlchemy 会话
dag_bag – 用于查找 DAG 的 DagBag(可选)
exclude_task_ids – 一组不应清除的
task_id
或 (task_id
,map_index
) 元组exclude_run_ids – 一组
run_id
或 (run_id
)
- classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]¶
- test(run_after=None, logical_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[source]¶
为给定的 DAG 和逻辑日期执行单个 DagRun。
- 参数:
run_after (datetime.datetime | None) – 在此日期时间之前 DAG 不能运行。
logical_date (datetime.datetime | None) – 此 DAG 运行的逻辑日期
conn_file_path (str | None) – YAML 或 JSON 格式的连接文件的文件路径
variable_file_path (str | None) – YAML 或 JSON 格式的变量文件的文件路径
use_executor (bool) – 如果设置,则使用执行器测试此 DAG
mark_success_pattern (re.Pattern | str | None) – 要标记为成功(而非运行中)的 task_id 的正则表达式
session (sqlalchemy.orm.session.Session) – 数据库连接(可选)
- classmethod bulk_write_to_db(bundle_name, bundle_version, dags, session=NEW_SESSION)[source]¶
确保数据库中 dag 表中给定 DAG 的 DagModel 行是最新的。
- 参数:
dags (collections.abc.Collection[airflow.serialization.serialized_objects.MaybeSerializedDAG]) – 要保存到数据库的 DAG 对象
- 返回:
无
- static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]¶
给定一个已知 DAG 列表,停用 ORM 中被标记为活动的任何其他 DAG。
- 参数:
active_dag_ids – 处于活动状态的 DAG ID 列表
- 返回:
无
- static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]¶
停用在过期日期之前最后一次被调度器触碰的任何 DAG。
这些 DAG 可能已被删除。
- 参数:
expiration_date – 设置在此时间之前被触碰的非活动状态的 DAG
- 返回:
无
- class airflow.models.dag.DagOwnerAttributes[source]¶
基类:
airflow.models.base.Base
定义不同所有者属性的表。
例如,一个所有者的链接,将作为超链接传递到“DAGs”视图。
- class airflow.models.dag.DagModel(**kwargs)[source]¶
基类:
airflow.models.base.Base
包含 DAG 属性的表。
- property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[source]¶
- static get_paused_dag_ids(dag_ids, session=NEW_SESSION)[source]¶
给定一个 dag_id 列表,获取一组暂停的 Dag Id。
- 参数:
session (sqlalchemy.orm.session.Session) – ORM 会话
- 返回:
暂停的 Dag Id
- 返回类型:
- set_is_paused(is_paused, session=NEW_SESSION)[source]¶
暂停/取消暂停一个 DAG。
- 参数:
is_paused (bool) – DAG 是否暂停
session – 会话
- classmethod deactivate_deleted_dags(bundle_name, rel_filelocs, session=NEW_SESSION)[source]¶
对于已移除 DAG 文件的 DAG,将其
is_active=False
设置为 False。- 参数:
bundle_name (str) – 文件位置的捆绑包
session (sqlalchemy.orm.session.Session) – ORM 会话
- classmethod dags_needing_dagruns(session)[source]¶
返回(并锁定)一个 Dag 对象列表,这些对象已到期创建新的 DagRun。
这将返回一个行级别锁定的行结果集,使用“SELECT … FOR UPDATE”查询。您应确保在单个事务中做出所有调度决策——一旦事务提交,锁定将被释放。
- calculate_dagrun_date_fields(dag, last_automated_dag_run)[source]¶
计算
next_dagrun
和 next_dagrun_create_after`。- 参数:
dag (DAG) – DAG 对象
last_automated_dag_run (None | airflow.timetables.base.DataInterval) – 此 DAG 最近一次运行的数据间隔(或 datetime),如果尚未调度,则为 None。