airflow.providers.google.cloud.hooks.bigquery

BigQuery Hook 以及一个非常基础的 BigQuery 的 PEP 249 实现。

属性

log

BigQueryJob

BigQueryHook

与 BigQuery 交互。

BigQueryConnection

BigQuery 连接。

BigQueryBaseCursor

BigQuery 游标。

BigQueryCursor

一个非常基础的 BigQuery 的 PEP 249 游标实现。

BigQueryAsyncHook

使用 gcloud-aio 库检索作业详情。

BigQueryTableAsyncHook

用于 BigQuery Table 的异步 Hook。

函数

split_tablename(table_input, default_project_id[, ...])

模块内容

airflow.providers.google.cloud.hooks.bigquery.log[source]
airflow.providers.google.cloud.hooks.bigquery.BigQueryJob[source]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryHook(use_legacy_sql=True, location=None, priority='INTERACTIVE', api_resource_configs=None, impersonation_scopes=None, labels=None, **kwargs)[source]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseHook, airflow.providers.common.sql.hooks.sql.DbApiHook

与 BigQuery 交互。

此 Hook 使用 Google Cloud 连接。

参数:
  • gcp_conn_id – 用于 GCP 凭据的 Airflow 连接。

  • use_legacy_sql (bool) – 这指定是否使用旧版 SQL 方言。

  • location (str | None) – BigQuery 资源的位置。

  • priority (str) – 指定查询的优先级。可能的值包括 INTERACTIVE 和 BATCH。默认值为 INTERACTIVE。

  • api_resource_configs (dict | None) – 这包含应用于 Google BigQuery 作业的参数配置。

  • impersonation_chain – 这是可选的服务账号,用于使用短期凭据进行模拟。

  • impersonation_scopes (str | collections.abc.Sequence[str] | None) – 被模拟账号的可选作用域列表。将覆盖连接中的作用域。

  • labels (dict | None) – BigQuery 资源标签。

conn_name_attr = 'gcp_conn_id'[source]
default_conn_name = 'google_cloud_bigquery_default'[source]
conn_type = 'gcpbigquery'[source]
hook_name = 'Google Bigquery'[source]
classmethod get_connection_form_widgets()[source]

返回要添加到连接表单的连接小部件。

classmethod get_ui_field_behaviour()[source]

返回自定义字段行为。

use_legacy_sql: bool[source]
location: str | None[source]
priority: str[source]
running_job_id: str | None = None[source]
api_resource_configs: dict[source]
labels[source]
impersonation_scopes: str | collections.abc.Sequence[str] | None = None[source]
get_conn()[source]

获取 BigQuery PEP 249 连接对象。

get_client(project_id=PROVIDE_PROJECT_ID, location=None)[source]

获取已认证的 BigQuery Client。

参数:
  • project_id (str) – 客户端代表其执行操作的项目 ID。

  • location (str | None) – 作业 / 数据集 / 表格的默认位置。

get_uri()[source]

覆盖自 DbApiHook 用于 get_sqlalchemy_engine()

get_sqlalchemy_engine(engine_kwargs=None)[source]

创建一个 SQLAlchemy 引擎对象。

参数:

engine_kwargs (dict | None) – create_engine() 中使用的 Kwargs。

get_records(sql, parameters=None)[source]

执行 sql 并返回一组记录。

参数:
  • sql – 要执行的 sql 语句(str)或要执行的 sql 语句列表

  • parameters – 用于渲染 SQL 查询的参数。

abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[source]

插入行。

当前不支持插入。理论上,您可以使用 BigQuery 的流式 API 将行插入到表中,但这尚未实现。

get_pandas_df(sql, parameters=None, dialect=None, **kwargs)[source]

获取 BigQuery 结果的 Pandas DataFrame。

必须覆盖 DbApiHook 方法,因为 Pandas 不支持 PEP 249 连接,SQLite 除外。

