AWS DataSync¶
AWS DataSync 是一项数据传输服务,可简化、自动化并加速通过互联网或 AWS Direct Connect 在本地存储系统与 AWS 存储服务之间移动和复制数据。
前置任务¶
要使用这些操作符,您必须执行以下几项操作:
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 安装 Airflow®
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,不进行连接查找。否则,使用连接中存储的凭证。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包文件名。如果您想使用与 botocore 使用的 CA 证书包不同的证书包,可以指定此参数。
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构建 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。
示例:有关参数的更多详细信息,请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None
注意
指定一个空字典
{}
将覆盖 botocore.config.Config 的连接配置。
操作符¶
与 AWS DataSync 任务交互¶
您可以使用 DataSyncOperator
来查找、创建、更新、执行和删除 AWS DataSync 任务。
一旦 DataSyncOperator
确定了要运行的正确 TaskArn(无论是因为您指定了它,还是因为它已被找到),它就会被执行。每当执行一个 AWS DataSync 任务时,都会创建一个由 TaskExecutionArn 标识的 AWS DataSync TaskExecution。
TaskExecutionArn 将被监控直到完成(成功/失败),其状态将定期写入 Airflow 任务日志。
DataSyncOperator
支持向底层的 boto3.start_task_execution()
API 可选地传递额外的 kwargs。这是通过 task_execution_kwargs
参数完成的。例如,这对于限制带宽或过滤包含的文件很有用,更多详细信息请参阅 boto3 Datasync 文档。
执行任务¶
要执行特定任务,您可以将 task_arn
传递给操作符。
tests/system/amazon/aws/example_datasync.py
# Execute a specific task
execute_task_by_arn = DataSyncOperator(
task_id="execute_task_by_arn",
task_arn=created_task_arn,
)
搜索并执行任务¶
要搜索任务,您可以将 source_location_uri
和 destination_location_uri
参数指定给操作符。如果找到一个任务,该任务将被执行。如果找到多个任务,操作符将引发一个 Exception。为避免这种情况,您可以将 allow_random_task_choice
设置为 True
,以便从候选任务中随机选择一个。
tests/system/amazon/aws/example_datasync.py
# Search and execute a task
execute_task_by_locations = DataSyncOperator(
task_id="execute_task_by_locations",
source_location_uri=f"s3://{s3_bucket_source}/test",
destination_location_uri=f"s3://{s3_bucket_destination}/test",
# Only transfer files from /test/subdir folder
task_execution_kwargs={
"Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
},
)
创建并执行任务¶
搜索任务时,如果未找到任何任务,您可以选择在执行前创建一个。为此,您需要提供额外的参数 create_task_kwargs
、create_source_location_kwargs
和 create_destination_location_kwargs
。
这些额外参数提供了一种方式,让操作符在未找到合适的现有任务时自动创建任务和/或位置。如果这些参数保持默认值 (None),则不会尝试创建。
此外,由于 delete_task_after_execution
设置为 True
,任务在成功完成后将从 AWS DataSync 中删除。
tests/system/amazon/aws/example_datasync.py
# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
task_id="create_and_execute_task",
source_location_uri=f"s3://{s3_bucket_source}/test_create",
destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
create_task_kwargs={"Name": "Created by Airflow"},
create_source_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
create_destination_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
delete_task_after_execution=False,
)
创建任务时,DataSyncOperator
将尝试查找并使用现有的 LocationArns,而不是创建新的。如果多个 LocationArns 与指定的 URI 匹配,则我们需要选择一个使用。在这种情况下,操作符的行为类似于从多个任务中选择单个任务的方式
操作符将引发一个 Exception。为避免这种情况,您可以将 allow_random_location_choice
设置为 True
,以便从候选位置中随机选择一个。