Trino 到 Google Cloud Storage 传输操作符¶
Trino 是一个开源、快速、分布式的 SQL 查询引擎,用于对从千兆字节到拍字节的各种大小的数据源运行交互式分析查询。Trino 允许查询数据所在的位置,包括 Hive、Cassandra、关系数据库,甚至专有的数据存储。一个 Trino 查询可以组合来自多个来源的数据,从而允许在您的整个组织中进行分析。
Google Cloud Storage 允许随时随地存储和检索任何数量的数据。您可以使用它来存储备份和 存档数据,以及作为 BigQuery 的数据源。
数据传输¶
Trino 和 Google Storage 之间的文件传输使用 TrinoToGCSOperator
操作符执行。
此操作符有 3 个必需的参数
sql
- 要执行的 SQL。bucket
- 要上传到的存储桶。filename
- 上传到 Google Cloud Storage 时用作对象名称的文件名。应在文件名中指定一个{}
,以便在文件因大小而拆分的情况下,操作符可以注入文件编号。
所有参数都在参考文档中描述 - TrinoToGCSOperator
。
一个示例操作符调用可能如下所示
trino_to_gcs_basic = TrinoToGCSOperator(
task_id="trino_to_gcs_basic",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
)
数据格式的选择¶
操作符支持两种输出格式
json
- JSON Lines (默认)csv
您可以通过 export_format
参数指定这些选项。
如果您想创建一个 CSV 文件,您的操作符调用可能如下所示
trino_to_gcs_csv = TrinoToGCSOperator(
task_id="trino_to_gcs_csv",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.csv",
schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
export_format="csv",
)
生成 BigQuery 架构¶
如果您设置 schema_filename
参数,将从数据库中转储一个包含该表 BigQuery 架构字段的 .json
文件,并上传到存储桶。
如果您想创建一个架构文件,那么一个示例操作符调用可能如下所示
trino_to_gcs_multiple_types = TrinoToGCSOperator(
task_id="trino_to_gcs_multiple_types",
sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
bucket=GCS_BUCKET,
filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
gzip=False,
)
有关 BigQuery 架构的更多信息,请查看 BigQuery 文档中的 指定架构。
将结果拆分为多个文件¶
此操作符支持将大型结果拆分为多个文件的功能。approx_max_file_size_bytes
参数允许开发人员指定拆分的文件大小。默认情况下,该文件不超过 1,900,000,000 字节(1900 MB)
请查看 Google Cloud Storage 中的配额和限制,以了解单个对象允许的最大文件大小。
如果您想创建 10 MB 的文件,您的代码可能如下所示
read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
task_id="read_data_from_gcs_many_chunks",
configuration={
"query": {
"query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
"useLegacySql": False,
}
},
)
使用 BigQuery 查询数据¶
Google Cloud Storage 中可用的数据可以被 BigQuery 使用。您可以将数据加载到 BigQuery,也可以在查询中直接引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请查看 BigQuery 文档中的 从 Cloud Storage 加载数据简介。有关查询 GCS 数据的信息,请查看 BigQuery 文档中的 查询 Cloud Storage 数据。
Airflow 也有许多操作符允许您创建对 BigQuery 的使用。例如,如果您想创建一个外部表,允许您创建直接从 GCS 读取数据的查询,那么您可以使用 BigQueryCreateExternalTableOperator
。使用此操作符如下所示
create_external_table_multiple_types = BigQueryCreateExternalTableOperator(
task_id="create_external_table_multiple_types",
bucket=GCS_BUCKET,
table_resource={
"tableReference": {
"projectId": GCP_PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": f"{safe_name(SOURCE_SCHEMA_COLUMNS)}",
},
"schema": {
"fields": [
{"name": "table_catalog", "type": "STRING"},
{"name": "table_schema", "type": "STRING"},
{"name": "table_name", "type": "STRING"},
{"name": "column_name", "type": "STRING"},
{"name": "ordinal_position", "type": "INT64"},
{"name": "column_default", "type": "STRING"},
{"name": "is_nullable", "type": "STRING"},
{"name": "data_type", "type": "STRING"},
],
},
"externalDataConfiguration": {
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"compression": "NONE",
"sourceUris": [f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
},
},
source_objects=[f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
schema_object=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
)
有关 Airflow 和 BigQuery 集成的更多信息,请查看 Python API 参考 - bigquery
。