Airflow Summit 2025 即将于 10 月 07-09 日举行。立即注册以获取早鸟票!

参数

参数允许您为任务提供运行时配置。您可以在 DAG 代码中配置默认参数,并在运行时触发 DAG 时提供额外的参数或覆盖参数值。Param 值使用 JSON Schema 进行验证。对于计划的 DAG 运行,使用默认的 Param 值。

已定义的参数也用于在手动触发时渲染友好的用户界面。当您手动触发 DAG 时,可以在 dagrun 开始前修改其参数。如果用户提供的值未通过验证,Airflow 将显示警告而不是创建 dagrun。

DAG 级别参数

要将参数添加到 DAG,请使用 params kwarg 初始化它。使用一个字典,将参数名称映射到 Param 或表示参数默认值的对象。

 from airflow.sdk import DAG
 from airflow.sdk import task
 from airflow.sdk import Param

 with DAG(
     "the_dag",
     params={
         "x": Param(5, type="integer", minimum=3),
         "my_int_param": 6
     },
 ) as dag:

     @task.python
     def example_task(params: dict):
         # This will print the default value, 6:
         dag.log.info(dag.params['my_int_param'])

         # This will print the manually-provided value, 42:
         dag.log.info(params['my_int_param'])

         # This will print the default value, 5, since it wasn't provided manually:
         dag.log.info(params['x'])

     example_task()

 if __name__ == "__main__":
     dag.test(
         run_conf={"my_int_param": 42}
     )

注意

DAG 级别参数是传递给任务的默认值。这些值不应与通过 UI 表单或 CLI 手动提供的值混淆,手动提供的值仅存在于 DagRunTaskInstance 的上下文中。对于 TaskFlow DAGs 来说,这种区别至关重要,TaskFlow DAGs 可能会在 with DAG(...) as dag: 代码块中包含逻辑。在这种情况下,用户可能会尝试使用 dag 对象访问手动提供的参数值,但这只会包含默认值。为确保访问手动提供的值,请在任务中使用模板变量,例如 paramsti

任务级别参数

您也可以将参数添加到单个任务中。

def print_my_int_param(params):
  print(params.my_int_param)

PythonOperator(
    task_id="print_my_int_param",
    params={"my_int_param": 10},
    python_callable=print_my_int_param,
)

任务级别参数优先于 DAG 级别参数,而用户提供的参数(触发 DAG 时)优先于任务级别参数。

在任务中引用参数

可以在 模板字符串 中通过 params 引用参数。例如

 PythonOperator(
     task_id="from_template",
     op_args=[
         "{{ params.my_int_param + 10 }}",
     ],
     python_callable=(
         lambda my_int_param: print(my_int_param)
     ),
 )

即使参数可以使用多种类型,模板的默认行为是为您的任务提供字符串。您可以通过在初始化 DAG 时设置 render_template_as_native_obj=True 来改变此行为。

 with DAG(
     "the_dag",
     params={"my_int_param": Param(5, type="integer", minimum=3)},
     render_template_as_native_obj=True
 ):

这样,在将 Param 提供给任务时,其类型将得到尊重。

# prints <class 'str'> by default
# prints <class 'int'> if render_template_as_native_obj=True
PythonOperator(
    task_id="template_type",
    op_args=[
        "{{ params.my_int_param }}",
    ],
    python_callable=(
        lambda my_int_param: print(type(my_int_param))
    ),
)

另一种访问参数的方式是通过任务的 context kwarg。

 def print_my_int_param(**context):
     print(context["params"]["my_int_param"])

 PythonOperator(
     task_id="print_my_int_param",
     python_callable=print_my_int_param,
     params={"my_int_param": 12345},
 )

JSON Schema 验证

Param 使用 JSON Schema,因此您可以使用 https://json-schema.fullstack.org.cn/draft/2020-12/json-schema-validation.html 中提到的完整 JSON Schema 规范来定义 Param 对象。

with DAG(
    "my_dag",
    params={
        # an int with a default value
        "my_int_param": Param(10, type="integer", minimum=0, maximum=20),

        # a required param which can be of multiple types
        # a param must have a default value
        "multi_type_param": Param(5, type=["null", "number", "string"]),

        # an enum param, must be one of three values
        "enum_param": Param("foo", enum=["foo", "bar", 42]),

        # a param which uses json-schema formatting
        "email": Param(
            default="example@example.com",
            type="string",
            format="idn-email",
            minLength=5,
            maxLength=255,
        ),
    },
):

