airflow.providers.google.cloud.transfers.gcs_to_bigquery

此模块包含一个 Google Cloud Storage 到 BigQuery 的操作符。

模块内容

GCSToBigQueryOperator

将文件从 Google Cloud Storage 加载到 BigQuery。

属性

ALLOWED_FORMATS

airflow.providers.google.cloud.transfers.gcs_to_bigquery.ALLOWED_FORMATS = ['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO', 'GOOGLE_SHEETS', 'DATASTORE_BACKUP', 'PARQUET'][源代码]
class airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator(*, bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, schema_object_bucket=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=None, write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, quote_character=None, ignore_unknown_values=False, allow_quoted_newlines=False, allow_jagged_rows=False, encoding='UTF-8', max_id_key=None, gcp_conn_id='google_cloud_default', schema_update_options=(), src_fmt_configs=None, external_table=False, time_partitioning=None, cluster_fields=None, autodetect=True, encryption_configuration=None, location=None, impersonation_chain=None, labels=None, description=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), result_retry=DEFAULT_RETRY, result_timeout=None, cancel_on_kill=True, job_id=None, force_rerun=True, reattach_states=None, project_id=PROVIDE_PROJECT_ID, force_delete=False, **kwargs)[源代码]

基类:airflow.models.BaseOperator

将文件从 Google Cloud Storage 加载到 BigQuery。

BigQuery 表的模式可以使用两种方式之一指定。您可以直接传入模式字段,也可以将操作符指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是一个包含模式字段的 JSON 文件。

另请参阅

有关如何使用此操作符的更多信息,请查看以下指南:操作符

参数
  • bucket – 要从中加载的存储桶。(模板化)

  • source_objects – 要从中加载的 Google Cloud Storage URI 的字符串或列表。(模板化)如果 source_format 为 'DATASTORE_BACKUP',则列表必须仅包含一个 URI。

  • destination_project_dataset_table – 要将数据加载到的点分隔的 (<project>.|<project>:)<dataset>.<table> BigQuery 表。如果未包含 <project>,则项目将是连接 json 中定义的项目。(模板化)

  • schema_fields – 如果设置,则模式字段列表,如此处定义:https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load 如果 source_format 为 'DATASTORE_BACKUP',则不应设置。如果 'schema_object' 为 null 且 autodetect 为 False,则必须定义参数。

  • schema_object – 如果设置,则指向包含表模式的 .json 文件的 GCS 对象路径。(模板化)如果 'schema_fields' 为 null 且 autodetect 为 False,则必须定义参数。

  • schema_object_bucket – [可选] 如果设置,则为存储模式对象模板的 GCS 存储桶。(模板化) (默认值: bucket 的值)

  • source_format – 要导出的文件格式。

  • compression – [可选] 数据源的压缩类型。可能的值包括 GZIP 和 NONE。默认值为 NONE。此设置对于 Google Cloud Bigtable、Google Cloud Datastore 备份和 Avro 格式将被忽略。

  • create_disposition – 如果表不存在,则创建处置。

  • skip_leading_rows – BigQuery 在加载数据时将跳过的 CSV 文件顶部的行数。当 autodetect 开启时,行为如下:skip_leading_rows 未指定 - Autodetect 尝试检测第一行中的标题。如果未检测到标题,则将该行读取为数据。否则,将从第二行开始读取数据。skip_leading_rows 为 0 - 指示 autodetect 没有标题,应从第一行开始读取数据。skip_leading_rows = N > 0 - Autodetect 跳过 N-1 行,并尝试检测第 N 行中的标题。如果未检测到标题,则仅跳过第 N 行。否则,使用第 N 行来提取检测到的模式的列名。默认值设置为 None,以便自动检测选项可以检测模式字段。

  • write_disposition – 如果表已存在,则写入处置。

  • field_delimiter – 从 CSV 加载时要使用的分隔符。

  • max_bad_records – BigQuery 在运行作业时可以忽略的最大错误记录数。

  • quote_character – 用于在 CSV 文件中引用数据部分的值。

  • ignore_unknown_values – [可选] 指示 BigQuery 是否应允许未在表模式中表示的额外值。如果为 true,则忽略额外的值。如果为 false,则将具有额外列的记录视为错误记录,并且如果错误记录过多,则会在作业结果中返回无效错误。

  • allow_quoted_newlines – 是否允许带引号的换行符(true)或不允许(false)。

  • allow_jagged_rows – 接受缺少尾部可选列的行。缺失的值将被视为 null 值。如果为 false,则将缺少尾部列的记录视为错误记录,并且如果错误记录过多,则会在作业结果中返回无效错误。仅适用于 CSV,对于其他格式将被忽略。

  • encoding – 数据的字符编码。请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).csvOptions.encoding https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding

  • max_id_key – 如果设置,则为要加载的 BigQuery 表中列的名称。这将被用于在加载发生后从 BigQuery 中选择 MAX 值。结果将由 execute() 命令返回,然后存储在 XCom 中供未来的运算符使用。这对于增量加载很有帮助——在未来的执行过程中,您可以从最大 ID 处继续。

  • schema_update_options – 允许作为加载作业的副作用更新目标表的架构。

  • src_fmt_configs – 配置特定于源格式的可选字段

  • external_table – 标志,用于指定目标表是否应为 BigQuery 外部表。默认值为 False。

  • time_partitioning – 配置可选的时间分区字段,例如按照 API 规范进行字段、类型和过期时间的分区。请注意,“field”在与 dataset.table$partition 并发时不可用。

  • cluster_fields – 请求将此加载的结果按一个或多个列排序存储。BigQuery 支持对分区表和非分区表进行聚类。给定的列顺序决定排序顺序。不适用于外部表。

  • autodetect – [可选] 指示是否应自动推断 CSV 和 JSON 源的选项和架构。(默认值:True)。如果未定义“schema_fields”和“schema_object”,则必须将此参数设置为 True。如果表是在 Airflow 之外创建的,建议将其设置为 True。如果 autodetect 为 None 且未提供架构(既没有通过 schema_fields,也没有通过 schema_object),则假定该表已存在。

  • encryption_configuration

    [可选] 自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key",
    }
    

  • location – [可选] 作业的地理位置。美国和欧盟以外的地区是必需的。有关详细信息,请参阅 https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,使用短期凭据模拟,或者获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予原始帐户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将 Service Account Token Creator IAM 角色授予直接前面的身份,列表中的第一个帐户将此角色授予原始帐户(模板化)。

  • labels – [可选] BiqQuery 表的标签。

  • description – [可选] BigQuery 表的描述。仅当新创建目标表时才会使用此项。如果表已存在且提供了与当前描述不同的值,则作业将失败。

  • deferrable (bool) – 在可延迟模式下运行运算符

  • force_delete (bool) – 如果目标表已存在,则强制删除目标表。

template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'schema_object_bucket',...[source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
ui_color = '#f0eee4'[source]
execute(context)[source]

在创建运算符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

立即返回,并依赖触发器抛出成功事件。触发器的回调。

依赖触发器抛出异常,否则假定执行成功。

on_kill()[source]

覆盖此方法以在任务实例被终止时清理子进程。

在运算符中使用 threading、subprocess 或 multiprocessing 模块的任何情况都需要清理,否则它会留下“幽灵进程”。

get_openlineage_facets_on_complete(task_instance)[source]

实现 on_complete,因为我们将包括最终的 BQ 作业 ID。

此条目是否有帮助?