Amazon DynamoDB 到 Amazon S3¶
使用 DynamoDBToS3Operator
传输操作符将现有的 Amazon DynamoDB 表的内容复制到现有的 Amazon Simple Storage Service (S3) 存储桶。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请访问 Airflow® 的安装
设置连接.
操作符¶
Amazon DynamoDB 到 Amazon S3 传输操作符¶
此操作符将记录从 Amazon DynamoDB 表复制到 Amazon S3 存储桶中的文件。它扫描 Amazon DynamoDB 表并将接收到的记录写入本地文件系统上的文件。一旦文件大小超过用户指定的文件大小限制,它会将文件刷新到 Amazon S3。
用户还可以使用 dynamodb_scan_kwargs
指定筛选条件,以便仅复制满足条件的记录。
要获取更多信息,请访问:DynamoDBToS3Operator
使用示例
tests/system/amazon/aws/example_dynamodb_to_s3.py
backup_db = DynamoDBToS3Operator(
task_id="backup_db",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=20,
)
为了并行化复制,用户可以使用 TotalSegments
参数创建多个 DynamoDBToS3Operator
任务。例如,要以 2 的并行度进行复制,请创建两个任务
tests/system/amazon/aws/example_dynamodb_to_s3.py
# Segmenting allows the transfer to be parallelized into {segment} number of parallel tasks.
backup_db_segment_1 = DynamoDBToS3Operator(
task_id="backup_db_segment_1",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
s3_key_prefix=f"{S3_KEY_PREFIX}-1-",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 0,
},
)
backup_db_segment_2 = DynamoDBToS3Operator(
task_id="backup_db_segment_2",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
# Max output file size in bytes. If the Table is too large, multiple files will be created.
file_size=1000,
s3_key_prefix=f"{S3_KEY_PREFIX}-2-",
dynamodb_scan_kwargs={
"TotalSegments": 2,
"Segment": 1,
},
)
用户还可以将 point_in_time_export
布尔参数传递给 DynamoDBToS3Operator
,以便从某个时间点恢复数据。
完整导出使用示例
tests/system/amazon/aws/example_dynamodb_to_s3.py
backup_db_to_point_in_time_full_export = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time_full_export",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
point_in_time_export=True,
export_time=export_time,
s3_key_prefix=f"{S3_KEY_PREFIX}-3-",
)
增量导出使用示例
tests/system/amazon/aws/example_dynamodb_to_s3.py
backup_db_to_point_in_time_incremental_export = DynamoDBToS3Operator(
task_id="backup_db_to_point_in_time_incremental_export",
dynamodb_table_name=table_name,
s3_bucket_name=bucket_name,
point_in_time_export=True,
s3_key_prefix=f"{S3_KEY_PREFIX}-4-",
export_table_to_point_in_time_kwargs={
"ExportType": "INCREMENTAL_EXPORT",
"IncrementalExportSpecification": {
"ExportFromTime": start_time,
"ExportToTime": end_time,
"ExportViewType": "NEW_AND_OLD_IMAGES",
},
},
)