注意

如果为 DAG 定义了 schedule,则带有默认值的参数必须有效。这在 DAG 解析期间进行验证。如果 schedule=None,则参数不会在 DAG 解析期间进行验证,而是在触发 DAG 之前进行验证。这在 DAG 作者不想提供默认值但希望强制用户在触发时提供有效参数的情况下非常有用。

注意

目前,出于安全原因,不能使用派生自自定义类的 Param 对象。我们计划为自定义 Param 类建立一个注册系统,就像 Operator ExtraLinks 那样。

使用参数提供触发 UI 表单

在版本 2.6.0 中添加。

DAG 级别参数用于渲染用户友好的触发表单。当用户点击“触发 DAG”按钮时,会提供此表单。

触发 UI 表单是根据预定义的 DAG 参数渲染的。如果 DAG 没有定义参数,则会跳过触发表单。表单元素可以使用 Param 类定义,其属性定义表单字段如何显示。

触发 UI 表单支持以下特性

  • 顶级 DAG 参数中的直接标量值(布尔型、整型、字符串、列表、字典)会自动装箱成 Param 对象。从原生的 Python 数据类型会自动检测到 type 属性。因此,这些简单类型会渲染为相应的字段类型。参数的名称用作标签,不再进行进一步验证,所有值都被视为可选。

  • 如果您使用 Param 类来定义参数值,可以添加以下属性

    • Param 属性 title 用于渲染输入框的表单字段标签。如果未定义 title,则改用参数名称/键。

    • Param 属性 description 作为帮助文本以灰色显示在输入字段下方。如果您想提供特殊格式或链接,则需要使用 Param 属性 description_md。请参阅教程 DAG 参数 UI 示例 DAG 查看示例。

    • Param 属性 type 影响字段的渲染方式。支持以下类型

      参数类型

      表单元素类型

      支持的附加属性

      示例

      string

      生成单行文本框或文本区域以编辑文本。

      • minLength: 最小文本长度

      • maxLength: 最大文本长度

      • format="date": 生成日期选择器
        带有日历弹出窗口
      • format="date-time": 生成日期和
        时间选择器带有日历弹出窗口
      • format="time": 生成时间选择器

      • format="multiline": 生成多行文本区域

      • enum=["a", "b", "c"]: 生成一个
        用于标量值的下拉选择列表。
        根据 JSON 验证,必须选择一个值,
        否则必须明确将字段标记为
        可选。另请参阅
        JSON Schema Enum 描述 中的详细信息。
      • values_display={"a": "Alpha", "b": "Beta"}:
        对于通过 enum 生成的
        选择下拉列表,您可以添加属性
        values_display,其值为字典,可以将数据
        值映射到显示标签。
      • examples=["One", "Two", "Three"]: 如果您
        想提供值的建议
        (不限制用户使用固定的 enum
        如上所述),您可以使用 examples
        它是一个项目列表。
      另请参阅
      这些选项在 DAG 触发前的后端进行检查。

      Param("default", type="string", maxLength=10)

      Param(f"{datetime.date.today()}", type="string", format="date")

      number

      integer

      生成一个字段,该字段只允许添加
      数值。HTML 浏览器通常
      也会在右侧添加一个微调器
      来增加或减少值。
      integer 只允许整数,
      number 也允许
      小数。
      • minimum: 最小数值

      • maximum: 最大数值

      另请参阅
      这些选项在 DAG 触发前的后端进行检查。

      Param(42, type="integer", minimum=14, multipleOf=7)

      boolean

      生成一个可用于 TrueFalse 的切换按钮。
      作为 TrueFalse

      无。

      Param(True, type="boolean")

      array

      生成一个 HTML 多行文本字段,
      编辑的每一行都将成为值中的
      一个字符串数组。
      • 如果您添加带有列表的属性 examples
        将生成多值选择选项,而不是自由文本字段。
        字段。
      • values_display={"a": "Alpha", "b": "Beta"}:
        对于多值选择的 examples,您可以添加
        属性 values_display,其值为字典,并
        将数据值映射到显示标签。
      • 如果您添加带有字典的属性 items
        该字典包含一个 type 字段,其
        值不是“string”,则会生成一个 JSON 输入
        字段,用于更多数组类型和
        附加类型验证,如

      Param(["a", "b", "c"], type="array")

      Param(["two", "three"], type="array", examples=["one", "two", "three", "four", "five"])

      Param(["one@example.com", "two@example.com"], type="array", items={"type": "string", "format": "idn-email"})

      object

      生成一个带有文本验证的 JSON 输入字段。
      文本验证。
      HTML 表单仅验证 JSON 输入的语法。
      为了验证特定结构的内
      容,请查看

      Param({"key": "value"}, type=["object", "null"])

      null

      指定不期望任何内容。
      任何内容。
      单独使用意义不大,但对于
      类型组合非常有用,例如
      type=["null", "string"],因为
      type 属性也接受类型列表。
      默认情况下,如果指定了类型,由于 JSON 验证,
      该字段将被设为必填项并要求
      输入。
      如果您希望字段值是可选的,
      则必须允许 JSON schema 验证
      允许 null 值。
      值。

      Param(None, type=["null", "string"])

  • 如果表单字段留空,则会以 None 值传递给 params 字典。

  • 表单字段按照 DAG 中 params 定义的顺序渲染。

  • 如果您想向表单添加部分,请为每个字段添加属性 section。文本将用作部分标签。没有 section 的字段将在默认区域中渲染。附加部分默认会折叠。

  • 如果您想隐藏参数,请使用 const 属性。这些参数将被提交但不会显示在表单中。const 值必须与默认值匹配才能通过 JSON Schema 验证

  • 在表单底部可以展开生成的 JSON 配置。如果您想手动更改值,可以调整 JSON 配置。表单字段更改时会覆盖手动更改。

  • 要在发布触发表单的链接时预填充表单中的值,可以调用触发 URL /dags/<dag_name>/trigger,并在 URL 中添加查询参数,格式为 name=value,例如 /dags/example_params_ui_tutorial/trigger?required_field=some%20text。要预定义 DAG 运行的 run id,请使用 URL 参数 run_id

  • 字段可以是必填或可选的。根据 JSON schema 验证的要求,有类型的字段默认是必填的。要使有类型的字段可选,必须允许“null”类型。

  • 没有“section”的字段将在默认区域中渲染。附加部分默认会折叠。

