airflow.providers.google.cloud.transfers.gcs_to_bigquery

此模块包含一个 Google Cloud Storage 到 BigQuery 的操作符。

属性

ALLOWED_FORMATS

GCSToBigQueryOperator

将文件从 Google Cloud Storage 加载到 BigQuery 中。

模块内容

airflow.providers.google.cloud.transfers.gcs_to_bigquery.ALLOWED_FORMATS = ['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO', 'GOOGLE_SHEETS', 'DATASTORE_BACKUP', 'PARQUET', 'ORC'][源码]
class airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator(*, bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, schema_object_bucket=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=None, write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, quote_character=None, ignore_unknown_values=False, allow_quoted_newlines=False, allow_jagged_rows=False, encoding='UTF-8', max_id_key=None, gcp_conn_id='google_cloud_default', schema_update_options=(), src_fmt_configs=None, external_table=False, time_partitioning=None, cluster_fields=None, autodetect=True, encryption_configuration=None, location=None, impersonation_chain=None, labels=None, description=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), result_retry=DEFAULT_RETRY, result_timeout=None, cancel_on_kill=True, job_id=None, force_rerun=True, reattach_states=None, project_id=PROVIDE_PROJECT_ID, force_delete=False, **kwargs)[源码]

Bases: airflow.models.BaseOperator

将文件从 Google Cloud Storage 加载到 BigQuery 中。

用于 BigQuery 表的模式可以通过两种方式指定。你可以直接传入模式字段,或者指向 Google Cloud Storage 中的对象名称。Google Cloud Storage 中的对象必须是包含模式字段的 JSON 文件。

另请参阅

有关如何使用此操作符的更多信息,请查阅指南:操作符

参数:
  • bucket – 要从中加载的存储桶。(模板化)

  • source_objects – 要从中加载的 Google Cloud Storage URI 字符串或列表。(模板化) 如果 source_format 是 ‘DATASTORE_BACKUP’,列表必须只包含一个 URI。

  • destination_project_dataset_table – 用于加载数据的带有 (<project>.|<project>:)<dataset>.<table> 点号格式的 BigQuery 表。如果未包含 <project>,则项目将是连接 json 中定义的项目。(模板化)

  • schema_fields – 如果设置,则为此处定义的模式字段列表:https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load 当 source_format 为 ‘DATASTORE_BACKUP’ 时不应设置此项。如果 ‘schema_object’ 为 null 且 autodetect 为 False,则必须定义此参数。

  • schema_object – 如果设置,则为指向包含表模式的 .json 文件的 GCS 对象路径。(模板化) 如果 ‘schema_fields’ 为 null 且 autodetect 为 False,则必须定义此参数。

  • schema_object_bucket – [可选] 如果设置,则为存储模式对象模板的 GCS 存储桶。(模板化) (默认值: bucket 的值)

  • source_format – 要导出的文件格式。

  • compression – [可选] 数据源的压缩类型。可能的值包括 GZIP 和 NONE。默认值为 NONE。对于 Google Cloud Bigtable、Google Cloud Datastore 备份和 Avro 格式,此设置将被忽略。

  • create_disposition – 如果表不存在时的创建处置。

  • skip_leading_rows – 加载数据时 BigQuery 将跳过的 CSV 文件顶部的行数。当 autodetect 开启时,行为如下:skip_leading_rows 未指定 - 自动检测尝试检测第一行中的标题。如果未检测到,则该行作为数据读取。否则,数据将从第二行开始读取。skip_leading_rows 为 0 - 指示自动检测没有标题,数据应从第一行开始读取。skip_leading_rows = N > 0 - 自动检测跳过 N-1 行,并尝试在第 N 行检测标题。如果未检测到标题,则跳过第 N 行。否则,第 N 行用于提取检测到的模式的列名。默认值设置为 None,以便 autodetect 选项可以检测模式字段。

  • write_disposition – 如果表已存在时的写入处置。

  • field_delimiter – 从 CSV 加载时使用的分隔符。

  • max_bad_records – BigQuery 在运行作业时可以忽略的最大错误记录数。

  • quote_character – 用于引用 CSV 文件中数据部分的字符。

  • ignore_unknown_values – [可选] 指示 BigQuery 是否应允许表中模式未表示的额外值。如果为 true,则忽略额外值。如果为 false,则带有额外列的记录将被视为错误记录,如果错误记录过多,则在作业结果中返回无效错误。

  • allow_quoted_newlines – 是否允许带引号的换行符 (true) 或不允许 (false)。

  • allow_jagged_rows – 接受缺少尾随可选列的行。缺失值将被视为 null。如果为 false,则缺少尾随列的记录将被视为错误记录,如果错误记录过多,则在作业结果中返回无效错误。仅适用于 CSV,对于其他格式将被忽略。

  • encoding – 数据的字符编码。参见:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).csvOptions.encoding https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding

  • max_id_key – 如果设置,则为要加载的 BigQuery 表中列的名称。加载完成后,此名称将用于从 BigQuery 中选择 MAX 值。结果将由 execute() 命令返回,并存储在 XCom 中供后续操作符使用。这对于增量加载很有帮助——在未来的执行中,你可以从最大 ID 开始。

  • schema_update_options – 允许将目标表的模式作为加载作业的副作用进行更新。

  • src_fmt_configs – 配置特定于源格式的可选字段

  • external_table – 标志,用于指定目标表是否应为 BigQuery 外部表。默认值为 False。

  • time_partitioning – 配置可选的时间分区字段,即根据 API 规范按字段、类型和过期时间进行分区。请注意,‘field’ 不能与 dataset.table$partition 同时使用。

  • cluster_fields – 请求将此加载的结果按一个或多个列排序存储。BigQuery 支持对分区表和非分区表进行聚类。给定的列顺序决定了排序顺序。不适用于外部表。

  • autodetect – [可选] 指示是否应自动推断 CSV 和 JSON 源的选项和模式。(默认值: True)。如果未定义 ‘schema_fields’ 和 ‘schema_object’,则必须将参数设置为 True。如果在 Airflow 外部创建表,建议将其设置为 True。如果 autodetect 为 None 且未提供模式(无论是通过 schema_fields 还是 schema_object),则假定表已存在。

  • encryption_configuration

    [可选] 自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key",
    }
    

  • location – [可选] 作业的地理位置。除美国和欧盟外,必填。详细信息请参见 https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者获取列表中最后一个账号的 access_token 所需的账号链列表,该账号将在请求中被模拟。如果设置为字符串,则该账号必须授予发起账号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中身份必须授予 Service Account Token Creator IAM 角色给直接前一个身份,列表中第一个账号将此角色授予发起账号(模板化)。

  • labels – [可选] BigQuery 表的标签。

  • description – [可选] BigQuery 表的描述。仅在新建目标表时使用。如果表已存在且提供了与当前描述不同的值,则作业将失败。

  • deferrable (bool) – 以可延迟模式运行操作符

  • force_delete (bool) – 如果目标表已存在,则强制删除它。

