Web 服务器身份验证¶
默认情况下,Airflow 要求用户在登录前指定密码。您可以使用以下 CLI 命令创建帐户
# create an admin user
airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email [email protected]
要停用身份验证并允许用户被识别为匿名用户,需要在 $AIRFLOW_HOME/webserver_config.py
中设置以下条目,并设置匿名用户默认拥有的所需角色
AUTH_ROLE_PUBLIC = 'Admin'
请务必查看 API 以了解如何保护 API。
注意
Airflow 使用 Python 的配置解析器。此配置解析器会插入“%”符号。请确保转义配置文件中的任何 %
符号(而不是环境变量),将其写为 %%
,否则 Airflow 可能会在配置解析器异常时将这些密码泄露到日志中。
其他方法¶
自 Airflow 2.0 起,默认 UI 是 Flask App Builder RBAC。将自动生成 webserver_config.py
配置文件,可用于配置 Airflow 以支持 OAuth、OpenID、LDAP、REMOTE_USER 等身份验证方法。应注意的是,由于 Flask AppBuilder 和 Authlib 的限制,仅支持部分 OAuth2 提供商。此列表包括 github
、githublocal
、twitter
、linkedin
、google
、azure
、openshift
、okta
、keycloak
和 keycloak_before_17
。
Web 身份验证 部分中描述的默认身份验证选项与 $AIRFLOW_HOME/webserver_config.py
中的以下条目相关。
AUTH_TYPE = AUTH_DB
可以使用 WSGI 中间件来管理非常具体的身份验证形式(例如,SPNEGO),并利用 REMOTE_USER 方法
from typing import Any, Callable
from flask import current_app
from flask_appbuilder.const import AUTH_REMOTE_USER
class CustomMiddleware:
def __init__(self, wsgi_app: Callable) -> None:
self.wsgi_app = wsgi_app
def __call__(self, environ: dict, start_response: Callable) -> Any:
# Custom authenticating logic here
# ...
environ["REMOTE_USER"] = "username"
return self.wsgi_app(environ, start_response)
current_app.wsgi_app = CustomMiddleware(current_app.wsgi_app)
AUTH_TYPE = AUTH_REMOTE_USER
创建用户的另一种方法是在 UI 登录页面中,允许用户通过“注册”按钮进行自我注册。可以编辑 $AIRFLOW_HOME/webserver_config.py
中的以下条目来实现此目的
AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Desired Role For The Self Registered User"
RECAPTCHA_PRIVATE_KEY = 'private_key'
RECAPTCHA_PUBLIC_KEY = 'public_key'
MAIL_SERVER = 'smtp.gmail.com'
MAIL_USE_TLS = True
MAIL_USERNAME = '[email protected]'
MAIL_PASSWORD = 'passwordformail'
MAIL_DEFAULT_SENDER = '[email protected]'
需要通过 pip 安装 Flask-Mail
包,以允许用户自我注册,因为它是由框架 Flask-AppBuilder 提供的功能。
要支持通过第三方提供商进行身份验证,需要使用所需的选项(如 OAuth、OpenID、LDAP)更新 AUTH_TYPE
条目,并且需要取消注释并配置 $AIRFLOW_HOME/webserver_config.py
中引用所选选项的行。
有关更多详细信息,请参阅 FAB 文档的安全性部分。
使用 GitHub OAuth 进行基于团队的授权的示例¶
要使用 GitHub OAuth 进行基于团队的授权,需要执行几个步骤。
通过 webserver_config.py 中的 FAB 配置配置 OAuth
创建一个自定义安全管理器类,并在 webserver_config.py 中将其提供给 FAB
将安全管理器类返回的角色映射到 FAB 可以理解的角色。
以下是您在 webserver_config.py 中可能有的示例
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
from flask_appbuilder.security.manager import AUTH_OAUTH
import os
AUTH_TYPE = AUTH_OAUTH
AUTH_ROLES_SYNC_AT_LOGIN = True # Checks roles on every login
AUTH_USER_REGISTRATION = True # allow users who are not already in the FAB DB to register
AUTH_ROLES_MAPPING = {
"Viewer": ["Viewer"],
"Admin": ["Admin"],
}
# If you wish, you can add multiple OAuth providers.
OAUTH_PROVIDERS = [
{
"name": "github",
"icon": "fa-github",
"token_key": "access_token",
"remote_app": {
"client_id": os.getenv("OAUTH_APP_ID"),
"client_secret": os.getenv("OAUTH_APP_SECRET"),
"api_base_url": "https://api.github.com",
"client_kwargs": {"scope": "read:user, read:org"},
"access_token_url": "https://github.com/login/oauth/access_token",
"authorize_url": "https://github.com/login/oauth/authorize",
"request_token_url": None,
},
},
]
class CustomSecurityManager(FabAirflowSecurityManagerOverride):
pass
# Make sure to replace this with your own implementation of AirflowSecurityManager class
SECURITY_MANAGER_CLASS = CustomSecurityManager
以下是定义自定义安全管理器的示例。此类必须在 Python 的路径中可用,并且如果您愿意,可以在 webserver_config.py 本身中定义。
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
import logging
from typing import Any, List, Union
import os
log = logging.getLogger(__name__)
log.setLevel(os.getenv("AIRFLOW__LOGGING__FAB_LOGGING_LEVEL", "INFO"))
FAB_ADMIN_ROLE = "Admin"
FAB_VIEWER_ROLE = "Viewer"
FAB_PUBLIC_ROLE = "Public" # The "Public" role is given no permissions
TEAM_ID_A_FROM_GITHUB = 123 # Replace these with real team IDs for your org
TEAM_ID_B_FROM_GITHUB = 456 # Replace these with real team IDs for your org
def team_parser(team_payload: dict[str, Any]) -> list[int]:
# Parse the team payload from GitHub however you want here.
return [team["id"] for team in team_payload]
def map_roles(team_list: list[int]) -> list[str]:
# Associate the team IDs with Roles here.
# The expected output is a list of roles that FAB will use to Authorize the user.
team_role_map = {
TEAM_ID_A_FROM_GITHUB: FAB_ADMIN_ROLE,
TEAM_ID_B_FROM_GITHUB: FAB_VIEWER_ROLE,
}
return list(set(team_role_map.get(team, FAB_PUBLIC_ROLE) for team in team_list))
class GithubTeamAuthorizer(FabAirflowSecurityManagerOverride):
# In this example, the oauth provider == 'github'.
# If you ever want to support other providers, see how it is done here:
# https://github.com/dpgaspar/Flask-AppBuilder/blob/master/flask_appbuilder/security/manager.py#L550
def get_oauth_user_info(self, provider: str, resp: Any) -> dict[str, Union[str, list[str]]]:
# Creates the user info payload from Github.
# The user previously allowed your app to act on their behalf,
# so now we can query the user and teams endpoints for their data.
# Username and team membership are added to the payload and returned to FAB.
remote_app = self.appbuilder.sm.oauth_remotes[provider]
me = remote_app.get("user")
user_data = me.json()
team_data = remote_app.get("user/teams")
teams = team_parser(team_data.json())
roles = map_roles(teams)
log.debug(f"User info from Github: {user_data}\nTeam info from Github: {teams}")
return {"username": "github_" + user_data.get("login"), "role_keys": roles}
使用 KeyCloak 进行基于团队的授权的示例¶
以下是您在 webserver_config.py 中可能有的示例
import os
import jwt
import requests
import logging
from base64 import b64decode
from cryptography.hazmat.primitives import serialization
from flask_appbuilder.security.manager import AUTH_DB, AUTH_OAUTH
from airflow import configuration as conf
from airflow.www.security import AirflowSecurityManager
log = logging.getLogger(__name__)
AUTH_TYPE = AUTH_OAUTH
AUTH_USER_REGISTRATION = True
AUTH_ROLES_SYNC_AT_LOGIN = True
AUTH_USER_REGISTRATION_ROLE = "Viewer"
OIDC_ISSUER = "https://sso.keycloak.me/realms/airflow"
# Make sure you create these role on Keycloak
AUTH_ROLES_MAPPING = {
"Viewer": ["Viewer"],
"Admin": ["Admin"],
"User": ["User"],
"Public": ["Public"],
"Op": ["Op"],
}
OAUTH_PROVIDERS = [
{
"name": "keycloak",
"icon": "fa-key",
"token_key": "access_token",
"remote_app": {
"client_id": "airflow",
"client_secret": "xxx",
"server_metadata_url": "https://sso.keycloak.me/realms/airflow/.well-known/openid-configuration",
"api_base_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect",
"client_kwargs": {"scope": "email profile"},
"access_token_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect/token",
"authorize_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect/auth",
"request_token_url": None,
},
}
]
# Fetch public key
req = requests.get(OIDC_ISSUER)
key_der_base64 = req.json()["public_key"]
key_der = b64decode(key_der_base64.encode())
public_key = serialization.load_der_public_key(key_der)
class CustomSecurityManager(AirflowSecurityManager):
def get_oauth_user_info(self, provider, response):
if provider == "keycloak":
token = response["access_token"]
me = jwt.decode(token, public_key, algorithms=["HS256", "RS256"])
# Extract roles from resource access
realm_access = me.get("realm_access", {})
groups = realm_access.get("roles", [])
log.info("groups: {0}".format(groups))
if not groups:
groups = ["Viewer"]
userinfo = {
"username": me.get("preferred_username"),
"email": me.get("email"),
"first_name": me.get("given_name"),
"last_name": me.get("family_name"),
"role_keys": groups,
}
log.info("user info: {0}".format(userinfo))
return userinfo
else:
return {}
# Make sure to replace this with your own implementation of AirflowSecurityManager class
SECURITY_MANAGER_CLASS = CustomSecurityManager