Google Cloud BigQuery 操作符

BigQuery 是 Google 提供的一款完全托管、PB 级、低成本的分析型数据仓库。它是一款无需数据库管理员的无服务器软件即服务 (SaaS)。它允许用户专注于分析数据,使用熟悉的 SQL 找出有价值的洞察。

Airflow 提供操作符来管理数据集和表、运行查询和验证数据。

前提任务

要使用这些操作符,您必须执行以下几项操作

管理数据集

创建数据集

要在 BigQuery 数据库中创建一个空数据集,可以使用 BigQueryCreateEmptyDatasetOperator

tests/system/google/cloud/bigquery/example_bigquery_dataset.py

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)

获取数据集详情

要获取现有数据集的详细信息,可以使用 BigQueryGetDatasetOperator

此操作符返回一个 数据集资源

tests/system/google/cloud/bigquery/example_bigquery_dataset.py

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

列出数据集中的表

要检索给定数据集中的表列表,请使用 BigQueryGetDatasetTablesOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py

get_dataset_tables = BigQueryGetDatasetTablesOperator(
    task_id="get_dataset_tables", dataset_id=DATASET_NAME
)

更新表

要在 BigQuery 中更新表,可以使用 BigQueryUpdateTableOperator

update 方法会替换整个表资源,而 patch 方法只替换提交的表资源中提供的字段。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

update_table = BigQueryUpdateTableOperator(
    task_id="update_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    fields=["friendlyName", "description"],
    table_resource={
        "friendlyName": "Updated Table",
        "description": "Updated Table",
    },
)

更新数据集

要在 BigQuery 中更新数据集,可以使用 BigQueryUpdateDatasetOperator

update 方法会替换整个数据集资源,而 patch 方法只替换提交的数据集资源中提供的字段。

tests/system/google/cloud/bigquery/example_bigquery_dataset.py

update_dataset = BigQueryUpdateDatasetOperator(
    task_id="update_dataset",
    dataset_id=DATASET_NAME,
    dataset_resource={"description": "Updated dataset"},
)

删除数据集

要从 BigQuery 数据库中删除现有数据集,可以使用 BigQueryDeleteDatasetOperator

tests/system/google/cloud/bigquery/example_bigquery_dataset.py

delete_dataset = BigQueryDeleteDatasetOperator(
    task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)

管理表

创建表

要在数据集中使用 Google Cloud Storage 中的数据创建新表,可以通过在 table_resource 字段中提供表结构来使用 BigQueryCreateTableOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py

create_table = BigQueryCreateTableOperator(
    task_id="create_table",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    table_resource={
        "schema": {
            "fields": [
                {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
                {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
            ],
        },
    },
)

您也可以指定 Google Cloud Storage 对象名称作为指定 schema 的方式。Google Cloud Storage 中的对象必须是包含 schema 字段的 JSON 文件。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

create_table_schema_json = BigQueryCreateTableOperator(
    task_id="create_table_schema_json",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
    table_resource={
        "tableReference": {
            "projectId": PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": "test_table",
        },
    },
)

您可以使用此操作符在现有表之上创建一个视图。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

create_view = BigQueryCreateTableOperator(
    task_id="create_view",
    dataset_id=DATASET_NAME,
    table_id="test_view",
    table_resource={
        "view": {
            "query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
            "useLegacySql": False,
        },
    },
)

您还可以使用此操作符创建物化视图,该视图会定期缓存查询结果以提高性能和效率。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

create_materialized_view = BigQueryCreateTableOperator(
    task_id="create_materialized_view",
    dataset_id=DATASET_NAME,
    table_id="test_materialized_view",
    table_resource={
        "materializedView": {
            "query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
            "enableRefresh": True,
            "refreshIntervalMs": 600000,
        },
    },
)

创建原生表

警告

此操作符已弃用,并将在 2025 年 7 月 30 日后移除。请使用 BigQueryCreateTableOperator

要在给定的 BigQuery 数据集中创建一个新的空表,可以选择使用 BigQueryCreateEmptyTableOperator 来指定 schema。

创建外部表

警告

此操作符已弃用,并将在 2025 年 7 月 30 日后移除。请使用 BigQueryCreateTableOperator

要在数据集中使用 Google Cloud Storage 中的数据创建新的外部表,可以使用 BigQueryCreateExternalTableOperator

从表中获取数据

要从 BigQuery 表中获取数据,可以使用 BigQueryGetDataOperator 。或者,如果您将字段传递给 selected_fields,则可以获取所选列的数据。

此操作符的结果可以根据 as_dict 参数的值以两种不同的格式检索:False(默认)- 一个 Python 列表的列表,其中嵌套列表中的元素数量将等于获取的行数。嵌套中的每个元素将是一个嵌套列表,其中的元素代表该行的列值。True - 一个 Python 字典的列表,其中每个字典代表一行。在每个字典中,键是列名,值是这些列对应的数值。

tests/system/google/cloud/bigquery/example_bigquery_queries.py

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
)

下面的示例展示了如何在异步(可延迟)模式下使用 BigQueryGetDataOperator。请注意,可延迟任务需要在您的 Airflow 部署上运行 Triggerer。

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME_1,
    use_legacy_sql=False,
    max_results=10,
    selected_fields="value",
    location=LOCATION,
    deferrable=True,
)

