airflow.models.taskinstance

模块内容

TaskInstance

任务实例存储任务实例的状态。

SimpleTaskInstance

简化的任务实例。

TaskInstanceNote

用于存储有关任务实例的任意注释。

函数

set_current_context(context)

将当前执行上下文设置为提供的上下文对象。

clear_task_instances(tis, session[, ...])

清除一组任务实例,但确保正在运行的任务实例被终止。

属性

TR

log

PAST_DEPENDS_MET

TaskInstanceStateType

airflow.models.taskinstance.TR[源代码]
airflow.models.taskinstance.log[源代码]
airflow.models.taskinstance.PAST_DEPENDS_MET = 'past_depends_met'[源代码]
airflow.models.taskinstance.set_current_context(context)[源代码]

将当前执行上下文设置为提供的上下文对象。

此方法应在每次任务执行时调用一次,在调用 operator.execute 之前。

airflow.models.taskinstance.clear_task_instances(tis, session, activate_dag_runs=None, dag=None, dag_run_state=DagRunState.QUEUED)[源代码]

清除一组任务实例,但确保正在运行的任务实例被终止。

还将 Dagrun 的 state 设置为 QUEUED,并将 start_date 设置为执行时间。但仅适用于已完成的 DR(SUCCESS 和 FAILED)。不会清除正在运行的 DR(QUEUED 和 RUNNING)的 DR 的 statestart_date,因为清除已在运行的 DR 的状态是多余的,并且清除 start_date 会影响 DR 的持续时间。

参数
class airflow.models.taskinstance.TaskInstance(task, execution_date=None, run_id=None, state=None, map_index=-1)[source]

基类:airflow.models.base.Base, airflow.utils.log.logging_mixin.LoggingMixin

任务实例存储任务实例的状态。

此表是关于任务已运行状态的权威且唯一的真实来源。

SqlAlchemy 模型特意没有与任务或 DAG 模型建立 SqlAlchemy 外键,以便更好地控制事务。

此表上的数据库事务应确保防止双重触发,以及防止多个调度程序可能触发任务实例时,对哪些任务实例准备好运行产生任何混淆。

map_index 中的值 -1 表示以下任何一种情况:未映射任务的 TI;具有尚未扩展的映射任务的 TI(状态=pending);具有扩展到空列表的映射任务的 TI(状态=skipped)。

property stats_tags: dict[str, str][source]

返回任务实例标签。

property next_try_number: int[source]
property operator_name: str | None[source]

@property:如果已设置,则使用更友好的运算符显示名称。

property log_url: str[source]

TaskInstance 的日志 URL。

property mark_success_url: str[source]

用于标记 TI 成功的 URL。

property key: airflow.models.taskinstancekey.TaskInstanceKey[source]

返回一个唯一标识任务实例的元组。

property is_premature: bool[source]

返回任务是否处于 UP_FOR_RETRY 状态且其重试间隔是否已过。

property previous_ti: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[source]

此属性已弃用。

请使用 airflow.models.taskinstance.TaskInstance.get_previous_ti

property previous_ti_success: TaskInstance | airflow.serialization.pydantic.taskinstance.TaskInstancePydantic | None[source]

此属性已弃用。

请使用 airflow.models.taskinstance.TaskInstance.get_previous_ti

property previous_start_date_success: pendulum.DateTime | None[source]

此属性已弃用。

请使用 airflow.models.taskinstance.TaskInstance.get_previous_start_date