注意

如果字段是必填项,则默认值也必须根据 schema 有效。如果 DAG 定义为 schedule=None,则参数值验证在触发时进行。

有关示例,请查看提供的两个示例 DAG:Params trigger example DAG参数 UI 示例 DAG

src/airflow/example_dags/example_params_trigger_ui.py

with DAG(
    dag_id=Path(__file__).stem,
    dag_display_name="Params Trigger UI",
    description=__doc__.partition(".")[0],
    doc_md=__doc__,
    schedule=None,
    start_date=datetime.datetime(2022, 3, 4),
    catchup=False,
    tags=["example", "params"],
    params={
        "names": Param(
            ["Linda", "Martha", "Thomas"],
            type="array",
            description="Define the list of names for which greetings should be generated in the logs."
            " Please have one name per line.",
            title="Names to greet",
        ),
        "english": Param(True, type="boolean", title="English"),
        "german": Param(True, type="boolean", title="German (Formal)"),
        "french": Param(True, type="boolean", title="French"),
    },
) as dag:

    @task(task_id="get_names", task_display_name="Get names")
    def get_names(**kwargs) -> list[str]:
        params = kwargs["params"]
        if "names" not in params:
            print("Uuups, no names given, was no UI used to trigger?")
            return []
        return params["names"]

    @task.branch(task_id="select_languages", task_display_name="Select languages")
    def select_languages(**kwargs) -> list[str]:
        params = kwargs["params"]
        selected_languages = []
        for lang in ["english", "german", "french"]:
            if params[lang]:
                selected_languages.append(f"generate_{lang}_greeting")
        return selected_languages

    @task(task_id="generate_english_greeting", task_display_name="Generate English greeting")
    def generate_english_greeting(name: str) -> str:
        return f"Hello {name}!"

    @task(task_id="generate_german_greeting", task_display_name="Erzeuge Deutsche Begrüßung")
    def generate_german_greeting(name: str) -> str:
        return f"Sehr geehrter Herr/Frau {name}."

    @task(task_id="generate_french_greeting", task_display_name="Produire un message d'accueil en français")
    def generate_french_greeting(name: str) -> str:
        return f"Bonjour {name}!"

    @task(task_id="print_greetings", task_display_name="Print greetings", trigger_rule=TriggerRule.ALL_DONE)
    def print_greetings(greetings1, greetings2, greetings3) -> None:
        for g in greetings1 or []:
            print(g)
        for g in greetings2 or []:
            print(g)
        for g in greetings3 or []:
            print(g)
        if not (greetings1 or greetings2 or greetings3):
            print("sad, nobody to greet :-(")

    lang_select = select_languages()
    names = get_names()
    english_greetings = generate_english_greeting.expand(name=names)
    german_greetings = generate_german_greeting.expand(name=names)
    french_greetings = generate_french_greeting.expand(name=names)
    lang_select >> [english_greetings, german_greetings, french_greetings]
    results_print = print_greetings(english_greetings, german_greetings, french_greetings)