Upsert 表

要 upsert 表,可以使用 BigQueryUpsertTableOperator

此操作符要么更新现有表,要么在给定数据集中创建一个新的空表。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

更新表 Schema

要更新表的 schema,可以使用 BigQueryUpdateTableSchemaOperator

此操作符会更新提供的 schema 字段值,同时保留其余值不变。例如,这对于在现有表 schema 上设置新的字段描述非常有用。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

删除表

要删除现有表,可以使用 BigQueryDeleteTableOperator

tests/system/google/cloud/bigquery/example_bigquery_tables.py

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

您也可以使用此操作符删除视图。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

delete_view = BigQueryDeleteTableOperator(
    task_id="delete_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)

您也可以使用此操作符删除物化视图。

tests/system/google/cloud/bigquery/example_bigquery_tables.py

delete_materialized_view = BigQueryDeleteTableOperator(
    task_id="delete_materialized_view",
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)

执行 BigQuery 作业

假设您想执行以下查询。

tests/system/google/cloud/bigquery/example_bigquery_queries.py

INSERT_ROWS_QUERY = (
    f"INSERT {DATASET_NAME}.{TABLE_1} VALUES "
    f"(42, 'monty python', '{INSERT_DATE}'), "
    f"(42, 'fishy fish', '{INSERT_DATE}');"
)

要在特定的 BigQuery 数据库中执行 SQL 查询,可以使用 BigQueryInsertJobOperator,并配置适当的查询作业配置,该配置可以使用 Jinja 模板化。

tests/system/google/cloud/bigquery/example_bigquery_queries.py

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
)

下面的示例展示了如何在异步(可延迟)模式下使用 BigQueryInsertJobOperator。请注意,可延迟任务需要在您的 Airflow 部署上运行 Triggerer。

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

有关 BigQuery 作业类型的更多信息,请查看文档

如果您想在配置中包含一些文件,可以使用 Jinja 模板语言的 include 子句,如下所示

tests/system/google/cloud/bigquery/example_bigquery_queries.py

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include QUERY_SQL_PATH %}",
            "useLegacySql": False,
        }
    },
)

包含的文件也可以使用 Jinja 模板,这对于 .sql 文件非常有用。

