Google Cloud BigQuery 操作符¶
BigQuery 是 Google 提供的一款完全托管、PB 级、低成本的分析型数据仓库。它是一款无需数据库管理员的无服务器软件即服务 (SaaS)。它允许用户专注于分析数据,使用熟悉的 SQL 找出有价值的洞察。
Airflow 提供操作符来管理数据集和表、运行查询和验证数据。
前提任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用结算功能,如Google Cloud 文档中所述。
启用 API,如Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关安装的详细信息可在以下链接找到。
管理数据集¶
创建数据集¶
要在 BigQuery 数据库中创建一个空数据集,可以使用 BigQueryCreateEmptyDatasetOperator
。
tests/system/google/cloud/bigquery/example_bigquery_dataset.py
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create_dataset", dataset_id=DATASET_NAME)
获取数据集详情¶
要获取现有数据集的详细信息,可以使用 BigQueryGetDatasetOperator
。
此操作符返回一个 数据集资源。
tests/system/google/cloud/bigquery/example_bigquery_dataset.py
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
列出数据集中的表¶
要检索给定数据集中的表列表,请使用 BigQueryGetDatasetTablesOperator
。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
get_dataset_tables = BigQueryGetDatasetTablesOperator(
task_id="get_dataset_tables", dataset_id=DATASET_NAME
)
更新表¶
要在 BigQuery 中更新表,可以使用 BigQueryUpdateTableOperator
。
update 方法会替换整个表资源,而 patch 方法只替换提交的表资源中提供的字段。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
update_table = BigQueryUpdateTableOperator(
task_id="update_table",
dataset_id=DATASET_NAME,
table_id="test_table",
fields=["friendlyName", "description"],
table_resource={
"friendlyName": "Updated Table",
"description": "Updated Table",
},
)
更新数据集¶
要在 BigQuery 中更新数据集,可以使用 BigQueryUpdateDatasetOperator
。
update 方法会替换整个数据集资源,而 patch 方法只替换提交的数据集资源中提供的字段。
tests/system/google/cloud/bigquery/example_bigquery_dataset.py
update_dataset = BigQueryUpdateDatasetOperator(
task_id="update_dataset",
dataset_id=DATASET_NAME,
dataset_resource={"description": "Updated dataset"},
)
删除数据集¶
要从 BigQuery 数据库中删除现有数据集,可以使用 BigQueryDeleteDatasetOperator
。
tests/system/google/cloud/bigquery/example_bigquery_dataset.py
delete_dataset = BigQueryDeleteDatasetOperator(
task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True
)
管理表¶
创建表¶
要在数据集中使用 Google Cloud Storage 中的数据创建新表,可以通过在 table_resource
字段中提供表结构来使用 BigQueryCreateTableOperator
。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
create_table = BigQueryCreateTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
table_resource={
"schema": {
"fields": [
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
},
},
)
您也可以指定 Google Cloud Storage 对象名称作为指定 schema 的方式。Google Cloud Storage 中的对象必须是包含 schema 字段的 JSON 文件。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
create_table_schema_json = BigQueryCreateTableOperator(
task_id="create_table_schema_json",
dataset_id=DATASET_NAME,
table_id="test_table",
gcs_schema_object=GCS_PATH_TO_SCHEMA_JSON,
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": "test_table",
},
},
)
您可以使用此操作符在现有表之上创建一个视图。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
create_view = BigQueryCreateTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
table_resource={
"view": {
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"useLegacySql": False,
},
},
)
您还可以使用此操作符创建物化视图,该视图会定期缓存查询结果以提高性能和效率。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
create_materialized_view = BigQueryCreateTableOperator(
task_id="create_materialized_view",
dataset_id=DATASET_NAME,
table_id="test_materialized_view",
table_resource={
"materializedView": {
"query": f"SELECT SUM(salary) AS sum_salary FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`",
"enableRefresh": True,
"refreshIntervalMs": 600000,
},
},
)
创建原生表¶
警告
此操作符已弃用,并将在 2025 年 7 月 30 日后移除。请使用 BigQueryCreateTableOperator
。
要在给定的 BigQuery 数据集中创建一个新的空表,可以选择使用 BigQueryCreateEmptyTableOperator
来指定 schema。
创建外部表¶
警告
此操作符已弃用,并将在 2025 年 7 月 30 日后移除。请使用 BigQueryCreateTableOperator
。
要在数据集中使用 Google Cloud Storage 中的数据创建新的外部表,可以使用 BigQueryCreateExternalTableOperator
。
从表中获取数据¶
要从 BigQuery 表中获取数据,可以使用 BigQueryGetDataOperator
。或者,如果您将字段传递给 selected_fields
,则可以获取所选列的数据。
此操作符的结果可以根据 as_dict
参数的值以两种不同的格式检索:False
(默认)- 一个 Python 列表的列表,其中嵌套列表中的元素数量将等于获取的行数。嵌套中的每个元素将是一个嵌套列表,其中的元素代表该行的列值。True
- 一个 Python 字典的列表,其中每个字典代表一行。在每个字典中,键是列名,值是这些列对应的数值。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
)
下面的示例展示了如何在异步(可延迟)模式下使用 BigQueryGetDataOperator
。请注意,可延迟任务需要在您的 Airflow 部署上运行 Triggerer。
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME_1,
use_legacy_sql=False,
max_results=10,
selected_fields="value",
location=LOCATION,
deferrable=True,
)
Upsert 表¶
要 upsert 表,可以使用 BigQueryUpsertTableOperator
。
此操作符要么更新现有表,要么在给定数据集中创建一个新的空表。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
upsert_table = BigQueryUpsertTableOperator(
task_id="upsert_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
更新表 Schema¶
要更新表的 schema,可以使用 BigQueryUpdateTableSchemaOperator
。
此操作符会更新提供的 schema 字段值,同时保留其余值不变。例如,这对于在现有表 schema 上设置新的字段描述非常有用。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
删除表¶
要删除现有表,可以使用 BigQueryDeleteTableOperator
。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)
您也可以使用此操作符删除视图。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
delete_view = BigQueryDeleteTableOperator(
task_id="delete_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_view",
)
您也可以使用此操作符删除物化视图。
tests/system/google/cloud/bigquery/example_bigquery_tables.py
delete_materialized_view = BigQueryDeleteTableOperator(
task_id="delete_materialized_view",
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_materialized_view",
)
执行 BigQuery 作业¶
假设您想执行以下查询。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
INSERT_ROWS_QUERY = (
f"INSERT {DATASET_NAME}.{TABLE_1} VALUES "
f"(42, 'monty python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
要在特定的 BigQuery 数据库中执行 SQL 查询,可以使用 BigQueryInsertJobOperator
,并配置适当的查询作业配置,该配置可以使用 Jinja 模板化。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
)
下面的示例展示了如何在异步(可延迟)模式下使用 BigQueryInsertJobOperator
。请注意,可延迟任务需要在您的 Airflow 部署上运行 Triggerer。
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
有关 BigQuery 作业类型的更多信息,请查看文档。
如果您想在配置中包含一些文件,可以使用 Jinja 模板语言的 include
子句,如下所示
tests/system/google/cloud/bigquery/example_bigquery_queries.py
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include QUERY_SQL_PATH %}",
"useLegacySql": False,
}
},
)
包含的文件也可以使用 Jinja 模板,这对于 .sql
文件非常有用。
此外,您可以使用 BigQueryInsertJobOperator
的 job_id
参数来提高幂等性。如果未传递此参数,则将使用 uuid 作为 job_id
。如果提供了此参数,则操作符将尝试使用此 job_id`
提交新作业。如果已存在具有此类 job_id
的作业,则它将重新连接到现有作业。
此外,对于所有这些操作,您都可以在可延迟模式下使用操作符
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
"priority": "BATCH",
}
},
location=LOCATION,
deferrable=True,
)
验证数据¶
检查查询结果是否包含数据¶
要对 BigQuery 执行检查,可以使用 BigQueryCheckOperator
此操作符需要一个返回单行的 sql 查询。该第一行的每个值都使用 Python 的 bool
强制类型转换进行评估。如果任何值返回 False
,则检查失败并报错。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
use_legacy_sql=False,
)
您也可以在此操作符中使用可延迟模式
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
将查询结果与通过值进行比较¶
要使用 sql 代码执行简单的值检查,可以使用 BigQueryValueCheckOperator
这些操作符需要一个返回单行的 sql 查询。该第一行的每个值都会与 pass_value
进行比较,pass_value
可以是字符串或数值。如果为数值,您还可以指定 tolerance
。
tests/system/google/cloud/bigquery/example_bigquery_queries.py
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
pass_value=4,
use_legacy_sql=False,
)
您也可以在此操作符中使用可延迟模式
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_NAME_1}",
pass_value=2,
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
比较随时间变化的指标¶
要检查作为 SQL 表达式给出的指标值是否在与 days_back
之前的值的特定 tolerance 范围内,您可以使用 BigQueryIntervalCheckOperator
或 BigQueryIntervalCheckAsyncOperator
tests/system/google/cloud/bigquery/example_bigquery_queries.py
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
)
您也可以在此操作符中使用可延迟模式
tests/system/google/cloud/bigquery/example_bigquery_queries_async.py
check_interval = BigQueryIntervalCheckOperator(
task_id="check_interval",
table=f"{DATASET_NAME}.{TABLE_NAME_1}",
days_back=1,
metrics_thresholds={"COUNT(*)": 1.5},
use_legacy_sql=False,
location=LOCATION,
deferrable=True,
)
使用预定义测试检查列¶
要检查列是否通过用户可配置的测试,可以使用 BigQueryColumnCheckOperator
tests/system/google/cloud/bigquery/example_bigquery_queries.py
column_check = BigQueryColumnCheckOperator(
task_id="column_check",
table=f"{DATASET_NAME}.{TABLE_1}",
column_mapping={"value": {"null_check": {"equal_to": 0}}},
)
检查表级别数据质量¶
要检查表是否通过用户定义的测试,可以使用 BigQueryTableCheckOperator
tests/system/google/cloud/bigquery/example_bigquery_queries.py
table_check = BigQueryTableCheckOperator(
task_id="table_check",
table=f"{DATASET_NAME}.{TABLE_1}",
checks={"row_count_check": {"check_statement": "COUNT(*) = 4"}},
)
传感器¶
检查表是否存在¶
要检查表是否存在,您可以定义一个传感器操作符。这允许延迟下游操作符的执行,直到表存在为止。如果表按日期分片,例如,您可以使用 {{ ds_nodash }}
宏作为表名后缀。
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_exists = BigQueryTableExistenceSensor(
task_id="check_table_exists", project_id=PROJECT_ID, dataset_id=DATASET_NAME, table_id=TABLE_NAME
)
如果您想在传感器运行时释放工作节点槽位,您也可以在此操作符中使用可延迟模式。
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_exists_def = BigQueryTableExistenceSensor(
task_id="check_table_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
deferrable=True,
)
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_exists_async = BigQueryTableExistenceSensor(
task_id="check_table_exists_async",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)
检查表分区是否存在¶
要检查表是否存在并且包含分区,可以使用 BigQueryTablePartitionExistenceSensor
。
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_partition_exists = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
)
对于按 DAY 分区的表,partition_id
参数是一个格式为“%Y%m%d”的字符串
如果您想在传感器运行时释放工作节点槽位,您也可以在此操作符中使用可延迟模式。
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_partition_exists_def = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_def",
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
partition_id=PARTITION_NAME,
deferrable=True,
)
tests/system/google/cloud/bigquery/example_bigquery_sensors.py
check_table_partition_exists_async = BigQueryTablePartitionExistenceSensor(
task_id="check_table_partition_exists_async",
partition_id=PARTITION_NAME,
project_id=PROJECT_ID,
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
)
参考资料¶
更多信息,请参阅