Amazon S3 到 DynamoDB¶
使用 S3ToDynamoDBOperator
传输将存储在 Amazon Simple Storage Service (S3) 存储桶中的数据加载到现有或新的 Amazon DynamoDB 表中。
先决条件任务¶
要使用这些操作符,您需要做一些事情
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow™ 安装
设置连接.
操作符¶
Amazon S3 到 DynamoDB 传输操作符¶
此操作符将数据从 Amazon S3 加载到 Amazon DynamoDB 表中。它使用与不同 AWS 服务(如 Amazon S3 和 CloudWatch)交互的 Amazon DynamoDB ImportTable 服务。默认行为是将 S3 数据加载到新的 Amazon DynamoDB 表中。该服务目前不支持导入到现有表中。因此,操作符使用自定义方法。它创建一个临时的 DynamoDB 表并将 S3 数据加载到该表中。然后,它扫描临时的 Amazon DynamoDB 表并将接收到的记录写入目标表。
要获取更多信息,请访问: S3ToDynamoDBOperator
示例用法
transfer_1 = S3ToDynamoDBOperator(
task_id="s3_to_dynamodb",
s3_bucket=bucket_name,
s3_key=s3_key,
dynamodb_table_name=new_table_name,
input_format="CSV",
import_table_kwargs={
"InputFormatOptions": {
"Csv": {
"Delimiter": ",",
}
}
},
dynamodb_attributes=[
{"AttributeName": "cocktail_id", "AttributeType": "S"},
],
dynamodb_key_schema=[
{"AttributeName": "cocktail_id", "KeyType": "HASH"},
],
)
要将 S3 数据加载到现有的 DynamoDB 表中,请使用
transfer_2 = S3ToDynamoDBOperator(
task_id="s3_to_dynamodb_new_table",
s3_bucket=bucket_name,
s3_key=s3_key,
dynamodb_table_name=existing_table_name,
use_existing_table=True,
input_format="CSV",
import_table_kwargs={
"InputFormatOptions": {
"Csv": {
"Delimiter": ",",
}
}
},
dynamodb_attributes=[
{"AttributeName": "cocktail_id", "AttributeType": "S"},
],
dynamodb_key_schema=[
{"AttributeName": "cocktail_id", "KeyType": "HASH"},
],
)