管理连接

另请参见

有关钩子和连接的概述,请参阅 连接和钩子

Airflow 的 连接 对象用于存储连接到外部服务的凭据和其他必要信息。

连接可以通过以下方式定义

在环境变量中存储连接

Airflow 连接可以在环境变量中定义。

命名约定为 AIRFLOW_CONN_{CONN_ID},全部大写(注意 CONN> 周围的单下划线)。因此,如果你的连接 ID 为 my_prod_db,则变量名称应为 AIRFLOW_CONN_MY_PROD_DB

值可以是 JSON 或 Airflow 的 URI 格式。

JSON 格式示例

2.3.0 版新增功能。

如果使用 JSON 序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='{
    "conn_type": "my-conn-type",
    "login": "my-login",
    "password": "my-password",
    "host": "my-host",
    "port": 1234,
    "schema": "my-schema",
    "extra": {
        "param1": "val1",
        "param2": "val2"
    }
}'

生成 JSON 连接表示

2.8.0 版新增功能。

为了简化连接 JSON 生成,Connection 类具有一个便捷属性 as_json()。可以使用它,如下所示

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'

此外,可以使用相同的方法将连接从 URI 格式转换为 JSON 格式

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="awesome_conn",
...     description="Example Connection",
...     uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'

URI 格式示例

如果使用 Airflow URI 序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

请参阅 连接 URI 格式,了解有关如何生成有效 URI 的更多详细信息。

注意

在环境变量中定义的连接不会显示在 Airflow UI 中或使用 airflow connections list

在 Secrets Backend 中存储连接

可以在外部 secrets 后端(如 HashiCorp Vault、AWS SSM Parameter Store 和其他此类服务)中存储 Airflow 连接。有关更多详细信息,请参阅 Secrets Backend

在数据库中存储连接

另请参见

连接也可以存储在 环境变量外部 secrets 后端(如 HashiCorp Vault、AWS SSM Parameter Store 等)中。

在数据库中存储连接时,可以使用 Web UI 或 Airflow CLI 来管理它们。

使用 UI 创建连接

打开 UI 的 Admin->Connections 部分。单击 Create 链接以创建新连接。

../_images/connection_create.png
  1. 使用所需的连接 ID 填写 Connection Id 字段。建议使用小写字符,并用下划线分隔单词。

  2. 使用 Connection Type 字段选择连接类型。

  3. 填写剩余字段。请参阅 处理 extra 中的任意 dict,了解属于不同连接类型的字段的说明。

  4. 单击 Save 按钮以创建连接。

使用 UI 编辑连接

打开 UI 的 Admin->Connections 部分。单击连接列表中要编辑的连接旁边的铅笔图标。

../_images/connection_edit.png

修改连接属性,然后单击 Save 按钮以保存更改。

从 CLI 创建连接

您可以从 CLI 向数据库添加连接。

您可以使用 JSON 格式添加连接(从版本 2.3.0 开始)

airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

或者,您可以使用 Airflow 的连接 URI 格式(请参阅 生成连接 URI)。

airflow connections add 'my_prod_db' \
    --conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1&param2=val2&...'

最后,您还可以单独指定每个参数

airflow connections add 'my_prod_db' \
    --conn-type 'my-conn-type' \
    --conn-login 'login' \
    --conn-password 'password' \
    --conn-host 'host' \
    --conn-port 'port' \
    --conn-schema 'schema' \
    ...

将连接导出到文件

您可以将存储在数据库中的连接导出到文件(例如,用于将连接从一个环境迁移到另一个环境)。有关用法,请参阅 导出连接

数据库中连接的安全性

对于存储在 Airflow 元数据数据库中的连接,Airflow 使用 Fernet 来加密密码和其他可能敏感的数据。它保证在没有加密密码的情况下,无法在没有密钥的情况下处理或读取连接密码。有关配置 Fernet 的信息,请参阅 Fernet

测试连接

出于安全原因,默认情况下在 Airflow UI、API 和 CLI 中禁用测试连接功能。

有关用户功能的更多信息,请参阅文档:https://airflow.org.cn/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。强烈建议在确保只有高度可信的 UI/API 用户拥有“编辑连接”权限之前不要启用此功能。

此功能的可用性可以通过 Airflow 配置(airflow.cfg)的核心部分中的 test_connection 标志进行控制。它还可以通过环境变量 AIRFLOW__CORE__TEST_CONNECTION 进行控制。

此配置参数接受以下值

  • 已禁用:禁用测试连接功能,并在 UI 中禁用“测试连接”按钮。这也是在 Airflow 配置中设置的默认值。

  • 已启用:启用测试连接功能,并在 UI 中激活“测试连接”按钮。

  • 已隐藏:禁用测试连接功能,并在 UI 中隐藏“测试连接”按钮。

启用“测试连接”后,可以通过以下方式使用它:在 UI 中调用 创建编辑 连接页面,通过调用 连接 REST API,或运行 airflow connections test CLI 命令

警告

使用 Airflow UI 或 REST API 时,此功能对于驻留在外部机密后端的连接不可用。

要测试连接,Airflow 会从关联的钩子类调用 test_connection 方法并报告结果。可能发生的情况是,连接类型没有任何关联的钩子,或者钩子没有 test_connection 方法实现,在任何一种情况下,都会显示错误消息或禁用功能(如果你在 UI 中进行测试)。

注意

在 Airflow UI 中进行测试时,测试会从 Web 服务器执行,因此此功能受为 Web 服务器设置的网络出口规则约束。

注意

