管理连接

另请参阅

有关钩子和连接的概述,请参阅 连接 & 钩子

Airflow 的 Connection 对象用于存储凭据和其他连接到外部服务所需的信息。

可以通过以下方式定义连接

在环境变量中存储连接

Airflow 连接可以在环境变量中定义。

命名约定是 AIRFLOW_CONN_{CONN_ID},全部大写(注意 CONN 周围的单个下划线)。因此,如果您的连接 ID 是 my_prod_db,则变量名称应为 AIRFLOW_CONN_MY_PROD_DB

该值可以是 JSON 或 Airflow 的 URI 格式。

JSON 格式示例

2.3.0 版本新增。

如果使用 JSON 进行序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='{
    "conn_type": "my-conn-type",
    "login": "my-login",
    "password": "my-password",
    "host": "my-host",
    "port": 1234,
    "schema": "my-schema",
    "extra": {
        "param1": "val1",
        "param2": "val2"
    }
}'

生成 JSON 连接表示

2.8.0 版本新增。

为了简化连接 JSON 的生成,Connection 类具有一个方便的属性 as_json()。它可以像这样使用

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'

此外,相同的方法可以用于将连接从 URI 格式转换为 JSON 格式

>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="awesome_conn",
...     description="Example Connection",
...     uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'

URI 格式示例

如果使用 Airflow URI 进行序列化

export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1&param2=val2'

有关如何生成有效 URI 的更多详细信息,请参阅 连接 URI 格式

注意

在环境变量中定义的连接不会显示在 Airflow UI 中或使用 airflow connections list

在 Secrets 后端中存储连接

您可以将 Airflow 连接存储在外部 Secrets 后端中,例如 HashiCorp Vault、AWS SSM Parameter Store 和其他此类服务。有关更多详细信息,请参阅 Secrets 后端

在数据库中存储连接

另请参阅

连接也可以存储在 环境变量外部 Secrets 后端 中,例如 HashiCorp Vault、AWS SSM Parameter Store 等。

将连接存储在数据库中时,您可以使用 Web UI 或 Airflow CLI 来管理它们。

使用 UI 创建连接

打开 UI 的 Admin->Connections 部分。单击 Create 链接以创建新连接。

../_images/connection_create.png
  1. Connection Id 字段中填写所需的连接 ID。建议您使用小写字符,并用下划线分隔单词。

  2. 使用 Connection Type 字段选择连接类型。

  3. 填写其余字段。有关属于不同连接类型的字段的描述,请参阅 处理 extra 中的任意 dict

  4. 单击 Save 按钮以创建连接。

使用 UI 编辑连接

打开 UI 的 Admin->Connections 部分。单击连接列表中您要编辑的连接旁边的铅笔图标。

../_images/connection_edit.png

修改连接属性,然后单击 Save 按钮以保存更改。

从 CLI 创建连接

您可以从 CLI 将连接添加到数据库。

您可以使用 JSON 格式添加连接(从 2.3.0 版本开始)

airflow connections add 'my_prod_db' \
    --conn-json '{
        "conn_type": "my-conn-type",
        "login": "my-login",
        "password": "my-password",
        "host": "my-host",
        "port": 1234,
        "schema": "my-schema",
        "extra": {
            "param1": "val1",
            "param2": "val2"
        }
    }'

或者,您可以使用 Airflow 的连接 URI 格式(请参阅 生成连接 URI)。

airflow connections add 'my_prod_db' \
    --conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1&param2=val2&...'

最后,您还可以单独指定每个参数

airflow connections add 'my_prod_db' \
    --conn-type 'my-conn-type' \
    --conn-login 'login' \
    --conn-password 'password' \
    --conn-host 'host' \
    --conn-port 'port' \
    --conn-schema 'schema' \
    ...

将连接导出到文件

您可以将存储在数据库中的连接导出到文件(例如,用于将连接从一个环境迁移到另一个环境)。有关用法,请参阅 导出连接

