AWS DataSync¶
AWS DataSync 是一种数据传输服务,可简化、自动化和加速本地存储系统与 AWS 存储服务之间通过互联网或 AWS Direct Connect 进行的数据移动和复制。
先决条件任务¶
要使用这些操作符,您需要执行以下操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow™ 的安装
设置连接.
通用参数¶
- aws_conn_id
对 Amazon Web Services 连接 ID 的引用。如果此参数设置为
None
,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果您想使用与 botocore 使用的 CA 证书包不同的 CA 证书包,则可以指定此参数。
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。
{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则将使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None
注意
指定一个空字典
{}
将覆盖 botocore.config.Config 的连接配置
操作符¶
与 AWS DataSync 任务交互¶
您可以使用 DataSyncOperator
来查找、创建、更新、执行和删除 AWS DataSync 任务。
一旦 DataSyncOperator
确定了要运行的正确 TaskArn(因为您指定了它,或者因为它被找到),它就会被执行。每当执行 AWS DataSync 任务时,它都会创建一个 AWS DataSync TaskExecution,由 TaskExecutionArn 标识。
将监控 TaskExecutionArn 直到完成(成功/失败),并定期将其状态写入 Airflow 任务日志。
DataSyncOperator
支持可选地将额外的 kwargs 传递给底层的 boto3.start_task_execution()
API。这是通过 task_execution_kwargs
参数完成的。例如,这对于限制带宽或过滤包含的文件很有用,有关更多详细信息,请参阅 boto3 Datasync 文档。
执行任务¶
要执行特定任务,您可以将 task_arn
传递给操作符。
# Execute a specific task
execute_task_by_arn = DataSyncOperator(
task_id="execute_task_by_arn",
task_arn=created_task_arn,
)
搜索并执行任务¶
要搜索任务,您可以将 source_location_uri
和 destination_location_uri
指定给操作符。如果找到一个任务,则将执行该任务。如果找到多个任务,则操作符将引发异常。为避免这种情况,您可以将 allow_random_task_choice
设置为 True
以从候选任务中随机选择。
# Search and execute a task
execute_task_by_locations = DataSyncOperator(
task_id="execute_task_by_locations",
source_location_uri=f"s3://{s3_bucket_source}/test",
destination_location_uri=f"s3://{s3_bucket_destination}/test",
# Only transfer files from /test/subdir folder
task_execution_kwargs={
"Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
},
)
创建并执行任务¶
搜索任务时,如果没有找到任务,您可以选择在执行任务之前创建一个任务。为此,您需要提供额外的参数 create_task_kwargs
、create_source_location_kwargs
和 create_destination_location_kwargs
。
如果未找到合适的现有任务,则这些额外参数为操作符提供了一种自动创建任务和/或位置的方法。如果将这些参数保留为默认值 (None),则不会尝试创建。
此外,由于 delete_task_after_execution
设置为 True
,因此任务将在成功完成后从 AWS DataSync 中删除。
# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
task_id="create_and_execute_task",
source_location_uri=f"s3://{s3_bucket_source}/test_create",
destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
create_task_kwargs={"Name": "Created by Airflow"},
create_source_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
create_destination_location_kwargs={
"Subdirectory": "test_create",
"S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
"S3Config": {
"BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
},
},
delete_task_after_execution=False,
)
创建任务时,DataSyncOperator
将尝试查找并使用现有的 LocationArn,而不是创建新的 LocationArn。如果多个 LocationArn 与指定的 URI 匹配,则我们需要选择一个使用。在这种情况下,操作符的行为类似于它从多个任务中选择单个任务的方式
操作符将引发异常。为避免这种情况,您可以将 allow_random_location_choice
设置为 True
以从候选位置中随机选择。