src/airflow/example_dags/example_params_ui_tutorial.py

    params={
        # Let's start simple: Standard dict values are detected from type and offered as entry form fields.
        # Detected types are numbers, text, boolean, lists and dicts.
        # Note that such auto-detected parameters are treated as optional (not required to contain a value)
        "number_param": 3,
        "text_param": "Hello World!",
        "bool_param": False,
        "list_param": ["one", "two", "three", "actually one value is made per line"],
        "dict_param": {"key": "value"},
        # You can arrange the entry fields in sections so that you can have a better overview for the user
        # Therefore you can add the "section" attribute.
        # But of course you might want to have it nicer! Let's add some description to parameters.
        # Note if you can add any Markdown formatting to the description, you need to use the description_md
        # attribute.
        "most_loved_number": Param(
            42,
            type="integer",
            title="Your favorite number",
            description_md="Everybody should have a **favorite** number. Not only _math teachers_. "
            "If you can not think of any at the moment please think of the 42 which is very famous because"
            "of the book [The Hitchhiker's Guide to the Galaxy]"
            "(https://en.wikipedia.org/wiki/Phrases_from_The_Hitchhiker%27s_Guide_to_the_Galaxy#"
            "The_Answer_to_the_Ultimate_Question_of_Life,_the_Universe,_and_Everything_is_42).",
            minimum=0,
            maximum=128,
            section="Typed parameters with Param object",
        ),
        # If you want to have a selection list box then you can use the enum feature of JSON schema
        "pick_one": Param(
            "value 42",
            type="string",
            title="Select one Value",
            description="You can use JSON schema enum's to generate drop down selection boxes.",
            enum=[f"value {i}" for i in range(16, 64)],
            section="Typed parameters with Param object",
        ),

src/airflow/example_dags/example_params_ui_tutorial.py

        "required_field": Param(
            # In this example we have no default value
            # Form will enforce a value supplied by users to be able to trigger
            type="string",
            title="Required text field",
            minLength=10,
            maxLength=30,
            description="This field is required. You can not submit without having text in here.",
            section="Typed parameters with Param object",
        ),
        "optional_field": Param(
            "optional text, you can trigger also w/o text",
            type=["null", "string"],
            title="Optional text field",
            description_md="This field is optional. As field content is JSON schema validated you must "
            "allow the `null` type.",
            section="Typed parameters with Param object",
        ),

src/airflow/example_dags/example_params_ui_tutorial.py

    @task(task_display_name="Show used parameters")
    def show_params(**kwargs) -> None:
        params = kwargs["params"]
        print(f"This DAG was triggered with the following parameters:\n\n{json.dumps(params, indent=4)}\n")

    show_params()

参数 UI 教程分为 4 个部分,展示了最常见的示例。第一部分展示了不使用 Param 类的基本用法。

../_images/trigger-dag-tutorial-form-1.png

第二部分展示了如何使用 Param 类定义更多属性。

../_images/trigger-dag-tutorial-form-2.png

第三部分展示了如何建模选择列表和下拉列表。

../_images/trigger-dag-tutorial-form-3.png

最后,第四部分展示了高级表单元素。

../_images/trigger-dag-tutorial-form-4.png

在版本 3.0.0 中更改: 默认情况下不允许使用自定义 HTML,以防止注入脚本或其他恶意 HTML 代码。以前名为 description_html 的字段现已被属性 description_md 取代。不再支持 description_html。使用属性 custom_html_form 的自定义表单元素在版本 2.8.0 中已弃用,并在 3.0.0 中移除了支持。

禁用运行时参数修改

在触发 DAG 时更新参数的能力取决于标志 core.dag_run_conf_overrides_params。将此配置设置为 False 将使您的默认参数实际上变成常量。

此条目有帮助吗?