数据库中连接的安全性

对于存储在 Airflow 元数据数据库中的连接,Airflow 使用 Fernet 来加密密码和其他潜在的敏感数据。它保证在没有加密密码的情况下,连接密码不能被操纵或在没有密钥的情况下读取。有关配置 Fernet 的信息,请参阅 Fernet

测试连接

出于安全原因,默认情况下,Airflow UI、API 和 CLI 中都禁用了测试连接功能。

有关用户功能的更多信息,请参阅文档:https://airflow.org.cn/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。强烈建议在确保只有高度信任的 UI/API 用户拥有“编辑连接”权限之前不要启用该功能。

该功能的可用性可以通过 Airflow 配置(airflow.cfg)的核心部分中的 `test_connection` 标志来控制。它也可以通过环境变量 AIRFLOW__CORE__TEST_CONNECTION 来控制。

此配置参数接受以下值:

  • 禁用 (Disabled):禁用测试连接功能,并禁用 UI 中的“测试连接”按钮。这也是 Airflow 配置中设置的默认值。

  • 启用 (Enabled):启用测试连接功能,并激活 UI 中的“测试连接”按钮。

  • 隐藏 (Hidden):禁用测试连接功能,并隐藏 UI 中的“测试连接”按钮。

启用测试连接后,可以通过 UI 中创建编辑连接页面,通过调用连接 REST API,或者运行 airflow connections test CLI 命令来使用。

警告

当使用 Airflow UI 或 REST API 时,此功能不适用于驻留在外部 secrets 后端的连接。

要测试连接,Airflow 会调用关联的 hook 类中的 test_connection 方法并报告结果。可能会出现连接类型没有任何关联的 hook,或者 hook 没有实现 test_connection 方法的情况。在这两种情况下,都会显示错误消息或禁用该功能(如果您正在 UI 中测试)。

注意

在 Airflow UI 中测试时,测试是从 webserver 执行的,因此此功能受限于为您的 webserver 设置的网络出口规则。

注意

如果 webserver 和 worker 机器(如果通过 Airflow UI 测试)或机器/pod(如果通过 Airflow CLI 测试)安装了不同的库或 provider 包,则测试结果可能会有所不同。

自定义连接类型

Airflow 允许定义自定义连接类型——包括修改连接的添加/编辑表单。自定义连接类型在社区维护的 provider 中定义,但您也可以添加自定义 provider 来添加自定义连接类型。有关如何添加自定义 provider 的说明,请参阅Provider 包

自定义连接类型是通过 provider 提供的 Hook 定义的。Hook 可以实现协议类 DiscoverableHook 中定义的方法。请注意,您的自定义 Hook 不应从此类派生,此类是一个示例,用于说明关于您的 Hook 可能定义的类字段和方法的期望。另一个好的示例是 JdbcHook

通过在您的 hook 中实现这些方法,并通过 provider 元数据中的 connection-types 数组(以及已弃用的 hook-class-names)公开它们,您可以自定义 Airflow:

  • 添加自定义连接类型

  • 添加从连接类型自动创建 Hook 的功能

  • 添加自定义表单小部件,以显示和编辑您的连接 URL 中的自定义“额外”参数

  • 隐藏未用于您的连接的字段

  • 添加占位符,显示字段应如何格式化的示例

您可以在Provider 包中了解更多关于如何添加自定义 provider 包的详细信息。

自定义连接字段

可以在 Airflow webserver 的连接添加/编辑视图中添加自定义表单字段。自定义字段以 JSON 格式存储在 Connection.extra 字段中。要添加自定义字段,请实现方法 get_connection_form_widgets()。此方法应返回一个字典。键应该是字段的字符串名称,因为它应该存储在 extra 字典中。值应该是 wtforms.fields.core.Field 的继承者。

这是一个示例

@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
    """Returns connection widgets to add to connection form"""
    from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
    from flask_babel import lazy_gettext
    from wtforms import StringField

    return {
        "workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
        "project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
    }

