模板参考¶
变量、宏和过滤器可以在模板中使用(参阅 Jinja 模板化 部分)
以下是 Airflow 开箱即用的功能。可以通过 插件 全局添加额外的自定义宏,或通过 DAG.user_defined_macros
参数在 DAG 级别添加。
变量¶
Airflow 引擎默认传递一些在所有模板中都可访问的变量
变量 |
类型 |
描述 |
---|---|---|
|
数据区间的开始。版本 2.2 中添加。 |
|
|
数据区间的结束。版本 2.2 中添加。 |
|
|
逻辑上标识当前 DAG 运行的日期时间。此值不包含任何语义,仅用于标识。
如果您想要具有实际语义的值,请改用
data_interval_start 和 data_interval_end ,例如,根据时间戳从数据库获取一部分行。
|
|
|
None | str | Exception KeyboardInterrupt |
运行任务实例时发生错误。
|
|
pendulum.DateTime | |
前一个成功
DagRun 的数据区间开始。版本 2.2 中添加。
|
|
pendulum.DateTime | |
前一个成功
DagRun 的数据区间结束。版本 2.2 中添加。
|
|
pendulum.DateTime | |
前一个成功 |
|
pendulum.DateTime | |
前一个成功 |
|
当前任务开始的日期时间。 |
|
|
list |
任务上声明的输入列表。 |
|
dict[str, …] |
访问输入资产的过去事件。参阅 资产。版本 2.10 中添加。 |
|
list |
任务上声明的输出列表。 |
|
dict[str, …] |
用于将信息附加到当前任务将发出的资产事件的访问器。
参阅 资产。版本 2.10 中添加。
|
|
DAG |
|
|
BaseOperator |
当前正在运行的
BaseOperator 。您可以在 Operator 部分阅读更多关于任务的信息 |
|
int |
当前任务已被重新调度了多少次。与 |
|
对宏包的引用。参阅下方的 宏。
|
|
|
TaskInstance |
当前正在运行的 |
|
TaskInstance |
与 |
|
dict[str, Any] |
用户定义的参数。这可以被传递给
trigger_dag -c 的映射覆盖,如果 dag_run_conf_overrides_params 在
airflow.cfg 中启用。 |
|
Airflow 变量。参阅下方的 模板中的 Airflow 变量。 |
|
|
Airflow 变量。参阅下方的 模板中的 Airflow 变量。 |
|
|
Airflow 连接。参阅下方的 模板中的 Airflow 连接。 |
|
|
str |
任务实例的唯一、人类可读的键。格式为
{dag_id}__{task_id}__{ds_nodash} . |
|
str |
当前正在运行的 |
|
DagRun |
当前正在运行的 |
|
bool |
任务实例是否由 |
|
None | str |
用于渲染映射任务的扩展任务实例的模板。设置此值将反映在渲染结果中。 |
|
int | |
映射任务扩展成的任务实例数量。如果
当前任务未被映射,此值应为
None 。版本 2.5 中添加。
|
|
dict[str, list[AssetEvent]] |
以下变量仅在 DagRun 具有 logical_date
时可用
变量 |
类型 |
描述 |
---|---|---|
|
str |
DAG 运行的逻辑日期,格式为
YYYY-MM-DD 。与
{{ logical_date | ds }} 相同。 |
|
str |
与 |
|
str |
与
{{ logical_date | ts }} 相同。示例:
2018-01-01T00:00:00+00:00 。 |
|
str |
与
{{ logical_date | ts_nodash_with_tz }} 相同。示例:
20180101T000000+0000 。 |
|
str |
与
{{ logical_date | ts_nodash }} 相同。示例:
20180101T000000 。 |
注意
DAG 运行的逻辑日期以及从中派生的值(例如 ds
和 ts
)不应在 DAG 中视为唯一。请改用 run_id
。
从 TaskFlow 任务访问 Airflow 上下文变量¶
虽然使用 @task
装饰的任务不支持渲染作为参数传递的 Jinja 模板,但所有上述变量都可以直接从任务中访问。以下代码块是从其任务访问 task_instance
对象的示例
from airflow.models.taskinstance import TaskInstance from airflow.models.dagrun import DagRun @task def print_ti_info(task_instance: TaskInstance, dag_run: DagRun): print(f"Run ID: {task_instance.run_id}") # Run ID: scheduled__2023-08-09T00:00:00+00:00 print(f"Duration: {task_instance.duration}") # Duration: 0.972019 print(f"DAG Run queued at: {dag_run.queued_at}") # 2023-08-10 00:00:01+02:20
请注意,您可以使用简单的点表示法访问对象的属性和方法。以下是一些可能的示例: {{ task.owner }}
, {{ task.task_id }}
, {{ ti.hostname }}
, … 有关对象属性和方法的更多信息,请参阅 模型文档。
模板中的 Airflow 变量¶
var
模板变量允许您访问 Airflow 变量。您可以以纯文本或 JSON 格式访问它们。如果您使用 JSON,您还可以遍历嵌套结构,例如字典: {{ var.json.my_dict_var.key1 }}
。
如果需要(例如您的变量键包含点),也可以通过字符串获取变量,例如: {{ var.value.get('my.var', 'fallback') }}
或 {{ var.json.get('my.dict.var', {'key1': 'val1'}) }}
。如果变量不存在,可以提供默认值。
模板中的 Airflow 连接¶
类似地,Airflow 连接数据可以通过 conn
模板变量访问。例如,您可以在模板中使用表达式,如 {{ conn.my_conn_id.login }}
, {{ conn.my_conn_id.password }}
等。
就像使用 var
一样,可以通过字符串获取连接(例如 {{ conn.get('my_conn_id_'+index).host }}
)或提供默认值(例如 {{ conn.get('my_conn_id', {"host": "host1", "login": "user1"}).host }}
)。
此外,连接的 extras
字段可以通过 extra_dejson
字段获取为 Python 字典,例如 conn.my_aws_conn_id.extra_dejson.region_name
将从 extras
中获取 region_name
。这样,也可以为 extras
中的字段提供默认值(例如 {{ conn.my_aws_conn_id.extra_dejson.get('region_name', 'Europe (Frankfurt)') }}
)。
过滤器¶
Airflow 定义了一些 Jinja 过滤器,可用于格式化值。
例如,使用 {{ logical_date | ds }}
将以 YYYY-MM-DD
格式输出 logical_date。
过滤器 |
作用于 |
描述 |
---|---|---|
|
datetime |
将 datetime 格式化为 |
|
datetime |
将 datetime 格式化为 |
|
datetime |
与 |
|
datetime |
与 |
|
datetime |
与 |
宏¶
宏是一种将对象暴露给模板的方式,存在于模板的 macros
命名空间下。
一些常用的库和方法已可用。
变量 |
描述 |
---|---|
|
标准库的 |
|
标准库的 |
|
对 |
|
标准库的 |
|
标准库的 |
|
标准库的 |
还定义了一些 Airflow 特定的宏
- airflow.macros.random() x 在 区间 [0, 1) 内。 ¶