Airflow 峰会 2025 将于 10 月 07 日至 09 日举行。立即注册以获取早鸟票!

airflow.models.dag

属性

log

AssetT

TAG_MAX_LEN

DagStateChangeCallback

ScheduleInterval

ScheduleArg

dag

异常

InconsistentDataInterval

当模型错误地填充数据间隔字段时引发的异常。

DAG

DAG(有向无环图)是具有方向依赖关系的任务集合。

DagTag

每个 DAG 的标签名称,以便在 DAG 视图中快速过滤。

DagOwnerAttributes

定义不同所有者属性的表。

DagModel

包含 DAG 属性的表。

函数

get_last_dagrun(dag_id, session[, ...])

返回某个 DAG 的最后一次运行,如果没有则返回 None。

get_asset_triggered_next_run_info(dag_ids, *, session)

获取给定 dag_ids 列表的下一次运行信息。

模块内容

airflow.models.dag.log[source]
airflow.models.dag.AssetT[source]
airflow.models.dag.TAG_MAX_LEN = 100[source]
airflow.models.dag.DagStateChangeCallback[source]
airflow.models.dag.ScheduleInterval[source]
airflow.models.dag.ScheduleArg[source]
exception airflow.models.dag.InconsistentDataInterval(instance, start_field_name, end_field_name)[source]

基类: airflow.exceptions.AirflowException

当模型错误地填充数据间隔字段时引发的异常。

数据间隔字段应要么都为 None(对于 AIP-39 之前调度的运行),要么都为 datetime(对于 AIP-39 实现后调度的运行)。如果其中恰好一个字段为 None,则会引发此异常。

__str__()[source]

返回 str(self)。

airflow.models.dag.get_last_dagrun(dag_id, session, include_manually_triggered=False)[source]

返回某个 DAG 的最后一次运行,如果没有则返回 None。

最后一次 DAG 运行可以是任何类型的运行,例如调度或回填。忽略被覆盖的 DagRun。

airflow.models.dag.get_asset_triggered_next_run_info(dag_ids, *, session)[source]

获取给定 dag_ids 列表的下一次运行信息。

给定一个 dag_ids 列表,获取一个字符串表示任何资产触发的 DAG 距离下一次运行有多近,例如“2 个资产中 1 个已更新”。

airflow.models.dag.dag[source]
class airflow.models.dag.DAG(context=None)[source]

基类: airflow.sdk.definitions.dag.DAG, airflow.utils.log.logging_mixin.LoggingMixin

DAG(有向无环图)是具有方向依赖关系的任务集合。

一个 DAG 也有一个调度、一个开始日期和一个结束日期(可选)。对于每个调度(例如每日或每小时),DAG 需要在其依赖项满足时运行每个单独的任务。某些任务具有依赖于其自身过去的属性,这意味着它们在完成其先前的调度(以及上游任务)之前无法运行。

DAGs 本质上充当任务的命名空间。一个 task_id 只能添加到同一个 DAG 一次。

请注意,如果您计划使用时区,所有提供的日期都应该是 pendulum 日期。请参阅 时区感知 DAGs

版本 2.4 新增: 用于指定基于时间的调度逻辑 (timetable) 或资产驱动触发器的 schedule 参数。

版本 3.0 变更: schedule 的默认值已更改为 None(无调度)。先前的默认值为 timedelta(days=1)