参数:
  • sql (str) – 要执行的 BigQuery SQL。

  • parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 用于渲染 SQL 查询的参数(未使用,保留以覆盖超类方法)

  • dialect (str | None) – BigQuery SQL 方言 – 旧版 SQL 或标准 SQL;如果未指定,则默认为使用 self.use_legacy_sql

  • kwargs – (可选) 传递到 pandas_gbq.read_gbq 方法中

table_exists(dataset_id, table_id, project_id)[source]

检查 Google BigQuery 中是否存在表。

参数:
  • project_id (str) – 要查找表的 Google Cloud 项目。提供给 Hook 的连接必须提供对指定项目的访问权限。

  • dataset_id (str) – 要查找表的所在数据集的名称。

  • table_id (str) – 要检查其是否存在的表的名称。

table_partition_exists(dataset_id, table_id, partition_id, project_id)[source]

检查 Google BigQuery 中是否存在分区。

参数:
  • project_id (str) – 要查找表的 Google Cloud 项目。提供给 Hook 的连接必须提供对指定项目的访问权限。

  • dataset_id (str) – 要查找表的所在数据集的名称。

  • table_id (str) – 要检查其是否存在的表的名称。

  • partition_id (str) – 要检查其是否存在的分区的名称。

create_empty_table(project_id=PROVIDE_PROJECT_ID, dataset_id=None, table_id=None, table_resource=None, schema_fields=None, time_partitioning=None, cluster_fields=None, labels=None, view=None, materialized_view=None, encryption_configuration=None, retry=DEFAULT_RETRY, location=None, exists_ok=True)[source]

在数据集中创建一个新的空表。

要创建一个由 SQL 查询定义的视图,请将字典解析到 view 参数。

参数:
返回值:

创建的表

返回类型:

google.cloud.bigquery.table.Table

create_table(dataset_id, table_id, table_resource, location=None, project_id=PROVIDE_PROJECT_ID, exists_ok=True, schema_fields=None, retry=DEFAULT_RETRY, timeout=None)[source]

在数据集中创建一个新的空表。

参数:
create_empty_dataset(dataset_id=None, project_id=PROVIDE_PROJECT_ID, location=None, dataset_reference=None, exists_ok=True)[source]

创建一个新的空数据集。

参数:
  • project_id (str) – 我们要在其中创建空数据集的项目的名称。如果 dataset_reference 中有 projectId,则无需提供此参数。

  • dataset_id (str | None) – 数据集的 ID。如果 dataset_reference 中有 datasetId,则无需提供此参数。

  • location (str | None) – (可选) 数据集应驻留的地理位置。没有默认值,但如果未提供任何值,数据集将在美国创建。

  • dataset_reference (dict[str, Any] | None) – 可随请求正文提供的 Dataset reference。更多信息请参阅: https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • exists_ok (bool) – 如果为 True,则在创建数据集时忽略“已存在”错误。

get_dataset_tables(dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, retry=DEFAULT_RETRY)[source]

获取指定数据集的表列表。

更多信息请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list

参数:
  • dataset_id (str) – 请求的数据集的 ID。

  • project_id (str) – (可选) 请求的数据集所属的项目。如果为 None,则使用 self.project_id。

  • max_results (int | None) – (可选) 要返回的最大表数。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。

返回值:

与数据集关联的表列表。

返回类型:

list[dict[str, Any]]

delete_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, retry=DEFAULT_RETRY)[source]

在您的项目中删除一个 BigQuery 数据集。

参数:
  • project_id (str) – 数据集所在的项目名称。

  • dataset_id (str) – 要删除的数据集。

  • delete_contents (bool) – 如果为 True,则删除数据集中的所有表。如果为 False 且数据集包含表,则请求将失败。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。

update_table(table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]

更改表的某些字段。

使用 fields 指定要更新的字段。必须至少提供一个字段。如果某个字段列在 fields 中,但在 table 中为 None,则该字段值将被删除。

如果 table.etag 不为 None,则只有当服务器上的表具有相同的 ETag 时,更新才会成功。因此,使用 get_table 读取表,更改其字段,然后将其传递给 update_table,将确保更改仅在自读取以来未发生对表的修改时才会保存。

