Airflow 峰会 2025 将于 10 月 07-09 日举行。立即注册享早鸟票优惠!

模板参考

变量、宏和过滤器可以在模板中使用(参阅 Jinja 模板化 部分)

以下是 Airflow 开箱即用的功能。可以通过 插件 全局添加额外的自定义宏,或通过 DAG.user_defined_macros 参数在 DAG 级别添加。

变量

Airflow 引擎默认传递一些在所有模板中都可访问的变量

变量

类型

描述

{{ data_interval_start }}

pendulum.DateTime

数据区间的开始。版本 2.2 中添加。

{{ data_interval_end }}

pendulum.DateTime

数据区间的结束。版本 2.2 中添加。

{{ logical_date }}

pendulum.DateTime

逻辑上标识当前 DAG 运行的日期时间。此值不包含任何语义,仅用于标识。
如果您想要具有实际语义的值,请改用 data_interval_startdata_interval_end
例如,根据时间戳从数据库获取一部分行。

{{ exception }}

None | str | Exception KeyboardInterrupt

运行任务实例时发生错误。


{{ prev_data_interval_start_success }}

pendulum.DateTime | None

前一个成功 DagRun 的数据区间开始。
版本 2.2 中添加。

{{ prev_data_interval_end_success }}

pendulum.DateTime | None

前一个成功 DagRun 的数据区间结束。
版本 2.2 中添加。

{{ prev_start_date_success }}

pendulum.DateTime | None

前一个成功 DagRun 的开始日期(如果可用)。

{{ prev_end_date_success }}

pendulum.DateTime | None

前一个成功 DagRun 的结束日期(如果可用)。

{{ start_date }}

pendulum.DateTime

当前任务开始的日期时间。

{{ inlets }}

list

任务上声明的输入列表。

{{ inlet_events }}

dict[str, …]

访问输入资产的过去事件。参阅 资产。版本 2.10 中添加。

{{ outlets }}

list

任务上声明的输出列表。

{{ outlet_events }}

dict[str, …]

用于将信息附加到当前任务将发出的资产事件的访问器。
参阅 资产。版本 2.10 中添加。

{{ dag }}

DAG

当前正在运行的 DAG。您可以在 DAG 部分阅读更多关于 DAG 的信息。

{{ task }}

BaseOperator

当前正在运行的 BaseOperator。您可以在 Operator 部分阅读更多关于任务的信息

{{ task_reschedule_count }}

int

当前任务已被重新调度了多少次。与 mode="reschedule" 传感器相关。

{{ macros }}

对宏包的引用。参阅下方的

{{ task_instance }}

TaskInstance

当前正在运行的 TaskInstance

{{ ti }}

TaskInstance

{{ task_instance }} 相同。

{{ params }}

dict[str, Any]

用户定义的参数。这可以被传递给
trigger_dag -c 的映射覆盖,如果 dag_run_conf_overrides_params
airflow.cfg 中启用。

{{ var.value }}

Airflow 变量。参阅下方的 模板中的 Airflow 变量

{{ var.json }}

Airflow 变量。参阅下方的 模板中的 Airflow 变量

{{ conn }}

Airflow 连接。参阅下方的 模板中的 Airflow 连接

{{ task_instance_key_str }}

str

任务实例的唯一、人类可读的键。格式为
{dag_id}__{task_id}__{ds_nodash}.

{{ run_id }}

str

当前正在运行的 DagRun 的运行 ID。

{{ dag_run }}

DagRun

当前正在运行的 DagRun

{{ test_mode }}

bool

任务实例是否由 airflow test 命令行接口运行。

{{ map_index_template }}

None | str

用于渲染映射任务的扩展任务实例的模板。设置此值将反映在渲染结果中。

{{ expanded_ti_count }}

int | None

映射任务扩展成的任务实例数量。如果
当前任务未被映射,此值应为 None
版本 2.5 中添加。

{{ triggering_asset_events }}

dict[str, list[AssetEvent]]

如果在资产调度的 DAG 中,则为资产 URI 到触发 AssetEvent 列表的映射
(可能有多个,如果存在多个具有不同频率的资产)。
在此阅读更多 资产
版本 2.4 中添加。

以下变量仅在 DagRun 具有 logical_date 时可用

变量

类型

描述

{{ ds }}

str

