airflow.providers.amazon.aws.operators.datasync

创建、获取、更新、执行和删除 AWS DataSync 任务。

DataSyncOperator

查找、创建、更新、执行和删除 AWS DataSync 任务。

模块内容

class airflow.providers.amazon.aws.operators.datasync.DataSyncOperator(*, wait_interval_seconds=30, max_iterations=60, wait_for_completion=True, task_arn=None, source_location_uri=None, destination_location_uri=None, allow_random_task_choice=False, allow_random_location_choice=False, create_task_kwargs=None, create_source_location_kwargs=None, create_destination_location_kwargs=None, update_task_kwargs=None, task_execution_kwargs=None, delete_task_after_execution=False, **kwargs)[source]

Bases: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.datasync.DataSyncHook]

查找、创建、更新、执行和删除 AWS DataSync 任务。

如果 do_xcom_push 为 True,则将已执行的 DataSync TaskArn 和 TaskExecutionArn 推送到 XCom。

另请参阅

有关如何使用此 Operator 的更多信息,请查阅指南:与 AWS DataSync 任务交互

注意

您的 AWS 环境中可能存在 0 个、1 个或多个已定义的 DataSync 任务。默认行为是如果任务数量为 0,则创建新任务;如果任务数量为 1,则执行该任务;如果任务数量过多,则失败。

参数:
  • wait_interval_seconds (int) – 检查 TaskExecution 状态的两次连续调用之间等待的时间。

  • max_iterations (int) – 检查 TaskExecution 状态的最大连续调用次数。

  • wait_for_completion (bool) – 如果为 True,则等待任务执行达到最终状态

  • task_arn (str | None) – 要使用的 AWS DataSync TaskArn。如果为 None,则此 Operator 将尝试搜索现有任务或尝试创建新任务。

  • source_location_uri (str | None) – 要搜索的源位置 URI。将考虑所有 LocationArn 与此 URI 匹配的 DataSync 任务。示例:smb://server/subdir

  • destination_location_uri (str | None) – 要搜索的目标位置 URI。将考虑所有 LocationArn 与此 URI 匹配的 DataSync 任务。示例:s3://airflow_bucket/stuff

  • allow_random_task_choice (bool) – 如果多个任务匹配,则必须选择一个任务来执行。如果 allow_random_task_choice 为 True,则随机选择一个。

  • allow_random_location_choice (bool) – 如果多个位置匹配,则在创建任务时必须选择一个位置。如果 allow_random_location_choice 为 True,则随机选择一个。

  • create_task_kwargs (dict | None) – 如果未找到合适的 TaskArn,如果定义了 create_task_kwargs,则将创建一个。然后 create_task_kwargs 将在内部这样使用:boto3.create_task(**create_task_kwargs) 示例:{'Name': 'xyz', 'Options': ..., 'Excludes': ..., 'Tags': ...}

  • create_source_location_kwargs (dict | None) – 如果未找到合适的 LocationArn,如果定义了 create_source_location_kwargs,则将创建一个 Location。然后 create_source_location_kwargs 将在内部这样使用:boto3.create_location_xyz(**create_source_location_kwargs) xyz 根据 source_location_uri 的前缀确定,例如 smb:/...s3:/... 示例:{'Subdirectory': ..., 'ServerHostname': ..., ...}

  • create_destination_location_kwargs (dict | None) – 如果未找到合适的 LocationArn,如果定义了 create_destination_location_kwargs,则将创建一个 Location。然后 create_destination_location_kwargs 将在内部这样使用:boto3.create_location_xyz(**create_destination_location_kwargs) xyz 根据 destination_location_uri 的前缀确定,例如 smb:/...` or ``s3:/... 示例:{'S3BucketArn': ..., 'S3Config': {'BucketAccessRoleArn': ...}, ...}

  • update_task_kwargs (dict | None) – 如果找到或创建了合适的 TaskArn,如果定义了 update_task_kwargs,则将更新该 TaskArn。然后 update_task_kwargs 将在内部这样使用:boto3.update_task(TaskArn=task_arn, **update_task_kwargs) 示例:{'Name': 'xyz', 'Options': ..., 'Excludes': ...}

  • task_execution_kwargs (dict | None) – 启动任务执行时直接传递的其他 kwargs,在内部这样使用:boto3.start_task_execution(TaskArn=task_arn, **task_execution_kwargs)

  • delete_task_after_execution (bool) – 如果为 True,则在成功完成后,将从 AWS DataSync 中删除已执行的 TaskArn。

  • aws_conn_id – 用于 AWS 凭据的 Airflow 连接。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果在分布式环境中运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(且必须在每个 worker 节点上维护)。

  • region_name – AWS region_name。如果未指定,则使用默认的 boto3 行为。

  • verify – 是否验证 SSL 证书。请参阅:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • botocore_config – botocore 客户端的配置字典(键值对)。请参阅:https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html

引发:
  • AirflowException – 如果未指定 task_arn,或者未指定 source_location_uridestination_location_uri

  • AirflowException – 如果找不到源或目标 Location 且无法创建。

  • AirflowException – 如果 choose_taskchoose_location 失败。

  • AirflowException – 如果任务创建、更新、执行或删除失败。

aws_hook_class[source]
template_fields: collections.abc.Sequence[str][source]
template_fields_renderers[source]
ui_color = '#44b5e2'[source]
wait_interval_seconds = 30[source]
max_iterations = 60[source]
wait_for_completion = True[source]
task_arn = None[source]
source_location_uri = None[source]
destination_location_uri = None[source]
allow_random_task_choice = False[source]
allow_random_location_choice = False[source]
create_task_kwargs[source]
create_source_location_kwargs[source]
create_destination_location_kwargs[source]
update_task_kwargs[source]
task_execution_kwargs[source]
delete_task_after_execution = False[source]
candidate_source_location_arns: list[str] | None = None[source]
candidate_destination_location_arns: list[str] | None = None[source]
candidate_task_arns: list[str] | None = None[source]
source_location_arn: str | None = None[source]
destination_location_arn: str | None = None[source]
task_execution_arn: str | None = None[source]
execute(context)[source]

创建 Operator 时派生。

Context 与渲染 jinja 模板时使用的字典相同。

更多上下文信息请参考 get_template_context。

choose_task(task_arn_list)[source]

从列表中选择 1 个 DataSync TaskArn。

choose_location(location_arn_list)[source]

从列表中选择 1 个 DataSync LocationArn。

on_kill()[source]

当任务实例被杀死时,重写此方法以清理子进程。

Operator 中任何使用 threading、subprocess 或 multiprocessing 模块的地方都需要进行清理,否则会留下僵尸进程。

此条目有帮助吗?