Presto 到 Google Cloud Storage 传输操作符

Presto 是一个开源的分布式 SQL 查询引擎,用于对从千兆字节到拍字节的各种大小数据源运行交互式分析查询。Presto 允许在数据所在位置进行查询,包括 Hive、Cassandra、关系型数据库甚至专有数据存储。单个 Presto 查询可以组合来自多个来源的数据,从而实现对整个组织的数据分析。

Google Cloud Storage 允许在全球范围内随时存储和检索任意量的数据。您可以使用它存储备份数据和归档数据,也可以作为BigQuery 的数据源

数据传输

使用 PrestoToGCSOperator 操作符在 Presto 和 Google Storage 之间传输文件。

此操作符有 3 个必需参数

  • sql - 要执行的 SQL。

  • bucket - 要上传到的存储桶。

  • filename - 上传到 Google Cloud Storage 时用作对象名称的文件名。在文件名中应指定 {},以便操作符在文件因大小而分割时注入文件编号。

所有参数都在参考文档中进行了描述 - PrestoToGCSOperator

示例操作符调用可能如下所示

tests/system/google/cloud/gcs/example_presto_to_gcs.py

presto_to_gcs_basic = PrestoToGCSOperator(
    task_id="presto_to_gcs_basic",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=BUCKET_NAME,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
)

数据格式选择

此操作符支持两种输出格式

  • json - JSON Lines(默认)

  • csv

您可以通过 export_format 参数指定这些选项。

如果您想创建 CSV 文件,您的操作符调用可能如下所示

tests/system/google/cloud/gcs/example_presto_to_gcs.py

presto_to_gcs_csv = PrestoToGCSOperator(
    task_id="presto_to_gcs_csv",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=BUCKET_NAME,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.csv",
    schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
    export_format="csv",
)

生成 BigQuery schema

如果您设置 schema_filename 参数,一个包含表对应 BigQuery schema 字段的 .json 文件将从数据库导出并上传到存储桶。

如果您想创建 schema 文件,那么示例操作符调用可能如下所示

tests/system/google/cloud/gcs/example_presto_to_gcs.py

presto_to_gcs_multiple_types = PrestoToGCSOperator(
    task_id="presto_to_gcs_multiple_types",
    sql=f"select * from {SOURCE_MULTIPLE_TYPES}",
    bucket=BUCKET_NAME,
    filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}.{{}}.json",
    schema_filename=f"{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
    gzip=False,
)

有关 BigQuery schema 的更多信息,请参阅 Big Query 文档中的 指定 schema

将结果分割成多个文件

此操作符支持将大型结果分割成多个文件。`approx_max_file_size_bytes` 参数允许开发者指定分割文件的大小。默认情况下,文件大小不超过 1 900 000 000 字节(1900 MB)

查看 Google Cloud Storage 中的配额与限制,了解单个对象的最大允许文件大小。

如果您想创建 10 MB 的文件,您的代码可能如下所示

tests/system/google/cloud/gcs/example_presto_to_gcs.py

presto_to_gcs_many_chunks = PrestoToGCSOperator(
    task_id="presto_to_gcs_many_chunks",
    sql=f"select * from {SOURCE_CUSTOMER_TABLE}",
    bucket=BUCKET_NAME,
    filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}.{{}}.json",
    schema_filename=f"{safe_name(SOURCE_CUSTOMER_TABLE)}-schema.json",
    approx_max_file_size_bytes=10_000_000,
    gzip=False,
)

使用 BigQuery 查询数据

Google Cloud Storage 中的数据可供 BigQuery 使用。您可以将数据加载到 BigQuery,或在查询中直接引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请查阅 BigQuery 文档中的 从 Cloud Storage 加载数据简介。有关查询 GCS 数据的信息,请查阅 BigQuery 文档中的 查询 Cloud Storage 数据

Airflow 还提供了许多允许您使用 BigQuery 的操作符。例如,如果您想创建一个外部表,以便您可以创建直接从 GCS 读取数据的查询,那么您可以使用 BigQueryCreateExternalTableOperator。使用此操作符的代码如下所示

tests/system/google/cloud/gcs/example_presto_to_gcs.py

create_external_table_multiple_types = BigQueryCreateTableOperator(
    task_id="create_external_table_multiple_types",
    dataset_id=DATASET_NAME,
    table_id=f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
    table_resource={
        "tableReference": {
            "projectId": PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": f"{safe_name(SOURCE_MULTIPLE_TYPES)}",
        },
        "schema": {
            "fields": [
                {"name": "name", "type": "STRING"},
                {"name": "post_abbr", "type": "STRING"},
            ]
        },
        "externalDataConfiguration": {
            "sourceFormat": "NEWLINE_DELIMITED_JSON",
            "compression": "NONE",
            "csvOptions": {"skipLeadingRows": 1},
            "sourceUris": [f"gs://{BUCKET_NAME}/{safe_name(SOURCE_MULTIPLE_TYPES)}.*.json"],
        },
    },
    gcs_schema_object=f"gs://{BUCKET_NAME}/{safe_name(SOURCE_MULTIPLE_TYPES)}-schema.json",
)

有关 Airflow 和 BigQuery 集成的更多信息,请查阅 Python API 参考 - bigquery

参考

欲了解更多信息,请查阅

本条目是否有帮助?