AWS Lambda¶
借助 AWS Lambda,您无需预置或管理服务器即可运行代码。您只需为消耗的计算时间付费,代码未运行时无需付费。您可以运行几乎任何类型的应用程序或后端服务的代码,而无需任何管理。只需上传您的代码,Lambda 会负责运行和扩展您的代码所需的一切,并具有高可用性。您可以设置您的代码自动触发其他 AWS 服务,或直接从任何 Web 或移动应用程序调用它。
先决条件任务¶
要使用这些 operators,您必须完成一些步骤
使用 AWS Console 或 AWS CLI 创建必要的资源。
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 安装 Airflow®
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭证。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果您想使用与 botocore 使用的不同的 CA 证书捆绑包,可以指定此参数。
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。
示例,有关参数的更多详细信息,请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None
注意
指定一个空字典,
{}
,将覆盖 botocore.config.Config 的连接配置。
Operators¶
创建 AWS Lambda 函数¶
要创建 AWS lambda 函数,您可以使用 LambdaCreateFunctionOperator
。通过传递参数 deferrable=True
,此 operator 可以运行在可推迟模式下。这需要安装 aiobotocore 模块。
tests/system/amazon/aws/example_lambda.py
create_lambda_function = LambdaCreateFunctionOperator(
task_id="create_lambda_function",
function_name=lambda_function_name,
runtime="python3.9",
role=role_arn,
handler="lambda_function.test",
code={
"ZipFile": create_zip(CODE_CONTENT),
},
)
调用 AWS Lambda 函数¶
要调用 AWS lambda 函数,您可以使用 LambdaInvokeFunctionOperator
。
注意
根据 Lambda.Client.invoke 文档,对于具有较长超时的同步调用(invocation_type="RequestResponse"
),您的客户端在等待响应期间可能会断开连接。
如果发生这种情况,您将看到类似于以下的 ReadTimeoutError
异常
urllib3.exceptions.ReadTimeoutError: AWSHTTPSConnectionPool(host='lambda.us-east-1.amazonaws.com', port=443): Read timed out. (read timeout=60)
如果您遇到此问题,需要提供 botocore.config.Config 以使用具有超时或 keep-alive 设置的长时间连接。
通过提供 botocore_config 作为 operator 参数
{
"connect_timeout": 900,
"read_timeout": 900,
"tcp_keepalive": True,
}
或在相关的 AWS 连接额外参数 中指定 config_kwargs
{
"config_kwargs": {
"connect_timeout": 900,
"read_timeout": 900,
"tcp_keepalive": true
}
}
此外,您可能需要配置防火墙、代理或操作系统,以允许具有超时或 keep-alive 设置的长时间连接。
注意
您无法描述 AWS Lambda 函数的异步调用(invocation_type="Event"
)。唯一的方法是 配置异步调用的目标 并感知目标。
tests/system/amazon/aws/example_lambda.py
invoke_lambda_function = LambdaInvokeFunctionOperator(
task_id="invoke_lambda_function",
function_name=lambda_function_name,
payload=json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}),
)
Sensors¶
等待 AWS Lambda 函数部署状态¶
要检查 AWS Lambda 函数的部署状态,直到它达到目标状态或另一个终端状态,您可以使用 LambdaFunctionStateSensor
。
tests/system/amazon/aws/example_lambda.py
wait_lambda_function_state = LambdaFunctionStateSensor(
task_id="wait_lambda_function_state",
function_name=lambda_function_name,
)