参数:
  • project_id (str) – 要在其中创建表的项目。

  • dataset_id (str | None) – 要在其中创建表的数据集。

  • table_id (str | None) – 要创建的表的名称。

  • table_resource (dict[str, Any]) – 如文档中所述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 表必须包含 tableReference 或必须提供 project_iddataset_idtable_id

  • fields (list[str] | None) – 要更改的 table 字段,拼写与 Table 属性相同(例如,“friendly_name”)。

insert_all(project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False)[source]

无需加载作业,即可一次一条记录地将数据流式传输到 BigQuery 中。

参数:
  • project_id (str) – 表所在的项目名称

  • dataset_id (str) – 表所在的数据集名称

  • table_id (str) – 表名称

  • rows (list) –

    要插入的行

    rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
    

  • ignore_unknown_values (bool) – [可选] 接受包含与架构不匹配的值的行。未知值将被忽略。默认值为 false,将未知值视为错误。

  • skip_invalid_rows (bool) – [可选] 插入请求中的所有有效行,即使存在无效行。默认值为 false,如果存在任何无效行,将导致整个请求失败。

  • fail_on_error (bool) – [可选] 如果发生任何错误,强制任务失败。默认值为 false,表示即使发生任何插入错误,任务也不应失败。

update_dataset(fields, dataset_resource, dataset_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY)[source]

更改数据集的某些字段。

使用 fields 指定要更新的字段。必须至少提供一个字段。如果某个字段列在 fields 中,但在 dataset 中为 None,则该字段将被删除。

如果 dataset.etag 不为 None,则只有当服务器上的数据集具有相同的 ETag 时,更新才会成功。因此,使用 get_dataset 读取数据集,更改其字段,然后将其传递给 update_dataset,将确保更改仅在自读取以来未发生对数据集的修改时才会保存。

参数:
get_datasets_list(project_id=PROVIDE_PROJECT_ID, include_all=False, filter_=None, max_results=None, page_token=None, retry=DEFAULT_RETRY, return_iterator=False)[source]

获取当前项目中的所有 BigQuery 数据集。

更多信息请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list

参数:
  • project_id (str) – 您尝试获取所有数据集的 Google Cloud 项目

  • include_all (bool) – 如果结果包含隐藏数据集,则为 True。默认为 False。

  • filter – 按标签过滤结果的表达式。有关语法,请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter

  • filter – 字符串

  • max_results (int | None) – 要返回的最大数据集数。

  • max_results – 整型

  • page_token (str | None) – 表示数据集游标的令牌。如果未传递,API 将返回第一页数据集。该令牌标记要返回的迭代器的开头,并且可以通过 HTTPIteratornext_page_token 访问 page_token 的值。

  • page_token – 字符串

  • retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。

  • return_iterator (bool) – 返回 HTTPIterator 而不是 list[Row],HTTPIterator 可用于获取 next_page_token 属性。

get_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID)[source]

获取由 dataset_id 引用的数据集。

参数:
  • dataset_id (str) – BigQuery 数据集 ID

  • project_id (str) – Google Cloud 项目 ID

返回值:

数据集资源

返回类型:

google.cloud.bigquery.dataset.Dataset

另请参阅

更多信息请参阅 Dataset Resource 内容:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

run_grant_dataset_view_access(source_dataset, view_dataset, view_table, view_project=None, project_id=PROVIDE_PROJECT_ID)[source]

将数据集的授权视图访问权限授予视图表。

如果此视图已获得数据集的访问权限,则不做任何操作。此方法不是原子性的。运行它可能会破坏同时进行的更新。

参数:
  • source_dataset (str) – 源数据集

  • view_dataset (str) – 视图所在的数据集

  • view_table (str) – 视图表

  • project_id (str) – 源数据集所属的项目。如果为 None,则使用 self.project_id。

  • view_project (str | None) – 视图所在的项目。如果为 None,则使用 self.project_id。

返回值:

源数据集的数据集资源。

返回类型:

dict[str, Any]

run_table_upsert(dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID)[source]

如果表存在则更新,否则创建新表。

由于 BigQuery 本身不支持表 upsert,此操作不是原子性的。