__tablename__ = 'task_instance'[source]
task_id[source]
dag_id[source]
run_id[source]
map_index[source]
start_date[source]
end_date[source]
duration[source]
state[source]
try_number[source]
max_tries[source]
hostname[source]
unixname[source]
job_id[source]
pool[source]
pool_slots[source]
queue[source]
priority_weight[source]
operator[source]
custom_operator_name[source]
queued_dttm[source]
queued_by_job_id[source]
pid[source]
executor[source]
executor_config[source]
updated_at[source]
rendered_map_index[source]
external_executor_id[source]
trigger_id[source]
trigger_timeout[source]
next_method[source]
next_kwargs[source]
__table_args__ = ()[source]
dag_model: airflow.models.dag.DagModel[source]
trigger[source]
triggerer_job[source]
dag_run[source]
rendered_task_instance_fields[source]
execution_date[source]
task_instance_note[source]
note[source]
task: airflow.models.operator.Operator | None[source]
test_mode: bool = False[source]
is_trigger_log_context: bool = False[source]
run_as_user: str | None[source]
__hash__()[source]

返回 hash(self)。

init_on_load()[source]

初始化未存储在数据库中的属性。

task_display_name()[source]
command_as_list(mark_success=False, ignore_all_deps=False, ignore_task_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_ti_state=False, local=False, pickle_id=None, raw=False, job_id=None, pool=None, cfg_path=None)[source]

返回一个可以在任何安装了 Airflow 的地方执行的命令。

此命令是编排器发送给执行器的消息的一部分。

static generate_command(dag_id, task_id, run_id, mark_success=False, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, local=False, pickle_id=None, file_path=None, raw=False, job_id=None, pool=None, cfg_path=None, map_index=-1)[source]

生成执行此任务实例所需的 shell 命令。

参数
  • dag_id (str) – DAG ID

  • task_id (str) – 任务 ID

  • run_id (str) – 此任务的 DagRun 的 run_id

  • mark_success (bool) – 是否将任务标记为成功

  • ignore_all_deps (bool) – 忽略所有可忽略的依赖项。覆盖其他 ignore_* 参数。

  • ignore_depends_on_past (bool) – 忽略 DAG 的 depends_on_past 参数(例如,对于回填)

  • wait_for_past_depends_before_skipping (bool) – 在将 ti 标记为跳过之前等待过去的依赖项

  • ignore_task_deps (bool) – 忽略特定于任务的依赖项,例如 depends_on_past 和触发规则

  • ignore_ti_state (bool) – 忽略任务实例之前的失败/成功

  • local (bool) – 是否在本地运行任务

  • pickle_id (int | None) – 如果 DAG 已序列化到数据库,则为与 pickle DAG 关联的 ID

  • file_path (pathlib.PurePath | str | None) – 包含 DAG 定义的文件的路径

  • raw (bool) – 原始模式(需要更多详细信息)

  • job_id (str | None) – 作业 ID(需要更多详细信息)

  • pool (str | None) – 任务应在其中运行的 Airflow 池

  • cfg_path (str | None) – 配置文件的路径

返回

可用于运行任务实例的 shell 命令

返回类型

list[str]

current_state(session=NEW_SESSION)[source]

从数据库中获取最新的状态。

如果传递了会话,我们将使用它,并且查找状态成为会话的一部分,否则将使用新会话。

这里使用 sqlalchemy.inspect 获取主键,确保如果它们发生变化,它不会退化

参数

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

error(session=NEW_SESSION)[source]

强制将任务实例的状态在数据库中设置为 FAILED。

参数

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

classmethod get_task_instance(dag_id, run_id, task_id, map_index, lock_for_update=False, session=NEW_SESSION)[source]
refresh_from_db(session=NEW_SESSION, lock_for_update=False)[source]

根据主键从数据库刷新任务实例。

参数
  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

  • lock_for_update (bool) – 如果为 True,则表示数据库应锁定 TaskInstance(发出 FOR UPDATE 子句),直到提交会话。

refresh_from_task(task, pool_override=None)[源代码]

从给定的任务复制通用属性。

参数
  • task (airflow.models.operator.Operator) – 要从中复制的任务对象

  • pool_override (str | None) – 使用 pool_override 代替任务的池

clear_xcom_data(session=NEW_SESSION)[源代码]
set_state(state, session=NEW_SESSION)[源代码]

