AWS DataSync

AWS DataSync 是一种数据传输服务,可简化、自动化和加速本地存储系统与 AWS 存储服务之间通过互联网或 AWS Direct Connect 进行的数据移动和复制。

先决条件任务

要使用这些操作符,您需要执行以下操作

通用参数

aws_conn_id

Amazon Web Services 连接 ID 的引用。如果此参数设置为 None,则使用默认的 boto3 行为,不进行连接查找。否则,使用存储在连接中的凭据。默认值:aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 region_name。否则,使用指定的值而不是连接值。默认值:None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果您想使用与 botocore 使用的 CA 证书包不同的 CA 证书包,则可以指定此参数。

如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 verify。否则,使用指定的值而不是连接值。默认值:None

botocore_config

提供的字典用于构造 botocore.config.Config。此配置可用于配置 避免限流异常、超时等。

示例,有关参数的更多详细信息,请查看 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则将使用 AWS 连接额外参数 中的 config_kwargs。否则,使用指定的值而不是连接值。默认值:None

注意

指定一个空字典 {} 将覆盖 botocore.config.Config 的连接配置

操作符

与 AWS DataSync 任务交互

您可以使用 DataSyncOperator 来查找、创建、更新、执行和删除 AWS DataSync 任务。

一旦 DataSyncOperator 确定了要运行的正确 TaskArn(因为您指定了它,或者因为它被找到),它就会被执行。每当执行 AWS DataSync 任务时,它都会创建一个 AWS DataSync TaskExecution,由 TaskExecutionArn 标识。

将监控 TaskExecutionArn 直到完成(成功/失败),并定期将其状态写入 Airflow 任务日志。

DataSyncOperator 支持可选地将额外的 kwargs 传递给底层的 boto3.start_task_execution() API。这是通过 task_execution_kwargs 参数完成的。例如,这对于限制带宽或过滤包含的文件很有用,有关更多详细信息,请参阅 boto3 Datasync 文档

执行任务

要执行特定任务,您可以将 task_arn 传递给操作符。

tests/system/providers/amazon/aws/example_datasync.py[源代码]

# Execute a specific task
execute_task_by_arn = DataSyncOperator(
    task_id="execute_task_by_arn",
    task_arn=created_task_arn,
)

搜索并执行任务

要搜索任务,您可以将 source_location_uridestination_location_uri 指定给操作符。如果找到一个任务,则将执行该任务。如果找到多个任务,则操作符将引发异常。为避免这种情况,您可以将 allow_random_task_choice 设置为 True 以从候选任务中随机选择。

tests/system/providers/amazon/aws/example_datasync.py[源代码]

# Search and execute a task
execute_task_by_locations = DataSyncOperator(
    task_id="execute_task_by_locations",
    source_location_uri=f"s3://{s3_bucket_source}/test",
    destination_location_uri=f"s3://{s3_bucket_destination}/test",
    # Only transfer files from /test/subdir folder
    task_execution_kwargs={
        "Includes": [{"FilterType": "SIMPLE_PATTERN", "Value": "/test/subdir"}],
    },
)

创建并执行任务

搜索任务时,如果没有找到任务,您可以选择在执行任务之前创建一个任务。为此,您需要提供额外的参数 create_task_kwargscreate_source_location_kwargscreate_destination_location_kwargs

如果未找到合适的现有任务,则这些额外参数为操作符提供了一种自动创建任务和/或位置的方法。如果将这些参数保留为默认值 (None),则不会尝试创建。

此外,由于 delete_task_after_execution 设置为 True,因此任务将在成功完成后从 AWS DataSync 中删除。

tests/system/providers/amazon/aws/example_datasync.py[源代码]

# Create a task (the task does not exist)
create_and_execute_task = DataSyncOperator(
    task_id="create_and_execute_task",
    source_location_uri=f"s3://{s3_bucket_source}/test_create",
    destination_location_uri=f"s3://{s3_bucket_destination}/test_create",
    create_task_kwargs={"Name": "Created by Airflow"},
    create_source_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_source),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    create_destination_location_kwargs={
        "Subdirectory": "test_create",
        "S3BucketArn": get_s3_bucket_arn(s3_bucket_destination),
        "S3Config": {
            "BucketAccessRoleArn": test_context[ROLE_ARN_KEY],
        },
    },
    delete_task_after_execution=False,
)

创建任务时,DataSyncOperator 将尝试查找并使用现有的 LocationArn,而不是创建新的 LocationArn。如果多个 LocationArn 与指定的 URI 匹配,则我们需要选择一个使用。在这种情况下,操作符的行为类似于它从多个任务中选择单个任务的方式

操作符将引发异常。为避免这种情况,您可以将 allow_random_location_choice 设置为 True 以从候选位置中随机选择。

此条目有帮助吗?