如果 Web 服务器和工作机(如果通过 Airflow UI 进行测试)或机器/Pod(如果通过 Airflow CLI 进行测试)安装了不同的库或提供程序包,则测试结果可能会有所不同。

自定义连接类型

Airflow 允许定义自定义连接类型,包括修改连接的添加/编辑表单。自定义连接类型在社区维护的提供程序中定义,但你还可以添加一个添加自定义连接类型的自定义提供程序。有关如何添加自定义提供程序的说明,请参阅 提供程序包

自定义连接类型通过提供程序提供的钩子定义。钩子可以实现协议类 DiscoverableHook 中定义的方法。请注意,您的自定义钩子不应派生自此类,此类是一个示例,用于记录有关您的钩子可能定义的类字段和方法的期望。另一个好的示例是 JdbcHook

通过在您的钩子中实现这些方法并通过 connection-types 数组(和已弃用的 hook-class-names)在提供程序元数据中公开它们,您可以通过以下方式自定义 Airflow

  • 添加自定义连接类型

  • 添加从连接类型自动创建钩子

  • 添加自定义表单小部件以显示和编辑连接 URL 中的自定义“额外”参数

  • 隐藏未用于您的连接的字段

  • 添加占位符,显示字段应如何格式化的示例

您可以在 提供程序包 中阅读有关如何添加自定义提供程序包的详细信息

自定义连接字段

可以在 Airflow Web 服务器的连接添加/编辑视图中添加自定义表单字段。自定义字段存储在 Connection.extra 字段中,格式为 JSON。要添加自定义字段,请实现方法 get_connection_form_widgets()。此方法应返回一个字典。键应该是字段的字符串名称,因为它应存储在 extra 字典中。值应该是 wtforms.fields.core.Field 的继承者。

这是一个示例

@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
    """Returns connection widgets to add to connection form"""
    from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
    from flask_babel import lazy_gettext
    from wtforms import StringField

    return {
        "workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
        "project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
    }

注意

自定义字段不再需要 extra__<conn type>__ 前缀

在 Airflow 2.3 之前,如果您想要在 UI 中添加一个自定义字段,则必须使用 extra__<conn type>__ 为其添加前缀,并且这是其值在 extra 字典中的存储方式。从 2.3 开始,您不再需要这样做。

方法 get_ui_field_behaviour() 允许您自定义这两个方法的行为。例如,您可以隐藏或重新标记一个字段(例如,如果它未被使用或重新用于其他用途),并且您可以添加占位符文本。

一个示例

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
    """Returns custom field behaviour"""
    return {
        "hidden_fields": ["port", "host", "login", "schema"],
        "relabeling": {},
        "placeholders": {
            "password": "Asana personal access token",
            "workspace": "My workspace gid",
            "project": "My project gid",
        },
    }

注意

如果您想为名称与标准连接属性(即登录名、密码、主机、方案、端口、额外)冲突的 extra 字段添加表单占位符,则必须使用 extra__<conn type>__ 为其添加前缀。例如 extra__myservice__password

查看提供程序以了解您可以执行的操作示例,例如 JdbcHookAsanaHook 都使用了此功能。

注意

已弃用的 hook-class-names

在 Airflow 2.2.0 之前,提供程序中的连接已通过提供程序元数据中的 hook-class-names 数组公开。但是,当在工作进程中使用单独的挂钩时,这已被证明效率低下,并且 hook-class-names 数组现在已被 connection-types 数组替换。在提供程序支持低于 2.2.0 的 Airflow 之前,connection-typeshook-class-names 都应存在。CI 构建期间的自动化检查将验证这两个数组的一致性。

URI 格式

注意

从版本 2.3.0 开始,您可以使用 JSON 序列化连接。请参阅 示例

出于历史原因,Airflow 具有特殊的 URI 格式,可用于将 Connection 对象序列化为字符串值。

通常,Airflow 的 URI 格式如下所示

my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2

上述 URI 将生成等效于以下内容的 Connection 对象

Connection(
    conn_id="",
    conn_type="my_conn_type",
    description=None,
    login="my-login",
    password="my-password",
    host="my-host",
    port=5432,
    schema="my-schema",
    extra=json.dumps(dict(param1="val1", param2="val2")),
)

生成连接 URI

为了简化连接 URI 生成,Connection 类提供了一个便捷方法 get_uri()。它可以这样使用

>>> import json
>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:[email protected]?this_param=some+val&that_param=other+val%2A'

此外,如果你已创建连接,可以使用 airflow connections get 命令。

$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db

处理 extra 中的任意 dict

某些 JSON 结构无法在不丢失的情况下进行 URL 编码。对于此类 JSON,get_uri 会将整个字符串存储在 URL 查询参数 __extra__ 下。

例如

>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
...     conn_type="scheme",
...     host="host/location",
...     schema="schema",
...     login="user",
...     password="password",
...     port=1234,
...     extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'

我们可以验证它返回相同的字典

>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True

但对于仅存储键值对的最常见情况,将使用纯 URL 编码。

你可以这样验证 URI 是否已正确解析

>>> from airflow.models.connection import Connection

>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password

处理连接参数中的特殊字符

注意

在生成连接时,使用便捷方法 Connection.get_uri,如 生成连接 URI 一节中所述。本节仅供参考。

手动构建 URI 时,某些字符需要特殊处理。

例如,如果你的密码包含 /,则会失败

>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1&param2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'

要解决此问题,可以使用 quote_plus() 进行编码

>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.password)
my-pa/ssword

此条目是否有用?