airflow.providers.google.cloud.hooks.bigquery¶
BigQuery Hook 以及一个非常基础的 BigQuery 的 PEP 249 实现。
属性¶
类¶
与 BigQuery 交互。 |
|
BigQuery 连接。 |
|
BigQuery 游标。 |
|
一个非常基础的 BigQuery 的 PEP 249 游标实现。 |
|
使用 gcloud-aio 库检索作业详情。 |
|
用于 BigQuery Table 的异步 Hook。 |
函数¶
|
模块内容¶
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryHook(use_legacy_sql=True, location=None, priority='INTERACTIVE', api_resource_configs=None, impersonation_scopes=None, labels=None, **kwargs)[source]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
,airflow.providers.common.sql.hooks.sql.DbApiHook
与 BigQuery 交互。
此 Hook 使用 Google Cloud 连接。
- 参数:
gcp_conn_id – 用于 GCP 凭据的 Airflow 连接。
use_legacy_sql (bool) – 这指定是否使用旧版 SQL 方言。
location (str | None) – BigQuery 资源的位置。
priority (str) – 指定查询的优先级。可能的值包括 INTERACTIVE 和 BATCH。默认值为 INTERACTIVE。
api_resource_configs (dict | None) – 这包含应用于 Google BigQuery 作业的参数配置。
impersonation_chain – 这是可选的服务账号,用于使用短期凭据进行模拟。
impersonation_scopes (str | collections.abc.Sequence[str] | None) – 被模拟账号的可选作用域列表。将覆盖连接中的作用域。
labels (dict | None) – BigQuery 资源标签。
- get_sqlalchemy_engine(engine_kwargs=None)[source]¶
创建一个 SQLAlchemy 引擎对象。
- 参数:
engine_kwargs (dict | None) –
create_engine()
中使用的 Kwargs。
- get_records(sql, parameters=None)[source]¶
执行 sql 并返回一组记录。
- 参数:
sql – 要执行的 sql 语句(str)或要执行的 sql 语句列表
parameters – 用于渲染 SQL 查询的参数。
- abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[source]¶
插入行。
当前不支持插入。理论上,您可以使用 BigQuery 的流式 API 将行插入到表中,但这尚未实现。
- get_pandas_df(sql, parameters=None, dialect=None, **kwargs)[source]¶
获取 BigQuery 结果的 Pandas DataFrame。
必须覆盖 DbApiHook 方法,因为 Pandas 不支持 PEP 249 连接,SQLite 除外。
另请参阅
https://github.com/pandas-dev/pandas/blob/055d008615272a1ceca9720dc365a2abd316f353/pandas/io/sql.py#L415 https://github.com/pandas-dev/pandas/issues/6900
- 参数:
sql (str) – 要执行的 BigQuery SQL。
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 用于渲染 SQL 查询的参数(未使用,保留以覆盖超类方法)
dialect (str | None) – BigQuery SQL 方言 – 旧版 SQL 或标准 SQL;如果未指定,则默认为使用 self.use_legacy_sql
kwargs – (可选) 传递到 pandas_gbq.read_gbq 方法中
- table_partition_exists(dataset_id, table_id, partition_id, project_id)[source]¶
检查 Google BigQuery 中是否存在分区。
- create_empty_table(project_id=PROVIDE_PROJECT_ID, dataset_id=None, table_id=None, table_resource=None, schema_fields=None, time_partitioning=None, cluster_fields=None, labels=None, view=None, materialized_view=None, encryption_configuration=None, retry=DEFAULT_RETRY, location=None, exists_ok=True)[source]¶
在数据集中创建一个新的空表。
要创建一个由 SQL 查询定义的视图,请将字典解析到 view 参数。
- 参数:
project_id (str) – 要在其中创建表的项目。
dataset_id (str | None) – 要在其中创建表的数据集。
table_id (str | None) – 要创建的表的名称。
table_resource (dict[str, Any] | None) – 文档中描述的表资源: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供此参数,将忽略所有其他参数。
schema_fields (list | None) –
如果设置,此处定义的 schema 字段列表: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
labels (dict | None) – 包含表标签的字典,传递给 BigQuery
retry (google.api_core.retry.Retry) – 可选。如何重试 RPC。
time_partitioning (dict | None) –
配置可选的时间分区字段,即根据 API 规范按字段、类型和过期时间进行分区。
cluster_fields (list[str] | None) – [可选] 用于聚簇的字段。BigQuery 支持对分区表和非分区表进行聚簇。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#clustering.fields
view (dict | None) –
[可选] 包含视图定义的字典。如果设置,它将创建一个视图而不是表: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
view = { "query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 1000", "useLegacySql": False, }
materialized_view (dict | None) – [可选] 物化视图定义。
encryption_configuration (dict | None) –
[可选] 自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key", }
num_retries – 连接问题时的最大重试次数。
location (str | None) – (可选) 表应驻留的地理位置。
exists_ok (bool) – 如果为
True
,创建表时忽略“已存在”错误。
- 返回值:
创建的表
- 返回类型:
- create_table(dataset_id, table_id, table_resource, location=None, project_id=PROVIDE_PROJECT_ID, exists_ok=True, schema_fields=None, retry=DEFAULT_RETRY, timeout=None)[source]¶
在数据集中创建一个新的空表。
- 参数:
project_id (str) – 可选。要在其中创建表的项目。
dataset_id (str) – 必需。要在其中创建表的数据集。
table_id (str) – 必需。要创建的表的名称。
table_resource (dict[str, Any] | google.cloud.bigquery.table.Table | google.cloud.bigquery.table.TableReference | google.cloud.bigquery.table.TableListItem) – 必需。文档中描述的表资源: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果
table
是一个引用,将创建一个具有指定 ID 的空表。表所属的数据集必须已存在。schema_fields (list | None) –
可选。如果设置,此处定义的 schema 字段列表: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
location (str | None) – 可选。操作使用的位置。
exists_ok (bool) – 可选。如果为
True
,创建表时忽略“已存在”错误。retry (google.api_core.retry.Retry) – 可选。用于重试请求的重试对象。如果指定 None,请求将不会被重试。
timeout (float | None) – 可选。等待请求完成的时间量,以秒为单位。请注意,如果指定了 retry,则超时适用于每次单独的尝试。
- create_empty_dataset(dataset_id=None, project_id=PROVIDE_PROJECT_ID, location=None, dataset_reference=None, exists_ok=True)[source]¶
创建一个新的空数据集。
- 参数:
project_id (str) – 我们要在其中创建空数据集的项目的名称。如果 dataset_reference 中有 projectId,则无需提供此参数。
dataset_id (str | None) – 数据集的 ID。如果 dataset_reference 中有 datasetId,则无需提供此参数。
location (str | None) – (可选) 数据集应驻留的地理位置。没有默认值,但如果未提供任何值,数据集将在美国创建。
dataset_reference (dict[str, Any] | None) – 可随请求正文提供的 Dataset reference。更多信息请参阅: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
exists_ok (bool) – 如果为
True
,则在创建数据集时忽略“已存在”错误。
- get_dataset_tables(dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, retry=DEFAULT_RETRY)[source]¶
获取指定数据集的表列表。
更多信息请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
- delete_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, retry=DEFAULT_RETRY)[source]¶
在您的项目中删除一个 BigQuery 数据集。
- 参数:
project_id (str) – 数据集所在的项目名称。
dataset_id (str) – 要删除的数据集。
delete_contents (bool) – 如果为 True,则删除数据集中的所有表。如果为 False 且数据集包含表,则请求将失败。
retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。
- update_table(table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]¶
更改表的某些字段。
使用
fields
指定要更新的字段。必须至少提供一个字段。如果某个字段列在fields
中,但在table
中为None
,则该字段值将被删除。如果
table.etag
不为None
,则只有当服务器上的表具有相同的 ETag 时,更新才会成功。因此,使用get_table
读取表,更改其字段,然后将其传递给update_table
,将确保更改仅在自读取以来未发生对表的修改时才会保存。- 参数:
project_id (str) – 要在其中创建表的项目。
dataset_id (str | None) – 要在其中创建表的数据集。
table_id (str | None) – 要创建的表的名称。
table_resource (dict[str, Any]) – 如文档中所述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 表必须包含
tableReference
或必须提供project_id
、dataset_id
和table_id
。fields (list[str] | None) – 要更改的
table
字段,拼写与 Table 属性相同(例如,“friendly_name”)。
- insert_all(project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False)[source]¶
无需加载作业,即可一次一条记录地将数据流式传输到 BigQuery 中。
- 参数:
project_id (str) – 表所在的项目名称
dataset_id (str) – 表所在的数据集名称
table_id (str) – 表名称
rows (list) –
要插入的行
rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
ignore_unknown_values (bool) – [可选] 接受包含与架构不匹配的值的行。未知值将被忽略。默认值为 false,将未知值视为错误。
skip_invalid_rows (bool) – [可选] 插入请求中的所有有效行,即使存在无效行。默认值为 false,如果存在任何无效行,将导致整个请求失败。
fail_on_error (bool) – [可选] 如果发生任何错误,强制任务失败。默认值为 false,表示即使发生任何插入错误,任务也不应失败。
- update_dataset(fields, dataset_resource, dataset_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY)[source]¶
更改数据集的某些字段。
使用
fields
指定要更新的字段。必须至少提供一个字段。如果某个字段列在fields
中,但在dataset
中为None
,则该字段将被删除。如果
dataset.etag
不为None
,则只有当服务器上的数据集具有相同的 ETag 时,更新才会成功。因此,使用get_dataset
读取数据集,更改其字段,然后将其传递给update_dataset
,将确保更改仅在自读取以来未发生对数据集的修改时才会保存。- 参数:
dataset_resource (dict[str, Any]) – 将在请求正文中提供的数据集资源。https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
dataset_id (str | None) – 数据集的 ID。
fields (collections.abc.Sequence[str]) – 要更改的
dataset
属性(例如,“friendly_name”)。project_id (str) – Google Cloud 项目 ID
retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。
- get_datasets_list(project_id=PROVIDE_PROJECT_ID, include_all=False, filter_=None, max_results=None, page_token=None, retry=DEFAULT_RETRY, return_iterator=False)[source]¶
获取当前项目中的所有 BigQuery 数据集。
更多信息请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
- 参数:
project_id (str) – 您尝试获取所有数据集的 Google Cloud 项目
include_all (bool) – 如果结果包含隐藏数据集,则为 True。默认为 False。
filter – 按标签过滤结果的表达式。有关语法,请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter。
filter – 字符串
max_results (int | None) – 要返回的最大数据集数。
max_results – 整型
page_token (str | None) – 表示数据集游标的令牌。如果未传递,API 将返回第一页数据集。该令牌标记要返回的迭代器的开头,并且可以通过
HTTPIterator
的next_page_token
访问page_token
的值。page_token – 字符串
retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。
return_iterator (bool) – 返回 HTTPIterator 而不是 list[Row],HTTPIterator 可用于获取 next_page_token 属性。
- get_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID)[source]¶
获取由 dataset_id 引用的数据集。
- 参数:
- 返回值:
数据集资源
- 返回类型:
另请参阅
更多信息请参阅 Dataset Resource 内容:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
- run_grant_dataset_view_access(source_dataset, view_dataset, view_table, view_project=None, project_id=PROVIDE_PROJECT_ID)[source]¶
将数据集的授权视图访问权限授予视图表。
如果此视图已获得数据集的访问权限,则不做任何操作。此方法不是原子性的。运行它可能会破坏同时进行的更新。
- run_table_upsert(dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID)[source]¶
如果表存在则更新,否则创建新表。
由于 BigQuery 本身不支持表 upsert,此操作不是原子性的。
- 参数:
dataset_id (str) – 要将表 upsert 到其中的数据集。
table_resource (dict[str, Any]) – 表资源。请参阅 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
project_id (str) – 要将表 upsert 到其中的项目。如果为 None,则使用 self.project_id。
- delete_table(table_id, not_found_ok=True, project_id=PROVIDE_PROJECT_ID)[source]¶
从数据集中删除现有表。
如果表不存在,则除非将 not_found_ok 设置为 True,否则返回错误。
- list_rows(dataset_id, table_id, max_results=None, selected_fields=None, page_token=None, start_index=None, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY, return_iterator=False)[source]¶
列出表中的行。
请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
- 参数:
dataset_id (str) – 请求的表的数据集 ID。
table_id (str) – 请求的表的表 ID。
max_results (int | None) – 要返回的最大结果数。
selected_fields (list[str] | str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。
page_token (str | None) – 分页令牌,由先前的调用返回,用于标识结果集。
start_index (int | None) – 要读取的起始行的从零开始的索引。
project_id (str) – 客户端代表其执行操作的项目 ID。
location (str | None) – 作业的默认位置。
retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。
return_iterator (bool) – 返回 RowIterator 而不是 list[Row],RowIterator 可用于获取 next_page_token 属性。
- 返回值:
行列表
- 返回类型:
list[google.cloud.bigquery.table.Row] | google.cloud.bigquery.table.RowIterator
- update_table_schema(schema_fields_updates, include_policy_tags, dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]¶
更新指定数据集和表的架构中的字段。
请注意,架构中的某些字段是不可变的;尝试更改它们将导致异常。
如果包含新字段,则会插入该字段,这要求设置所有必填字段。
- 参数:
include_policy_tags (bool) – 如果设置为 True,策略标签将包含在更新请求中,即使未更改,这也需要特殊权限,请参阅 https://cloud.google.com/bigquery/docs/column-level-security#roles
dataset_id (str) – 要更新的表的 数据集 ID
table_id (str) – 要更新的表的 表 ID
schema_fields_updates (list[dict[str, Any]]) –
部分架构资源。请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
schema_fields_updates = [ {"name": "emp_name", "description": "Some New Description"}, {"name": "salary", "description": "Some New Description"}, { "name": "departments", "fields": [ {"name": "name", "description": "Some New Description"}, {"name": "type", "description": "Some New Description"}, ], }, ]
project_id (str) – 要更新表的项目名称。
- poll_job_complete(job_id, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY)[source]¶
检查作业是否已完成。
- 参数:
job_id (str) – 作业 ID。
project_id (str) – 作业运行所在的 Google Cloud 项目
location (str | None) – 作业运行所在的位置
retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。
- insert_job(configuration, job_id=None, project_id=PROVIDE_PROJECT_ID, location=None, nowait=False, retry=DEFAULT_RETRY, timeout=None)[source]¶
执行 BigQuery 作业并等待其完成。
- 参数:
configuration (dict) – configuration 参数直接映射到 BigQuery 作业对象中的 configuration 字段。有关详细信息,请参阅 https://cloud.google.com/bigquery/docs/reference/v2/jobs。
job_id (str | None) – 作业的 ID。ID 必须仅包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或短划线 (-)。最大长度为 1,024 个字符。如果未提供,则将生成 uuid。
project_id (str) – 作业运行所在的 Google Cloud 项目。
location (str | None) – 作业运行的位置。
nowait (bool) – 是否在不等待结果的情况下插入作业。
retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。
timeout (float | None) – 在使用
retry
之前等待底层 HTTP 传输的秒数。
- 返回值:
作业 ID。
- 返回类型:
BigQueryJob
- get_query_results(job_id, location, max_results=None, selected_fields=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY)[source]¶
获取给定 job_id 的查询结果。
- 参数:
job_id (str) – 作业的 ID。ID 必须仅包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_ 或短划线 (-)。最大长度为 1,024 个字符。
location (str) – 操作使用的位置。
selected_fields (list[str] | str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。
max_results (int | None) – 从表中获取的最大记录(行)数。
project_id (str) – 作业运行所在的 Google Cloud 项目。
retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。
job_retry (google.api_core.retry.Retry) – 如何重试失败的作业。
- 返回值:
如果指定了 selected fields,则返回按这些字段过滤的行列表
- 抛出:
AirflowException
- 返回类型:
- property scopes: collections.abc.Sequence[str][source]¶
返回 OAuth 2.0 scopes。
- 返回值:
返回 impersonation_scopes 中定义的 scopes、连接配置中的 scopes 或默认 scopes。
- 返回类型:
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection(*args, **kwargs)[source]¶
BigQuery 连接。
BigQuery 没有持久连接的概念。因此,这些对象是游标的无状态小型工厂,所有实际工作都由游标完成。
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryBaseCursor(service, project_id, hook, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5, labels=None)[source]¶
Bases:
airflow.utils.log.logging_mixin.LoggingMixin
BigQuery 游标。
BigQuery 基本游标包含用于针对 BigQuery 执行查询的辅助方法。在不需要 PEP 249 游标的情况下,运算符可以直接使用这些方法。
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor(service, project_id, hook, use_legacy_sql=True, location=None, num_retries=5)[source]¶
Bases:
BigQueryBaseCursor
一个非常基础的 BigQuery 的 PEP 249 游标实现。
参考使用了 PyHive PEP 249 实现
https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py
- airflow.providers.google.cloud.hooks.bigquery.split_tablename(table_input, default_project_id, var_name=None)[source]¶
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
使用 gcloud-aio 库检索作业详情。
- async create_job_for_partition_get(dataset_id, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]¶
使用 gcloud-aio 创建新作业并获取 job_id。
- value_check(sql, pass_value, records, tolerance=None)[来源]¶
将单行查询结果和容差与 pass_value 进行匹配。
- 抛出:
AirflowException – 如果匹配失败
- interval_check(row1, row2, metrics_thresholds, ignore_zero, ratio_formula)[来源]¶
检查指标(SQL 表达式)的值是否在一定的容差范围内。
- 参数:
row1 (str | None) – 第一个 SQL 查询执行任务的第一个结果行
row2 (str | None) – 第二个 SQL 查询执行任务的第一个结果行
metrics_thresholds (dict[str, Any]) – 一个以指标为键的比例字典,例如 'COUNT(*)': 1.5 要求当前日期的数据与 days_back 天之前的数据之间的差异小于或等于 50%。
ignore_zero (bool) – 是否应该忽略值为零的指标
ratio_formula (str) – 用于计算两个指标之间比例的公式。假设 cur 是今天的指标,ref 是 days_back 天前的指标。 max_over_min: 计算 max(cur, ref) / min(cur, ref) relative_diff: 计算 abs(cur-ref) / ref
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[来源]¶
Bases:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
用于 BigQuery Table 的异步 Hook。