注意

自定义字段不再需要 extra__<conn type>__ 前缀

在 Airflow 2.3 之前,如果您想要在 UI 中添加自定义字段,则必须以 extra__<conn type>__ 为前缀,并且它的值将以这种方式存储在 extra 字典中。从 2.3 开始,您不再需要这样做。

方法 get_ui_field_behaviour() 允许您自定义两者的行为。例如,您可以隐藏或重新标记字段(例如,如果它未使用或重新用途),并且可以添加占位符文本。

一个例子

@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
    """Returns custom field behaviour"""
    return {
        "hidden_fields": ["port", "host", "login", "schema"],
        "relabeling": {},
        "placeholders": {
            "password": "Asana personal access token",
            "workspace": "My workspace gid",
            "project": "My project gid",
        },
    }

注意

如果要为与标准连接属性(即 login、password、host、scheme、port、extra)冲突的 extra 字段添加表单占位符,则必须以 extra__<conn type>__ 为前缀。例如,extra__myservice__password

请查看 provider 以获取您可以执行的操作的示例,例如 JdbcHookAsanaHook 都使用了此功能。

注意

已弃用的 hook-class-names

在 Airflow 2.2.0 之前,provider 中的连接通过 provider 元数据中的 hook-class-names 数组公开。但是,当在 worker 中使用单独的 hook 时,这已被证明效率低下,并且 hook-class-names 数组现在已替换为 connection-types 数组。在 provider 支持低于 2.2.0 的 Airflow 版本之前,应该同时存在 connection-typeshook-class-names。CI 构建期间的自动化检查将验证这两个数组的一致性。

URI 格式

注意

从 2.3.0 版本开始,您可以使用 JSON 序列化连接。请参阅示例

由于历史原因,Airflow 具有特殊的 URI 格式,可用于将 Connection 对象序列化为字符串值。

一般来说,Airflow 的 URI 格式如下:

my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2

上面的 URI 将生成一个与以下对象等效的 Connection 对象:

Connection(
    conn_id="",
    conn_type="my_conn_type",
    description=None,
    login="my-login",
    password="my-password",
    host="my-host",
    port=5432,
    schema="my-schema",
    extra=json.dumps(dict(param1="val1", param2="val2")),
)

生成连接 URI

为了简化连接 URI 的生成,Connection 类有一个方便的方法 get_uri()。它可以像这样使用:

>>> import json
>>> from airflow.models.connection import Connection
>>> c = Connection(
...     conn_id="some_conn",
...     conn_type="mysql",
...     description="connection description",
...     host="myhost.com",
...     login="myname",
...     password="mypassword",
...     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:[email protected]?this_param=some+val&that_param=other+val%2A'

此外,如果您创建了连接,则可以使用 airflow connections get 命令。

$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db

处理额外字段中的任意字典

一些 JSON 结构无法在不丢失信息的情况下进行 URL 编码。对于此类 JSON,get_uri 会将整个字符串存储在 URL 查询参数 __extra__ 下。

例如

>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
...     conn_type="scheme",
...     host="host/location",
...     schema="schema",
...     login="user",
...     password="password",
...     port=1234,
...     extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'

我们可以验证它是否返回相同的字典:

>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True

但是对于仅存储键值对的最常见情况,则使用普通的 URL 编码。

您可以像这样验证 URI 是否已正确解析:

>>> from airflow.models.connection import Connection

>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password

处理连接参数中的特殊字符

注意

生成连接时,请使用生成连接 URI部分中描述的方便方法 Connection.get_uri。此部分仅供参考。

手动构建 URI 时,某些字符需要特殊处理。

例如,如果您的密码中包含 /,则此操作会失败:

>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1&param2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'

要解决此问题,可以使用 quote_plus() 进行编码:

>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1&param2=val2")
>>> print(c.password)
my-pa/ssword

此条目是否有帮助?