airflow.models.dagrun

模块内容

TISchedulingDecision

DagRun.task_instance_scheduling_decisions 的返回类型。

DagRun

DAG 的调用实例。

DagRunNote

用于存储关于 dagrun 实例的任意注释。

属性

CreatedTasks

RUN_ID_REGEX

airflow.models.dagrun.CreatedTasks[源代码]
airflow.models.dagrun.RUN_ID_REGEX = '^(?:manual|scheduled|dataset_triggered)__(?:\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\+00:00)$'[源代码]
class airflow.models.dagrun.TISchedulingDecision[源代码]

基类:NamedTuple

DagRun.task_instance_scheduling_decisions 的返回类型。

tis: list[airflow.models.taskinstance.TaskInstance][源代码]
schedulable_tis: list[airflow.models.taskinstance.TaskInstance][源代码]
changed_tis: bool[源代码]
unfinished_tis: list[airflow.models.taskinstance.TaskInstance][源代码]
finished_tis: list[airflow.models.taskinstance.TaskInstance][源代码]
class airflow.models.dagrun.DagRun(dag_id=None, run_id=None, queued_at=NOTSET, execution_date=None, start_date=None, external_trigger=None, conf=None, state=None, run_type=None, dag_hash=None, creating_job_id=None, data_interval=None)[源代码]

基类:airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

DAG 的调用实例。

DAG 运行可以通过调度器创建(即计划运行),也可以通过外部触发器创建(即手动运行)。

property stats_tags: dict[str, str][源代码]
property logical_date: datetime.datetime[源代码]
property state[源代码]
property is_backfill: bool[源代码]
__tablename__ = 'dag_run'[源代码]
id[源代码]
dag_id[源代码]
queued_at[源代码]
execution_date[源代码]
start_date[源代码]
end_date[源代码]
run_id[源代码]
creating_job_id[源代码]
external_trigger[源代码]
run_type[源代码]
conf[源代码]
data_interval_start[源代码]
data_interval_end[源代码]
last_scheduling_decision[源代码]
dag_hash[源代码]
log_template_id[源代码]
updated_at[源代码]
clear_number[源代码]
__table_args__ = ()[源代码]
task_instances[源代码]
dag_model[源代码]
dag_run_note[源代码]
note[源代码]
DEFAULT_DAGRUNS_TO_EXAMINE[源代码]
__repr__()[源代码]

返回 repr(self)。

validate_run_id(key, run_id)[源代码]
get_state()[源代码]
set_state(state)[源代码]

更改 DagRan 的状态。

属性的更改根据下表实现(行表示旧状态,列表示新状态)

状态转换矩阵

已排队

正在运行

成功

失败

queued_at = timezone.utcnow()

如果为空: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

已排队

queued_at = timezone.utcnow()

如果为空: start_date = timezone.utcnow() end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

正在运行

queued_at = timezone.utcnow() start_date = None end_date = None

end_date = timezone.utcnow()

end_date = timezone.utcnow()

成功

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

失败

queued_at = timezone.utcnow() start_date = None end_date = None

start_date = timezone.utcnow() end_date = None

refresh_from_db(session=NEW_SESSION)[源代码]

从数据库重新加载当前 dagrun。

参数

session (sqlalchemy.orm.Session) – 数据库会话

classmethod active_runs_of_dags(dag_ids=None, only_running=False, session=NEW_SESSION)[源代码]

获取每个 DAG 的活跃 DAG 运行次数。

classmethod next_dagruns_to_examine(state, session, max_number=None)[source]

返回调度器应尝试调度的下一个 DagRun。

这将返回零个或多个 DagRun 行,这些行使用 “SELECT … FOR UPDATE” 查询进行行级锁定。您应确保在单个事务中做出所有调度决策,因为一旦事务提交,它将被解锁。

classmethod find(dag_id=None, run_id=None, execution_date=None, state=None, external_trigger=None, no_backfills=False, run_type=None, session=NEW_SESSION, execution_start_date=None, execution_end_date=None)[source]

返回符合给定搜索条件的 DAG 运行集合。

参数
  • dag_id (str | list[str] | None) – 要查找 DAG 运行的 dag_id 或 dag_id 列表

  • run_id (Iterable[str] | None) – 定义此 DAG 运行的运行 ID

  • run_type (airflow.utils.types.DagRunType | None) – DagRun 的类型

  • execution_date (datetime.datetime | Iterable[datetime.datetime] | None) – 执行日期

  • state (airflow.utils.state.DagRunState | None) – DAG 运行的状态

  • external_trigger (bool | None) – 此 DAG 运行是否由外部触发

  • no_backfills (bool) – 不返回回填 (True),返回所有 (False)。默认为 False

  • session (sqlalchemy.orm.Session) – 数据库会话

  • execution_start_date (datetime.datetime | None) – 从此日期执行的 DAG 运行

  • execution_end_date (datetime.datetime | None) – 执行到此日期的 DAG 运行

