Google Cloud Storage 传输算子到 BigQuery¶
Google Cloud Storage (GCS) 是一项用于存储非结构化数据的托管服务。 Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库产品。此算子可用于使用存储在 Cloud Storage 存储桶中的文件中的数据填充 BigQuery 表。
先决条件任务¶
要使用这些算子,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算,如 Google Cloud 文档中所述。
启用 API,如 Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
算子¶
使用 GCSToBigQueryOperator
算子执行从 GCS 到 BigQuery 的文件传输。
使用 bucket
、source_objects
、schema_object
、schema_object_bucket
、destination_project_dataset_table
、impersonation_chain
、src_fmt_configs
和 Jinja 模板 动态定义值。
您可以使用 source_objects
参数从单个存储桶加载多个对象。您还可以定义架构以及其他设置,例如压缩格式。有关更多信息,请参阅上面的链接。
传输文件¶
以下算子将一个或多个文件从 GCS 传输到 BigQuery 表中。
load_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.csv"],
destination_project_dataset_table=f"{DATASET_NAME}.{TABLE_NAME}",
schema_fields=[
{"name": "name", "type": "STRING", "mode": "NULLABLE"},
{"name": "post_abbr", "type": "STRING", "mode": "NULLABLE"},
],
write_disposition="WRITE_TRUNCATE",
)
您还可以在可延迟模式下使用 GCSToBigQueryOperator
load_string_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_str_csv_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.csv"],
destination_project_dataset_table=f"{DATASET_NAME_STR}.{TABLE_NAME_STR}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key="string_field_0",
deferrable=True,
)
load_date_based_csv = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_date_csv_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states-by-date.csv"],
destination_project_dataset_table=f"{DATASET_NAME_DATE}.{TABLE_NAME_DATE}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key=MAX_ID_DATE,
deferrable=True,
)
load_json = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_date_json_async",
bucket="cloud-samples-data",
source_objects=["bigquery/us-states/us-states.json"],
source_format="NEWLINE_DELIMITED_JSON",
destination_project_dataset_table=f"{DATASET_NAME_JSON}.{TABLE_NAME_JSON}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
max_id_key=MAX_ID_STR,
deferrable=True,
)
load_csv_delimiter = GCSToBigQueryOperator(
task_id="gcs_to_bigquery_example_delimiter_async",
bucket="big-query-samples",
source_objects=["employees-tabular.csv"],
source_format="csv",
destination_project_dataset_table=f"{DATASET_NAME_DELIMITER}.{TABLE_NAME_DELIMITER}",
write_disposition="WRITE_TRUNCATE",
external_table=False,
autodetect=True,
field_delimiter="\t",
quote_character="",
max_id_key=MAX_ID_STR,
deferrable=True,
)