Amazon S3 到 DynamoDB¶
使用 S3ToDynamoDBOperator
传输 operator 将存储在 Amazon Simple Storage Service (S3) 存储桶中的数据加载到现有或新的 Amazon DynamoDB 表中。
先决任务¶
要使用这些 operator,您必须做几件事
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参见 Airflow® 安装
设置连接.
Operator¶
Amazon S3 到 DynamoDB 传输 operator¶
此 operator 将数据从 Amazon S3 加载到 Amazon DynamoDB 表中。它使用 Amazon DynamoDB ImportTable 服务,该服务与 Amazon S3 和 CloudWatch 等不同的 AWS 服务交互。默认行为是将 S3 数据加载到新的 Amazon DynamoDB 表中。目前服务不支持导入到现有表中。因此,此 operator 采用自定义方法。它创建一个临时 DynamoDB 表,并将 S3 数据加载到该表中。然后它扫描临时 Amazon DynamoDB 表,并将接收到的记录写入目标表。
更多信息请访问: S3ToDynamoDBOperator
示例用法
tests/system/amazon/aws/example_s3_to_dynamodb.py
transfer_1 = S3ToDynamoDBOperator(
task_id="s3_to_dynamodb",
s3_bucket=bucket_name,
s3_key=s3_key,
dynamodb_table_name=new_table_name,
input_format="CSV",
import_table_kwargs={
"InputFormatOptions": {
"Csv": {
"Delimiter": ",",
}
}
},
dynamodb_attributes=[
{"AttributeName": "cocktail_id", "AttributeType": "S"},
],
dynamodb_key_schema=[
{"AttributeName": "cocktail_id", "KeyType": "HASH"},
],
)
将 S3 数据加载到现有 DynamoDB 表中,请使用
tests/system/amazon/aws/example_s3_to_dynamodb.py
transfer_2 = S3ToDynamoDBOperator(
task_id="s3_to_dynamodb_new_table",
s3_bucket=bucket_name,
s3_key=s3_key,
dynamodb_table_name=existing_table_name,
use_existing_table=True,
input_format="CSV",
import_table_kwargs={
"InputFormatOptions": {
"Csv": {
"Delimiter": ",",
}
}
},
dynamodb_attributes=[
{"AttributeName": "cocktail_id", "AttributeType": "S"},
],
dynamodb_key_schema=[
{"AttributeName": "cocktail_id", "KeyType": "HASH"},
],
)