Presto 到 Google Cloud Storage 传输操作符¶
Presto 是一个开源的分布式 SQL 查询引擎,用于对从千兆字节到拍字节的各种大小数据源运行交互式分析查询。Presto 允许在数据所在位置进行查询,包括 Hive、Cassandra、关系型数据库甚至专有数据存储。单个 Presto 查询可以组合来自多个来源的数据,从而实现对整个组织的数据分析。
Google Cloud Storage 允许在全球范围内随时存储和检索任意量的数据。您可以使用它存储备份数据和归档数据,也可以作为BigQuery 的数据源。
数据传输¶
使用 PrestoToGCSOperator
操作符在 Presto 和 Google Storage 之间传输文件。
此操作符有 3 个必需参数
sql
- 要执行的 SQL。bucket
- 要上传到的存储桶。filename
- 上传到 Google Cloud Storage 时用作对象名称的文件名。在文件名中应指定{}
,以便操作符在文件因大小而分割时注入文件编号。
所有参数都在参考文档中进行了描述 - PrestoToGCSOperator
。
示例操作符调用可能如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
presto_to_gcs_basic = PrestoToGCSOperator(
task_id="presto_to_gcs_basic",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=BUCKET_NAME,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
)
数据格式选择¶
此操作符支持两种输出格式
json
- JSON Lines(默认)csv
您可以通过 export_format
参数指定这些选项。
如果您想创建 CSV 文件,您的操作符调用可能如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
presto_to_gcs_csv = PrestoToGCSOperator(
task_id="presto_to_gcs_csv",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=BUCKET_NAME,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
export_format="csv",
)
生成 BigQuery schema¶
如果您设置 schema_filename
参数,一个包含表对应 BigQuery schema 字段的 .json
文件将从数据库导出并上传到存储桶。
如果您想创建 schema 文件,那么示例操作符调用可能如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
presto_to_gcs_multiple_types = PrestoToGCSOperator(
task_id="presto_to_gcs_multiple_types",
sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
bucket=BUCKET_NAME,
filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
gzip=False,
)
有关 BigQuery schema 的更多信息,请参阅 Big Query 文档中的 指定 schema。
将结果分割成多个文件¶
此操作符支持将大型结果分割成多个文件。`approx_max_file_size_bytes` 参数允许开发者指定分割文件的大小。默认情况下,文件大小不超过 1 900 000 000 字节(1900 MB)
查看 Google Cloud Storage 中的配额与限制,了解单个对象的最大允许文件大小。
如果您想创建 10 MB 的文件,您的代码可能如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
presto_to_gcs_many_chunks = PrestoToGCSOperator(
task_id="presto_to_gcs_many_chunks",
sql=f"select * from {SOURCE_CUSTOMER_TABLE}",
bucket=BUCKET_NAME,
filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}.{{}}.json",
schema_filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
approx_max_file_size_bytes=10_000_000,
gzip=False,
)
使用 BigQuery 查询数据¶
Google Cloud Storage 中的数据可供 BigQuery 使用。您可以将数据加载到 BigQuery,或在查询中直接引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请查阅 BigQuery 文档中的 从 Cloud Storage 加载数据简介。有关查询 GCS 数据的信息,请查阅 BigQuery 文档中的 查询 Cloud Storage 数据。
Airflow 还提供了许多允许您使用 BigQuery 的操作符。例如,如果您想创建一个外部表,以便您可以创建直接从 GCS 读取数据的查询,那么您可以使用 BigQueryCreateExternalTableOperator
。使用此操作符的代码如下所示
tests/system/google/cloud/gcs/example_presto_to_gcs.py
create_external_table_multiple_types = BigQueryCreateTableOperator(
task_id="create_external_table_multiple_types",
dataset_id=DATASET_NAME,
table_id=f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
},
"schema": {
"fields": [
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
]
},
"externalDataConfiguration": {
"sourceFormat": "NEWLINE_DELIMITED_JSON",
"compression": "NONE",
"csvOptions": {"skipLeadingRows": 1},
"sourceUris": [f"gs://{BUCKET_NAME}/{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
},
},
gcs_schema_object=f"gs://{BUCKET_NAME}/{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
)
有关 Airflow 和 BigQuery 集成的更多信息,请查阅 Python API 参考 - bigquery
。
参考¶
欲了解更多信息,请查阅