连接管理¶
另请参阅
有关 Hook 和连接的概览,请参阅连接与 Hook。
Airflow 的Connection
对象用于存储连接到外部服务所需的凭据及其他信息。
连接可以通过以下方式定义
在环境变量中
在外部Secrets Backend中
在Airflow 元数据数据库中(使用CLI或Web UI)
在环境变量中存储连接¶
Airflow 连接可以在环境变量中定义。
命名约定是AIRFLOW_CONN_{CONN_ID}
,全部大写(注意 CONN
周围的单下划线)。因此,如果您的连接 ID 是 my_prod_db
,则变量名应为 AIRFLOW_CONN_MY_PROD_DB
。
值可以是 JSON 格式或 Airflow 的 URI 格式。
JSON 格式示例¶
始于 2.3.0 版本。
如果使用 JSON 序列化
export AIRFLOW_CONN_MY_PROD_DATABASE='{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
生成连接的 JSON 表示¶
始于 2.8.0 版本。
为了更方便地生成连接的 JSON,Connection
类提供了一个便利属性 as_json()
。它可以这样使用
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'
此外,可以使用相同的方法将连接从 URI 格式转换为 JSON 格式
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="awesome_conn",
... description="Example Connection",
... uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'
URI 格式示例¶
如果使用 Airflow URI 序列化
export AIRFLOW_CONN_MY_PROD_DATABASE='my-conn-type://login:password@host:port/schema?param1=val1¶m2=val2'
有关如何生成有效 URI 的更多详细信息,请参阅连接 URI 格式。
注意
在环境变量中定义的连接不会显示在 Airflow UI 中,也不会通过 airflow connections list
命令显示。
在 Secrets Backend 中存储连接¶
您可以将 Airflow 连接存储在外部 secrets backend 中,例如 HashiCorp Vault、AWS SSM Parameter Store 和其他此类服务。更多详细信息请参阅Secrets Backend。
在数据库中存储连接¶
另请参阅
连接也可以存储在环境变量或外部 secrets backend 中,例如 HashiCorp Vault、AWS SSM Parameter Store 等。
将连接存储在数据库中时,您可以使用 Web UI 或 Airflow CLI 进行管理。
使用 UI 创建连接¶
打开 UI 的 Admin->Connections
部分。点击 Add Connection
链接以创建新连接。

在
Connection Id
字段中填写所需的连接 ID。建议使用小写字母并通过下划线分隔单词。在
Connection Type
字段中选择连接类型。填写其余字段。有关不同连接类型字段的描述,请参阅处理 extra 中的任意字典。
点击
Save
按钮创建连接。
使用 UI 编辑连接¶
打开 UI 的 Admin->Connections
部分。在连接列表中点击您希望编辑的连接旁边的铅笔图标。

