Google Cloud Storage 到 BigQuery 传输 Operator¶
Google Cloud Storage (GCS) 是一种用于存储非结构化数据的托管服务。Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库服务。此 Operator 可用于将存储在 Cloud Storage 存储桶中的文件数据填充到 BigQuery 表中。
先决条件任务¶
要使用这些 Operator,您必须完成以下事项
使用 Cloud 控制台选择或创建一个 Cloud Platform 项目。
为您的项目启用结算功能,具体请参阅 Google Cloud 文档中的描述。
启用 API,具体请参阅 Cloud 控制台文档中的描述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装的详细信息,请参阅。
Operator¶
从 GCS 到 BigQuery 的文件传输由 GCSToBigQueryOperator
Operator 执行。
使用 Jinja 模板化,并通过 bucket
, source_objects
, schema_object
, schema_object_bucket
, destination_project_dataset_table
, impersonation_chain
, src_fmt_configs
参数动态定义值。
您可以使用 source_objects
参数从单个存储桶加载多个对象。您还可以定义 schema 以及其他设置,例如压缩格式。有关更多信息,请参阅上方链接。
传输文件¶
以下 Operator 将一个或多个文件从 GCS 传输到 BigQuery 表中。
tests/system/google/cloud/gcs/example_gcs_to_bigquery.py
load_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.csv"],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
],
write_disposition="WRITE_TRUNCATE",
)
您也可以在可延迟模式下使用 GCSToBigQueryOperator
tests/system/google/cloud/gcs/example_gcs_to_bigquery_async.py
load_string_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_str_csv_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.csv"],
destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key="string_field_0",
deferrable=True,
)
load_date_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_date_csv_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states-by-date.csv"],
destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key=MAX_ID_DATE,
deferrable=True,
)
load_json = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_date_json_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.json"],
source_format="NEWLINE_DELIMITED_JSON",
destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key=MAX_ID_STR,
deferrable=True,
)
load_csv_delimiter = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_delimiter_async",
bucket="big-query-samples",
source_objects=["employees-tabular.csv"],
source_format="csv",
destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
field_delimiter="\t",
quote_character="",
max_id_key=MAX_ID_STR,
deferrable=True,
)
参考资料¶
如需更多信息,请参阅