airflow.providers.google.cloud.transfers.s3_to_gcs

S3ToGCSOperator

将 S3 键(可能是一个前缀)与 Google Cloud Storage 目标路径同步。

模块内容

class airflow.providers.google.cloud.transfers.s3_to_gcs.S3ToGCSOperator(*, bucket, prefix='', apply_gcs_prefix=False, delimiter='', aws_conn_id='aws_default', verify=None, gcp_conn_id='google_cloud_default', dest_gcs=None, replace=False, gzip=False, google_impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=10, **kwargs)[源代码]

Bases: airflow.providers.amazon.aws.operators.s3.S3ListOperator

将 S3 键(可能是一个前缀)与 Google Cloud Storage 目标路径同步。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 从 Amazon S3 传输数据到 Google Cloud Storage

参数:
  • bucket – 查找对象的 S3 存储桶。(模板化的)

  • prefix – 用于过滤对象名称以此前缀开头的字符串。(模板化的)

  • apply_gcs_prefix

    (可选)是否将源对象的路径替换为给定的 GCS 目标路径。如果 apply_gcs_prefix 为 False(默认),则 S3 中的对象将复制到 GCS 存储桶的给定 GCS 路径中,并且源路径将保留在其中。例如: =>

    如果 apply_gcs_prefix 为 True,则 S3 中的对象将复制到 GCS 存储桶的给定 GCS 路径中,并且源路径将被省略。例如: =>

  • delimiter – 分隔符,标记键的层级。(模板化的)

  • aws_conn_id – 源 S3 连接

  • verify

    是否验证 S3 连接的 SSL 证书。默认情况下会验证 SSL 证书。您可以提供以下值:

    • False:不验证 SSL 证书。SSL 仍会使用

      (除非 use_ssl 为 False),但 SSL 证书不会被验证。

    • path/to/cert/bundle.pem:要使用的 CA 证书捆绑包的文件名。

      如果要使用与 botocore 使用的不同的 CA 证书捆绑包,可以指定此参数。

  • gcp_conn_id – (可选) 用于连接到 Google Cloud 的连接 ID。

  • dest_gcs – 目标 Google Cloud Storage 存储桶和前缀,用于存储文件。(模板化的)

  • replace – 是否要替换现有的目标文件。

  • gzip – 上传时压缩文件的选项。在 deferrable 模式下忽略此参数。

  • google_impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的 Google 服务账号,用于使用短期凭据模拟身份,或者链式账号列表,用于获取列表中最后一个账号的 access_token,该 access_token 将用于请求中的身份模拟。如果设置为字符串,该账号必须授予发起账号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予直接前一个身份 Service Account Token Creator IAM 角色,列表中的第一个账号将此角色授予发起账号。(模板化的)

  • deferrable – 是否在 deferrable 模式下运行操作符

  • poll_interval (int) – 轮询作业完成之间的时间间隔(秒)。仅在 deferrable 模式下运行时考虑此值。必须大于 0。

示例:

s3_to_gcs_op = S3ToGCSOperator(
    task_id="s3_to_gcs_example",
    bucket="my-s3-bucket",
    prefix="data/customers-201804",
    gcp_conn_id="google_cloud_default",
    dest_gcs="gs://my.gcs.bucket/some/customers/",
    replace=False,
    gzip=True,
    dag=my_dag,
)

请注意,bucketprefixdelimiterdest_gcs 是模板化的,因此您可以在其中使用变量。

template_fields: collections.abc.Sequence[str] = ('bucket', 'prefix', 'delimiter', 'dest_gcs', 'google_impersonation_chain')[源代码]
ui_color = '#e09411'[源代码]
transfer_job_max_files_number = 1000[源代码]
apply_gcs_prefix = False[源代码]
gcp_conn_id = 'google_cloud_default'[源代码]
dest_gcs = None[源代码]
replace = False[源代码]
verify = None[源代码]
gzip = False[源代码]
google_impersonation_chain = None[源代码]
deferrable = True[源代码]
poll_interval = 10[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文是与渲染 jinja 模板时使用的相同的字典。

有关更多上下文信息,请参阅 get_template_context。

exclude_existing_objects(s3_objects, gcs_hook)[源代码]

从列表中排除已存在于 GCS 存储桶中的对象。

s3_to_gcs_object(s3_object)[源代码]

根据操作符的逻辑将 S3 路径转换为 GCS 路径。

如果 apply_gcs_prefix == True 则 <s3_prefix><content> => <gcs_prefix><content> 如果 apply_gcs_prefix == False 则 <s3_prefix><content> => <gcs_prefix><s3_prefix><content>

gcs_to_s3_object(gcs_object)[源代码]

根据操作符的逻辑将 GCS 路径转换为 S3 路径。

如果 apply_gcs_prefix == True 则 <gcs_prefix><content> => <s3_prefix><content> 如果 apply_gcs_prefix == False 则 <gcs_prefix><s3_prefix><content> => <s3_prefix><content>

transfer_files(s3_objects, gcs_hook, s3_hook)[源代码]
transfer_files_async(files, gcs_hook, s3_hook)[源代码]

提交 Google Cloud Storage Transfer Service 作业,将文件从 AWS S3 复制到 GCS。

submit_transfer_jobs(files, gcs_hook, s3_hook)[源代码]
execute_complete(context, event)[源代码]

立即返回并依赖触发器抛出成功事件。触发器的回调函数。

依赖触发器抛出异常,否则假定执行成功。

get_transfer_hook()[源代码]
get_openlineage_facets_on_start()[源代码]

此条目是否有帮助?