修改连接属性并点击 Save
按钮保存更改。
从 CLI 创建连接¶
您可以从 CLI 向数据库添加连接。
您可以使用 JSON 格式添加连接(始于 2.3.0 版本)
airflow connections add 'my_prod_db' \
--conn-json '{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
或者,您可以使用 Airflow 的连接 URI 格式(请参阅生成连接 URI)。
airflow connections add 'my_prod_db' \
--conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1¶m2=val2&...'
最后,您也可以单独指定每个参数
airflow connections add 'my_prod_db' \
--conn-type 'my-conn-type' \
--conn-login 'login' \
--conn-password 'password' \
--conn-host 'host' \
--conn-port 'port' \
--conn-schema 'schema' \
...
导出连接到文件¶
您可以将存储在数据库中的连接导出到文件(例如,用于将连接从一个环境迁移到另一个环境)。有关用法,请参阅导出连接。
数据库中连接的安全性¶
对于存储在 Airflow 元数据数据库中的连接,Airflow 使用 Fernet 加密密码和其他潜在敏感数据。它保证如果没有加密密码,连接密码就无法在没有密钥的情况下被篡改或读取。有关配置 Fernet 的信息,请查看Fernet。
测试连接¶
出于安全原因,Airflow UI、API 和 CLI 中的测试连接功能默认处于禁用状态。
有关用户权限的更多信息,请参阅文档:https://airflow.org.cn/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。强烈建议在确保只有高度信任的 UI/API 用户拥有“编辑连接”权限后,再启用此功能。
此功能的可用性可以通过 Airflow 配置 (airflow.cfg) 的 core 部分中的 test_connection 标志来控制。也可以通过环境变量 AIRFLOW__CORE__TEST_CONNECTION
来控制。
此配置参数接受以下值
Disabled(禁用):禁用测试连接功能并禁用 UI 中的“测试连接”按钮。这也是 Airflow 配置中设置的默认值。
Enabled(启用):启用测试连接功能并激活 UI 中的“测试连接”按钮。
Hidden(隐藏):禁用测试连接功能并隐藏 UI 中的“测试连接”按钮。
启用测试连接后,可以在 UI 的创建或编辑连接页面使用,或者通过调用连接 REST API,或者运行 airflow connections test
CLI 命令来使用。
警告
使用 Airflow UI 或 REST API 时,对于驻留在外部 secrets backend 中的连接,此功能将不可用。
要测试连接,Airflow 会调用相关 Hook 类中的 test_connection
方法并报告结果。连接类型可能没有关联的 Hook,或者 Hook 没有实现 test_connection
方法,这两种情况下都会显示错误消息或功能将被禁用(如果您在 UI 中测试)。
注意
在 Airflow UI 中进行测试时,测试是从 Webserver 执行的,因此此功能受限于您为 Webserver 设置的网络出站规则。
注意
如果 Webserver 和 Worker 机器(如果通过 Airflow UI 测试)或机器/Pod(如果通过 Airflow CLI 测试)安装了不同的库或 Providers,测试结果可能会有所不同。
自定义连接类型¶
Airflow 允许定义自定义连接类型——包括修改连接的添加/编辑表单。自定义连接类型在社区维护的 Providers 中定义,但您也可以添加自定义 Provider 来添加自定义连接类型。有关如何添加自定义 Provider 的描述,请参阅Providers。
自定义连接类型通过 Providers 提供的 Hook 来定义。Hook 可以实现协议类 DiscoverableHook
中定义的方法。请注意,您的自定义 Hook 不应继承自此类,此类是一个示例,用于记录您的 Hook 可能定义的类字段和方法的期望。另一个很好的例子是JdbcHook
。
通过在您的 Hook 中实现这些方法并通过 Provider 元数据中的 connection-types
数组(以及已弃用的 hook-class-names
)暴露它们,您可以自定义 Airflow:
添加自定义连接类型
添加从连接类型自动创建 Hook
添加自定义表单控件以显示和编辑连接 URL 中的自定义“额外”参数
隐藏连接中未使用的字段
添加显示字段应如何格式化的示例占位符
您可以在Providers中阅读更多关于如何添加自定义 Provider 的详细信息。
自定义连接字段¶
可以在 Airflow Webserver 的连接添加/编辑视图中添加自定义表单字段。自定义字段以 JSON 格式存储在 Connection.extra
字段中。要添加自定义字段,请实现 get_connection_form_widgets()
方法。此方法应返回一个字典。键应为字段的字符串名称,该名称应存储在 extra
字典中。值应为 wtforms.fields.core.Field
的继承者。
这是一个例子
@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField
return {
"workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
}
注意
自定义字段不再需要 extra__<conn type>__
前缀
在 Airflow 2.3 之前,如果您想在 UI 中添加自定义字段,必须在其前面加上 extra__<conn type>__
前缀,并且其值将以这种方式存储在 extra
字典中。从 2.3 版本开始,您不再需要这样做。
get_ui_field_behaviour()
方法允许您自定义两者的行为。例如,您可以隐藏或重新标记字段(例如,如果它未使用或重新用途),并且可以添加占位符文本。
一个例子
@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["port", "host", "login", "schema"],
"relabeling": {},
"placeholders": {
"password": "Asana personal access token",
"workspace": "My workspace gid",
"project": "My project gid",
},
}
注意
如果您想为一个名称与标准连接属性(即 login, password, host, scheme, port, extra)冲突的 extra
字段添加表单占位符,则必须在其前面加上 extra__<conn type>__
前缀。例如 extra__myservice__password
。
查看 Provider 可以获得一些您可以做到的例子,例如 JdbcHook
和 AsanaHook
都使用了这个功能。
注意
已弃用的 hook-class-names
在 Airflow 2.2.0 之前,Provider 中的连接通过 Provider 元数据中的 hook-class-names
数组暴露。然而,事实证明,在 Worker 中使用单个 Hook 时效率低下,现在 hook-class-names
数组已被 connection-types
数组取代。在 Provider 支持低于 2.2.0 的 Airflow 版本之前,connection-types
和 hook-class-names
都应该存在。CI 构建期间的自动化检查将验证这两个数组的一致性。
URI 格式¶
注意
从 2.3.0 版本开始,您可以使用 JSON 序列化连接。请参阅示例。
出于历史原因,Airflow 有一种特殊的 URI 格式,可用于将 Connection 对象序列化为字符串值。
通常,Airflow 的 URI 格式如下所示
my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2
上述 URI 将生成一个等同于以下内容的 Connection
对象
Connection(
conn_id="",
conn_type="my_conn_type",
description=None,
login="my-login",
password="my-password",
host="my-host",
port=5432,
schema="my-schema",
extra=json.dumps(dict(param1="val1", param2="val2")),
)
生成连接 URI¶
为了更方便地生成连接 URI,Connection
类提供了一个便利方法 get_uri()
。它可以这样使用
>>> import json
>>> from airflow.sdk import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'
注意
get_uri()
方法返回的是 Airflow 格式的连接 URI,不是 SQLAlchemy 兼容的 URI。如果您需要用于数据库连接的 SQLAlchemy 兼容 URI,请改用 sqlalchemy_url
属性。
此外,如果您已创建连接,可以使用 airflow connections get
命令。
$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db
处理 extra 中的任意字典¶
某些 JSON 结构无法进行 URL 编码而不会丢失数据。对于此类 JSON,get_uri
会将整个字符串存储在 URL 查询参数 __extra__
下。
例如
>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
... conn_type="scheme",
... host="host/location",
... schema="schema",
... login="user",
... password="password",
... port=1234,
... extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'
并且我们可以验证它返回相同的字典
>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True
但对于仅存储键值对的最常见情况,使用简单的 URL 编码。
您可以这样验证 URI 是否被正确解析
>>> from airflow.sdk import Connection
>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password
处理连接参数中的特殊字符¶
注意
生成连接时,请使用 Connection.get_uri
便利方法,如 生成连接 URI 部分所述。本节仅供参考。
手动构建 URI 时,某些字符需要特殊处理。
例如,如果您的密码包含 /
,则会失败
>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1¶m2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'
要解决此问题,您可以使用 quote_plus()
进行编码
>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.password)
my-pa/ssword