参数¶
参数使您能够为任务提供运行时配置。您可以在 DAG 代码中配置默认参数,并在触发 DAG 时在运行时提供其他参数或覆盖参数值。Param
值使用 JSON Schema 进行验证。对于计划的 DAG 运行,将使用默认的 Param
值。
定义的参数也用于在手动触发时呈现友好的 UI。当您手动触发 DAG 时,您可以在 dagrun 开始之前修改其参数。如果用户提供的值未通过验证,Airflow 将显示警告而不是创建 dagrun。
DAG 级别的参数¶
要将参数添加到 DAG
,请使用 params
kwarg 初始化它。使用一个字典,将参数名称映射到 Param
或指示参数默认值的对象。
from airflow import DAG
from airflow.decorators import task
from airflow.models.param import Param
with DAG(
"the_dag",
params={
"x": Param(5, type="integer", minimum=3),
"my_int_param": 6
},
) as dag:
@task.python
def example_task(params: dict):
# This will print the default value, 6:
dag.log.info(dag.params['my_int_param'])
# This will print the manually-provided value, 42:
dag.log.info(params['my_int_param'])
# This will print the default value, 5, since it wasn't provided manually:
dag.log.info(params['x'])
example_task()
if __name__ == "__main__":
dag.test(
run_conf={"my_int_param": 42}
)
注意
DAG 级别参数是传递给任务的默认值。不应将这些与通过 UI 表单或 CLI 手动提供的值混淆,这些值仅存在于 DagRun
和 TaskInstance
的上下文中。这种区别对于 TaskFlow DAG 至关重要,TaskFlow DAG 可能在 with DAG(...) as dag:
代码块中包含逻辑。在这种情况下,用户可能会尝试使用 dag
对象访问手动提供的参数值,但这只会包含默认值。要确保访问手动提供的值,请在您的任务中使用模板变量,例如 params
或 ti
。
任务级别的参数¶
您还可以将参数添加到单个任务。
def print_my_int_param(params):
print(params.my_int_param)
PythonOperator(
task_id="print_my_int_param",
params={"my_int_param": 10},
python_callable=print_my_int_param,
)
任务级别参数优先于 DAG 级别参数,并且用户提供的参数(在触发 DAG 时)优先于任务级别参数。
在任务中引用参数¶
可以在 模板字符串 的 params
下引用参数。例如
PythonOperator(
task_id="from_template",
op_args=[
"{{ params.my_int_param + 10 }}",
],
python_callable=(
lambda my_int_param: print(my_int_param)
),
)
即使参数可以使用各种类型,模板的默认行为是为您的任务提供一个字符串。您可以通过在初始化 DAG
时设置 render_template_as_native_obj=True
来更改此行为。
with DAG(
"the_dag",
params={"my_int_param": Param(5, type="integer", minimum=3)},
render_template_as_native_obj=True
):
这样,当 Param
的类型提供给您的任务时,它会受到尊重。
# prints <class 'str'> by default
# prints <class 'int'> if render_template_as_native_obj=True
PythonOperator(
task_id="template_type",
op_args=[
"{{ params.my_int_param }}",
],
python_callable=(
lambda my_int_param: print(type(my_int_param))
),
)
访问参数的另一种方法是通过任务的 context
kwarg。
def print_my_int_param(**context):
print(context["params"]["my_int_param"])
PythonOperator(
task_id="print_my_int_param",
python_callable=print_my_int_param,
params={"my_int_param": 12345},
)
JSON Schema 验证¶
Param
使用 JSON Schema,因此您可以使用 https://json-schema.fullstack.org.cn/draft/2020-12/json-schema-validation.html 中提到的完整 JSON Schema 规范来定义 Param
对象。
with DAG(
"my_dag",
params={
# an int with a default value
"my_int_param": Param(10, type="integer", minimum=0, maximum=20),
# a required param which can be of multiple types
# a param must have a default value
"multi_type_param": Param(5, type=["null", "number", "string"]),
# an enum param, must be one of three values
"enum_param": Param("foo", enum=["foo", "bar", 42]),
# a param which uses json-schema formatting
"email": Param(
default="[email protected]",
type="string",
format="idn-email",
minLength=5,
maxLength=255,
),
},
):
注意
如果为 DAG 定义了 schedule
,则带有默认值的参数必须有效。这会在 DAG 解析期间进行验证。如果 schedule=None
,则在 DAG 解析期间不会验证参数,而是在触发 DAG 之前验证参数。这在 DAG 作者不想提供默认值但希望强制用户在触发时提供有效参数的情况下很有用。
使用参数提供触发 UI 表单¶
2.6.0 版本中的新功能。
DAG
级别参数用于呈现用户友好的触发表单。当用户单击“触发 DAG”按钮时,将提供此表单。
触发 UI 表单是基于预定义的 DAG 参数呈现的。如果 DAG 没有定义任何参数,则会跳过触发表单。可以使用 Param
类定义表单元素,并且属性定义表单字段的显示方式。
触发 UI 表单支持以下功能
来自顶级 DAG 参数的直接标量值(布尔值、整数、字符串、列表、字典)会自动装箱到
Param
对象中。从本机 Python 数据类型自动检测type
属性。因此,这些简单类型会呈现为相应的字段类型。参数名称用作标签,并且不进行进一步验证,所有值都被视为可选。如果您使用
Param
类作为参数值的定义,则可以添加以下属性Param
属性title
用于呈现输入框的表单字段标签。如果未定义title
,则改为使用参数名称/键。Param
属性description
在输入字段下方呈现为灰色帮助文本。如果要提供特殊的格式或链接,则需要使用 Param 属性description_md
。有关示例,请参阅教程 DAG 参数 UI 示例 DAG。Param
属性type
会影响字段的呈现方式。支持以下类型参数类型
表单元素类型
其他支持的属性
示例
string
生成单行文本框或文本区域以编辑文本。
minLength
:最小文本长度maxLength
:最大文本长度format="date"
:生成日期选择器带有日历弹出窗口format="date-time"
:生成日期和带有日历弹出窗口的时间选择器format="time"
:生成时间选择器format="multiline"
:生成多行文本区域enum=["a", "b", "c"]
:生成一个用于标量值的下拉选择列表。按照 JSON 验证,必须选择一个值,或者必须将该字段标记为显式可选。另请参阅内部的详细信息values_display={"a": "Alpha", "b": "Beta"}
:对于通过以下方式生成的选择下拉列表enum
,您可以添加属性values_display
,其中包含 dict 和映射数据值以显示标签。examples=["One", "Two", "Three"]
:如果您想要为值提供建议(不将用户限制为固定的enum
如上所述),您可以使用examples
,它是一个项目列表。
Param("default", type="string", maxLength=10)
Param(f"{datetime.date.today()}", type="string", format="date")
number
或整数
生成一个字段,限制只能添加数值。HTML 浏览器通常也会在右侧添加一个微调器来增加或减少值。integer
只允许整数,number
也允许小数值。minimum
:最小值maximum
:最大值
Param(42, type="integer", minimum=14, multipleOf=7)
布尔值
生成一个切换按钮,用作True
或False
。无。
Param(True, type="boolean")
数组
生成一个 HTML 多行文本字段,编辑的每一行都会被转换为一个字符串数组作为值。- 如果添加了
examples
属性和一个列表,则会生成一个多值选择选项而不是自由文本字段。 values_display={"a": "Alpha", "b": "Beta"}
:对于多值选择,examples
可以添加values_display
属性和一个字典,将数据值映射到显示标签。- 如果添加了
items
属性和一个包含字段type
的字典,其值不是 “string”,则会为更多数组类型生成一个 JSON 条目字段,并且会进行额外的类型验证,如JSON Schema 数组项 中所述。
Param(["a", "b", "c"], type="array")
Param(["two", "three"], type="array", examples=["one", "two", "three", "four", "five"])
Param(["[email protected]", "[email protected]"], type="array", items={"type": "string", "format": "idn-email"})
对象
生成一个具有文本验证的 JSON 条目字段。Param({"key": "value"}, type=["object", "null"])
空值
指定不期望任何内容。单独使用没有太大意义,但对于类型组合很有用,例如type=["null", "string"]
,因为type 属性也接受一个类型列表。默认情况下,如果指定类型,则会使该字段成为必填字段,并进行输入 - 这是因为 JSON 验证。如果希望字段值只是可选添加,则必须允许JSON schema 验证允许空值。JSON schema validation allowing nullParam(None, type=["null", "string"])
如果表单字段留空,则会将其作为
None
值传递到 params 字典中。表单字段按照 DAG 中
params
的定义顺序呈现。如果想向表单添加节,请将
section
属性添加到每个字段。文本将用作节标签。没有section
的字段将在默认区域中呈现。其他节默认情况下会折叠。如果要使参数不显示,请使用
const
属性。这些参数将被提交,但在表单中隐藏。const
值必须与默认值匹配才能通过 JSON Schema 验证。在表单底部,可以展开生成的 JSON 配置。如果要手动更改值,可以调整 JSON 配置。当表单字段更改时,更改会被覆盖。
要在发布到触发表单的链接时预先填充表单中的值,您可以调用触发 URL
/dags/<dag_name>/trigger
并在 URL 中添加name=value
形式的查询参数,例如/dags/example_params_ui_tutorial/trigger?required_field=some%20text
。要预定义 DAG 运行的运行 ID,请使用 URL 参数run_id
。字段可以是必填或可选的。为了确保通过 JSON schema 验证,默认情况下类型字段是必填的。要使类型字段可选,必须允许 “null” 类型。
没有 “section” 的字段将在默认区域中呈现。其他节默认情况下会折叠。
注意
如果字段是必填的,则默认值也必须根据 schema 有效。如果 DAG 定义为 schedule=None
,则参数值验证将在触发时进行。
有关示例,请查看提供的两个示例 DAG:参数触发示例 DAG 和 参数 UI 示例 DAG。
with DAG(
dag_id=Path(__file__).stem,
dag_display_name="Params Trigger UI",
description=__doc__.partition(".")[0],
doc_md=__doc__,
schedule=None,
start_date=datetime.datetime(2022, 3, 4),
catchup=False,
tags=["example", "params"],
params={
"names": Param(
["Linda", "Martha", "Thomas"],
type="array",
description="Define the list of names for which greetings should be generated in the logs."
" Please have one name per line.",
title="Names to greet",
),
"english": Param(True, type="boolean", title="English"),
"german": Param(True, type="boolean", title="German (Formal)"),
"french": Param(True, type="boolean", title="French"),
},
) as dag:
@task(task_id="get_names", task_display_name="Get names")
def get_names(**kwargs) -> list[str]:
params: ParamsDict = kwargs["params"]
if "names" not in params:
print("Uuups, no names given, was no UI used to trigger?")
return []
return params["names"]
@task.branch(task_id="select_languages", task_display_name="Select languages")
def select_languages(**kwargs) -> list[str]:
params: ParamsDict = kwargs["params"]
selected_languages = []
for lang in ["english", "german", "french"]:
if params[lang]:
selected_languages.append(f"generate_{lang}_greeting")
return selected_languages
@task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
def generate_english_greeting(name: str) -> str:
return f"Hello {name}!"
@task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
def generate_german_greeting(name: str) -> str:
return f"Sehr geehrter Herr/Frau {name}."
@task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
def generate_french_greeting(name: str) -> str:
return f"Bonjour {name}!"
@task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
def print_greetings(greetings1, greetings2, greetings3) -> None:
for g in greetings1 or []:
print(g)
for g in greetings2 or []:
print(g)
for g in greetings3 or []:
print(g)
if not (greetings1 or greetings2 or greetings3):
print("sad, nobody to greet :-(")
lang_select = select_languages()
names = get_names()
english_greetings = generate_english_greeting.expand(name=names)
german_greetings = generate_german_greeting.expand(name=names)
french_greetings = generate_french_greeting.expand(name=names)
lang_select >> [english_greetings, german_greetings, french_greetings]
results_print = print_greetings(english_greetings, german_greetings, french_greetings)
params={
# Let's start simple: Standard dict values are detected from type and offered as entry form fields.
# Detected types are numbers, text, boolean, lists and dicts.
# Note that such auto-detected parameters are treated as optional (not required to contain a value)
"x": 3,
"text": "Hello World!",
"flag": False,
"a_simple_list": ["one", "two", "three", "actually one value is made per line"],
# But of course you might want to have it nicer! Let's add some description to parameters.
# Note if you can add any Markdown formatting to the description, you need to use the description_md
# attribute.
"most_loved_number": Param(
42,
type="integer",
title="Your favorite number",
description_md="Everybody should have a **favorite** number. Not only _math teachers_. "
"If you can not think of any at the moment please think of the 42 which is very famous because"
"of the book [The Hitchhiker's Guide to the Galaxy]"
"(https://en.wikipedia.org/wiki/Phrases_from_The_Hitchhiker%27s_Guide_to_the_Galaxy#"
"The_Answer_to_the_Ultimate_Question_of_Life,_the_Universe,_and_Everything_is_42).",
),
# If you want to have a selection list box then you can use the enum feature of JSON schema
"pick_one": Param(
"value 42",
type="string",
title="Select one Value",
description="You can use JSON schema enum's to generate drop down selection boxes.",
enum=[f"value {i}" for i in range(16, 64)],
),
"required_field": Param(
# In this example we have no default value
# Form will enforce a value supplied by users to be able to trigger
type="string",
title="Required text field",
description="This field is required. You can not submit without having text in here.",
),
"optional_field": Param(
"optional text, you can trigger also w/o text",
type=["null", "string"],
title="Optional text field",
description_md="This field is optional. As field content is JSON schema validated you must "
"allow the `null` type.",
),
@task(task_display_name="Show used parameters")
def show_params(**kwargs) -> None:
params: ParamsDict = kwargs["params"]
print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")
show_params()
2.7.0 版本新增: 即使没有定义参数,也可以使用配置开关 webserver.show_trigger_form_if_no_params
强制显示触发表单。
2.8.0 版本变更: 默认情况下不允许使用自定义 HTML,以防止注入脚本或其他恶意 HTML 代码。如果您信任 DAG 作者,可以通过将配置条目 webserver.allow_raw_html_descriptions
设置为 True
来更改参数描述的信任级别,从而允许原始 HTML。使用默认设置时,所有 HTML 都将显示为纯文本。这与之前使用 description_html
属性启用富格式化的功能有关,该属性现在被 description_md
属性取代。使用 custom_html_form
属性的自定义表单元素允许 DAG 作者指定原始 HTML 表单模板。这些自定义 HTML 表单元素自 2.8.0 版本起已弃用。
禁用运行时参数修改¶
在触发 DAG 时更新参数的能力取决于标志 core.dag_run_conf_overrides_params
。将此配置设置为 False
将有效地将默认参数变为常量。