参数:
delete_table(table_id, not_found_ok=True, project_id=PROVIDE_PROJECT_ID)[source]

从数据集中删除现有表。

如果表不存在,则除非将 not_found_ok 设置为 True,否则返回错误。

参数:
  • table_id (str) – 用点号分隔的 (<project>.|<project>:)<dataset>.<table> 格式,指示要删除哪个表。

  • not_found_ok (bool) – 如果为 True,则即使请求的表不存在也返回成功。

  • project_id (str) – 用于执行请求的项目

list_rows(dataset_id, table_id, max_results=None, selected_fields=None, page_token=None, start_index=None, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY, return_iterator=False)[source]

列出表中的行。

请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list

参数:
  • dataset_id (str) – 请求的表的数据集 ID。

  • table_id (str) – 请求的表的表 ID。

  • max_results (int | None) – 要返回的最大结果数。

  • selected_fields (list[str] | str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。

  • page_token (str | None) – 分页令牌,由先前的调用返回,用于标识结果集。

  • start_index (int | None) – 要读取的起始行的从零开始的索引。

  • project_id (str) – 客户端代表其执行操作的项目 ID。

  • location (str | None) – 作业的默认位置。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。

  • return_iterator (bool) – 返回 RowIterator 而不是 list[Row],RowIterator 可用于获取 next_page_token 属性。

返回值:

行列表

返回类型:

list[google.cloud.bigquery.table.Row] | google.cloud.bigquery.table.RowIterator

get_schema(dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]

获取指定数据集和表的架构。

参数:
  • dataset_id (str) – 请求的表的数据集 ID

  • table_id (str) – 请求的表的表 ID

  • project_id (str) – 请求的表的可选项目 ID。如果未提供,将使用连接器配置的项目。

返回值:

表架构

返回类型:

dict

update_table_schema(schema_fields_updates, include_policy_tags, dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]

更新指定数据集和表的架构中的字段。

请注意,架构中的某些字段是不可变的;尝试更改它们将导致异常。

如果包含新字段,则会插入该字段,这要求设置所有必填字段。

参数:
  • include_policy_tags (bool) – 如果设置为 True,策略标签将包含在更新请求中,即使未更改,这也需要特殊权限,请参阅 https://cloud.google.com/bigquery/docs/column-level-security#roles

  • dataset_id (str) – 要更新的表的 数据集 ID

  • table_id (str) – 要更新的表的 表 ID

  • schema_fields_updates (list[dict[str, Any]]) –

    部分架构资源。请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema

    schema_fields_updates = [
        {"name": "emp_name", "description": "Some New Description"},
        {"name": "salary", "description": "Some New Description"},
        {
            "name": "departments",
            "fields": [
                {"name": "name", "description": "Some New Description"},
                {"name": "type", "description": "Some New Description"},
            ],
        },
    ]
    

  • project_id (str) – 要更新表的项目名称。

poll_job_complete(job_id, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY)[source]

检查作业是否已完成。

参数:
  • job_id (str) – 作业 ID。

  • project_id (str) – 作业运行所在的 Google Cloud 项目

  • location (str | None) – 作业运行所在的位置

  • retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。

cancel_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]

取消作业并等待取消完成。

参数:
  • job_id (str) – 作业 ID。

  • project_id (str) – 作业运行所在的 Google Cloud 项目

  • location (str | None) – 作业运行所在的位置

get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]

获取 BigQuery 作业。

参数:
  • job_id (str) – 作业的 ID。ID 必须仅包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_ 或短划线 (-)。最大长度为 1,024 个字符。

  • project_id (str) – 作业运行所在的 Google Cloud 项目。

  • location (str | None) – 作业运行所在的位置。

insert_job(configuration, job_id=None, project_id=PROVIDE_PROJECT_ID, location=None, nowait=False, retry=DEFAULT_RETRY, timeout=None)[source]

执行 BigQuery 作业并等待其完成。

