Trino 到 Google Cloud Storage 传输操作符

Trino 是一个开源、快速、分布式的 SQL 查询引擎,用于对从吉字节到拍字节的各种大小数据源运行交互式分析查询。Trino 允许在数据所在位置进行查询,包括 Hive、Cassandra、关系数据库甚至专有数据存储。一个 Trino 查询可以合并来自多个源的数据,从而实现对整个组织的分析。

Google Cloud Storage 允许在任何时间在全球范围内存储和检索任何数量的数据。您可以使用它来存储备份和归档数据,也可以作为 BigQuery 的数据源

数据传输

Trino 和 Google Storage 之间的数据传输通过 TrinoToGCSOperator 操作符执行。

此操作符有 3 个必需参数

  • sql - 要执行的 SQL。

  • bucket - 要上传到的 bucket。

  • filename - 上传到 Google Cloud Storage 时用作对象名称的文件名。文件名中应指定 {},以便操作符在文件因大小而被分割的情况下注入文件编号。

所有参数都在参考文档中描述 - TrinoToGCSOperator

一个示例操作符调用可能如下所示

tests/system/google/cloud/gcs/example_trino_to_gcs.py

trino_to_gcs_basic = TrinoToGCSOperator(  # TODO
    task_id="trino_to_gcs_basic",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
)

数据格式选择

操作符支持两种输出格式

  • json - JSON Lines(默认)

  • csv

您可以通过 export_format 参数指定这些选项。

如果您希望创建 CSV 文件,您的操作符调用可能如下所示

tests/system/google/cloud/gcs/example_trino_to_gcs.py

trino_to_gcs_csv = TrinoToGCSOperator(
    task_id="trino_to_gcs_csv",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.csv",
    schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
    export_format="csv",
)

生成 BigQuery schema

如果您设置了 schema_filename 参数,将从数据库导出包含表 BigQuery schema 字段的 .json 文件并上传到 bucket。

如果您想创建 schema 文件,则示例操作符调用可能如下所示

tests/system/google/cloud/gcs/example_trino_to_gcs.py

trino_to_gcs_multiple_types = TrinoToGCSOperator(  # TODO
    task_id="trino_to_gcs_multiple_types",
    sql=f"select * from {SOURCE_SCHEMA_COLUMNS}",
    bucket=GCS_BUCKET,
    filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}.{{}}.json",
    schema_filename=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
    gzip=False,
)

有关 BigQuery schema 的更多信息,请参阅 Big Query 文档中的指定 schema

将结果分割成多个文件

此操作符支持将大型结果分割成多个文件的功能。approx_max_file_size_bytes 参数允许开发者指定分割文件的文件大小。默认情况下,文件大小不超过 1 900 000 000 字节(1900 MB)。

查看 Google Cloud Storage 中的配额与限制,了解单个对象允许的最大文件大小。

如果您想创建 10 MB 的文件,您的代码可能如下所示

tests/system/google/cloud/gcs/example_trino_to_gcs.py

read_data_from_gcs_many_chunks = BigQueryInsertJobOperator(
    task_id="read_data_from_gcs_many_chunks",
    configuration={
        "query": {
            "query": f"SELECT COUNT(*) FROM `{GCP_PROJECT_ID}.{DATASET_NAME}."
            f"{safe_name(SOURCE_CUSTOMER_TABLE)}`",
            "useLegacySql": False,
        }
    },
)

使用 BigQuery 查询数据

Google Cloud Storage 中可用的数据可以由 BigQuery 使用。您可以将数据加载到 BigQuery,或在查询中直接引用 GCS 数据。有关将数据加载到 BigQuery 的信息,请参阅 BigQuery 文档中的从 Cloud Storage 加载数据简介。有关查询 GCS 数据的信息,请参阅 BigQuery 文档中的查询 Cloud Storage 数据

Airflow 也有许多操作符允许您使用 BigQuery。例如,如果您想创建一个外部表,以便您可以创建直接读取 GCS 数据的查询,那么您可以使用 BigQueryCreateExternalTableOperator。使用此操作符如下所示

tests/system/google/cloud/gcs/example_trino_to_gcs.py

create_external_table_multiple_types = BigQueryCreateTableOperator(
    task_id="create_external_table_multiple_types",
    dataset_id=DATASET_NAME,
    table_id=f"{safe_name(SOURCE_SCHEMA_COLUMNS)}",
    table_resource={
        "schema": {
            "fields": [
                {"name": "table_catalog", "type": "STRING"},
                {"name": "table_schema", "type": "STRING"},
                {"name": "table_name", "type": "STRING"},
                {"name": "column_name", "type": "STRING"},
                {"name": "ordinal_position", "type": "INT64"},
                {"name": "column_default", "type": "STRING"},
                {"name": "is_nullable", "type": "STRING"},
                {"name": "data_type", "type": "STRING"},
            ],
        },
        "externalDataConfiguration": {
            "sourceFormat": "NEWLINE_DELIMITED_JSON",
            "compression": "NONE",
            "sourceUris": [f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}.*.json"],
        },
    },
    gcs_schema_object=f"gs://{GCS_BUCKET}/{safe_name(SOURCE_SCHEMA_COLUMNS)}-schema.json",
)

有关 Airflow 和 BigQuery 集成的更多信息,请参阅 Python API 参考 - bigquery

参考

有关更多信息,请参阅

此条目有帮助吗?