airflow.models.dagrun
¶
模块内容¶
类¶
DagRun.task_instance_scheduling_decisions 的返回类型。 |
|
DAG 的调用实例。 |
|
用于存储关于 dagrun 实例的任意注释。 |
属性¶
- airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|dataset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[源代码]¶
- class airflow.models.dagrun.TISchedulingDecision[源代码]¶
基类:
NamedTuple
DagRun.task_instance_scheduling_decisions 的返回类型。
- schedulable_tis: list[airflow.models.taskinstance.TaskInstance][源代码]¶
- unfinished_tis: list[airflow.models.taskinstance.TaskInstance][源代码]¶
- finished_tis: list[airflow.models.taskinstance.TaskInstance][源代码]¶
- class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, queued_at=NOTSET, execution_date=None, start_date=None, external_trigger=None, conf=None, state=None, run_type=None, dag_hash=None, creating_job_id=None, data_interval=None)[源代码]¶
基类:
airflow.models.base.Base
,airflow.utils.log.logging_mixin.LoggingMixin
DAG 的调用实例。
DAG 运行可以通过调度器创建(即计划运行),也可以通过外部触发器创建(即手动运行)。
- property logical_date: datetime.datetime[源代码]¶
- set_state(state)[源代码]¶
更改 DagRan 的状态。
属性的更改根据下表实现(行表示旧状态,列表示新状态)
¶ 已排队
正在运行
成功
失败
无
queued_at = timezone.utcnow()
如果为空: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
已排队
queued_at = timezone.utcnow()
如果为空: start_date = timezone.utcnow() end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
正在运行
queued_at = timezone.utcnow() start_date = None end_date = None
end_date = timezone.utcnow()
end_date = timezone.utcnow()
成功
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
失败
queued_at = timezone.utcnow() start_date = None end_date = None
start_date = timezone.utcnow() end_date = None
- refresh_from_db(session=NEW_SESSION)[源代码]¶
从数据库重新加载当前 dagrun。
- 参数
session (sqlalchemy.orm.Session) – 数据库会话
- classmethod active_runs_of_dags(dag_ids=None, only_running=False, session=NEW_SESSION)[源代码]¶
获取每个 DAG 的活跃 DAG 运行次数。
- classmethod next_dagruns_to_examine(state, session, max_number=None)[source]¶
返回调度器应尝试调度的下一个 DagRun。
这将返回零个或多个 DagRun 行,这些行使用 “SELECT … FOR UPDATE” 查询进行行级锁定。您应确保在单个事务中做出所有调度决策,因为一旦事务提交,它将被解锁。
- classmethod find(dag_id=None, run_id=None, execution_date=None, state=None, external_trigger=None, no_backfills=False, run_type=None, session=NEW_SESSION, execution_start_date=None, execution_end_date=None)[source]¶
返回符合给定搜索条件的 DAG 运行集合。
- 参数
dag_id (str | list[str] | None) – 要查找 DAG 运行的 dag_id 或 dag_id 列表
run_id (Iterable[str] | None) – 定义此 DAG 运行的运行 ID
run_type (airflow.utils.types.DagRunType | None) – DagRun 的类型
execution_date (datetime.datetime | Iterable[datetime.datetime] | None) – 执行日期
state (airflow.utils.state.DagRunState | None) – DAG 运行的状态
external_trigger (bool | None) – 此 DAG 运行是否由外部触发
no_backfills (bool) – 不返回回填 (True),返回所有 (False)。默认为 False
session (sqlalchemy.orm.Session) – 数据库会话
execution_start_date (datetime.datetime | None) – 从此日期执行的 DAG 运行
execution_end_date (datetime.datetime | None) – 执行到此日期的 DAG 运行
- classmethod find_duplicate(dag_id, run_id, execution_date, session=NEW_SESSION)[source]¶
返回具有特定 run_id 或 execution_date 的 DAG 的现有运行。
如果未找到此类 DAG 运行,则返回 None。
- 参数
dag_id (str) – 要查找重复项的 dag_id
run_id (str) – 定义此 DAG 运行的运行 ID
execution_date (datetime.datetime) – 执行日期
session (sqlalchemy.orm.Session) – 数据库会话
- static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]¶
返回此 DAG 运行的任务实例。
- get_task_instances(state=None, session=NEW_SESSION)[source]¶
返回此 DAG 运行的任务实例。
重定向到 DagRun.fetch_task_instances 方法。保留此方法,因为它在代码中被广泛使用。
- get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]¶
返回此 DAG 运行中由 task_id 指定的任务实例。
- 参数
task_id (str) – 任务 ID
session (sqlalchemy.orm.Session) – Sqlalchemy ORM 会话
- static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]¶
返回此 DAG 运行中由 task_id 指定的任务实例。
- 参数
dag_id (str) – DAG ID
dag_run_id (str) – DAG 运行 ID
task_id (str) – 任务 ID
session (sqlalchemy.orm.Session) – Sqlalchemy ORM 会话
- static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]¶
返回上一个 DagRun,如果存在。
- 参数
dag_run (DagRun | airflow.serialization.pydantic.dag_run.DagRunPydantic) – DAG 运行
session (sqlalchemy.orm.Session) – SQLAlchemy ORM 会话
状态 (airflow.utils.state.DagRunState | None) – DAG 运行状态
- 静态 get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[源代码]¶
返回上一个 SCHEDULED 的 DagRun,如果存在的话。
- 参数
dag_run_id (int) – DAG 运行 ID
session (sqlalchemy.orm.Session) – SQLAlchemy ORM 会话
- update_state(session=NEW_SESSION, execute_callbacks=True)[源代码]¶
根据其任务实例的状态确定 DagRun 的整体状态。
- 参数
session (sqlalchemy.orm.Session) – Sqlalchemy ORM 会话
execute_callbacks (bool) – 是否应直接调用 dag 回调(成功/失败、SLA 等)(默认值:true),还是将回调记录为
returned_callback
属性中的待处理请求
- 返回
包含可在当前循环中调度的 tis 以及需要执行的 returned_callback 的元组
- 返回类型
tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]
- verify_integrity(*, session=NEW_SESSION)[源代码]¶
通过检查已删除的任务或尚未在数据库中的任务来验证 DagRun。
如果需要,它会将状态设置为 removed 或添加任务。
- Missing_indexes
一个任务与缺少索引的字典。
- 参数
session (sqlalchemy.orm.Session) – Sqlalchemy ORM 会话