设置 TaskInstance 状态。

参数
  • state (str | None) – 要为 TI 设置的状态

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

返回

状态是否已更改

返回类型

bool

are_dependents_done(session=NEW_SESSION)[源代码]

检查此任务实例的直接依赖项是否已成功或已跳过。

这旨在供 wait_for_downstream 使用。

当您不希望在依赖项完成之前开始处理任务的下一个计划时,这很有用。例如,如果任务删除并重新创建表。

参数

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

get_previous_dagrun(state=None, session=None)[源代码]

返回在此任务实例的 DagRun 之前运行的 DagRun。

参数
  • state (airflow.utils.state.DagRunState | None) – 如果传递,则仅考虑特定状态的实例。

  • session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM 会话。

get_previous_ti(state=None, session=NEW_SESSION)[源代码]

返回在此任务实例之前运行的任务的任务实例。

参数
  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

  • state (airflow.utils.state.DagRunState | None) – 如果传递,则仅考虑特定状态的实例。

get_previous_execution_date(state=None, session=NEW_SESSION)[源代码]

从属性 previous_ti_success 返回执行日期。

参数
  • state (airflow.utils.state.DagRunState | None) – 如果传递,则仅考虑特定状态的实例。

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

get_previous_start_date(state=None, session=NEW_SESSION)[源代码]

从属性 previous_ti_success 返回开始日期。

参数
  • state (airflow.utils.state.DagRunState | None) – 如果传递,则仅考虑特定状态的实例。

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

are_dependencies_met(dep_context=None, session=NEW_SESSION, verbose=False)[源代码]

给定依赖项的上下文,是否满足此任务实例运行的所有条件。

(例如,从 UI 强制运行的任务实例将忽略某些依赖项)。

参数
  • dep_context (airflow.ti_deps.dep_context.DepContext | None) – 确定应评估的依赖项的执行上下文。

  • session (sqlalchemy.orm.session.Session) – 数据库会话

  • verbose (bool) – 是否在 info 或 debug 日志级别记录有关失败依赖项的详细信息

get_failed_dep_statuses(dep_context=None, session=NEW_SESSION)[源代码]

获取失败的依赖项。

__repr__()[源代码]

返回 repr(self)。

next_retry_datetime()[源代码]

如果任务实例失败,则获取下一次重试的日期时间。

对于指数退避,retry_delay 用作基数,并将转换为秒。

ready_for_retry()[源代码]

检查任务实例是否处于正确的状态和时间范围内以进行重试。

get_dagrun(session=NEW_SESSION)[源代码]

返回此 TaskInstance 的 DagRun。

参数

session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

返回

DagRun

返回类型

airflow.models.dagrun.DagRun

classmethod ensure_dag(task_instance, session=NEW_SESSION)[源代码]

确保任务具有关联的 dag 对象,该对象可能已被序列化删除。

check_and_change_state_before_execution(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, external_executor_id=None, session=NEW_SESSION)[source]
emit_state_change_metric(new_state)[source]

发送一个时间指标,表示给定状态转换所花费的时间。

之前的状态和指标名称是从任务所处的状态推断出来的。

参数

new_state (airflow.utils.state.TaskInstanceState) – 为此任务刚刚设置的状态。我们不使用 self.state,因为有时状态是直接在数据库中更新的,而不是在本地 TaskInstance 对象中更新的。支持的状态:QUEUED 和 RUNNING

clear_next_method_args()[source]

确保我们取消设置 next_method 和 next_kwargs,以确保任何重试都不会重用它们。

defer_task(exception, session=NEW_SESSION)[source]

将任务标记为延迟,并设置在引发 TaskDeferred 时恢复任务所需的触发器。

run(verbose=True, ignore_all_deps=False, ignore_depends_on_past=False, wait_for_past_depends_before_skipping=False, ignore_task_deps=False, ignore_ti_state=False, mark_success=False, test_mode=False, job_id=None, pool=None, session=NEW_SESSION, raise_on_defer=False)[source]