DAG 运行的逻辑日期,格式为 YYYY-MM-DD
{{ logical_date | ds }} 相同。

{{ ds_nodash }}

str

{{ logical_date | ds_nodash }} 相同。

{{ ts }}

str

{{ logical_date | ts }} 相同。
示例: 2018-01-01T00:00:00+00:00

{{ ts_nodash_with_tz }}

str

{{ logical_date | ts_nodash_with_tz }} 相同。
示例: 20180101T000000+0000

{{ ts_nodash }}

str

{{ logical_date | ts_nodash }} 相同。
示例: 20180101T000000

注意

DAG 运行的逻辑日期以及从中派生的值(例如 dsts)不应在 DAG 中视为唯一。请改用 run_id

从 TaskFlow 任务访问 Airflow 上下文变量

虽然使用 @task 装饰的任务不支持渲染作为参数传递的 Jinja 模板,但所有上述变量都可以直接从任务中访问。以下代码块是从其任务访问 task_instance 对象的示例

from airflow.models.taskinstance import TaskInstance
from airflow.models.dagrun import DagRun


@task
def print_ti_info(task_instance: TaskInstance, dag_run: DagRun):
    print(f"Run ID: {task_instance.run_id}")  # Run ID: scheduled__2023-08-09T00:00:00+00:00
    print(f"Duration: {task_instance.duration}")  # Duration: 0.972019
    print(f"DAG Run queued at: {dag_run.queued_at}")  # 2023-08-10 00:00:01+02:20

请注意,您可以使用简单的点表示法访问对象的属性和方法。以下是一些可能的示例: {{ task.owner }}, {{ task.task_id }}, {{ ti.hostname }}, … 有关对象属性和方法的更多信息,请参阅 模型文档

模板中的 Airflow 变量

var 模板变量允许您访问 Airflow 变量。您可以以纯文本或 JSON 格式访问它们。如果您使用 JSON,您还可以遍历嵌套结构,例如字典: {{ var.json.my_dict_var.key1 }}

如果需要(例如您的变量键包含点),也可以通过字符串获取变量,例如: {{ var.value.get('my.var', 'fallback') }}{{ var.json.get('my.dict.var', {'key1': 'val1'}) }}。如果变量不存在,可以提供默认值。

模板中的 Airflow 连接

类似地,Airflow 连接数据可以通过 conn 模板变量访问。例如,您可以在模板中使用表达式,如 {{ conn.my_conn_id.login }}, {{ conn.my_conn_id.password }} 等。

就像使用 var 一样,可以通过字符串获取连接(例如 {{ conn.get('my_conn_id_'+index).host }})或提供默认值(例如 {{ conn.get('my_conn_id', {"host": "host1", "login": "user1"}).host }})。

此外,连接的 extras 字段可以通过 extra_dejson 字段获取为 Python 字典,例如 conn.my_aws_conn_id.extra_dejson.region_name 将从 extras 中获取 region_name。这样,也可以为 extras 中的字段提供默认值(例如 {{ conn.my_aws_conn_id.extra_dejson.get('region_name', 'Europe (Frankfurt)') }})。

过滤器

Airflow 定义了一些 Jinja 过滤器,可用于格式化值。

例如,使用 {{ logical_date | ds }} 将以 YYYY-MM-DD 格式输出 logical_date。

过滤器

作用于

描述

ds

datetime

将 datetime 格式化为 YYYY-MM-DD

ds_nodash

datetime

将 datetime 格式化为 YYYYMMDD

ts

datetime

.isoformat() 相同,示例: 2018-01-01T00:00:00+00:00

ts_nodash

datetime

ts 过滤器相同,但不包含 -: 或时区信息。示例: 20180101T000000

ts_nodash_with_tz

datetime

ts 过滤器相同,但不包含 -:。示例 20180101T000000+0000

宏是一种将对象暴露给模板的方式,存在于模板的 macros 命名空间下。

一些常用的库和方法已可用。

变量

描述

macros.datetime

标准库的 datetime.datetime

macros.timedelta

标准库的 datetime.timedelta

macros.dateutil

dateutil 包的引用

macros.time

标准库的 time

macros.uuid

标准库的 uuid

macros.random

标准库的 random.random

还定义了一些 Airflow 特定的宏

airflow.macros.random() x 区间 [0, 1) 内。

此条目是否有帮助?