管理连接¶
另请参阅
有关钩子和连接的概述,请参阅 连接 & 钩子。
Airflow 的 Connection
对象用于存储凭据和其他连接到外部服务所需的信息。
可以通过以下方式定义连接
在 环境变量 中
在外部 Secrets 后端 中
在 Airflow 元数据数据库 中(使用 CLI 或 Web UI)
在环境变量中存储连接¶
Airflow 连接可以在环境变量中定义。
命名约定是 AIRFLOW_CONN_{CONN_ID}
,全部大写(注意 CONN
周围的单个下划线)。因此,如果您的连接 ID 是 my_prod_db
,则变量名称应为 AIRFLOW_CONN_MY_PROD_DB
。
该值可以是 JSON 或 Airflow 的 URI 格式。
JSON 格式示例¶
2.3.0 版本新增。
如果使用 JSON 进行序列化
export AIRFLOW_CONN_MY_PROD_DATABASE='{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
生成 JSON 连接表示¶
2.8.0 版本新增。
为了简化连接 JSON 的生成,Connection
类具有一个方便的属性 as_json()
。它可以像这样使用
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra={"this_param": "some val", "that_param": "other val*"},
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_SOME_CONN='{"conn_type": "mysql", "description": "connection description", "host": "myhost.com", "login": "myname", "password": "mypassword", "extra": {"this_param": "some val", "that_param": "other val*"}}'
此外,相同的方法可以用于将连接从 URI 格式转换为 JSON 格式
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="awesome_conn",
... description="Example Connection",
... uri="aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?__extra__=%7B%22region_name%22%3A+%22eu-central-1%22%2C+%22config_kwargs%22%3A+%7B%22retries%22%3A+%7B%22mode%22%3A+%22standard%22%2C+%22max_attempts%22%3A+10%7D%7D%7D",
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.as_json()}'")
AIRFLOW_CONN_AWESOME_CONN='{"conn_type": "aws", "description": "Example Connection", "host": "", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", "schema": "", "extra": {"region_name": "eu-central-1", "config_kwargs": {"retries": {"mode": "standard", "max_attempts": 10}}}}'
在 Secrets 后端中存储连接¶
您可以将 Airflow 连接存储在外部 Secrets 后端中,例如 HashiCorp Vault、AWS SSM Parameter Store 和其他此类服务。有关更多详细信息,请参阅 Secrets 后端。
在数据库中存储连接¶
另请参阅
连接也可以存储在 环境变量 或 外部 Secrets 后端 中,例如 HashiCorp Vault、AWS SSM Parameter Store 等。
将连接存储在数据库中时,您可以使用 Web UI 或 Airflow CLI 来管理它们。
使用 UI 创建连接¶
打开 UI 的 Admin->Connections
部分。单击 Create
链接以创建新连接。
在
Connection Id
字段中填写所需的连接 ID。建议您使用小写字符,并用下划线分隔单词。使用
Connection Type
字段选择连接类型。填写其余字段。有关属于不同连接类型的字段的描述,请参阅 处理 extra 中的任意 dict。
单击
Save
按钮以创建连接。
从 CLI 创建连接¶
您可以从 CLI 将连接添加到数据库。
您可以使用 JSON 格式添加连接(从 2.3.0 版本开始)
airflow connections add 'my_prod_db' \
--conn-json '{
"conn_type": "my-conn-type",
"login": "my-login",
"password": "my-password",
"host": "my-host",
"port": 1234,
"schema": "my-schema",
"extra": {
"param1": "val1",
"param2": "val2"
}
}'
或者,您可以使用 Airflow 的连接 URI 格式(请参阅 生成连接 URI)。
airflow connections add 'my_prod_db' \
--conn-uri '<conn-type>://<login>:<password>@<host>:<port>/<schema>?param1=val1¶m2=val2&...'
最后,您还可以单独指定每个参数
airflow connections add 'my_prod_db' \
--conn-type 'my-conn-type' \
--conn-login 'login' \
--conn-password 'password' \
--conn-host 'host' \
--conn-port 'port' \
--conn-schema 'schema' \
...
数据库中连接的安全性¶
对于存储在 Airflow 元数据数据库中的连接,Airflow 使用 Fernet 来加密密码和其他潜在的敏感数据。它保证在没有加密密码的情况下,连接密码不能被操纵或在没有密钥的情况下读取。有关配置 Fernet 的信息,请参阅 Fernet。
测试连接¶
出于安全原因,默认情况下,Airflow UI、API 和 CLI 中都禁用了测试连接功能。
有关用户功能的更多信息,请参阅文档:https://airflow.org.cn/docs/apache-airflow/stable/security/security_model.html#capabilities-of-authenticated-ui-users。强烈建议在确保只有高度信任的 UI/API 用户拥有“编辑连接”权限之前不要启用该功能。
该功能的可用性可以通过 Airflow 配置(airflow.cfg)的核心部分中的 `test_connection` 标志来控制。它也可以通过环境变量 AIRFLOW__CORE__TEST_CONNECTION
来控制。
此配置参数接受以下值:
禁用 (Disabled):禁用测试连接功能,并禁用 UI 中的“测试连接”按钮。这也是 Airflow 配置中设置的默认值。
启用 (Enabled):启用测试连接功能,并激活 UI 中的“测试连接”按钮。
隐藏 (Hidden):禁用测试连接功能,并隐藏 UI 中的“测试连接”按钮。
启用测试连接后,可以通过 UI 中创建或编辑连接页面,通过调用连接 REST API,或者运行 airflow connections test
CLI 命令来使用。
警告
当使用 Airflow UI 或 REST API 时,此功能不适用于驻留在外部 secrets 后端的连接。
要测试连接,Airflow 会调用关联的 hook 类中的 test_connection
方法并报告结果。可能会出现连接类型没有任何关联的 hook,或者 hook 没有实现 test_connection
方法的情况。在这两种情况下,都会显示错误消息或禁用该功能(如果您正在 UI 中测试)。
注意
在 Airflow UI 中测试时,测试是从 webserver 执行的,因此此功能受限于为您的 webserver 设置的网络出口规则。
注意
如果 webserver 和 worker 机器(如果通过 Airflow UI 测试)或机器/pod(如果通过 Airflow CLI 测试)安装了不同的库或 provider 包,则测试结果可能会有所不同。
自定义连接类型¶
Airflow 允许定义自定义连接类型——包括修改连接的添加/编辑表单。自定义连接类型在社区维护的 provider 中定义,但您也可以添加自定义 provider 来添加自定义连接类型。有关如何添加自定义 provider 的说明,请参阅Provider 包。
自定义连接类型是通过 provider 提供的 Hook 定义的。Hook 可以实现协议类 DiscoverableHook
中定义的方法。请注意,您的自定义 Hook 不应从此类派生,此类是一个示例,用于说明关于您的 Hook 可能定义的类字段和方法的期望。另一个好的示例是 JdbcHook
。
通过在您的 hook 中实现这些方法,并通过 provider 元数据中的 connection-types
数组(以及已弃用的 hook-class-names
)公开它们,您可以自定义 Airflow:
添加自定义连接类型
添加从连接类型自动创建 Hook 的功能
添加自定义表单小部件,以显示和编辑您的连接 URL 中的自定义“额外”参数
隐藏未用于您的连接的字段
添加占位符,显示字段应如何格式化的示例
您可以在Provider 包中了解更多关于如何添加自定义 provider 包的详细信息。
自定义连接字段¶
可以在 Airflow webserver 的连接添加/编辑视图中添加自定义表单字段。自定义字段以 JSON 格式存储在 Connection.extra
字段中。要添加自定义字段,请实现方法 get_connection_form_widgets()
。此方法应返回一个字典。键应该是字段的字符串名称,因为它应该存储在 extra
字典中。值应该是 wtforms.fields.core.Field
的继承者。
这是一个示例
@staticmethod
def get_connection_form_widgets() -> dict[str, Any]:
"""Returns connection widgets to add to connection form"""
from flask_appbuilder.fieldwidgets import BS3TextFieldWidget
from flask_babel import lazy_gettext
from wtforms import StringField
return {
"workspace": StringField(lazy_gettext("Workspace"), widget=BS3TextFieldWidget()),
"project": StringField(lazy_gettext("Project"), widget=BS3TextFieldWidget()),
}
注意
自定义字段不再需要 extra__<conn type>__
前缀
在 Airflow 2.3 之前,如果您想要在 UI 中添加自定义字段,则必须以 extra__<conn type>__
为前缀,并且它的值将以这种方式存储在 extra
字典中。从 2.3 开始,您不再需要这样做。
方法 get_ui_field_behaviour()
允许您自定义两者的行为。例如,您可以隐藏或重新标记字段(例如,如果它未使用或重新用途),并且可以添加占位符文本。
一个例子
@staticmethod
def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["port", "host", "login", "schema"],
"relabeling": {},
"placeholders": {
"password": "Asana personal access token",
"workspace": "My workspace gid",
"project": "My project gid",
},
}
注意
如果要为与标准连接属性(即 login、password、host、scheme、port、extra)冲突的 extra
字段添加表单占位符,则必须以 extra__<conn type>__
为前缀。例如,extra__myservice__password
。
请查看 provider 以获取您可以执行的操作的示例,例如 JdbcHook
和 AsanaHook
都使用了此功能。
注意
已弃用的 hook-class-names
在 Airflow 2.2.0 之前,provider 中的连接通过 provider 元数据中的 hook-class-names
数组公开。但是,当在 worker 中使用单独的 hook 时,这已被证明效率低下,并且 hook-class-names
数组现在已替换为 connection-types
数组。在 provider 支持低于 2.2.0 的 Airflow 版本之前,应该同时存在 connection-types
和 hook-class-names
。CI 构建期间的自动化检查将验证这两个数组的一致性。
URI 格式¶
注意
从 2.3.0 版本开始,您可以使用 JSON 序列化连接。请参阅示例。
由于历史原因,Airflow 具有特殊的 URI 格式,可用于将 Connection 对象序列化为字符串值。
一般来说,Airflow 的 URI 格式如下:
my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2
上面的 URI 将生成一个与以下对象等效的 Connection
对象:
Connection(
conn_id="",
conn_type="my_conn_type",
description=None,
login="my-login",
password="my-password",
host="my-host",
port=5432,
schema="my-schema",
extra=json.dumps(dict(param1="val1", param2="val2")),
)
生成连接 URI¶
为了简化连接 URI 的生成,Connection
类有一个方便的方法 get_uri()
。它可以像这样使用:
>>> import json
>>> from airflow.models.connection import Connection
>>> c = Connection(
... conn_id="some_conn",
... conn_type="mysql",
... description="connection description",
... host="myhost.com",
... login="myname",
... password="mypassword",
... extra=json.dumps(dict(this_param="some val", that_param="other val*")),
... )
>>> print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
AIRFLOW_CONN_SOME_CONN='mysql://myname:[email protected]?this_param=some+val&that_param=other+val%2A'
此外,如果您创建了连接,则可以使用 airflow connections get
命令。
$ airflow connections get sqlite_default
Id: 40
Connection Id: sqlite_default
Connection Type: sqlite
Host: /tmp/sqlite_default.db
Schema: null
Login: null
Password: null
Port: null
Is Encrypted: false
Is Extra Encrypted: false
Extra: {}
URI: sqlite://%2Ftmp%2Fsqlite_default.db
处理额外字段中的任意字典¶
一些 JSON 结构无法在不丢失信息的情况下进行 URL 编码。对于此类 JSON,get_uri
会将整个字符串存储在 URL 查询参数 __extra__
下。
例如
>>> extra_dict = {"my_val": ["list", "of", "values"], "extra": {"nested": {"json": "val"}}}
>>> c = Connection(
... conn_type="scheme",
... host="host/location",
... schema="schema",
... login="user",
... password="password",
... port=1234,
... extra=json.dumps(extra_dict),
... )
>>> uri = c.get_uri()
>>> uri
'scheme://user:password@host%2Flocation:1234/schema?__extra__=%7B%22my_val%22%3A+%5B%22list%22%2C+%22of%22%2C+%22values%22%5D%2C+%22extra%22%3A+%7B%22nested%22%3A+%7B%22json%22%3A+%22val%22%7D%7D%7D'
我们可以验证它是否返回相同的字典:
>>> new_c = Connection(uri=uri)
>>> new_c.extra_dejson == extra_dict
True
但是对于仅存储键值对的最常见情况,则使用普通的 URL 编码。
您可以像这样验证 URI 是否已正确解析:
>>> from airflow.models.connection import Connection
>>> c = Connection(uri="my-conn-type://my-login:my-password@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.login)
my-login
>>> print(c.password)
my-password
处理连接参数中的特殊字符¶
注意
生成连接时,请使用生成连接 URI部分中描述的方便方法 Connection.get_uri
。此部分仅供参考。
手动构建 URI 时,某些字符需要特殊处理。
例如,如果您的密码中包含 /
,则此操作会失败:
>>> c = Connection(uri="my-conn-type://my-login:my-pa/ssword@my-host:5432/my-schema?param1=val1¶m2=val2")
ValueError: invalid literal for int() with base 10: 'my-pa'
要解决此问题,可以使用 quote_plus()
进行编码:
>>> c = Connection(uri="my-conn-type://my-login:my-pa%2Fssword@my-host:5432/my-schema?param1=val1¶m2=val2")
>>> print(c.password)
my-pa/ssword