Web 服务器身份验证

默认情况下,Airflow 要求用户在登录前指定密码。您可以使用以下 CLI 命令创建帐户

# create an admin user
airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email [email protected]

要停用身份验证并允许用户被识别为匿名用户,需要在 $AIRFLOW_HOME/webserver_config.py 中设置以下条目,并设置匿名用户默认拥有的所需角色

AUTH_ROLE_PUBLIC = 'Admin'

请务必查看 API 以了解如何保护 API。

注意

Airflow 使用 Python 的配置解析器。此配置解析器会插入“%”符号。请确保转义配置文件中的任何 % 符号(而不是环境变量),将其写为 %%,否则 Airflow 可能会在配置解析器异常时将这些密码泄露到日志中。

密码

最简单的身份验证机制之一是要求用户在登录前指定密码。

请使用命令行界面 airflow users create 创建帐户,或者在 UI 中执行此操作。

其他方法

自 Airflow 2.0 起,默认 UI 是 Flask App Builder RBAC。将自动生成 webserver_config.py 配置文件,可用于配置 Airflow 以支持 OAuth、OpenID、LDAP、REMOTE_USER 等身份验证方法。应注意的是,由于 Flask AppBuilder 和 Authlib 的限制,仅支持部分 OAuth2 提供商。此列表包括 githubgithublocaltwitterlinkedingoogleazureopenshiftoktakeycloakkeycloak_before_17

Web 身份验证 部分中描述的默认身份验证选项与 $AIRFLOW_HOME/webserver_config.py 中的以下条目相关。

AUTH_TYPE = AUTH_DB

可以使用 WSGI 中间件来管理非常具体的身份验证形式(例如,SPNEGO),并利用 REMOTE_USER 方法

from typing import Any, Callable

from flask import current_app
from flask_appbuilder.const import AUTH_REMOTE_USER


class CustomMiddleware:
    def __init__(self, wsgi_app: Callable) -> None:
        self.wsgi_app = wsgi_app

    def __call__(self, environ: dict, start_response: Callable) -> Any:
        # Custom authenticating logic here
        # ...
        environ["REMOTE_USER"] = "username"
        return self.wsgi_app(environ, start_response)


current_app.wsgi_app = CustomMiddleware(current_app.wsgi_app)

AUTH_TYPE = AUTH_REMOTE_USER

创建用户的另一种方法是在 UI 登录页面中,允许用户通过“注册”按钮进行自我注册。可以编辑 $AIRFLOW_HOME/webserver_config.py 中的以下条目来实现此目的

AUTH_USER_REGISTRATION = True
AUTH_USER_REGISTRATION_ROLE = "Desired Role For The Self Registered User"
RECAPTCHA_PRIVATE_KEY = 'private_key'
RECAPTCHA_PUBLIC_KEY = 'public_key'

MAIL_SERVER = 'smtp.gmail.com'
MAIL_USE_TLS = True
MAIL_USERNAME = '[email protected]'
MAIL_PASSWORD = 'passwordformail'
MAIL_DEFAULT_SENDER = '[email protected]'

需要通过 pip 安装 Flask-Mail 包,以允许用户自我注册,因为它是由框架 Flask-AppBuilder 提供的功能。

要支持通过第三方提供商进行身份验证,需要使用所需的选项(如 OAuth、OpenID、LDAP)更新 AUTH_TYPE 条目,并且需要取消注释并配置 $AIRFLOW_HOME/webserver_config.py 中引用所选选项的行。

有关更多详细信息,请参阅 FAB 文档的安全性部分

使用 GitHub OAuth 进行基于团队的授权的示例

要使用 GitHub OAuth 进行基于团队的授权,需要执行几个步骤。

  • 通过 webserver_config.py 中的 FAB 配置配置 OAuth

  • 创建一个自定义安全管理器类,并在 webserver_config.py 中将其提供给 FAB

  • 将安全管理器类返回的角色映射到 FAB 可以理解的角色。

以下是您在 webserver_config.py 中可能有的示例

from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
from flask_appbuilder.security.manager import AUTH_OAUTH
import os

AUTH_TYPE = AUTH_OAUTH
AUTH_ROLES_SYNC_AT_LOGIN = True  # Checks roles on every login
AUTH_USER_REGISTRATION = True  # allow users who are not already in the FAB DB to register

AUTH_ROLES_MAPPING = {
    "Viewer": ["Viewer"],
    "Admin": ["Admin"],
}
# If you wish, you can add multiple OAuth providers.
OAUTH_PROVIDERS = [
    {
        "name": "github",
        "icon": "fa-github",
        "token_key": "access_token",
        "remote_app": {
            "client_id": os.getenv("OAUTH_APP_ID"),
            "client_secret": os.getenv("OAUTH_APP_SECRET"),
            "api_base_url": "https://api.github.com",
            "client_kwargs": {"scope": "read:user, read:org"},
            "access_token_url": "https://github.com/login/oauth/access_token",
            "authorize_url": "https://github.com/login/oauth/authorize",
            "request_token_url": None,
        },
    },
]


class CustomSecurityManager(FabAirflowSecurityManagerOverride):
    pass


# Make sure to replace this with your own implementation of AirflowSecurityManager class
SECURITY_MANAGER_CLASS = CustomSecurityManager

