airflow.providers.amazon.aws.hooks.datasync

使用 AWS 的 boto3 库与 AWS DataSync 交互。

DataSyncHook

与 AWS DataSync 交互。

模块内容

class airflow.providers.amazon.aws.hooks.datasync.DataSyncHook(wait_interval_seconds=30, *args, **kwargs)[source]

基类: airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook

与 AWS DataSync 交互。

提供对 boto3.client("datasync") 的厚封装。

可以指定额外的参数(例如 aws_conn_id),这些参数将传递给底层的 AwsBaseHook。

参数:

wait_interval_seconds (int) – 检查 TaskExecution 状态的两次连续调用之间的等待时间。默认为 30 秒。

引发:

ValueError – 如果 wait_interval_seconds 不在 0 到 15*60 秒之间。

TASK_EXECUTION_INTERMEDIATE_STATES = ('INITIALIZING', 'QUEUED', 'LAUNCHING', 'PREPARING', 'TRANSFERRING', 'VERIFYING')[source]
TASK_EXECUTION_FAILURE_STATES = ('ERROR',)[source]
TASK_EXECUTION_SUCCESS_STATES = ('SUCCESS',)[source]
locations: list = [][source]
tasks: list = [][source]
create_location(location_uri, **create_location_kwargs)[source]

创建新位置。

参数:
  • location_uri (str) – 用于确定位置类型(S3、SMB、NFS、EFS)的位置 URI。

  • create_location_kwargs – 传递给 DataSync.Client.create_location_* 方法。

返回:

创建位置的 LocationArn。

引发:

AirflowException – 如果位置类型(从 location_uri 获取的前缀)无效。

返回类型:

str

get_location_arns(location_uri, case_sensitive=False, ignore_trailing_slash=True)[source]

返回匹配 LocationUri 的所有 LocationArn。

参数:
  • location_uri (str) – 要搜索的位置 URI,例如 s3://mybucket/mypath

  • case_sensitive (bool) – 对位置 URI 进行大小写敏感搜索。

  • ignore_trailing_slash (bool) – 匹配时忽略 URI 末尾的 /。

返回:

LocationArn 列表。

引发:

AirflowBadRequest – 如果 location_uri 为空

返回类型:

list[str]

create_task(source_location_arn, destination_location_arn, **create_task_kwargs)[source]

在指定的源 LocationArn 和目标 LocationArn 之间创建任务。

参数:
  • source_location_arn (str) – 源 LocationArn。必须已存在。

  • destination_location_arn (str) – 目标 LocationArn。必须已存在。

  • create_task_kwargs – 传递给 boto.create_task()。请参阅 AWS boto3 DataSync 文档。

返回:

创建的任务的 TaskArn

返回类型:

str

update_task(task_arn, **update_task_kwargs)[source]

更新任务。

参数:
  • task_arn (str) – 要更新的 TaskArn。

  • update_task_kwargs – 传递给 boto.update_task(),请参阅 AWS boto3 DataSync 文档。

delete_task(task_arn)[source]

删除任务。

参数:

task_arn (str) – 要删除的 TaskArn。

get_task_arns_for_location_arns(source_location_arns, destination_location_arns)[source]

返回同时使用指定源和目标 LocationArn 的 TaskArn 列表。

参数:
  • source_location_arns (list) – 源 LocationArn 列表。

  • destination_location_arns (list) – 目标 LocationArn 列表。

引发:

AirflowBadRequest – 如果 source_location_arnsdestination_location_arns 为空。

start_task_execution(task_arn, **kwargs)[source]

为指定的 task_arn 启动 TaskExecution。

每个任务最多只能有一个 TaskExecution。额外的关键字参数会发送给 start_task_execution boto3 方法。

参数:

task_arn (str) – TaskArn

返回:

TaskExecutionArn

引发:
  • ClientError – 如果此 task_arn 已有正在运行的 TaskExecution。

  • AirflowBadRequest – 如果 task_arn 为空。

返回类型:

str

cancel_task_execution(task_execution_arn)[source]

取消指定 task_execution_arn 的 TaskExecution。

参数:

task_execution_arn (str) – TaskExecutionArn。

引发:

AirflowBadRequest – 如果 task_execution_arn 为空。

get_task_description(task_arn)[source]

获取指定 task_arn 的描述。

参数:

task_arn (str) – TaskArn

返回:

关于任务的 AWS 元数据。

引发:

AirflowBadRequest – 如果 task_arn 为空。

返回类型:

dict

describe_task_execution(task_execution_arn)[source]

获取指定 task_execution_arn 的描述。

参数:

task_execution_arn (str) – TaskExecutionArn

返回:

关于任务执行的 AWS 元数据。

引发:

AirflowBadRequest – 如果 task_execution_arn 为空。

返回类型:

dict

get_current_task_execution_arn(task_arn)[source]

获取指定 task_arn 的当前 TaskExecutionArn(如果存在)。

参数:

task_arn (str) – TaskArn

返回:

task_arn 的当前 TaskExecutionArn 或 None。

引发:

AirflowBadRequest – 如果 task_arn 为空。

返回类型:

str | None

wait_for_task_execution(task_execution_arn, max_iterations=60)[source]

等待任务执行状态完成 (SUCCESS/ERROR)。

必须存在 task_execution_arn,否则将引发 boto3 ClientError。

参数:
  • task_execution_arn (str) – TaskExecutionArn

  • max_iterations (int) – 超时前的最大迭代次数。

返回:

任务执行结果。

引发:
  • AirflowTaskTimeout – 如果超过最大迭代次数。

  • AirflowBadRequest – 如果 task_execution_arn 为空。

返回类型:

bool

此条目是否有帮助?