Amazon Web Services 连接¶
Amazon Web Services 连接类型启用了AWS 集成。
重要
可以在 UI/API 中或通过调用 test_connection()
来测试 Amazon Web Services 连接。**重要**的是正确解释此测试的结果。在此测试期间,Amazon Provider 的组件会调用 AWS Security Token Service API GetCallerIdentity。此服务**仅**能检查您的凭据是否有效。遗憾的是,无法验证凭据是否具有访问特定 AWS 服务的权限。
如果您使用 Amazon Provider 与 AWS API 兼容服务(MinIO、LocalStack 等)通信,则测试连接失败**并不意味着**您的连接凭据错误。许多兼容服务仅提供有限数量的 AWS API 服务,并且大多数服务不实现 AWS STS GetCallerIdentity 方法。
向 AWS 进行身份验证¶
可以使用 Boto3 指南 凭据 中描述的任何选项执行身份验证。或者,可以将凭据作为连接初始化参数传入。
要使用 IAM 实例配置文件,请创建一个“空”连接(即,未指定 AWS 访问密钥 ID 或 AWS 秘密访问密钥,或使用 aws://
的连接)。
默认连接 ID¶
默认连接 ID 为 aws_default
。如果您运行 Airflow 的环境/计算机在 ${HOME}/.aws/
中具有文件凭据,并且默认连接的用户和密码字段为空,则它将自动从此处获取凭据。
重要
以前,aws_default
连接的“extras”字段在安装时设置为 {"region_name": "us-east-1"}
。这意味着默认情况下,aws_default
连接使用 us-east-1
区域。现在不再是这种情况,需要手动设置区域,可以在 Airflow 的连接屏幕中设置,也可以通过 AWS_DEFAULT_REGION
环境变量设置。
注意
如果您不运行“airflow connections create-default-connections”命令,则很可能没有 aws_default
。出于历史原因,Amazon Provider 组件(Hook、操作符、传感器等)在缺少连接 ID 的情况下会回退到默认的 boto3 凭据策略。
如果您需要使用默认的 boto3 凭据策略(环境变量中的凭据、IAM 配置文件等),请提供 None
,而不是缺少连接 ID,以避免在日志中收到警告。
配置连接¶
- AWS 访问密钥 ID(可选)
指定用于初始连接的 AWS 访问密钥 ID。如果您通过在 **Extra** 字段中指定
role_arn
来 承担角色,则临时凭据将用于后续对 AWS 的调用。- AWS 秘密访问密钥(可选)
指定用于初始连接的 AWS 秘密访问密钥。如果您通过在 **Extra** 字段中指定
role_arn
来 承担角色,则临时凭据将用于后续对 AWS 的调用。- 额外(可选)
指定可在 AWS 连接中使用的额外参数(作为 JSON 字典)。所有参数都是可选的。
以下额外参数用于创建初始
boto3.session.Session
aws_access_key_id
:用于初始连接的 AWS 访问密钥 ID。aws_secret_access_key
:用于初始连接的 AWS 秘密访问密钥。aws_session_token
:如果您使用外部凭据,则用于初始连接的 AWS 会话令牌。您负责更新这些令牌。region_name
:连接的 AWS 区域。profile_name
:要使用的配置文件名称,该名称在 配置和凭据文件设置 中列出。
以下额外参数用于 承担角色
role_arn
:如果指定,则承担此角色,使用assume_role_method
获取一组临时安全凭据。assume_role_method
:AWS STS 客户端方法,可以是 assume_role、assume_role_with_saml 或 assume_role_with_web_identity,如果未指定,则使用 **assume_role**。assume_role_kwargs
:传递给assume_role_method
的附加 **kwargs**。
如果
assume_role_method
设置为assume_role_with_web_identity
,则可以使用以下额外参数assume_role_with_web_identity_federation
:联合类型,用于确定使用哪个令牌加载器来检索访问令牌。目前支持file
和google
。assume_role_with_web_identity_token_file
:文件系统中包含用于使用 AWS STS 服务进行身份验证的访问令牌的文件路径,适用于file
联合类型。如果未指定,则将使用AWS_WEB_IDENTITY_TOKEN_FILE
环境变量的值。assume_role_with_web_identity_federation_audience
:如果使用google
联合类型,则为访问令牌的aud
声明。
以下额外参数将传递给
boto3.session.Session.client()
或boto3.session.Session.resource()
。config_kwargs
:用于构造 botocore.config.Config 的附加 **kwargs**。要匿名访问公共 AWS 资源(相当于 signature_version=botocore.UNSGINED),请在 config_kwargs 中设置 “signature_version”=”unsigned”。endpoint_url
:连接的全局端点 URL。您可以通过使用service_config
为每个 AWS 服务指定端点 URL,有关更多详细信息,请参阅 AWS 服务端点 URL 配置verify
:是否验证 SSL 证书。False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果您想使用与 botocore 使用的 CA 证书包不同的证书包,则可以指定此参数。
以下额外参数用于特定的 AWS 服务
service_config
:用于指定每个 AWS 服务/Amazon Provider Hook 的配置/参数的 JSON,有关更多详细信息,请参阅 每个服务配置。
如果您通过 URI 配置连接,请确保 URI 的所有组件都经过 URL 编码。
示例¶
创建连接并转换为 URI 的代码片段¶
import os from airflow.models.connection import Connection conn = Connection( conn_id="sample_aws_connection", conn_type="aws", login="AKIAIOSFODNN7EXAMPLE", # Reference to AWS Access Key ID password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY", # Reference to AWS Secret Access Key extra={ # Specify extra parameters here "region_name": "eu-central-1", }, ) # Generate Environment Variable Name and Connection URI env_key = f"AIRFLOW_CONN_{conn.conn_id.upper()}" conn_uri = conn.get_uri() print(f"{env_key}={conn_uri}") # AIRFLOW_CONN_SAMPLE_AWS_CONNECTION=aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@/?region_name=eu-central-1 os.environ[env_key] = conn_uri print(conn.test_connection()) # Validate connection credentials.警告
当使用 Airflow CLI 时,当
登录名
密码
主机
端口
未给出时,可能需要添加
@
,请参见下面的示例。这是一个已知的 Airflow 限制。
airflow connections add aws_conn --conn-uri aws://@/?region_name=eu-west-1
使用实例配置文件¶
这将使用 boto 的默认凭据查找链(来自 ~/.boto/ 配置文件中名为“default”的配置文件,以及在 AWS 内部运行时使用的实例配置文件)
URI 格式示例
export AIRFLOW_CONN_AWS_DEFAULT=aws://JSON 格式示例
export AIRFLOW_CONN_AWS_DEFAULT='{"conn_type": "aws"}'
使用 AWS IAM 密钥对¶
URI 格式示例
export AIRFLOW_CONN_AWS_DEFAULT=aws://AKIAIOSFODNN7EXAMPLE:wJalrXUtnFEMI%2FK7MDENG%2FbPxRfiCYEXAMPLEKEY@请注意,此处秘密访问密钥已进行 URL 编码(将
/
更改为%2F
),并且末尾的@
也进行了编码(没有它,将被视为<host>:<port>
,并且无法正常工作)JSON 格式示例
export AIRFLOW_CONN_AWS_DEFAULT='{ "conn_type": "aws", "login": "AKIAIOSFODNN7EXAMPLE", "password": "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" }'
Extra 字段的示例¶
使用 ~/.aws/credentials 和 ~/.aws/config 文件,并指定一个配置文件。
这里假设所有其他连接字段,例如 AWS 访问密钥 ID 或 AWS 秘密访问密钥 都是空的。
{
"profile_name": "my_profile"
}
指定要承担的角色 ARN 和区域名称
{
"role_arn": "arn:aws:iam::112223334444:role/my_role",
"region_name": "ap-southeast-2"
}
另请参阅
配置出站 HTTP 代理
{
"config_kwargs": {
"proxies": {
"http": "http://myproxy.mycompany.local:8080",
"https": "http://myproxy.mycompany.local:8080"
}
}
}
使用 AssumeRoleWithWebIdentity(基于文件的令牌)
{
"role_arn": "arn:aws:iam::112223334444:role/my_role",
"assume_role_method": "assume_role_with_web_identity",
"assume_role_with_web_identity_federation": "file",
"assume_role_with_web_identity_token_file": "/path/to/access_token"
}
使用 AssumeRoleWithSAML
{
"region_name":"eu-west-1",
"role_arn":"arn:aws:iam::112223334444:role/my_role",
"assume_role_method":"assume_role_with_saml",
"assume_role_with_saml":{
"principal_arn":"arn:aws:iam::112223334444:saml-provider/my_saml_provider",
"idp_url":"https://idp.mycompany.local/.../saml/clients/amazon-aws",
"idp_auth_method":"http_spegno_auth",
"mutual_authentication":"OPTIONAL",
"idp_request_kwargs":{
"headers":{"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8"},
"verify":false
},
"idp_request_retry_kwargs": {
"total": 10,
"backoff_factor":1,
"status":10,
"status_forcelist": [400, 429, 500, 502, 503, 504]
},
"log_idp_response":false,
"saml_response_xpath":"////INPUT[@NAME='SAMLResponse']/@VALUE",
},
"assume_role_kwargs": { "something":"something" }
}
以下设置可以在 Extra 中的 assume_role_with_saml
容器中使用。
principal_arn
: IAM 中创建的 SAML 提供商的 ARN,用于描述身份提供商。
idp_url
: 指向您的 IDP 端点的 URL,该端点提供 SAML 断言。
idp_auth_method
: 指定“http_spegno_auth”以使用 Pythonrequests_gssapi
库。这个库比requests_kerberos
更新,并且向后兼容。请参阅 PyPI 上的requests_gssapi
文档。
mutual_authentication
: 可以是“REQUIRED”、“OPTIONAL”或“DISABLED”。请参阅 PyPI 上的requests_gssapi
文档。
idp_request_kwargs
: 从 IDP 请求(通过 HTTP/S)时传递给requests
的其他kwargs
。
idp_request_retry_kwargs
: 用于构建 urllib3.util.Retry 的其他kwargs
,当从 IDP 请求时,用作重试策略。
log_idp_response
: 对调试很有用 - 如果指定,则将 IDP 响应内容打印到日志。请注意,成功的响应将包含敏感信息!
saml_response_xpath
: 如何使用 XML/HTML xpath 查询 IDP 响应。
assume_role_kwargs
: 传递给sts_client.assume_role_with_saml
的其他kwargs
。
注意
requests_gssapi
库用于从您的 IDP 获取 SAML 响应。 您可能需要 pip uninstall python-gssapi
并 pip install gssapi
来使其工作。 python-gssapi
库已过时,并且与 Airflow 在其他地方使用的某些版本的 paramiko
冲突。
另请参阅
按服务配置¶
AWS 服务端点 URL 配置¶
要在单个连接中为每个特定的 AWS 服务使用 endpoint_url
,您可以在服务配置中进行设置。要强制执行默认的 botocore
/boto3
行为,您可以将值设置为 null
。优先级规则如下
每个服务级别指定的
endpoint_url
。在连接额外的根级别指定的
endpoint_url
。请注意,在承担角色或测试连接中使用的 sts 客户端不使用全局参数。默认的
botocore
/boto3
行为
{
"endpoint_url": "s3.amazonaws.com"
"service_config": {
"s3": {
"endpoint_url": "https://s3.eu-west-1.amazonaws.com"
},
"sts": {
"endpoint_url": "https://sts.eu-west-2.amazonaws.com"
},
"ec2": {
"endpoint_url": null
}
}
}
避免限制异常¶
Amazon Web Services 对同时进行的 API 调用有配额限制,因此频繁调用 apache-airflow-providers-amazon
组件可能会在执行期间因限制异常而失败,例如 *ThrottlingException*、*ProvisionedThroughputExceededException*。
botocore.config.Config
开箱即用地支持不同的指数退避模式:legacy
、standard
、adaptive
默认情况下,botocore.config.Config
使用 legacy
模式,最多重试 5 次,在某些情况下可能不够。
如果遇到限制异常,您可以将模式更改为 standard
并增加重试次数。
另请参阅
Boto3 指南: 重试
在连接中设置¶
- 连接 extra 字段:
{ "config_kwargs": { "retries": { "mode": "standard", "max_attempts": 10 } } }
在 AWS 配置文件中设置¶
- ~/.aws/config:
[profile awesome_aws_profile] retry_mode = standard max_attempts = 10
- 连接 extra 字段:
{ "profile_name": "awesome_aws_profile" }
通过环境变量设置¶
注意
这将在所有连接上设置重试模式,除非在特定连接上显式设置了另一个重试配置。
export AWS_RETRY_MODE=standard export AWS_MAX_ATTEMPTS=10
会话工厂¶
连接的默认 BaseSessionFactory
可以处理 AWS 的大多数身份验证方法。如果您希望完全控制 boto3.session.Session
创建,或者您正在使用自定义 联合身份验证,这需要 外部进程来获取凭据,则可以子类化 BaseSessionFactory
并根据您的需要重写 create_session
和/或 _create_basic_session
方法。
您还需要添加 AwsBaseHook
的配置,以便通过它们的完整路径使用自定义实现。
示例¶
- 配置:
[aws] session_factory = my_company.aws.MyCustomSessionFactory
- 连接 extra 字段:
{ "federation": { "username": "my_username", "password": "my_password" } }
- 自定义会话工厂:
def get_federated_aws_credentials(username: str, password: str): """ Mock interaction with federation endpoint/process and returns AWS credentials. """ return { "Version": 1, "AccessKeyId": "key", "SecretAccessKey": "secret", "SessionToken": "token", "Expiration": "2050-12-31T00:00:00.000Z", } class MyCustomSessionFactory(BaseSessionFactory): @property def federated(self): return "federation" in self.extra_config def _create_basic_session(self, session_kwargs: dict[str, Any]) -> boto3.session.Session: if self.federated: return self._create_federated_session(session_kwargs) else: return super()._create_basic_session(session_kwargs) def _create_federated_session(self, session_kwargs: dict[str, Any]) -> boto3.session.Session: username = self.extra_config["federation"]["username"] region_name = self._get_region_name() self.log.debug( f"Creating federated session with username={username} region_name={region_name} for " f"connection {self.conn.conn_id}" ) credentials = RefreshableCredentials.create_from_metadata( metadata=self._refresh_federated_credentials(), refresh_using=self._refresh_federated_credentials, method="custom-federation", ) session = botocore.session.get_session() session._credentials = credentials session.set_config_variable("region", region_name) return boto3.session.Session(botocore_session=session, **session_kwargs) def _refresh_federated_credentials(self) -> dict[str, str]: self.log.debug("Refreshing federated AWS credentials") credentials = get_federated_aws_credentials(**self.extra_config["federation"]) access_key_id = credentials["AccessKeyId"] expiry_time = credentials["Expiration"] self.log.info( f"New federated AWS credentials received with aws_access_key_id={access_key_id} and " f"expiry_time={expiry_time} for connection {self.conn.conn_id}" ) return { "access_key": access_key_id, "secret_key": credentials["SecretAccessKey"], "token": credentials["SessionToken"], "expiry_time": expiry_time, }
使用 Web 身份联合的 Google Cloud 到 AWS 身份验证¶
感谢 Web 身份联合,您可以使用 Google Cloud 平台的凭据来授权访问 Amazon Web Service 平台。如果您额外使用从 元数据服务器 或 Workload Identity 获取的访问令牌进行授权,则可以通过消除长期存在的凭据来提高环境的安全性。
Google Cloud 凭据通过 AWS Security Token Service 交换为 Amazon Web Service 临时凭据。
下图说明了用于获取 AWS 凭据的典型通信流程。
角色设置¶
为了让 AWS 识别 Google 身份,您必须在 AWS 中配置角色。
您可以使用角色向导或使用 Terraform 来完成此操作。
角色向导¶
要为 Web 身份联合创建 IAM 角色
登录 AWS 管理控制台,并在 https://console.aws.amazon.com/iam/ 打开 IAM 控制台。
在导航窗格中,选择 Roles(角色),然后选择 Create role(创建角色)。
选择 Web identity(Web 身份)角色类型。
对于 Identity provider(身份提供商),选择 Google。
在 Audience(受众)框中键入服务帐户电子邮件地址(格式为
<NAME>@<PROJECT_ID>.iam.gserviceaccount.com
)。查看您的 Web 身份信息,然后选择 Next: Permissions(下一步:权限)。
选择要用于权限策略的策略,或者选择 Create policy(创建策略)以打开新的浏览器选项卡并从头开始创建新策略。有关更多信息,请参阅 创建 IAM 策略。
选择 Next: Tags(下一步:标签)。
(可选)通过附加标签作为键值对,将元数据添加到角色。有关在 IAM 中使用标签的更多信息,请参阅 标记 IAM 用户和角色。
选择 Next: Review(下一步:审核)。
对于 Role name(角色名称),键入角色名称。角色名称在您的 AWS 账户中必须是唯一的。
(可选)对于 Role description(角色描述),键入新角色的描述。
查看角色,然后选择创建角色。
更多信息,请参考:为 Web 身份或 OpenID 连接联合身份创建角色(控制台)
最后,您应该获得一个与下面类似的策略的角色。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Federated": "accounts.google.com"
},
"Action": "sts:AssumeRoleWithWebIdentity",
"Condition": {
"StringEquals": {
"accounts.google.com:aud": "<NAME>@<PROJECT_ID>.iam.gserviceaccount.com"
}
}
}
]
}
为了防止滥用 Google OpenID 令牌,您还可以通过配置每个受众的限制来限制使用范围。您需要为连接配置相同的值,并且该值也包含在 ID 令牌中。AWS 将测试此值是否匹配。为此,您可以向策略添加新的条件。
{
"Condition": {
"StringEquals": {
"accounts.google.com:aud": "<NAME>@<PROJECT_ID>.iam.gserviceaccount.com",
"accounts.google.com:oaud": "service-amp.my-company.com"
}
}
}
创建角色后,您应该在 Airflow 中配置连接。
Terraform¶
为了快速配置新角色,您可以使用以下 Terraform 脚本,该脚本配置 AWS 角色以及分配的策略。在使用它之前,您需要更正 locals
部分中的变量以适合您的环境。
google_service_account
- 将有权使用此角色的服务帐户的电子邮件地址。google_openid_audience
- 在 Airflow 角色和连接中配置的常量值。它可以防止滥用 Google ID 令牌。aws_role_name
- 新的 AWS 角色的名称。aws_policy_name
- 新的 AWS 策略的名称。
有关使用 Terraform 脚本的更多信息,请参考:Terraform 文档 - 入门 - AWS
执行计划后,您应该在 Airflow 中配置连接。
连接设置¶
为了使用 Google 身份,在连接设置的 extra 部分中,字段 "assume_role_method"
必须为 "assume_role_with_web_identity"
,字段 "assume_role_with_web_identity_federation"
必须为 "google"
。它还要求您在 "role_arn"
字段中设置角色。可选地,您可以通过配置 "assume_role_with_web_identity_federation_audience"
字段来限制 Google Open ID 令牌的使用。这些字段的值必须与角色中配置的值匹配。
Airflow 将根据 应用程序默认凭据建立 Google 的凭据。
下面是一个连接配置的示例。
{
"role_arn": "arn:aws:iam::240057002457:role/WebIdentity-Role",
"assume_role_method": "assume_role_with_web_identity",
"assume_role_with_web_identity_federation": "google",
"assume_role_with_web_identity_federation_audience": "service_a.apache.com"
}
您也可以使用环境变量 AIRFLOW_CONN_{CONN_ID}
配置连接。
export AIRFLOW_CONN_AWS_DEFAULT="aws://\
?role_arn=arn%3Aaws%3Aiam%3A%3A240057002457%3Arole%2FWebIdentity-Role&\
assume_role_method=assume_role_with_web_identity&\
assume_role_with_web_identity_federation=google&\
assume_role_with_web_identity_federation_audience=aaa.polidea.com"
在 EKS 上使用服务帐户的 IAM 角色 (IRSA)¶
如果您在 Amazon EKS 上运行 Airflow,您可以通过向其服务帐户授予 IAM 角色,从而向 Airflow 服务授予 AWS 相关权限(例如,用于远程日志记录的 S3 读/写权限)。IRSA 为在 EKS 上运行并使用其他 AWS 服务(例如使用 S3、任何其他 AWS 服务(如 Secrets Manager、CloudWatch、DynamoDB 等)的应用程序(例如,Pod)提供精细的权限管理。
要激活此功能,必须遵循以下步骤
在 EKS 集群上创建 IAM OIDC 提供程序。
创建 IAM 角色和策略,以使用在步骤 1 中创建的 Web 身份提供程序附加到 Airflow 服务帐户。
将相应的 IAM 角色作为注释添加到 Airflow 服务帐户。
然后,您可以在 Amazon EKS Pod 身份 Web 钩子添加的相应 Pod 的环境变量中找到 AWS_ROLE_ARN
和 AWS_WEB_IDENTITY_TOKEN_FILE
。然后 boto3 将使用这些变量配置凭据。为了在 Airflow 中使用 IRSA,您必须创建一个所有字段都为空的 aws 连接。如果设置了 role-arn
等字段,Airflow 将不会遵循 boto3 默认流程,因为它会使用连接字段手动创建会话。如果您没有更改默认连接 ID,则一个名为 aws_default
的空 AWS 连接就足够了。
使用 eksctl 为服务帐户 (IRSA) 创建 IAM 角色¶
eksctl 是一个简单的 CLI 工具,用于在 EKS 上创建和管理集群。请按照步骤为 Airflow 创建 IRSA。
在您的终端中设置 AWS 凭据以运行
eksctl
命令。默认情况下,IAM OIDC 提供程序未启用,您可以使用以下命令启用它。
eksctl utils associate-iam-oidc-provider --cluster="<EKS_CLUSTER_ID>" --approve
4. 替换 EKS_CLUSTER_ID
、SERVICE_ACCOUNT_NAME
和 NAMESPACE
,然后执行以下命令。此命令将使用现有的 EKS 集群 ID 并创建一个 IAM 角色、服务帐户和命名空间。
eksctl create iamserviceaccount --cluster="<EKS_CLUSTER_ID>" --name="<SERVICE_ACCOUNT_NAME>" --namespace="<NAMESPACE>" --attach-policy-arn="<IAM_POLICY_ARN>" --approve``
这是一个带有值的示例命令。此示例使用附加到 IAM 角色的具有完全 S3 权限的托管策略。我们强烈建议您创建一个具有 S3、Secrets Manager、CloudWatch 等必要权限的受限 IAM 策略,并将其与 --attach-policy-arn
一起使用。
eksctl create iamserviceaccount --cluster=airflow-eks-cluster --name=airflow-sa --namespace=airflow --attach-policy-arn=arn:aws:iam::aws:policy/AmazonS3FullAccess --approve
在 Airflow Helm 图表部署中或与 Kubernetes Pod Operator 一起使用服务帐户名称。
使用 Terraform 为服务帐户 (IRSA) 创建 IAM 角色¶
对于 Terraform 用户,可以使用 Amazon EKS Blueprints for Terraform 模块创建 IRSA 角色。
此模块将创建一个新的 IAM 角色、服务帐户和命名空间。这会将 IAM 角色与服务帐户关联,并将注释添加到服务帐户。您需要创建一个具有您希望 Pod 中容器拥有的必要权限的 IAM 策略。将 IAM_POLICY_ARN
替换为您的 IAM 策略 ARN,其他必需的输入如下所示,然后运行 terraform apply
。
module "airflow_irsa" {
source = "github.com/aws-ia/terraform-aws-eks-blueprints//modules/irsa"
eks_cluster_id = "<EKS_CLUSTER_ID>"
eks_oidc_provider_arn = "<EKS_CLUSTER_OIDC_PROVIDER_ARN>"
irsa_iam_policies = ["<IAM_POLICY_ARN>"]
kubernetes_namespace = "<NAMESPACE>"
kubernetes_service_account = "<SERVICE_ACCOUNT_NAME>"
}
应用 Terraform 模块后,您可以在 Airflow 部署中或与 Kubernetes Pod Operator 一起使用该服务帐户。