以下是定义自定义安全管理器的示例。此类必须在 Python 的路径中可用,并且如果您愿意,可以在 webserver_config.py 本身中定义。

from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
import logging
from typing import Any, List, Union
import os

log = logging.getLogger(__name__)
log.setLevel(os.getenv("AIRFLOW__LOGGING__FAB_LOGGING_LEVEL", "INFO"))

FAB_ADMIN_ROLE = "Admin"
FAB_VIEWER_ROLE = "Viewer"
FAB_PUBLIC_ROLE = "Public"  # The "Public" role is given no permissions
TEAM_ID_A_FROM_GITHUB = 123  # Replace these with real team IDs for your org
TEAM_ID_B_FROM_GITHUB = 456  # Replace these with real team IDs for your org


def team_parser(team_payload: dict[str, Any]) -> list[int]:
    # Parse the team payload from GitHub however you want here.
    return [team["id"] for team in team_payload]


def map_roles(team_list: list[int]) -> list[str]:
    # Associate the team IDs with Roles here.
    # The expected output is a list of roles that FAB will use to Authorize the user.

    team_role_map = {
        TEAM_ID_A_FROM_GITHUB: FAB_ADMIN_ROLE,
        TEAM_ID_B_FROM_GITHUB: FAB_VIEWER_ROLE,
    }
    return list(set(team_role_map.get(team, FAB_PUBLIC_ROLE) for team in team_list))


class GithubTeamAuthorizer(FabAirflowSecurityManagerOverride):
    # In this example, the oauth provider == 'github'.
    # If you ever want to support other providers, see how it is done here:
    # https://github.com/dpgaspar/Flask-AppBuilder/blob/master/flask_appbuilder/security/manager.py#L550
    def get_oauth_user_info(self, provider: str, resp: Any) -> dict[str, Union[str, list[str]]]:
        # Creates the user info payload from Github.
        # The user previously allowed your app to act on their behalf,
        #   so now we can query the user and teams endpoints for their data.
        # Username and team membership are added to the payload and returned to FAB.

        remote_app = self.appbuilder.sm.oauth_remotes[provider]
        me = remote_app.get("user")
        user_data = me.json()
        team_data = remote_app.get("user/teams")
        teams = team_parser(team_data.json())
        roles = map_roles(teams)
        log.debug(f"User info from Github: {user_data}\nTeam info from Github: {teams}")
        return {"username": "github_" + user_data.get("login"), "role_keys": roles}

使用 KeyCloak 进行基于团队的授权的示例

以下是您在 webserver_config.py 中可能有的示例

import os
import jwt
import requests
import logging
from base64 import b64decode
from cryptography.hazmat.primitives import serialization
from flask_appbuilder.security.manager import AUTH_DB, AUTH_OAUTH
from airflow import configuration as conf
from airflow.www.security import AirflowSecurityManager

log = logging.getLogger(__name__)

AUTH_TYPE = AUTH_OAUTH
AUTH_USER_REGISTRATION = True
AUTH_ROLES_SYNC_AT_LOGIN = True
AUTH_USER_REGISTRATION_ROLE = "Viewer"
OIDC_ISSUER = "https://sso.keycloak.me/realms/airflow"

# Make sure you create these role on Keycloak
AUTH_ROLES_MAPPING = {
    "Viewer": ["Viewer"],
    "Admin": ["Admin"],
    "User": ["User"],
    "Public": ["Public"],
    "Op": ["Op"],
}

OAUTH_PROVIDERS = [
    {
        "name": "keycloak",
        "icon": "fa-key",
        "token_key": "access_token",
        "remote_app": {
            "client_id": "airflow",
            "client_secret": "xxx",
            "server_metadata_url": "https://sso.keycloak.me/realms/airflow/.well-known/openid-configuration",
            "api_base_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect",
            "client_kwargs": {"scope": "email profile"},
            "access_token_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect/token",
            "authorize_url": "https://sso.keycloak.me/realms/airflow/protocol/openid-connect/auth",
            "request_token_url": None,
        },
    }
]

# Fetch public key
req = requests.get(OIDC_ISSUER)
key_der_base64 = req.json()["public_key"]
key_der = b64decode(key_der_base64.encode())
public_key = serialization.load_der_public_key(key_der)


class CustomSecurityManager(AirflowSecurityManager):
    def get_oauth_user_info(self, provider, response):
        if provider == "keycloak":
            token = response["access_token"]
            me = jwt.decode(token, public_key, algorithms=["HS256", "RS256"])

            # Extract roles from resource access
            realm_access = me.get("realm_access", {})
            groups = realm_access.get("roles", [])

            log.info("groups: {0}".format(groups))

            if not groups:
                groups = ["Viewer"]

            userinfo = {
                "username": me.get("preferred_username"),
                "email": me.get("email"),
                "first_name": me.get("given_name"),
                "last_name": me.get("family_name"),
                "role_keys": groups,
            }

            log.info("user info: {0}".format(userinfo))

            return userinfo
        else:
            return {}


# Make sure to replace this with your own implementation of AirflowSecurityManager class
SECURITY_MANAGER_CLASS = CustomSecurityManager

此条目是否有帮助?