Amazon DynamoDB 到 Amazon S3¶
使用 DynamoDBToS3Operator
传输将现有 Amazon DynamoDB 表的内容复制到现有的 Amazon Simple Storage Service (S3) 存储桶。
先决条件任务¶
要使用这些操作符,您必须执行以下操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow™ 安装
设置连接.
操作符¶
Amazon DynamoDB 到 Amazon S3 传输操作符¶
此操作符将记录从 Amazon DynamoDB 表复制到 Amazon S3 存储桶中的文件。它扫描 Amazon DynamoDB 表并将接收到的记录写入本地文件系统上的文件。一旦文件大小超过用户指定的文件大小限制,它就会将文件刷新到 Amazon S3。
用户还可以使用 dynamodb_scan_kwargs
指定过滤条件,以仅复制满足条件的记录。
要获取更多信息,请访问: DynamoDBToS3Operator
示例用法
backup_db = DynamoDBToS3Operator(
task_id="backup_db",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=20,
)
要并行复制,用户可以使用 TotalSegments
参数创建多个 DynamoDBToS3Operator
任务。例如,要以 2 的并行度进行复制,请创建两个任务
# Segmenting allows the transfer to be parallelized into {segment} number of parallel tasks.
backup_db_segment_1 = DynamoDBToS3Operator(
task_id="backup_db_segment_1",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
s3_key_prefix=f"{S3_KEY_PREFIX}-1-",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 0,
},
)
backup_db_segment_2 = DynamoDBToS3Operator(
task_id="backup_db_segment_2",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
s3_key_prefix=f"{S3_KEY_PREFIX}-2-",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 1,
},
)
用户还可以将 export_time
参数传递给 DynamoDBToS3Operator
以从某个时间点恢复数据。
backup_db_to_point_in_time = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time",
dynamodb_table_name=table_name,
file_size=1000,
s3_bucket_name=bucket_name,
export_time=export_time,
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)