classmethod find_duplicate(dag_id, run_id, execution_date, session=NEW_SESSION)[source]

返回具有特定 run_id 或 execution_date 的 DAG 的现有运行。

如果未找到此类 DAG 运行,则返回 None

参数
  • dag_id (str) – 要查找重复项的 dag_id

  • run_id (str) – 定义此 DAG 运行的运行 ID

  • execution_date (datetime.datetime) – 执行日期

  • session (sqlalchemy.orm.Session) – 数据库会话

static generate_run_id(run_type, execution_date)[source]

基于运行类型和执行日期生成运行 ID。

static fetch_task_instances(dag_id=None, run_id=None, task_ids=None, state=None, session=NEW_SESSION)[source]

返回此 DAG 运行的任务实例。

get_task_instances(state=None, session=NEW_SESSION)[source]

返回此 DAG 运行的任务实例。

重定向到 DagRun.fetch_task_instances 方法。保留此方法,因为它在代码中被广泛使用。

get_task_instance(task_id, session=NEW_SESSION, *, map_index=-1)[source]

返回此 DAG 运行中由 task_id 指定的任务实例。

参数
  • task_id (str) – 任务 ID

  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM 会话

static fetch_task_instance(dag_id, dag_run_id, task_id, session=NEW_SESSION, map_index=-1)[source]

返回此 DAG 运行中由 task_id 指定的任务实例。

参数
  • dag_id (str) – DAG ID

  • dag_run_id (str) – DAG 运行 ID

  • task_id (str) – 任务 ID

  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM 会话

get_dag()[source]

返回与此 DagRun 关联的 DAG。

返回

DAG

返回类型

airflow.models.dag.DAG

static get_previous_dagrun(dag_run, state=None, session=NEW_SESSION)[source]

返回上一个 DagRun,如果存在。

参数
  • dag_run (DagRun | airflow.serialization.pydantic.dag_run.DagRunPydantic) – DAG 运行

  • session (sqlalchemy.orm.Session) – SQLAlchemy ORM 会话

  • 状态 (airflow.utils.state.DagRunState | None) – DAG 运行状态

静态 get_previous_scheduled_dagrun(dag_run_id, session=NEW_SESSION)[源代码]

返回上一个 SCHEDULED 的 DagRun,如果存在的话。

参数
  • dag_run_id (int) – DAG 运行 ID

  • session (sqlalchemy.orm.Session) – SQLAlchemy ORM 会话

update_state(session=NEW_SESSION, execute_callbacks=True)[源代码]

根据其任务实例的状态确定 DagRun 的整体状态。

参数
  • session (sqlalchemy.orm.Session) – Sqlalchemy ORM 会话

  • execute_callbacks (bool) – 是否应直接调用 dag 回调(成功/失败、SLA 等)(默认值:true),还是将回调记录为 returned_callback 属性中的待处理请求

返回

包含可在当前循环中调度的 tis 以及需要执行的 returned_callback 的元组

返回类型

tuple[list[airflow.models.taskinstance.TaskInstance], airflow.callbacks.callback_requests.DagCallbackRequest | None]

task_instance_scheduling_decisions(session=NEW_SESSION)[源代码]
notify_dagrun_state_changed(msg='')[源代码]
verify_integrity(*, session=NEW_SESSION)[源代码]

通过检查已删除的任务或尚未在数据库中的任务来验证 DagRun。

如果需要,它会将状态设置为 removed 或添加任务。

Missing_indexes

一个任务与缺少索引的字典。

参数

session (sqlalchemy.orm.Session) – Sqlalchemy ORM 会话

类方法 get_latest_runs(session=NEW_SESSION)[源代码]

返回每个 DAG 的最新 DagRun。

schedule_tis(schedulable_tis, session=NEW_SESSION, max_tis_per_query=None)[源代码]

将给定的任务实例设置为计划状态。

schedulable_tis 的每个元素都应已设置其 task 属性。

任何没有回调或出口的 EmptyOperator 都直接设置为成功状态。

所有 TI 都应属于此 DagRun,但此代码位于热路径中,因此未进行检查——调用方有责任仅使用来自单个 dag 运行的 TI 来调用此函数。

get_log_template(*, session=NEW_SESSION)[源代码]
get_log_filename_template(*, session=NEW_SESSION)[源代码]
airflow.models.dagrun.DagRunNote(content, user_id=None)[源代码]

基类:airflow.models.base.Base

用于存储关于 dagrun 实例的任意注释。

__tablename__ = 'dag_run_note'[源代码]
user_id[源代码]
dag_run_id[源代码]
content[源代码]
created_at[源代码]
updated_at[源代码]
dag_run[源代码]
__table_args__ = ()[源代码]
__repr__()[源代码]

返回 repr(self)。

此条目是否有帮助?