参数:
  • configuration (dict) – configuration 参数直接映射到 BigQuery 作业对象中的 configuration 字段。有关详细信息,请参阅 https://cloud.google.com/bigquery/docs/reference/v2/jobs

  • job_id (str | None) – 作业的 ID。ID 必须仅包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或短划线 (-)。最大长度为 1,024 个字符。如果未提供,则将生成 uuid。

  • project_id (str) – 作业运行所在的 Google Cloud 项目。

  • location (str | None) – 作业运行的位置。

  • nowait (bool) – 是否在不等待结果的情况下插入作业。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。

  • timeout (float | None) – 在使用 retry 之前等待底层 HTTP 传输的秒数。

返回值:

作业 ID。

返回类型:

BigQueryJob

generate_job_id(job_id, dag_id, task_id, logical_date, configuration, force_rerun=False)[source]
split_tablename(table_input, default_project_id, var_name=None)[source]
get_query_results(job_id, location, max_results=None, selected_fields=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY)[source]

获取给定 job_id 的查询结果。

参数:
  • job_id (str) – 作业的 ID。ID 必须仅包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_ 或短划线 (-)。最大长度为 1,024 个字符。

  • location (str) – 操作使用的位置。

  • selected_fields (list[str] | str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。

  • max_results (int | None) – 从表中获取的最大记录(行)数。

  • project_id (str) – 作业运行所在的 Google Cloud 项目。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC 调用。

  • job_retry (google.api_core.retry.Retry) – 如何重试失败的作业。

返回值:

如果指定了 selected fields,则返回按这些字段过滤的行列表

抛出:

AirflowException

返回类型:

list[dict[str, Any]]

property scopes: collections.abc.Sequence[str][source]

返回 OAuth 2.0 scopes。

返回值:

返回 impersonation_scopes 中定义的 scopes、连接配置中的 scopes 或默认 scopes。

返回类型:

collections.abc.Sequence[str]

class airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection(*args, **kwargs)[source]

BigQuery 连接。

BigQuery 没有持久连接的概念。因此,这些对象是游标的无状态小型工厂,所有实际工作都由游标完成。

close()[source]

不执行任何操作。BigQueryConnection 不需要此操作。

commit()[source]

不执行任何操作。BigQueryConnection 不支持事务。

cursor()[source]

使用连接返回一个新的 Cursor 对象。

abstract rollback()[source]

不执行任何操作。BigQueryConnection 不支持事务。

class airflow.providers.google.cloud.hooks.bigquery.BigQueryBaseCursor(service, project_id, hook, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5, labels=None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

BigQuery 游标。

BigQuery 基本游标包含用于针对 BigQuery 执行查询的辅助方法。在不需要 PEP 249 游标的情况下,运算符可以直接使用这些方法。

service[source]
project_id[source]
use_legacy_sql = True[source]
api_resource_configs: dict[source]
running_job_id: str | None = None[source]
location = None[source]
num_retries = 5[source]
labels = None[source]
hook[source]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor(service, project_id, hook, use_legacy_sql=True, location=None, num_retries=5)[source]

Bases: BigQueryBaseCursor

一个非常基础的 BigQuery 的 PEP 249 游标实现。

参考使用了 PyHive PEP 249 实现

https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py

buffersize: int | None = None[source]
page_token: str | None = None[source]
job_id: str | None = None[source]
buffer: list = [][source]
all_pages_loaded: bool = False[source]
property description: list[source]

返回游标描述。

close()[source]

默认情况下,不执行任何操作。

property rowcount: int[source]

默认情况下,返回 -1 表示不支持此功能。

execute(operation, parameters=None)[source]

执行 BigQuery 查询,并更新 BigQueryCursor 描述。

参数:
  • operation (str) – 要执行的查询。

  • parameters (dict | None) – 要替换到查询中的参数。

executemany(operation, seq_of_parameters)[source]

使用不同的参数多次执行 BigQuery 查询。

参数:
  • operation (str) – 要执行的查询。

  • seq_of_parameters (list) – 要替换到查询中的字典参数列表。

flush_results()[source]

刷新与结果相关的游标属性。

fetchone()[source]

获取查询结果集的下一行。

next()[source]

从缓冲区返回下一行。

fetchone 的辅助方法。

如果缓冲区为空,则尝试通过结果集分页获取下一页,并将其加载到缓冲区中。

fetchmany(size=None)[source]

获取查询结果的下一组行。

这将返回一个序列的序列(例如,一个元组列表)。当没有更多行可用时,返回一个空序列。

每次调用获取的行数由参数指定。如果未给定,则由游标的 arraysize 确定要获取的行数。

此方法尝试获取由 size 参数指示的尽可能多的行。如果由于指定的行数不可用而无法做到,则可能会返回较少的行。

如果之前调用 execute() 未产生任何结果集,或者尚未发出任何调用,则会引发 Error(或其子类)异常。

fetchall()[source]

获取查询结果的所有(剩余)行。

返回一个序列的序列(例如,一个元组列表)。

get_arraysize()[source]

获取每次获取的行数。

另请参阅

fetchmany()

set_arraysize(arraysize)[source]

设置每次获取的行数。

另请参阅

fetchmany()

arraysize[source]
setinputsizes(sizes)[source]

默认情况下,不执行任何操作。

setoutputsize(size, column=None)[source]

默认情况下,不执行任何操作。

airflow.providers.google.cloud.hooks.bigquery.split_tablename(table_input, default_project_id, var_name=None)[source]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

使用 gcloud-aio 库检索作业详情。

sync_hook_class[source]
async get_job_instance(project_id, job_id, session)[source]

根据 job ID 和 project ID 获取指定的作业资源。

async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]
async get_job_output(job_id, project_id=PROVIDE_PROJECT_ID)[source]

异步获取给定 job ID 的 BigQuery 作业输出。

async create_job_for_partition_get(dataset_id, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]

使用 gcloud-aio 创建新作业并获取 job_id。

async cancel_job(job_id, project_id, location)[source]

取消一个 BigQuery 作业。

参数:
  • job_id (str) – 要取消的任务的 ID。

  • project_id (str | None) – 任务运行所在的 Google Cloud 项目。

  • location (str | None) – 任务运行所在的区域。

get_records(query_results, as_dict=False, selected_fields=None)[来源]

将 BigQuery 的响应转换为记录。

参数:
  • query_results (dict[str, Any]) – SQL 查询的结果

  • as_dict (bool) – 如果为 True,则将结果作为字典列表返回;否则作为列表的列表返回。

  • selected_fields (str | list[str] | None)

value_check(sql, pass_value, records, tolerance=None)[来源]

将单行查询结果和容差与 pass_value 进行匹配。

抛出:

AirflowException – 如果匹配失败

interval_check(row1, row2, metrics_thresholds, ignore_zero, ratio_formula)[来源]

检查指标(SQL 表达式)的值是否在一定的容差范围内。

参数:
  • row1 (str | None) – 第一个 SQL 查询执行任务的第一个结果行

  • row2 (str | None) – 第二个 SQL 查询执行任务的第一个结果行

  • metrics_thresholds (dict[str, Any]) – 一个以指标为键的比例字典,例如 'COUNT(*)': 1.5 要求当前日期的数据与 days_back 天之前的数据之间的差异小于或等于 50%。

  • ignore_zero (bool) – 是否应该忽略值为零的指标

  • ratio_formula (str) – 用于计算两个指标之间比例的公式。假设 cur 是今天的指标,ref 是 days_back 天前的指标。 max_over_min: 计算 max(cur, ref) / min(cur, ref) relative_diff: 计算 abs(cur-ref) / ref

class airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[来源]

Bases: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

用于 BigQuery Table 的异步 Hook。

sync_hook_class[来源]
async get_table_client(dataset, table_id, project_id, session)[来源]

获取 Google Big Query 表对象。

参数:
  • dataset (str) – 查找表存储桶的数据集名称。

  • table_id (str) – 要检查其是否存在的表的名称。

  • project_id (str) – 要查找表的 Google Cloud 项目。提供给 Hook 的连接必须提供对指定项目的访问权限。

  • session (aiohttp.ClientSession) – aiohttp ClientSession

此条目有帮助吗?