template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'schema_object_bucket',...[源码]
template_ext: collections.abc.Sequence[str] = ('.sql',)[源码]
ui_color = '#f0eee4'[源码]
hook: airflow.providers.google.cloud.hooks.bigquery.BigQueryHook | None = None[源码]
configuration: dict[str, Any][源码]
bucket[源码]
source_objects[源码]
schema_object = None[源码]
schema_object_bucket = None[源码]
destination_project_dataset_table[源码]
project_id = None[源码]
schema_fields = None[源码]
source_format = ''[源码]
compression = 'NONE'[源码]
create_disposition = 'CREATE_IF_NEEDED'[源码]
skip_leading_rows = None[源码]
write_disposition = 'WRITE_EMPTY'[源码]
field_delimiter = ','[源码]
max_bad_records = 0[源码]
quote_character = None[源码]
ignore_unknown_values = False[源码]
allow_quoted_newlines = False[源码]
allow_jagged_rows = False[源码]
external_table = False[源码]
encoding : str = 'UTF-8'[源码]
max_id_key = None[源码]
gcp_conn_id = 'google_cloud_default'[源码]
schema_update_options = ()[源码]
src_fmt_configs = None[源码]
time_partitioning = None[源码]
cluster_fields = None[源码]
autodetect : bool | None = True[源码]
encryption_configuration = None[source]
location = None[source]
impersonation_chain = None[source]
labels = None[source]
description = None[source]
job_id = None[source]
deferrable = True[source]
result_retry[source]
result_timeout = None[source]
force_rerun = True[source]
reattach_states: set[str][source]
cancel_on_kill = True[source]
force_delete = False[source]
source_uris: list[str] = [][source]
execute(context)[source]

在创建 Operator 时派生。

Context 是与渲染 Jinja 模板时使用的相同的字典。

请参阅 get_template_context 以获取更多上下文信息。

execute_complete(context, event)[source]

立即返回,并依赖 trigger 抛出成功事件。trigger 的回调函数。

依赖 trigger 抛出异常,否则假定执行成功。

on_kill()[source]

重写此方法,以便在 task instance 被终止时清理子进程。

Operator 内使用 threading、subprocess 或 multiprocessing 模块的任何地方都需要清理,否则会留下僵尸进程。

get_openlineage_facets_on_complete(task_instance)[source]

实现 on_complete,因为我们将包含最终的 BQ 作业 ID。

此条目有帮助吗?