模板参考¶
变量、宏和过滤器可以在模板中使用(请参阅Jinja 模板部分)
以下内容是 Airflow 开箱即用的。可以通过插件全局添加其他自定义宏,或者通过 DAG.user_defined_macros
参数在 DAG 级别添加。
变量¶
Airflow 引擎默认传递一些变量,这些变量可以在所有模板中访问
变量 |
类型 |
描述 |
---|---|---|
|
数据间隔的开始时间。在 2.2 版本中添加。 |
|
|
数据间隔的结束时间。在 2.2 版本中添加。 |
|
|
逻辑上标识当前 DAG 运行的日期时间。此值不包含任何语义,而只是一个用于标识的值。
如果要获取具有实际语义的值,请使用
data_interval_start 和 data_interval_end ,例如,根据时间戳从数据库中获取行切片。
|
|
|
str |
DAG 运行的逻辑日期,格式为
YYYY-MM-DD 。与
{{ logical_date | ds }} 相同。 |
|
str |
与 |
|
None | str | 异常 KeyboardInterrupt |
运行任务实例时发生错误。
|
|
str |
与
{{ logical_date | ts }} 相同。示例:
2018-01-01T00:00:00+00:00 。 |
|
str |
与
{{ logical_date | ts_nodash_with_tz }} 相同。示例:
20180101T000000+0000 。 |
|
str |
与
{{ logical_date | ts_nodash }} 相同。示例:
20180101T000000 。 |
|
pendulum.DateTime | |
先前成功
DagRun 的数据间隔的开始时间。在 2.2 版本中添加。
|
|
pendulum.DateTime | |
先前成功
DagRun 的数据间隔的结束时间。在 2.2 版本中添加。
|
|
pendulum.DateTime | |
先前成功 |
|
pendulum.DateTime | |
先前成功 |
|
list |
任务上声明的入口列表。 |
|
dict[str, …] |
访问入口数据集的过去事件。请参阅数据集。在 2.10 版本中添加。 |
|
list |
任务上声明的出口列表。 |
|
dict[str, …] |
用于将信息附加到当前任务将发出的数据集事件的访问器。
请参阅数据集。在 2.10 版本中添加。
|
|
DAG |
|
|
BaseOperator |
当前正在运行的
BaseOperator 。您可以在运算符中阅读有关任务的更多信息 |
|
对宏包的引用。请参阅下面的宏。
|
|
|
TaskInstance |
当前正在运行的 |
|
TaskInstance |
与 |
|
dict[str, Any] |
用户定义的参数。如果
dag_run_conf_overrides_params 在
airflow.cfg 中启用,则可以通过传递给 trigger_dag -c 的映射覆盖此参数。在
airflow.cfg 中启用。 |
|
Airflow 变量。请参阅下面的模板中的 Airflow 变量。 |
|
|
Airflow 变量。请参阅下面的模板中的 Airflow 变量。 |
|
|
Airflow 连接。请参阅下面的模板中的 Airflow 连接。 |
|
|
str |
任务实例的唯一、人类可读的键。格式为
{dag_id}__{task_id}__{ds_nodash} . |
|
AirflowConfigParser |
表示您的
airflow.cfg 内容的完整配置对象。请参阅 airflow.configuration.conf 。airflow.cfg 。 |
|
str |
当前正在运行的 |
|
DagRun |
当前正在运行的 |
|
bool |
任务实例是否由 |
|
None | str |
用于呈现映射任务的展开任务实例的模板。设置此值将反映在呈现的结果中。 |
|
int | |
映射任务展开为的任务实例数。如果
当前任务未映射,则应为
None 。在 2.5 版本中添加。
|
|
dict[str, list[DatasetEvent]] |
如果在数据集调度 DAG 中,则将数据集 URI 映射到触发
DatasetEvent 的列表(如果存在多个具有不同频率的数据集,则可能存在多个)。
在此处阅读更多信息 数据集。
在 2.4 版本中添加。
|
注意
DAG 运行的逻辑日期以及由此派生的值,例如 ds
和 ts
,不应被视为 DAG 中的唯一标识。请改用 run_id
。
从 TaskFlow 任务访问 Airflow 上下文变量¶
虽然使用 @task
装饰的任务不支持渲染作为参数传递的 Jinja 模板,但上述所有变量都可以直接从任务中访问。以下代码块示例展示了如何从任务访问 task_instance
对象。
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(task_instance: TaskInstance | None = None, dag_run: DagRun | None = None): print(f"Run ID: {task_instance.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {task_instance.duration}") # Duration: 0.972019 print(f"DAG Run queued at: {dag_run.queued_at}") # 2023-08-10 00:00:01+02:20
已弃用的变量¶
以下变量已弃用。保留它们是为了向后兼容,但您应该将现有代码转换为使用其他变量。
已弃用的变量 |
描述 |
---|---|
|
执行日期(逻辑日期),与 |
|
下一个计划运行的逻辑日期(如果适用);您可以使用 |
|
下一个执行日期,格式为 |
|
下一个执行日期,格式为 |
|
上一个计划运行的逻辑日期(如果适用) |
|
上一个执行日期,格式为 |
|
上一个执行日期,格式为 |
|
执行日期的前一天,格式为 |
|
执行日期的前一天,格式为 |
|
执行日期的后一天,格式为 |
|
执行日期的后一天,格式为 |
|
先前成功 DAG 运行的执行日期;如果您用于 DAG 的时间表/计划定义了与旧版 |
请注意,您可以使用简单的点表示法访问对象的属性和方法。以下是一些可能的示例:{{ task.owner }}
、{{ task.task_id }}
、{{ ti.hostname }}
,… 有关对象属性和方法的更多信息,请参阅模型文档。
模板中的 Airflow 变量¶
var
模板变量允许您访问 Airflow 变量。您可以将其作为纯文本或 JSON 访问。如果您使用 JSON,您还可以遍历嵌套结构,例如字典:{{ var.json.my_dict_var.key1 }}
。
如果需要,也可以使用字符串获取变量(例如,您的变量键包含点),方法是使用 {{ var.value.get('my.var', 'fallback') }}
或 {{ var.json.get('my.dict.var', {'key1': 'val1'}) }}
。如果变量不存在,可以提供默认值。
模板中的 Airflow 连接¶
类似地,可以通过 conn
模板变量访问 Airflow 连接数据。例如,您可以在模板中使用如下表达式:{{ conn.my_conn_id.login }}
、{{ conn.my_conn_id.password }}
等。
就像使用 var
一样,可以使用字符串获取连接(例如 {{ conn.get('my_conn_id_'+index).host }}
)或提供默认值(例如 {{ conn.get('my_conn_id', {"host": "host1", "login": "user1"}).host }}
)。
此外,可以使用 extra_dejson
字段将连接的 extras
字段作为 Python 字典获取,例如 conn.my_aws_conn_id.extra_dejson.region_name
将从 extras
中获取 region_name
。这样,也可以在 extras
中提供默认值(例如 {{ conn.my_aws_conn_id.extra_dejson.get('region_name', 'Europe (Frankfurt)') }}
)。
过滤器¶
Airflow 定义了一些 Jinja 过滤器,可用于格式化值。
例如,使用 {{ logical_date | ds }}
将以 YYYY-MM-DD
格式输出 logical_date。
过滤器 |
操作于 |
描述 |
---|---|---|
|
datetime |
将 datetime 格式化为 |
|
datetime |
将 datetime 格式化为 |
|
datetime |
与 |
|
datetime |
与 |
|
datetime |
与 |
宏¶
宏是将对象公开到模板并在模板中的 macros
命名空间下使用的工具。
提供了一些常用的库和方法。
变量 |
描述 |
---|---|
|
标准库的 |
|
标准库的 |
|
对 |
|
标准库的 |
|
标准库的 |
|
标准库的 |
还定义了一些 Airflow 特有的宏
- airflow.macros.datetime_diff_for_humans(dt, since=None)[源代码]¶
返回 datetime 之间的人类可读/近似差异。
如果仅提供一个 datetime,则比较将基于当前时间。
- 参数
dt (Any) – 要显示差异的 datetime
since (DateTime | None) – 从哪个时间显示日期。如果为
None
,则差异为dt
和当前时间之间的差异。
- airflow.macros.ds_add(ds, days)[源代码]¶
从 YYYY-MM-DD 添加或减去天数。
>>> ds_add("2015-01-01", 5) '2015-01-06' >>> ds_add("2015-01-06", -5) '2015-01-01'
- airflow.macros.ds_format(ds, input_format, output_format)[源代码]¶
以给定格式输出日期时间字符串。
- 参数
>>> ds_format("2015-01-01", "%Y-%m-%d", "%m-%d-%y") '01-01-15' >>> ds_format("1/5/2015", "%m/%d/%Y", "%Y-%m-%d") '2015-01-05' >>> ds_format("12/07/2024", "%d/%m/%Y", "%A %d %B %Y", "en_US") 'Friday 12 July 2024'
- airflow.macros.ds_format_locale(ds, input_format, output_format, locale=None)[源代码]¶
以给定的 Babel 格式输出本地化的日期时间字符串。
- 参数
>>> ds_format("2015-01-01", "%Y-%m-%d", "MM-dd-yy") '01-01-15' >>> ds_format("1/5/2015", "%m/%d/%Y", "yyyy-MM-dd") '2015-01-05' >>> ds_format("12/07/2024", "%d/%m/%Y", "EEEE dd MMMM yyyy", "en_US") 'Friday 12 July 2024'
2.10.0 版本新增。
- airflow.macros.random() x 在区间 [0, 1) 内。 ¶