参数:
  • dag_id – DAG 的 ID;必须仅包含字母数字字符、破折号、点和下划线(所有 ASCII 字符)

  • description – DAG 的描述,例如用于在 webserver 上显示

  • schedule – 如果提供,这将定义 DAG 运行的调度规则。可能的值包括 cron 表达式字符串、timedelta 对象、Timetable 或 Asset 对象列表。另请参阅 使用 Timetables 定制 DAG 调度

  • start_date – 调度器将尝试回填的时间戳。如果未提供,则必须手动使用明确的时间范围进行回填。

  • end_date – DAG 不会运行的日期,留空(None)表示开放式调度。

  • template_searchpath – 此文件夹列表(非相对路径)定义了 jinja 查找模板的位置。顺序很重要。请注意,jinja/airflow 默认包含您的 DAG 文件路径

  • template_undefined – 模板未定义类型。

  • user_defined_macros – 在 jinja 模板中公开的宏字典。例如,将 dict(foo='bar') 作为参数传递,可以使您在此 DAG 相关的所有 jinja 模板中使用 {{ foo }}。请注意,您可以在此处传递任何类型的对象。

  • user_defined_filters – 在 jinja 模板中公开的过滤器字典。例如,将 dict(hello=lambda name: 'Hello %s' % name) 作为参数传递,可以使您在此 DAG 相关的所有 jinja 模板中使用 {{ 'world' | hello }}

  • default_args – 用于在初始化操作符时作为构造函数关键字参数的默认参数字典。请注意,操作符具有相同的 hook,并且优先于此处定义的参数,这意味着如果您的字典中包含 ‘depends_on_past’: True,而操作符调用的 default_args 中包含 ‘depends_on_past’: False,则实际值将为 False

  • params – DAG 级别的参数字典,可在模板中访问,位于 params 命名空间下。这些参数可以在任务级别被覆盖。

  • max_active_tasks – 允许同时运行的任务实例数量

  • max_active_runs – 最大活跃 DAG 运行数,超过此数量处于运行状态的 DAG 运行后,调度器将不会创建新的活跃 DAG 运行

  • max_consecutive_failed_dag_runs – (实验性)最大连续失败 DAG 运行数,超过此数量后,调度器将禁用该 DAG

  • dagrun_timeout – 指定 DagRun 在超时或失败前允许运行的持续时间。DagRun 超时时正在运行的任务实例将被标记为跳过。

  • sla_miss_callback – 已弃用 - SLA 功能在 Airflow 3.0 中移除,将在 3.1 中由新实现取代

  • catchup – 执行调度器追赶(或仅运行最新的)?默认为 False

  • on_failure_callback – 当此 DAG 的 DagRun 失败时要调用的函数或函数列表。一个上下文字典作为单个参数传递给此函数。

  • on_success_callback – 与 on_failure_callback 非常相似,只是它在 DAG 成功时执行。

  • access_control – 指定可选的 DAG 级别操作,例如“{‘role1’: {‘can_read’}, ‘role2’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”,或者如果存在 DAG 运行资源,则可以指定资源名称,例如“{‘role1’: {‘DAG Runs’: {‘can_create’}}, ‘role2’: {‘DAGs’: {‘can_read’, ‘can_edit’, ‘can_delete’}}”

  • is_paused_upon_creation – 指定 DAG 在首次创建时是否暂停。如果 DAG 已存在,此标志将被忽略。如果未指定此可选参数,将使用全局配置设置。

  • jinja_environment_kwargs

    要传递给 Jinja Environment 以进行模板渲染的额外配置选项

    示例: 为了避免 Jinja 从模板字符串中移除末尾的换行符

    DAG(
        dag_id="my-dag",
        jinja_environment_kwargs={
            "keep_trailing_newline": True,
            # some other jinja2 Environment options here
        },
    )
    

    参见: Jinja Environment 文档

  • render_template_as_native_obj – 如果为 True,使用 Jinja NativeEnvironment 将模板渲染为本地 Python 类型。如果为 False,使用 Jinja Environment 将模板渲染为字符串值。

  • tags – 标签列表,用于帮助在 UI 中过滤 DAG。

  • owner_links – 所有者及其链接的字典,可在 DAG 视图 UI 上点击。可用作 HTTP 链接(例如指向您的 Slack 频道),或 mailto 链接。例如: {“dag_owner”: “https://airflow.org.cn/”}

  • auto_register – 当在 with 块中使用此 DAG 时自动注册它

  • fail_fast – 当 DAG 中的任务失败时,使当前正在运行的任务失败。警告: 快速失败的 DAG 只能包含具有默认触发规则(“all_success”)的任务。如果快速失败的 DAG 中的任何任务具有非默认触发规则,则会引发异常。

  • dag_display_name – 显示在 UI 上的 DAG 显示名称。

partial: bool = False[source]
last_loaded: datetime.datetime | None[source]
max_consecutive_failed_dag_runs: int[source]
property safe_dag_id[source]
validate()[source]

验证 DAG 的设置是否一致。

此方法在 DAG bag 组织 DAG 之前调用。

validate_executor_field()[source]
next_dagrun_info(last_automated_dagrun, *, restricted=True)[source]

获取此 DAG 在 date_last_automated_dagrun 之后的下一次 DagRun 的信息。

此方法根据 DAG 的 timetable、start_date、end_date 等计算下一次 DagRun 应操作的时间间隔(其逻辑日期)以及何时可以调度。此方法不检查最大活跃运行数或任何其他“max_active_tasks”类型的限制,仅根据此 DAG 及其任务的各种日期和间隔字段执行计算。

参数:
  • last_automated_dagrun (None | airflow.timetables.base.DataInterval) – 此 DAG 现有“自动化” DagRun(调度或回填,非手动)的 max(logical_date)

  • restricted (bool) – 如果设置为 False(默认为 True),忽略 DAG 或任务上指定的 start_dateend_datecatchup

返回:

下一次 dagrun 的 DagRunInfo,如果不会调度 dagrun 则返回 None。

返回类型:

airflow.timetables.base.DagRunInfo | None

iter_dagrun_infos_between(earliest, latest, *, align=True)[source]

使用此 DAG 的 timetable 在给定间隔内生成 DagRunInfo。

如果 DagRunInfo 实例的 logical_date 不早于 earliest,也不晚于 latest,则生成这些实例。实例按其 logical_date 从最早到最新排序。

如果 alignFalse,则第一次运行将立即在 earliest 时发生,即使它不符合逻辑 timetable 调度。默认为 True

示例: 一个 DAG 被调度为每午夜运行一次 (0 0 * * *)。如果 earliest2021-06-03 23:00:00,则如果 align=False,第一次 DagRunInfo 将是 2021-06-03 23:00:00,如果 align=True,则为 2021-06-04 00:00:00

get_last_dagrun(session=NEW_SESSION, include_manually_triggered=False)[source]
has_dag_runs(session=NEW_SESSION, include_manually_triggered=True)[source]
property dag_id: str[source]
property timetable_summary: str[source]
get_concurrency_reached(session=NEW_SESSION)[source]

返回一个布尔值,指示此 DAG 是否已达到 max_active_tasks 限制。

get_is_active(session=NEW_SESSION)[source]

返回一个布尔值,指示此 DAG 是否活跃。

get_is_stale(session=NEW_SESSION)[source]

返回一个布尔值,指示此 DAG 是否陈旧。

get_is_paused(session=NEW_SESSION)[source]

返回一个布尔值,指示此 DAG 是否已暂停。

get_bundle_name(session=NEW_SESSION)[source]

返回此 DAG 所在的 bundle 名称。

get_bundle_version(session=NEW_SESSION)[source]

返回处理此 DAG 时看到的 bundle 版本。

classmethod get_serialized_fields()[source]

字符串化的 DAGs 和操作符恰好包含这些字段。

static fetch_callback(dag, run_id, success=True, reason=None, *, session=NEW_SESSION)[source]

根据 success 的值获取相应的回调。

此方法获取属于此 DAG 运行的单个任务实例的上下文,并连同回调列表一起返回。

参数:
  • dag (DAG) – DAG 对象

  • run_id (str) – DAG 运行 ID

  • success (bool) – 指示是调用失败回调还是成功回调的标志

  • reason (str | None) – 完成原因

  • session (sqlalchemy.orm.session.Session) – 数据库会话

handle_callback(dagrun, success=True, reason=None, session=NEW_SESSION)[source]

触发相应的 on_failure_callback 或 on_success_callback。

此方法获取属于此 DAG 运行的单个任务实例的上下文,并将其连同“reason”(主要用于区分 DAG 运行失败)一起传递给可调用对象。

参数:
  • dagrun (airflow.models.dagrun.DagRun) – DagRun 对象

  • success – 指示是调用失败回调还是成功回调的标志

  • reason – 完成原因

  • session – 数据库会话

classmethod execute_callback(callbacks, context, dag_id)[source]

使用给定的上下文触发回调。

参数:
  • callbacks (list[Callable] | None) – 要调用的回调列表

  • context (airflow.utils.context.Context | None) – 要传递给所有回调的上下文

  • dag_id (str) – 要查找的 DAG 的 dag_id。

get_active_runs()[source]

返回当前正在运行的 DAG 运行的逻辑日期列表。

返回:

逻辑日期列表

static fetch_dagrun(dag_id, run_id, session=NEW_SESSION)[source]

返回给定 run_id 的 DAG 运行(如果存在),否则返回 None。

参数:
返回:

如果找到,则为 DagRun;否则为 None。

返回类型:

airflow.models.dagrun.DagRun

get_dagrun(run_id, session=NEW_SESSION)[source]
get_dagruns_between(start_date, end_date, session=NEW_SESSION)[source]

返回 start_date(包含)和 end_date(包含)之间的 DAG 运行列表。

参数:
  • start_date – 要查找的 DagRun 的起始逻辑日期。

  • end_date – 要查找的 DagRun 的结束逻辑日期。

  • session

返回:

找到的 DagRun 列表。

get_latest_logical_date(session=NEW_SESSION)[source]

返回存在至少一个 DAG 运行的最新日期。

get_task_instances_before(base_date, num, *, session=NEW_SESSION)[source]

获取 base_date 之前(含)的 num 个任务实例。

返回的列表可能包含与任何 DagRunType 对应的恰好 num 个任务实例。如果在 base_date 之前计划的 DAG 运行少于 num 个,则返回的数量可能会少于 num

get_task_instances(start_date=None, end_date=None, state=None, session=NEW_SESSION)[source]
set_task_instance_state(*, task_id, map_indexes=None, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

设置任务实例的状态,并清除处于失败或上游失败状态的下游任务。

参数:
  • task_id (str) – 任务实例的任务 ID

  • map_indexes (collections.abc.Collection[int] | None) – 仅在 map_index 匹配时设置任务实例。如果为 None(默认),则设置该任务的所有映射任务实例。

  • run_id (str | None) – 任务实例的 run_id

  • state (airflow.utils.state.TaskInstanceState) – 要将任务实例设置为的状态

  • upstream (bool) – 包含给定 task_id 的所有上游任务

  • downstream (bool) – 包含给定 task_id 的所有下游任务

  • future (bool) – 包含给定 task_id 的所有未来的任务实例

  • commit (bool) – 提交更改

  • past (bool) – 包含给定 task_id 的所有过去的任务实例

set_task_group_state(*, group_id, run_id=None, state, upstream=False, downstream=False, future=False, past=False, commit=True, session=NEW_SESSION)[source]

将任务组设置为给定状态,并清除处于失败或上游失败状态的下游任务。

参数:
  • group_id (str) – 任务组的 group_id

  • run_id (str | None) – 任务实例的 run_id

  • state (airflow.utils.state.TaskInstanceState) – 要将任务实例设置为的状态

  • upstream (bool) – 包含给定 task_id 的所有上游任务

  • downstream (bool) – 包含给定 task_id 的所有下游任务

  • future (bool) – 包含给定 task_id 的所有未来的任务实例

  • commit (bool) – 提交更改

  • past (bool) – 包含给定 task_id 的所有过去的任务实例

  • session (sqlalchemy.orm.session.Session) – 新会话

clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, run_id: str, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance][source]
clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, run_id: str, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int
clear(*, dry_run: airflow.typing_compat.Literal[True], task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) list[airflow.models.taskinstance.TaskInstance]
clear(*, task_ids: collections.abc.Collection[str | tuple[str, int]] | None = None, start_date: datetime.datetime | None = None, end_date: datetime.datetime | None = None, only_failed: bool = False, only_running: bool = False, confirm_prompt: bool = False, dag_run_state: airflow.utils.state.DagRunState = DagRunState.QUEUED, dry_run: airflow.typing_compat.Literal[False] = False, session: sqlalchemy.orm.session.Session = NEW_SESSION, dag_bag: airflow.models.dagbag.DagBag | None = None, exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(), exclude_run_ids: frozenset[str] | None = frozenset()) int

在指定日期范围内清除与当前 DAG 关联的一组任务实例。

参数:
  • task_ids – 要清除的任务 ID 列表或 (task_id, map_index) 元组

  • run_id – 要清除任务的运行 ID

  • start_date – 要清除的最小逻辑日期

  • end_date – 要清除的最大逻辑日期

  • only_failed – 仅清除失败的任务

  • only_running – 仅清除正在运行的任务。

  • confirm_prompt – 请求确认

  • dag_run_state – 要将 DagRun 设置成的状态。如果设置为 False,则不会更改 dagrun 状态。

  • dry_run – 查找要清除的任务,但不实际清除它们。

  • session – 要使用的 SQLAlchemy 会话

  • dag_bag – 用于查找 DAG 的 DagBag(可选)

  • exclude_task_ids – 一组不应清除的 task_id 或 (task_id, map_index) 元组

  • exclude_run_ids – 一组 run_id 或 (run_id)

classmethod clear_dags(dags, start_date=None, end_date=None, only_failed=False, only_running=False, confirm_prompt=False, dag_run_state=DagRunState.QUEUED, dry_run=False)[source]
cli()[source]

提供一个针对此 DAG 的 CLI。

test(run_after=None, logical_date=None, run_conf=None, conn_file_path=None, variable_file_path=None, use_executor=False, mark_success_pattern=None, session=NEW_SESSION)[source]

为给定的 DAG 和逻辑日期执行单个 DagRun。

参数:
  • run_after (datetime.datetime | None) – 在此日期时间之前 DAG 不能运行。

  • logical_date (datetime.datetime | None) – 此 DAG 运行的逻辑日期

  • run_conf (dict[str, Any] | None) – 传递给新创建的 dagrun 的配置

  • conn_file_path (str | None) – YAML 或 JSON 格式的连接文件的文件路径

  • variable_file_path (str | None) – YAML 或 JSON 格式的变量文件的文件路径

  • use_executor (bool) – 如果设置,则使用执行器测试此 DAG

  • mark_success_pattern (re.Pattern | str | None) – 要标记为成功(而非运行中)的 task_id 的正则表达式

  • session (sqlalchemy.orm.session.Session) – 数据库连接(可选)

classmethod bulk_write_to_db(bundle_name, bundle_version, dags, session=NEW_SESSION)[source]

确保数据库中 dag 表中给定 DAG 的 DagModel 行是最新的。

参数:

dags (collections.abc.Collection[airflow.serialization.serialized_objects.MaybeSerializedDAG]) – 要保存到数据库的 DAG 对象

返回:

sync_to_db(session=NEW_SESSION)[source]

将此 DAG 的属性保存到数据库。

返回:

static deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION)[source]

给定一个已知 DAG 列表,停用 ORM 中被标记为活动的任何其他 DAG。

参数:

active_dag_ids – 处于活动状态的 DAG ID 列表

返回:

static deactivate_stale_dags(expiration_date, session=NEW_SESSION)[source]

停用在过期日期之前最后一次被调度器触碰的任何 DAG。

这些 DAG 可能已被删除。

参数:

expiration_date – 设置在此时间之前被触碰的非活动状态的 DAG

返回:

static get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION)[source]

返回给定 DAG 中任务实例的数量。

参数:
  • session – ORM 会话

  • dag_id – 要获取任务实例数量的 DAG 的 ID

  • run_id – 要获取任务实例数量的 DAG 运行的 ID

  • task_ids – 给定 DAG 的有效任务 ID 列表

  • states – 如果提供,则用于过滤的状态列表

返回:

正在运行的任务数量

返回类型:

int

get_task_assets(inlets=True, outlets=True, of_type=Asset)[source]
classmethod from_sdk_dag(dag)[source]

从 TaskSDKDag 创建一个新的(调度器)DAG 对象。

class airflow.models.dag.DagTag[source]

基类:airflow.models.base.Base

每个 DAG 的标签名称,以便在 DAG 视图中快速过滤。

__tablename__ = 'dag_tag'[source]
name[source]
dag_id[source]
__table_args__[source]
__repr__()[source]
class airflow.models.dag.DagOwnerAttributes[source]

基类:airflow.models.base.Base

定义不同所有者属性的表。

例如,一个所有者的链接,将作为超链接传递到“DAGs”视图。

__tablename__ = 'dag_owner_attributes'[source]
dag_id[source]
owner[source]
__repr__()[source]
classmethod get_all(session)[source]
class airflow.models.dag.DagModel(**kwargs)[source]

基类:airflow.models.base.Base

包含 DAG 属性的表。

__tablename__ = 'dag'[source]

这些项存储在数据库中,用于与状态相关的信息

dag_id[source]
is_paused_at_creation = True[source]
is_paused[source]
is_stale[source]
last_parsed_time[source]
last_expired[source]
fileloc[source]
relative_fileloc[source]
bundle_name[source]
bundle_version[source]
owners[source]
description[source]
timetable_summary[source]
timetable_description[source]
asset_expression[source]
tags[source]
max_active_tasks[source]
max_active_runs[source]
max_consecutive_failed_dag_runs[source]
has_task_concurrency_limits[source]
has_import_errors[source]
next_dagrun[source]
next_dagrun_data_interval_start[source]
next_dagrun_data_interval_end[source]
next_dagrun_create_after[source]
__table_args__[source]
schedule_asset_references[source]
schedule_asset_alias_references[source]
schedule_asset_name_references[source]
schedule_asset_uri_references[source]
schedule_assets[source]
task_outlet_asset_references[source]
NUM_DAGS_PER_DAGRUN_QUERY[source]
dag_versions[source]
__repr__()[source]
property next_dagrun_data_interval: airflow.timetables.base.DataInterval | None[source]
property timezone[source]
static get_dagmodel(dag_id, session=NEW_SESSION)[source]
classmethod get_current(dag_id, session=NEW_SESSION)[source]
get_last_dagrun(session=NEW_SESSION, include_manually_triggered=False)[source]
get_is_paused(*, session=None)[source]

提供与‘DAG’的接口兼容性。

get_is_active(*, session=None)[source]

提供与‘DAG’的接口兼容性。

static get_paused_dag_ids(dag_ids, session=NEW_SESSION)[source]

给定一个 dag_id 列表,获取一组暂停的 Dag Id。

参数:
返回:

暂停的 Dag Id

返回类型:

set[str]

property safe_dag_id[source]
set_is_paused(is_paused, session=NEW_SESSION)[source]

暂停/取消暂停一个 DAG。

参数:
  • is_paused (bool) – DAG 是否暂停

  • session – 会话

dag_display_name()[source]
classmethod deactivate_deleted_dags(bundle_name, rel_filelocs, session=NEW_SESSION)[source]

对于已移除 DAG 文件的 DAG,将其 is_active=False 设置为 False。

参数:
classmethod dags_needing_dagruns(session)[source]

返回(并锁定)一个 Dag 对象列表,这些对象已到期创建新的 DagRun。

这将返回一个行级别锁定的行结果集,使用“SELECT … FOR UPDATE”查询。您应确保在单个事务中做出所有调度决策——一旦事务提交,锁定将被释放。

calculate_dagrun_date_fields(dag, last_automated_dag_run)[source]

计算 next_dagrunnext_dagrun_create_after`

参数:
  • dag (DAG) – DAG 对象

  • last_automated_dag_run (None | airflow.timetables.base.DataInterval) – 此 DAG 最近一次运行的数据间隔(或 datetime),如果尚未调度,则为 None。

get_asset_triggered_next_run_info(*, session=NEW_SESSION)[source]

本条目是否有帮助?