Google Cloud Storage 到 BigQuery 传输 Operator

Google Cloud Storage (GCS) 是一种用于存储非结构化数据的托管服务。Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库服务。此 Operator 可用于将存储在 Cloud Storage 存储桶中的文件数据填充到 BigQuery 表中。

先决条件任务

要使用这些 Operator,您必须完成以下事项

Operator

从 GCS 到 BigQuery 的文件传输由 GCSToBigQueryOperator Operator 执行。

使用 Jinja 模板化,并通过 bucket, source_objects, schema_object, schema_object_bucket, destination_project_dataset_table, impersonation_chain, src_fmt_configs 参数动态定义值。

您可以使用 source_objects 参数从单个存储桶加载多个对象。您还可以定义 schema 以及其他设置,例如压缩格式。有关更多信息,请参阅上方链接。

传输文件

以下 Operator 将一个或多个文件从 GCS 传输到 BigQuery 表中。

tests/system/google/cloud/gcs/example_gcs_to_bigquery.py

load_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
    schema_fields=[
        {"name": "name", "type": "STRING", "mode": "NULLABLE"},
        {"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
    ],
    write_disposition="WRITE_TRUNCATE",
)

您也可以在可延迟模式下使用 GCSToBigQueryOperator

tests/system/google/cloud/gcs/example_gcs_to_bigquery_async.py

load_string_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_str_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key="string_field_0",
    deferrable=True,
)

load_date_based_csv = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_csv_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states-by-date.csv"],
    destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_DATE,
    deferrable=True,
)

load_json = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_date_json_async",
    bucket="cloud-samples-data",
    source_objects=["bigquery/us-states/us-states.json"],
    source_format="NEWLINE_DELIMITED_JSON",
    destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

load_csv_delimiter = GCSToBigQueryOperator(
    task_id="gcs_to_bigquery_example_delimiter_async",
    bucket="big-query-samples",
    source_objects=["employees-tabular.csv"],
    source_format="csv",
    destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
    write_disposition="WRITE_TRUNCATE",
    external_table=False,
    autodetect=True,
    field_delimiter="\t",
    quote_character="",
    max_id_key=MAX_ID_STR,
    deferrable=True,
)

参考资料

如需更多信息,请参阅

此条目是否有帮助?