运行 TaskInstance。

dry_run()[source]

仅渲染 TI 的模板。

classmethod fetch_handle_failure_context(ti, error, test_mode=None, context=None, force_fail=False, *, session, fail_stop=False)[source]

处理 TaskInstance 的失败。

参数

fail_stop (bool) – 如果为 true,则停止 DAG 中剩余的任务

static save_to_db(ti, session=NEW_SESSION)[source]
handle_failure(error, test_mode=None, context=None, force_fail=False, session=NEW_SESSION)[source]

处理任务实例的失败。

参数
  • error (None | str | BaseException) – 如果指定,则记录抛出的特定异常

  • session (sqlalchemy.orm.session.Session) – SQLAlchemy ORM 会话

  • test_mode (bool | None) – 如果为 True,则不在数据库中记录成功或失败

  • context (airflow.utils.context.Context | None) – Jinja2 上下文

  • force_fail (bool) – 如果为 True,则任务不会重试

is_eligible_to_retry()[source]

任务实例是否有资格进行重试。

get_template_context(session=None, ignore_param_exceptions=True)[source]

返回 TI 上下文。

参数
  • session (sqlalchemy.orm.session.Session | None) – SQLAlchemy ORM 会话

  • ignore_param_exceptions (bool) – 在初始化 ParamsDict 时,抑制值异常的标志

get_rendered_template_fields(session=NEW_SESSION)[source]

更新任务,使其包含 UI 中展示的渲染后的模板字段。

如果任务已经运行,将从数据库中获取;否则将进行渲染。

overwrite_params_with_dag_run_conf(params, dag_run)[source]

使用 DagRun.conf 覆盖任务参数。

render_templates(context=None, jinja_env=None)[source]

渲染操作符字段中的模板。

如果任务最初是映射的,这可能会将 self.task 替换为未映射的、完全渲染的 BaseOperator。 原始替换前的 self.task 会被返回。

render_k8s_pod_yaml()[source]

渲染 k8s pod yaml。

get_rendered_k8s_spec(session=NEW_SESSION)[source]

渲染 k8s pod yaml。

get_email_subject_content(exception, task=None)[source]

获取异常的电子邮件主题内容。

参数
  • exception (BaseException) – 电子邮件中发送的异常

  • task (airflow.models.baseoperator.BaseOperator | None) –

email_alert(exception, task)[source]

发送包含异常信息的警报电子邮件。

参数
  • exception – 异常

  • task (airflow.models.baseoperator.BaseOperator) – 与异常相关的任务

set_duration()[source]

设置任务实例的持续时间。

xcom_push(key, value, execution_date=None, session=NEW_SESSION)[source]

使 XCom 可供任务拉取。

参数
  • key (str) – 存储值的键。

  • value (Any) – 要存储的值。可能的类型取决于 enable_xcom_pickling 是否为 true。 如果为 true,则可以是任何可 pickling 的对象;否则只能使用 JSON 可序列化的对象。

  • execution_date (datetime.datetime | None) – 已弃用的参数,不起作用。

xcom_pull(task_ids=None, dag_id=None, key=XCOM_RETURN_KEY, include_prior_dates=False, session=NEW_SESSION, *, map_indexes=None, default=None)[source]

拉取可选地满足某些条件的 XCom。

参数
  • key (str) – XCom 的键。如果提供,则只返回具有匹配键的 XCom。默认键为 'return_value',也可以作为常量 XCOM_RETURN_KEY 使用。 此键会自动赋给由任务返回的 XCom(与手动推送相反)。要移除筛选器,请传递 None

  • task_ids (str | Iterable[str] | None) – 只会拉取来自具有匹配 ID 的任务的 XCom。传递 None 可移除筛选器。

  • dag_id (str | None) – 如果提供,则只会从这个 DAG 中拉取 XCom。如果为 None(默认值),则会使用调用任务的 DAG。

  • map_indexes (int | Iterable[int] | None) – 如果提供,则只拉取具有匹配索引的 XCom。如果为 None(默认值),则会从正在拉取的任务中推断出来(详情见下文)。

  • include_prior_dates (bool) – 如果为 False,则只返回当前 execution_date 的 XCom。如果为 True,则还会返回之前日期的 XCom。

