Amazon Web Services 连接¶
Amazon Web Services 连接类型启用 AWS 集成。
重要提示
可以在 UI/API 中或通过调用 test_connection() 来测试 Amazon Web Services 连接,**重要**的是正确解释此测试的结果。在此测试期间,Amazon Provider 的组件会调用 AWS Security Token Service API GetCallerIdentity。该服务只能检查您的凭证是否有效。遗憾的是,无法验证凭证是否对特定的 AWS 服务拥有访问权限。
如果您使用 Amazon Provider 与兼容 AWS API 的服务(如 MinIO、LocalStack 等)通信,连接测试失败 **并不意味着** 您的连接凭证错误。许多兼容服务仅提供有限的 AWS API 服务,并且大多数未实现 AWS STS 的 GetCallerIdentity 方法。
认证至 AWS¶
认证可以使用 Boto3 指南 Credentials 中描述的任意选项进行。或者,也可以将凭证作为连接初始化参数传入。
要使用 IAM 实例配置文件,请创建一个“空”连接(即未指定 AWS Access Key ID 或 AWS Secret Access Key,或使用 aws://)。
默认连接 ID¶
默认的连接 ID 为 aws_default。如果运行 Airflow 的环境/机器在 ${HOME}/.aws/ 中有凭证文件,并且默认连接的用户和密码字段为空,则会自动使用该处的凭证。
重要提示
之前,在安装时 aws_default 连接的 “extras” 字段被设为 {\"region_name\": \"us-east-1\"}。这意味着默认情况下 aws_default 连接使用 us-east-1 区域。现在情况已变更,需要手动设置区域,可在 Airflow 的连接界面中设置,或通过 AWS_DEFAULT_REGION 环境变量设置。
注意
如果您未运行 “airflow connections create-default-connections” 命令,通常意味着没有 aws_default。出于历史原因,Amazon Provider 组件(Hooks、Operators、Sensors 等)在缺少连接 ID 时会回退到默认的 boto3 凭证策略。
如果您需要使用默认的 boto3 凭证策略(环境变量中的凭证、IAM 配置文件等),请提供 None 而不是缺失的连接 ID,以避免日志中出现警告。
配置连接¶
- AWS Access Key ID(可选)
指定用于初始连接的 AWS Access Key ID。如果在 Extra 字段中指定
role_arn进行 **Assume Role**,则后续对 AWS 的调用将使用临时凭证。- AWS Secret Access Key(可选)
指定用于初始连接的 AWS Secret Access Key。如果在 Extra 字段中指定
role_arn进行 **Assume Role**,则后续对 AWS 的调用将使用临时凭证。- 额外(可选)
指定可在 AWS 连接中使用的额外参数(JSON 字典形式)。所有参数均为可选。
以下额外参数用于创建初始
boto3.session.Sessionaws_access_key_id:用于初始连接的 AWS Access Key ID。aws_secret_access_key:用于初始连接的 AWS Secret Access Key。aws_session_token:如果使用外部凭证,则用于初始连接的 AWS 会话令牌。您需自行负责刷新该令牌。region_name:连接所使用的 AWS 区域。profile_name:要使用的配置文件名称,列于配置和凭证文件设置中。
以下额外参数用于 **Assume Role**
role_arn:如果指定,则假定该角色,并使用assume_role_method获取一组临时安全凭证。assume_role_method:AWS STS 客户端方法,可为 assume_role、assume_role_with_saml 或 assume_role_with_web_identity。如果未指定,则使用 **assume_role**。assume_role_kwargs:传递给assume_role_method的额外 **kwargs**。
当
assume_role_method设置为assume_role_with_web_identity时,可使用以下额外参数assume_role_with_web_identity_federation:联合类型,用于决定使用哪种令牌加载器来获取访问令牌。目前支持file和google。assume_role_with_web_identity_token_file:文件系统中包含用于通过 AWS STS 服务进行身份验证的访问令牌的文件路径(适用于file联合类型)。如果未指定,则使用环境变量AWS_WEB_IDENTITY_TOKEN_FILE的值。assume_role_with_web_identity_federation_audience:访问令牌的aud声明(适用于google联合类型)。
以下额外参数会传递给
boto3.session.Session.client()或boto3.session.Session.resource()。config_kwargs:用于构造 botocore.config.Config 的额外 **kwargs**。若要匿名访问公共 AWS 资源(等价于 signature_version=botocore.UNSIGNED),请在 config_kwargs 中设定 “signature_version”=”unsigned”。endpoint_url:全局端点 URL。您也可以通过service_config为每个 AWS 服务单独指定端点 URL,详细信息请参见 AWS Service Endpoint URL configuration。verify:是否验证 SSL 证书。False- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑文件名。如果您想使用与 botocore 不同的 CA 证书捆绑,可指定此参数。
以下额外参数用于特定的 AWS 服务
service_config:json,用于为每个 AWS 服务 / Amazon provider hook 指定配置/参数,详细信息请参见 Per-service configuration。
如果通过 URI 配置连接,请确保 URI 的所有组件已进行 URL 编码。
示例¶
创建连接并转换为 URI 的示例代码¶
import os from airflow.models.connection import Connection conn = Connection( conn_id="sample_aws_connection", conn_type="aws", login="AKIAIOSFODNN7EXAMPLE", # Reference to AWS Access Key ID password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", # Reference to AWS Secret Access Key extra={ # Specify extra parameters here "region_name": "eu-central-1", }, ) # Generate Environment Variable Name and Connection URI env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}" conn_uri = conn.get_uri() print(f"{env_key}={conn_uri}") # AIRFLOW_CONN_SAMPLE_AWS_CONNECTION=aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?region_name=eu-central-1 os.environ[env_key] = conn_uri print(conn.test_connection()) # Validate connection credentials.警告
使用 Airflow CLI 时,当
登录
password
host
port
未提供主机时,可能需要添加
@,请参见下面的示例。这是已知的 Airflow 限制。
airflow connections add aws_conn --conn-uri aws://@/?region_name=eu-west-1
使用实例配置文件¶
这将使用 boto 的默认凭证查找链(~/.boto/ 配置文件中的 “default” 配置文件,以及在 AWS 内部运行时使用实例配置文件)。
URI 格式示例
export AIRFLOW_CONN_AWS_DEFAULT=aws://JSON 格式示例
export AIRFLOW_CONN_AWS_DEFAULT='{"conn_type": "aws"}'
使用 AWS IAM 密钥对¶
URI 格式示例
export AIRFLOW_CONN_AWS_DEFAULT=aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@请注意,密钥已进行 URL 编码(将
/替换为%2F),并且末尾必须保留@(若缺失,会被当作<host>:<port>处理而失效)。JSON 格式示例
export AIRFLOW_CONN_AWS_DEFAULT='{ "conn_type": "aws", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" }'
**Extra** 字段示例¶
使用 ~/.aws/credentials 和 ~/.aws/config 文件,并指定 profile。
这假设所有其他连接字段(例如 AWS Access Key ID 或 AWS Secret Access Key)均为空。
{
"profile_name": "my_profile"
}
指定 role_arn 进行 assume 并设置 region_name
{
"role_arn": "arn:aws:iam::112223334444:role/my_role",
"region_name": "ap-southeast-2"
}
另请参阅
配置出站 HTTP 代理
{
"config_kwargs": {
"proxies": {
"http": "http://myproxy.mycompany.local:8080",
"https": "http://myproxy.mycompany.local:8080"
}
}
}
使用 AssumeRoleWithWebIdentity(基于文件的令牌)
{
"role_arn": "arn:aws:iam::112223334444:role/my_role",
"assume_role_method": "assume_role_with_web_identity",
"assume_role_with_web_identity_federation": "file",
"assume_role_with_web_identity_token_file": "/path/to/access_token"
}
使用 AssumeRoleWithSAML
{
"region_name":"eu-west-1",
"role_arn":"arn:aws:iam::112223334444:role/my_role",
"assume_role_method":"assume_role_with_saml",
"assume_role_with_saml":{
"principal_arn":"arn:aws:iam::112223334444:saml-provider/my_saml_provider",
"idp_url":"https://idp.mycompany.local/.../saml/clients/amazon-aws",
"idp_auth_method":"http_spegno_auth",
"mutual_authentication":"OPTIONAL",
"idp_request_kwargs":{
"headers":{"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"},
"verify":false
},
"idp_request_retry_kwargs": {
"total": 10,
"backoff_factor":1,
"status":10,
"status_forcelist": [400, 429, 500, 502, 503, 504]
},
"log_idp_response":false,
"saml_response_xpath":"////INPUT[@NAME='SAMLResponse']/@VALUE",
},
"assume_role_kwargs": { "something":"something" }
}
以下设置可在 Extra 中的 assume_role_with_saml 容器内使用。
principal_arn:在 IAM 中创建的 SAML 提供商的 ARN,用于描述身份提供商。
idp_url:指向您的 IDP 端点的 URL,提供 SAML 断言。
idp_auth_method:指定 “http_spegno_auth” 以使用 Pythonrequests_gssapi库。该库比requests_kerberos更为新颖且向后兼容。请参阅 PyPI 上的requests_gssapi文档。
mutual_authentication:可设为 “REQUIRED”、 “OPTIONAL” 或 “DISABLED”。详情请参考 PyPI 上的requests_gssapi文档。
idp_request_kwargs:在向 IDP(通过 HTTP/S)请求时传递给requests的额外kwargs。
idp_request_retry_kwargs:用于构造 urllib3.util.Retry 的额外kwargs,作为向 IDP 请求时的重试策略。
log_idp_response:调试用 - 如指定,将把 IDP 响应内容打印到日志。请注意,成功的响应可能包含敏感信息!
saml_response_xpath:使用 XML / HTML xpath 查询 IDP 响应的方式。
assume_role_kwargs:传递给sts_client.assume_role_with_saml的额外kwargs。
注意
requests_gssapi 库用于从您的 IDP 获取 SAML 响应。您可能需要先执行 pip uninstall python-gssapi 然后 pip install gssapi 才能正常工作。python-gssapi 已过时,并且会与 Airflow 其它地方使用的某些 paramiko 版本冲突。
另请参阅
每项服务配置¶
AWS Service Endpoint URL 配置¶
若要在单个连接中为特定 AWS 服务使用 endpoint_url,可以在 service_config 中进行设置。若想强制使用默认的 botocore/boto3 行为,可将该值设为 null。优先级如下:
endpoint_url在服务级别单独指定。
endpoint_url在连接 extra 的根级别指定。请注意,**sts** 客户端(在 assume role 或 test connection 中使用)不使用全局参数。默认
botocore/boto3行为
{
"endpoint_url": "s3.amazonaws.com"
"service_config": {
"s3": {
"endpoint_url": "https://s3.eu-west-1.amazonaws.com"
},
"sts": {
"endpoint_url": "https://sts.eu-west-2.amazonaws.com"
},
"ec2": {
"endpoint_url": null
}
}
}
S3 存储桶配置¶
若要在 S3Hook 方法中使用每个连接的 S3 存储桶名称,请在连接的 extra 字段中提供相应选项。
注意
Hook 方法中的 bucket_name 参数会覆盖此连接设置。
{
"service_config": {
"s3": {
"bucket_name": "awesome-bucket"
}
}
}
避免 Throttling 异常¶
Amazon Web Services 对同时调用的 API 有配额限制,频繁调用时 apache-airflow-providers-amazon 组件可能因限流异常而失败,例如 ThrottlingException、ProvisionedThroughputExceededException。
botocore.config.Config 开箱即支持不同的指数退避模式:legacy、standard、adaptive。
默认情况下,botocore.config.Config 使用 legacy 模式,最多重试 5 次,在某些情况下可能不够。
如果遇到限流异常,您可以将模式改为 standard 并增加重试次数。
另请参阅
Boto3 指南:Retries
在连接中设置¶
- 连接的 extra 字段:
{ "config_kwargs": { "retries": { "mode": "standard", "max_attempts": 10 } } }
在 AWS 配置文件中设置¶
- ~/.aws/config:
[profile awesome_aws_profile] retry_mode = standard max_attempts = 10
- 连接的 extra 字段:
{ "profile_name": "awesome_aws_profile" }
通过环境变量设置¶
注意
这将在所有连接上设置重试模式,除非对特定连接显式设置了其他重试配置。
export AWS_RETRY_MODE=standard export AWS_MAX_ATTEMPTS=10
会话工厂¶
默认的 BaseSessionFactory 能处理大多数 AWS 认证方式。如果您希望完全控制 boto3.session.Session 的创建,或使用需要外部进程提供凭证的自定义 联合,可以子类化 BaseSessionFactory 并根据需求覆盖 create_session 和/或 _create_basic_session 方法。
您还需要在 AwsBaseHook 的配置中提供自定义实现的完整路径,以便使用自定义会话工厂。
示例¶
- 配置:
[aws] session_factory = my_company.aws.MyCustomSessionFactory
- 连接的 extra 字段:
{ "federation": { "username": "my_username", "password": "my_password" } }
- 自定义会话工厂:
def get_federated_aws_credentials(username: str, password: str): """ Mock interaction with federation endpoint/process and returns AWS credentials. """ return { "Version": 1, "AccessKeyId": "key", "SecretAccessKey": "secret", "SessionToken": "token", "Expiration": "2050-12-31T00:00:00.000Z", } class MyCustomSessionFactory(BaseSessionFactory): @property def federated(self): return "federation" in self.extra_config def _create_basic_session(self, session_kwargs: dict[str, Any]) -> boto3.session.Session: if self.federated: return self._create_federated_session(session_kwargs) else: return super()._create_basic_session(session_kwargs) def _create_federated_session(self, session_kwargs: dict[str, Any]) -> boto3.session.Session: username = self.extra_config["federation"]["username"] region_name = self._get_region_name() self.log.debug( f"Creating federated session with username={username} region_name={region_name} for " f"connection {self.conn.conn_id}" ) credentials = RefreshableCredentials.create_from_metadata( metadata=self._refresh_federated_credentials(), refresh_using=self._refresh_federated_credentials, method="custom-federation", ) session = botocore.session.get_session() session._credentials = credentials session.set_config_variable("region", region_name) return boto3.session.Session(botocore_session=session, **session_kwargs) def _refresh_federated_credentials(self) -> dict[str, str]: self.log.debug("Refreshing federated AWS credentials") credentials = get_federated_aws_credentials(**self.extra_config["federation"]) access_key_id = credentials["AccessKeyId"] expiry_time = credentials["Expiration"] self.log.info( f"New federated AWS credentials received with aws_access_key_id={access_key_id} and " f"expiry_time={expiry_time} for connection {self.conn.conn_id}" ) return { "access_key": access_key_id, "secret_key": credentials["SecretAccessKey"], "token": credentials["SessionToken"], "expiry_time": expiry_time, }
使用 Web 身份联合进行 Google Cloud 到 AWS 认证¶
得益于 Web Identity Federation,您可以使用 Google Cloud 平台的凭证来授权访问 Amazon Web Service 平台。如果再结合从 metadata server 或 Workload Identity 获取的访问令牌进行授权,则可以通过消除长期凭证来提升环境的安全性。
Google Cloud 的凭证会通过 AWS Security Token Service 交换为 Amazon Web Service 的 临时凭证。
下图展示了获取 AWS 凭证的典型通信流程。
Communication Flow Diagram¶
角色设置¶
为了让 AWS 能识别 Google 身份,必须在 AWS 中配置角色。
您可以使用角色向导或 Terraform 完成此操作。
角色向导¶
创建用于 Web 身份联合的 IAM 角色
登录 AWS 管理控制台并打开 IAM 控制台。
在导航窗格中,选择 Roles,然后选择 Create role。
选择 Web identity 角色类型。
在 “Identity provider” 下拉框中选择 Google。
在 Audience 框中输入服务帐号的电子邮件地址(格式为
<NAME>@<PROJECT_ID>.iam.gserviceaccount.com)。检查您的 Web 身份信息,然后选择 Next: Permissions。
选择要用于权限的策略,或选择 Create policy 打开新标签页并从头创建新策略。更多信息请参见 创建 IAM 策略。
选择 Next: Tags。
(可选) 通过添加键‑值对形式的标签为角色附加元数据。有关在 IAM 中使用标签的更多信息,请参阅 标记 IAM 用户和角色。
选择 Next: Review。
在 Role name 中输入角色名称。角色名称在您的 AWS 账户内必须唯一。
(可选) 在 Role description** 中为新角色输入描述。
检查角色信息后,选择 Create role。
更多信息请参见:在控制台中为 Web 身份或 OpenID Connect 联合创建角色
最终,您应得到一个类似于下方策略的角色。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "accounts.google.com"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"accounts.google.com:aud": "<NAME>@<PROJECT_ID>.iam.gserviceaccount.com"
}
}
}
]
}
为了防止 Google OpenID 令牌被滥用,您还可以通过为每个受众配置限制来限制使用范围。您需要在连接中配置相同的值,该值也会包含在 ID Token 中。AWS 将检查此值是否匹配。为此,您可以在策略中添加新条件。
{
"Condition": {
"StringEquals": {
"accounts.google.com:aud": "<NAME>@<PROJECT_ID>.iam.gserviceaccount.com",
"accounts.google.com:oaud": "service-amp.my-company.com"
}
}
}
创建角色后,您需要在 Airflow 中配置相应的连接。
Terraform¶
为快速配置新角色,您可以使用下面的 Terraform 脚本,它会同时配置 AWS 角色及其对应的策略。使用前请在 locals 部分根据您自己的环境修改变量。
google_service_account- 将拥有使用该角色权限的服务帐号的电子邮件地址。google_openid_audience- 在 Airflow 角色和连接中配置的常量值,用于防止 Google ID 令牌被滥用。aws_role_name- 新建 AWS 角色的名称。aws_policy_name- 新建 AWS 策略的名称。
更多关于使用 Terraform 脚本的信息,请参阅:Terraform 文档 - 入门 - AWS
执行计划后,您应在 Airflow 中配置相应的连接。
连接设置¶
要使用 Google 身份,连接的 "assume_role_method" 必须设为 "assume_role_with_web_identity",并且在 extra 部分的 "assume_role_with_web_identity_federation" 必须设为 "google"。同时需在 "role_arn" 字段中配置相应角色。可选地,您可以通过配置 "assume_role_with_web_identity_federation_audience" 字段来限制 Google Open ID 令牌的使用范围。这些字段的值必须与角色中配置的值相匹配。
Airflow 将基于 Application Default Credentials 获取 Google 的凭证。
以下是一个示例连接配置。
{
"role_arn": "arn:aws:iam::240057002457:role/WebIdentity-Role",
"assume_role_method": "assume_role_with_web_identity",
"assume_role_with_web_identity_federation": "google",
"assume_role_with_web_identity_federation_audience": "service_a.apache.com"
}
您同样可以使用环境变量 AIRFLOW_CONN_{CONN_ID} 来配置连接。
export AIRFLOW_CONN_AWS_DEFAULT="aws://\
?role_arn=arn%3Aaws%3Aiam%3A%3A240057002457%3Arole%2FWebIdentity-Role&\
assume_role_method=assume_role_with_web_identity&\
assume_role_with_web_identity_federation=google&\
assume_role_with_web_identity_federation_audience=aaa.polidea.com"
在 EKS 上使用服务账户的 IAM 角色 (IRSA)¶
如果您在 Amazon EKS 上运行 Airflow,可以通过将 IAM 角色授予其服务账户的方式,为 Airflow 服务授予 AWS 相关权限(例如远程日志的 S3 读/写)。IRSA 为在 EKS 上运行并使用其他 AWS 服务(如 S3、Secrets Manager、CloudWatch、DynamoDB 等)的应用(如 pod)提供细粒度的权限管理。
要启用此功能,需要遵循以下步骤:
在 EKS 集群上创建 IAM OIDC Provider。
创建 IAM 角色和策略,并将其附加到第 1 步创建的 Web 身份提供商对应的 Airflow 服务账户。
将对应的 IAM 角色作为注解添加到 Airflow 服务账户。
随后您可以在相应 pod 的环境变量中找到 AWS_ROLE_ARN 和 AWS_WEB_IDENTITY_TOKEN_FILE(这些变量由 Amazon EKS Pod Identity Web Hook 添加)。boto3 将使用这些变量配置凭证。若要在 Airflow 中使用 IRSA,需创建一个所有字段均为空的 AWS 连接。如果设置了诸如 role-arn 等字段,Airflow 将不会遵循 boto3 的默认流程,而是手动使用连接字段创建会话。如果未更改默认连接 ID,名为 aws_default 的空 AWS 连接即可满足需求。
使用 eksctl 为服务账户创建 IAM 角色(IRSA)¶
eksctl 是一款用于在 EKS 上创建和管理集群的简易 CLI 工具。请按照下列步骤为 Airflow 创建 IRSA。
在终端中配置 AWS 凭证,以便运行
eksctl命令。默认情况下 IAM OIDC Provider 未启用,您可以使用以下命令启用它。
eksctl utils associate-iam-oidc-provider --cluster="<EKS_CLUSTER_ID>" --approve
4. 将 EKS_CLUSTER_ID、SERVICE_ACCOUNT_NAME 和 NAMESPACE 替换为实际值后执行以下命令。该命令会使用已有的 EKS 集群 ID 创建 IAM 角色、服务账户和命名空间。
eksctl create iamserviceaccount --cluster="<EKS_CLUSTER_ID>" --name="<SERVICE_ACCOUNT_NAME>" --namespace="<NAMESPACE>" --attach-policy-arn="<IAM_POLICY_ARN>" --approve``
以下为示例命令,其中使用了附带全部 S3 权限的托管策略。强烈建议您自行创建仅包含 S3、Secrets Manager、CloudWatch 等所需权限的受限 IAM 策略,并使用 --attach-policy-arn 指定。
eksctl create iamserviceaccount --cluster=airflow-eks-cluster --name=airflow-sa --namespace=airflow --attach-policy-arn=arn:aws:iam::aws:policy/AmazonS3FullAccess --approve
在 Airflow Helm Chart 部署或使用 Kubernetes Pod Operator 时使用该服务账户名称。
使用 Terraform 为服务账户创建 IAM 角色(IRSA)¶
对于 Terraform 用户,可使用 Amazon EKS Blueprints for Terraform 模块创建 IRSA 角色。
该模块会创建新的 IAM 角色、服务账户和命名空间,并将 IAM 角色关联到服务账户并添加相应的注解。您需要自行创建包含容器所需权限的 IAM 策略。将 IAM_POLICY_ARN 替换为您的 IAM 策略 ARN,填入其他必要输入后执行 terraform apply。
module "airflow_irsa" {
source = "github.com/aws-ia/terraform-aws-eks-blueprints//modules/irsa"
eks_cluster_id = "<EKS_CLUSTER_ID>"
eks_oidc_provider_arn = "<EKS_CLUSTER_OIDC_PROVIDER_ARN>"
irsa_iam_policies = ["<IAM_POLICY_ARN>"]
kubernetes_namespace = "<NAMESPACE>"
kubernetes_service_account = "<SERVICE_ACCOUNT_NAME>"
}
模块应用后,您即可在 Airflow 部署或使用 Kubernetes Pod Operator 时使用该服务账户。