此外,您可以使用 BigQueryInsertJobOperatorjob_id 参数来提高幂等性。如果未传递此参数,则将使用 uuid 作为 job_id。如果提供了此参数,则操作符将尝试使用此 job_id` 提交新作业。如果已存在具有此类 job_id 的作业,则它将重新连接到现有作业。

此外,对于所有这些操作,您都可以在可延迟模式下使用操作符

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
            "priority": "BATCH",
        }
    },
    location=LOCATION,
    deferrable=True,
)

验证数据

检查查询结果是否包含数据

要对 BigQuery 执行检查,可以使用 BigQueryCheckOperator

此操作符需要一个返回单行的 sql 查询。该第一行的每个值都使用 Python 的 bool 强制类型转换进行评估。如果任何值返回 False,则检查失败并报错。

tests/system/google/cloud/bigquery/example_bigquery_queries.py

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
    use_legacy_sql=False,
)

您也可以在此操作符中使用可延迟模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

将查询结果与通过值进行比较

要使用 sql 代码执行简单的值检查,可以使用 BigQueryValueCheckOperator

这些操作符需要一个返回单行的 sql 查询。该第一行的每个值都会与 pass_value 进行比较,pass_value 可以是字符串或数值。如果为数值,您还可以指定 tolerance

tests/system/google/cloud/bigquery/example_bigquery_queries.py

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
)

您也可以在此操作符中使用可延迟模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
    pass_value=2,
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

比较随时间变化的指标

要检查作为 SQL 表达式给出的指标值是否在与 days_back 之前的值的特定 tolerance 范围内,您可以使用 BigQueryIntervalCheckOperatorBigQueryIntervalCheckAsyncOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET_NAME}.{TABLE_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
)

您也可以在此操作符中使用可延迟模式

tests/system/google/cloud/bigquery/example_bigquery_queries_async.py

check_interval = BigQueryIntervalCheckOperator(
    task_id="check_interval",
    table=f"{DATASET_NAME}.{TABLE_NAME_1}",
    days_back=1,
    metrics_thresholds={"COUNT(*)": 1.5},
    use_legacy_sql=False,
    location=LOCATION,
    deferrable=True,
)

使用预定义测试检查列

要检查列是否通过用户可配置的测试,可以使用 BigQueryColumnCheckOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py

column_check = BigQueryColumnCheckOperator(
    task_id="column_check",
    table=f"{DATASET_NAME}.{TABLE_1}",
    column_mapping={"value": {"null_check": {"equal_to": 0}}},
)

检查表级别数据质量

要检查表是否通过用户定义的测试,可以使用 BigQueryTableCheckOperator

tests/system/google/cloud/bigquery/example_bigquery_queries.py

table_check = BigQueryTableCheckOperator(
    task_id="table_check",
    table=f"{DATASET_NAME}.{TABLE_1}",
    checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
)

传感器

检查表是否存在

要检查表是否存在,您可以定义一个传感器操作符。这允许延迟下游操作符的执行,直到表存在为止。如果表按日期分片,例如,您可以使用 {{ ds_nodash }} 宏作为表名后缀。

BigQueryTableExistenceSensor.

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_exists = BigQueryTableExistenceSensor(
    task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)

如果您想在传感器运行时释放工作节点槽位,您也可以在此操作符中使用可延迟模式。

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_exists_def = BigQueryTableExistenceSensor(
    task_id="check_table_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    deferrable=True,
)

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_exists_async = BigQueryTableExistenceSensor(
    task_id="check_table_exists_async",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

检查表分区是否存在

要检查表是否存在并且包含分区,可以使用 BigQueryTablePartitionExistenceSensor

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
)

对于按 DAY 分区的表,partition_id 参数是一个格式为“%Y%m%d”的字符串

如果您想在传感器运行时释放工作节点槽位,您也可以在此操作符中使用可延迟模式。

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_def",
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
    partition_id=PARTITION_NAME,
    deferrable=True,
)

tests/system/google/cloud/bigquery/example_bigquery_sensors.py

check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
    task_id="check_table_partition_exists_async",
    partition_id=PARTITION_NAME,
    project_id=PROJECT_ID,
    dataset_id=DATASET_NAME,
    table_id=TABLE_NAME,
)

参考资料

更多信息,请参阅

此条目是否有帮助?