当拉取一个单独的任务(task_idNone 或字符串)而没有指定 map_indexes 时,返回值会根据指定的任务是否被映射而推断出来。 如果未映射,则返回来自一个单独任务实例的值。 如果要拉取的任务是映射的,则会返回一个迭代器(不是列表),该迭代器产生来自映射任务实例的 XCom。 在这两种情况下,如果找不到匹配的 XCom,则会返回 default(如果未指定,则为 None)。

当拉取多个任务时(即 task_idmap_index 为非字符串可迭代对象),会返回一个匹配 XCom 的列表。列表中元素的顺序由 task_idmap_index 中项目的顺序确定。

get_num_running_task_instances(session, same_dagrun=False)[source]

从数据库返回正在运行的 TI 的数量。

init_run_context(raw=False)[source]

设置日志上下文。

static filter_for_tis(tis)[source]

返回 SQLAlchemy 过滤器,用于查询选定的任务实例。

schedule_downstream_tasks(session=NEW_SESSION, max_tis_per_query=None)[source]

调度此任务实例的下游任务。

get_relevant_upstream_map_indexes(upstream, ti_count, *, session)[source]

推断与此任务实例相关的上游任务的映射索引。

大部分逻辑主要用于解决以下示例描述的问题,其中“val”必须根据引用位置解析为不同的值

@task
def this_task(v):  # This is self.task.
    return v * 2


@task_group
def tg1(inp):
    val = upstream(inp)  # This is the upstream task.
    this_task(val)  # When inp is 1, val here should resolve to 2.
    return val


# This val is the same object returned by tg1.
val = tg1.expand(inp=[1, 2, 3])


@task_group
def tg2(inp):
    another_task(inp, val)  # val here should resolve to [2, 4, 6].


tg2.expand(inp=["a", "b"])

检查 upstreamself.task 周围的映射任务组,以找到一个共同的“祖先”。如果找到这样的祖先,我们需要返回特定的映射索引,以便从上游 XCom 中提取部分值。

参数
  • upstream (airflow.models.operator.Operator) – 引用的上游任务。

  • ti_count (int | None) – 此任务由调度程序扩展的任务实例总数,即模板上下文中的 expanded_ti_count

返回

要提取的特定映射索引或映射索引,如果我们要提取“完整”返回值(即不涉及映射任务组),则为 None

返回类型

int | range | None

airflow.models.taskinstance.TaskInstanceStateType[source]
class airflow.models.taskinstance.SimpleTaskInstance(dag_id, task_id, run_id, start_date, end_date, try_number, map_index, state, executor, executor_config, pool, queue, key, run_as_user=None, priority_weight=None)[source]

简化的任务实例。

用于通过队列在进程之间发送数据。

__repr__()[source]

返回 repr(self)。

__eq__(other)[source]

返回 self==value。

as_dict()[source]
classmethod from_ti(ti)[source]
classmethod from_dict(obj_dict)[source]
class airflow.models.taskinstance.TaskInstanceNote(content, user_id=None)[source]

基类:airflow.models.base.TaskInstanceDependencies

用于存储有关任务实例的任意注释。

__tablename__ = 'task_instance_note'[source]
user_id[source]
task_id[source]
dag_id[source]
run_id[source]
map_index[source]
content[source]
created_at[source]
updated_at[source]
task_instance[source]
__table_args__ = ()[source]
__repr__()[source]

返回 repr(self)。

此条目是否有帮助?