airflow.providers.google.cloud.hooks.bigquery

BigQuery Hook 和一个非常基本的 BigQuery PEP 249 实现。

模块内容

BigQueryHook

与 BigQuery 交互。

BigQueryConnection

BigQuery 连接。

BigQueryBaseCursor

BigQuery 光标。

BigQueryCursor

一个非常基本的 BigQuery PEP 249 光标实现。

BigQueryAsyncHook

使用 gcloud-aio 库检索作业详细信息。

BigQueryTableAsyncHook

BigQuery Table 的异步 hook。

函数

split_tablename(table_input, default_project_id[, ...])

属性

log

BigQueryJob

airflow.providers.google.cloud.hooks.bigquery.log[源代码]
airflow.providers.google.cloud.hooks.bigquery.BigQueryJob[源代码]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryHook(use_legacy_sql=True, location=None, priority='INTERACTIVE', api_resource_configs=None, impersonation_scopes=None, labels=None, **kwargs)[源代码]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseHook, airflow.providers.common.sql.hooks.sql.DbApiHook

与 BigQuery 交互。

此 hook 使用 Google Cloud 连接。

参数
  • gcp_conn_id – 用于 GCP 凭据的 Airflow 连接。

  • use_legacy_sql (bool) – 这指定是否使用旧版 SQL 方言。

  • location (str | None) – BigQuery 资源的位置。

  • priority (str) – 指定查询的优先级。可能的值包括 INTERACTIVE 和 BATCH。默认值为 INTERACTIVE。

  • api_resource_configs (dict | None) – 这包含应用于 Google BigQuery 作业的参数配置。

  • impersonation_chain – 这是使用短期凭据模拟的可选服务帐户。

  • impersonation_scopes (str | collections.abc.Sequence[str] | None) – 模拟帐户的可选范围列表。将覆盖连接中的范围。

  • labels (dict | None) – BigQuery 资源标签。

property scopes: collections.abc.Sequence[str][源代码]

返回 OAuth 2.0 范围。

返回

返回在 impersonation_scopes、连接配置或默认范围中定义的范围

返回类型

collections.abc.Sequence[str]

conn_name_attr = 'gcp_conn_id'[源代码]
default_conn_name = 'google_cloud_bigquery_default'[源代码]
conn_type = 'gcpbigquery'[源代码]
hook_name = 'Google Bigquery'[源代码]
classmethod get_connection_form_widgets()[源代码]

返回要添加到连接表单的连接小部件。

classmethod get_ui_field_behaviour()[源代码]

返回自定义字段行为。

get_conn()[源代码]

获取 BigQuery PEP 249 连接对象。

get_client(project_id=PROVIDE_PROJECT_ID, location=None)[源代码]

获取已验证的 BigQuery 客户端。

参数
  • project_id (str) – 客户端代表操作的项目 ID。

  • location (str | None) – 作业/数据集/表的默认位置。

get_uri()[源代码]

DbApiHook 重写,用于 get_sqlalchemy_engine()

get_sqlalchemy_engine(engine_kwargs=None)[源代码]

创建 SQLAlchemy 引擎对象。

参数

engine_kwargs (dict | None) – create_engine() 中使用的 Kwargs。

get_records(sql, parameters=None)[源代码]

执行 SQL 并返回记录集。

参数
  • sql – 要执行的 SQL 语句 (str) 或要执行的 SQL 语句列表

  • parameters – 用于渲染 SQL 查询的参数。

abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[源代码]

插入行。

目前不支持插入。理论上,你可以使用 BigQuery 的流式 API 将行插入到表中,但尚未实现。

get_pandas_df(sql, parameters=None, dialect=None, **kwargs)[源代码]

获取 BigQuery 结果的 Pandas DataFrame。

必须重写 DbApiHook 方法,因为 Pandas 不支持 PEP 249 连接,除了 SQLite。

参数
  • sql (str) – 要执行的 BigQuery SQL。

  • parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 用于渲染 SQL 查询的参数(未使用,留给重写超类方法)

  • dialect (str | None) – BigQuery SQL 的方言 – 传统 SQL 或标准 SQL,如果未指定,则默认为使用 self.use_legacy_sql

  • kwargs – (可选) 传递到 pandas_gbq.read_gbq 方法

table_exists(dataset_id, table_id, project_id)[源代码]

检查 Google BigQuery 中是否存在表。

参数
  • project_id (str) – 要在其中查找表的 Google 云项目。提供给钩子的连接必须提供对指定项目的访问权限。

  • dataset_id (str) – 要在其中查找表的数据集的名称。

  • table_id (str) – 要检查其存在性的表的名称。

table_partition_exists(dataset_id, table_id, partition_id, project_id)[源代码]

检查 Google BigQuery 中是否存在分区。

参数
  • project_id (str) – 要在其中查找表的 Google 云项目。提供给钩子的连接必须提供对指定项目的访问权限。

  • dataset_id (str) – 要在其中查找表的数据集的名称。

  • table_id (str) – 要检查其存在性的表的名称。

  • partition_id (str) – 要检查其存在性的分区的名称。

create_empty_table(project_id=PROVIDE_PROJECT_ID, dataset_id=None, table_id=None, table_resource=None, schema_fields=None, time_partitioning=None, cluster_fields=None, labels=None, view=None, materialized_view=None, encryption_configuration=None, retry=DEFAULT_RETRY, location=None, exists_ok=True)[源代码]

在数据集中创建一个新的空表。

要创建由 SQL 查询定义的视图,请将字典解析到 view 参数中。

参数
  • project_id (str) – 要在其中创建表的项目。

  • dataset_id (str | None) – 要在其中创建表的数据集。

  • table_id (str | None) – 要创建的表的名称。

  • table_resource (dict[str, Any] | None) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,则忽略所有其他参数。

  • schema_fields (list | None) –

    如果设置,则架构字段列表定义如下: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    schema_fields = [
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ]
    

  • labels (dict | None) – 包含表的标签的字典,传递给 BigQuery

  • retry (google.api_core.retry.Retry) – 可选。如何重试 RPC。

  • time_partitioning (dict | None) –

    配置可选的时间分区字段,例如按照 API 规范设置分区字段、类型和过期时间。

  • cluster_fields (list[str] | None) – [可选] 用于聚类的字段。BigQuery 支持对分区表和非分区表进行聚类。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#clustering.fields

  • view (dict | None) –

    [可选] 包含视图定义的字典。如果设置,它将创建一个视图而不是表:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition

    view = {
        "query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 1000",
        "useLegacySql": False,
    }
    

  • materialized_view (dict | None) – [可选] 物化视图定义。

  • encryption_configuration (dict | None) –

    [可选] 自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key",
    }
    

  • num_retries – 连接出现问题时的最大重试次数。

  • location (str | None) – (可选)表应驻留的地理位置。

  • exists_ok (bool) – 如果为 True,则在创建表时忽略“已存在”错误。

返回

创建的表

返回类型

google.cloud.bigquery.table.Table

create_empty_dataset(dataset_id=None, project_id=PROVIDE_PROJECT_ID, location=None, dataset_reference=None, exists_ok=True)[source]

创建一个新的空数据集。

参数
  • project_id (str) – 要在其中创建空数据集的项目的名称。如果 dataset_reference 中提供了 projectId,则无需提供。

  • dataset_id (str | None) – 数据集的 ID。如果 dataset_reference 中提供了 datasetId,则无需提供。

  • location (str | None) – (可选)数据集应驻留的地理位置。没有默认值,但如果未提供任何内容,则数据集将在美国创建。

  • dataset_reference (dict[str, Any] | None) – 可以与请求正文一起提供的数据集引用。更多信息:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • exists_ok (bool) – 如果为 True,则在创建数据集时忽略“已存在”错误。

get_dataset_tables(dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, retry=DEFAULT_RETRY)[source]

获取给定数据集的表列表。

更多信息,请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list

参数
  • dataset_id (str) – 所请求数据集的数据集 ID。

  • project_id (str) – (可选)所请求数据集的项目。如果为 None,则将使用 self.project_id。

  • max_results (int | None) – (可选)要返回的最大表数。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC。

返回

与数据集关联的表列表。

返回类型

list[dict[str, Any]]

delete_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, retry=DEFAULT_RETRY)[source]

删除项目中 BigQuery 的数据集。

参数
  • project_id (str) – 拥有数据集的项目的名称。

  • dataset_id (str) – 要删除的数据集。

  • delete_contents (bool) – 如果为 True,则删除数据集中的所有表。如果为 False 且数据集包含表,则请求将失败。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC。

update_table(table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]

更改表的某些字段。

使用 fields 指定要更新的字段。必须至少提供一个字段。如果 fields 中列出了某个字段,并且该字段在 table 中为 None,则该字段值将被删除。

如果 table.etag 不为 None,则只有当服务器上的表的 ETag 相同时,更新才会成功。因此,使用 get_table 读取表,更改其字段,然后将其传递给 update_table 将确保只有在自读取以来未对表进行修改的情况下才会保存更改。

参数
  • project_id (str) – 要在其中创建表的项目。

  • dataset_id (str | None) – 要在其中创建表的数据集。

  • table_id (str | None) – 要创建的表的名称。

  • table_resource (dict[str, Any]) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 该表必须包含 tableReferenceproject_id,必须提供 dataset_idtable_id

  • fields (list[str] | None) – 要更改的 table 的字段,拼写为 Table 属性(例如,“friendly_name”)。

insert_all(project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False)[source]

无需加载作业,即可将数据流式传输到 BigQuery,一次一条记录。

参数
  • project_id (str) – 表所在的项目的名称。

  • dataset_id (str) – 表所在的数据集的名称。

  • table_id (str) – 表的名称。

  • rows (list) –

    要插入的行。

    rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
    

  • ignore_unknown_values (bool) – [可选] 接受包含与架构不匹配的值的行。未知值将被忽略。默认值为 false,这会将未知值视为错误。

  • skip_invalid_rows (bool) – [可选] 插入请求中的所有有效行,即使存在无效行也是如此。默认值为 false,如果存在任何无效行,则会导致整个请求失败。

  • fail_on_error (bool) – [可选] 如果发生任何错误,则强制任务失败。默认值为 false,这表示即使发生任何插入错误,任务也不应失败。

update_dataset(fields, dataset_resource, dataset_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY)[source]

更改数据集的某些字段。

使用 fields 指定要更新的字段。必须提供至少一个字段。如果 fields 中列出了某个字段,并且在 dataset 中为 None,则会将其删除。

如果 dataset.etag 不为 None,则只有当服务器上的数据集具有相同的 ETag 时,更新才会成功。因此,使用 get_dataset 读取数据集,更改其字段,然后将其传递给 update_dataset 将确保只有在自读取以来未对数据集进行任何修改的情况下才会保存更改。

参数
get_datasets_list(project_id=PROVIDE_PROJECT_ID, include_all=False, filter_=None, max_results=None, page_token=None, retry=DEFAULT_RETRY, return_iterator=False)[source]

获取当前项目中的所有 BigQuery 数据集。

更多信息,请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list

参数
  • project_id (str) – 您尝试获取所有数据集的 Google Cloud 项目

  • include_all (bool) – 如果结果包括隐藏数据集,则为 True。默认为 False。

  • filter – 用于按标签过滤结果的表达式。有关语法,请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter

  • filter – str

  • max_results (int | None) – 要返回的最大数据集数。

  • max_results – int

  • page_token (str | None) – 表示数据集游标的令牌。如果未传递,则 API 将返回数据集的第一页。该令牌标记要返回的迭代器的开头,并且可以在 HTTPIteratornext_page_token 处访问 page_token 的值。

  • page_token – str

  • retry (google.api_core.retry.Retry) – 如何重试 RPC。

  • return_iterator (bool) – 返回 HTTPIterator 而不是返回 list[Row],该迭代器可用于获取 next_page_token 属性。

get_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID)[source]

提取由 dataset_id 引用的数据集。

参数
  • dataset_id (str) – BigQuery 数据集 ID

  • project_id (str) – Google Cloud 项目 ID

返回

dataset_resource

返回类型

google.cloud.bigquery.dataset.Dataset

另请参阅

有关数据集资源内容的更多信息,请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

run_grant_dataset_view_access(source_dataset, view_dataset, view_table, view_project=None, project_id=PROVIDE_PROJECT_ID)[source]

授予数据集的授权视图访问权限给视图表。

如果此视图已被授予对数据集的访问权限,则不执行任何操作。此方法不是原子的。运行它可能会覆盖同时发生的更新。

参数
  • source_dataset (str) – 源数据集

  • view_dataset (str) – 视图所在的数据集

  • view_table (str) – 视图的表

  • project_id (str) – 源数据集的项目。如果为 None,则将使用 self.project_id。

  • view_project (str | None) – 视图所在的项目。如果为 None,则将使用 self.project_id。

返回

源数据集的数据集资源。

返回类型

dict[str, Any]

run_table_upsert(dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID)[source]

如果表存在则更新表,否则创建新表。

由于 BigQuery 本身不允许表更新插入,因此这不是原子操作。

参数
delete_table(table_id, not_found_ok=True, project_id=PROVIDE_PROJECT_ID)[source]

从数据集中删除现有表。

如果表不存在,则返回错误,除非 not_found_ok 设置为 True。

参数
  • table_id (str) – 一个点分隔的 (<project>.|<project>:)<dataset>.<table>,指示将删除哪个表。

  • not_found_ok (bool) – 如果为 True,则即使请求的表不存在,也返回成功。

  • project_id (str) – 用于执行请求的项目。

list_rows(dataset_id, table_id, max_results=None, selected_fields=None, page_token=None, start_index=None, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY, return_iterator=False)[source]

列出表中的行。

请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list

参数
  • dataset_id (str) – 请求的表的 dataset ID。

  • table_id (str) – 请求的表的 table ID。

  • max_results (int | None) – 返回的最大结果数。

  • selected_fields (list[str] | str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。

  • page_token (str | None) – 从先前调用返回的页面令牌,用于标识结果集。

  • start_index (int | None) – 要读取的起始行的从零开始的索引。

  • project_id (str) – 客户端代表操作的项目 ID。

  • location (str | None) – 作业的默认位置。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC。

  • return_iterator (bool) – 不是返回 list[Row],而是返回 RowIterator,可用于获取 next_page_token 属性。

返回

行列表

返回类型

list[google.cloud.bigquery.table.Row] | google.cloud.bigquery.table.RowIterator

get_schema(dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]

获取给定数据集和表的架构。

参数
  • dataset_id (str) – 请求的表的 dataset ID。

  • table_id (str) – 请求的表的 table ID。

  • project_id (str) – 请求的表的可选 project ID。如果未提供,则将使用连接器配置的项目。

返回

表架构

返回类型

dict

update_table_schema(schema_fields_updates, include_policy_tags, dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]

更新给定数据集和表的架构中的字段。

请注意,架构中的某些字段是不可变的;尝试更改它们会导致异常。

如果包含新字段,则将插入该字段,这需要设置所有必需的字段。

参数
  • include_policy_tags (bool) – 如果设置为 True,则策略标签将包含在更新请求中,这需要特殊权限,即使未更改,请参阅 https://cloud.google.com/bigquery/docs/column-level-security#roles

  • dataset_id (str) – 要更新的请求表的 dataset ID。

  • table_id (str) – 要更新的表的 table ID。

  • schema_fields_updates (list[dict[str, Any]]) –

    部分架构资源。请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema

    schema_fields_updates = [
        {"name": "emp_name", "description": "Some New Description"},
        {"name": "salary", "description": "Some New Description"},
        {
            "name": "departments",
            "fields": [
                {"name": "name", "description": "Some New Description"},
                {"name": "type", "description": "Some New Description"},
            ],
        },
    ]
    

  • project_id (str) – 我们要更新表的项目的名称。

poll_job_complete(job_id, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY)[source]

检查作业是否已完成。

参数
  • job_id (str) – 作业的 ID。

  • project_id (str) – 运行作业的 Google Cloud 项目

  • location (str | None) – 作业运行的位置

  • retry (google.api_core.retry.Retry) – 如何重试 RPC。

cancel_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]

取消作业并等待取消完成。

参数
  • job_id (str) – 作业的 ID。

  • project_id (str) – 运行作业的 Google Cloud 项目

  • location (str | None) – 作业运行的位置

get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]

检索 BigQuery 作业。

参数
  • job_id (str) – 作业的 ID。ID 只能包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。

  • project_id (str) – 运行作业的 Google Cloud 项目。

  • location (str | None) – 作业运行的位置。

insert_job(configuration, job_id=None, project_id=PROVIDE_PROJECT_ID, location=None, nowait=False, retry=DEFAULT_RETRY, timeout=None)[source]

执行 BigQuery 作业并等待其完成。

参数
  • configuration (dict) – configuration 参数直接映射到作业对象中的 BigQuery 的配置字段。 有关详细信息,请参阅 https://cloud.google.com/bigquery/docs/reference/v2/jobs

  • job_id (str | None) – 作业的 ID。ID 只能包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。如果未提供,则将生成 uuid。

  • project_id (str) – 运行作业的 Google Cloud 项目。

  • location (str | None) – 作业运行的位置。

  • nowait (bool) – 是否插入作业而不等待结果。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC。

  • timeout (float | None) – 在使用 retry 之前,等待底层 HTTP 传输的秒数。

返回

作业 ID。

返回类型

BigQueryJob

generate_job_id(job_id, dag_id, task_id, logical_date, configuration, force_rerun=False)[source]
split_tablename(table_input, default_project_id, var_name=None)[source]
get_query_results(job_id, location, max_results=None, selected_fields=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY)[source]

获取给定 job_id 的查询结果。

参数
  • job_id (str) – 作业的 ID。ID 只能包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。

  • location (str) – 用于操作的位置。

  • selected_fields (list[str] | str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。

  • max_results (int | None) – 要从表中获取的最大记录(行)数。

  • project_id (str) – 运行作业的 Google Cloud 项目。

  • retry (google.api_core.retry.Retry) – 如何重试 RPC。

  • job_retry (google.api_core.retry.Retry) – 如何重试失败的作业。

返回

如果给定,则列出按选定字段筛选的列的行。

引发

AirflowException

返回类型

list[dict[str, Any]]

class airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection(*args, **kwargs)[source]

BigQuery 连接。

BigQuery 没有持久连接的概念。因此,这些对象是游标的小型无状态工厂,它们完成所有实际工作。

close()[source]

什么也不做。BigQueryConnection 不需要。

commit()[source]

什么也不做。BigQueryConnection 不支持事务。

cursor()[source]

使用连接返回一个新的 Cursor 对象。

abstract rollback()[source]

什么也不做。BigQueryConnection 不支持事务。

class airflow.providers.google.cloud.hooks.bigquery.BigQueryBaseCursor(service, project_id, hook, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5, labels=None)[source]

基类:airflow.utils.log.logging_mixin.LoggingMixin

BigQuery 光标。

BigQuery 基本游标包含针对 BigQuery 执行查询的辅助方法。在不需要 PEP 249 游标的情况下,运算符可以直接使用这些方法。

class airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor(service, project_id, hook, use_legacy_sql=True, location=None, num_retries=5)[source]

基类: BigQueryBaseCursor

一个非常基本的 BigQuery PEP 249 光标实现。

PyHive PEP 249 的实现被用作参考

https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py

property description: list[source]

返回游标描述。

property rowcount: int[source]

默认返回 -1,表示不支持。

arraysize[source]
close()[source]

默认情况下,不执行任何操作。

execute(operation, parameters=None)[source]

执行 BigQuery 查询,并更新 BigQueryCursor 描述。

参数
  • operation (str) – 要执行的查询。

  • parameters (dict | None) – 要替换到查询中的参数。

executemany(operation, seq_of_parameters)[source]

使用不同的参数多次执行 BigQuery 查询。

参数
  • operation (str) – 要执行的查询。

  • seq_of_parameters (list) – 要替换到查询中的字典参数列表。

flush_results()[source]

刷新与游标属性相关的结果。

fetchone()[source]

获取查询结果集的下一行。

next()[source]

从缓冲区返回下一行。

fetchone的辅助方法。

如果缓冲区为空,则尝试分页遍历结果集以获取下一页,并将其加载到缓冲区中。

fetchmany(size=None)[source]

获取查询结果的下一组行。

这将返回一个序列的序列(例如,元组列表)。当没有更多行可用时,将返回一个空序列。

每次调用要获取的行数由参数指定。如果未给出,则游标的 arraysize 确定要获取的行数。

此方法尝试获取 size 参数指示的尽可能多的行。如果由于指定的行数不可用而无法实现,则可能会返回较少的行。

如果先前调用 execute() 没有产生任何结果集,或者尚未发出调用,则会引发 Error(或子类)异常。

fetchall()[source]

获取查询结果的所有(剩余)行。

返回一个序列的序列(例如,元组列表)。

get_arraysize()[source]

获取每次要获取的行数。

另请参阅

fetchmany()

set_arraysize(arraysize)[source]

设置每次要获取的行数。

另请参阅

fetchmany()

setinputsizes(sizes)[source]

默认情况下不执行任何操作。

setoutputsize(size, column=None)[source]

默认情况下不执行任何操作。

airflow.providers.google.cloud.hooks.bigquery.split_tablename(table_input, default_project_id, var_name=None)[source]
class airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

使用 gcloud-aio 库检索作业详细信息。

sync_hook_class[source]
async get_job_instance(project_id, job_id, session)[source]

通过作业 ID 和项目 ID 获取指定的作业资源。

async get_job_status(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]
async get_job_output(job_id, project_id=PROVIDE_PROJECT_ID)[source]

异步获取给定作业 ID 的 BigQuery 作业输出。

async create_job_for_partition_get(dataset_id, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]

创建一个新作业并使用 gcloud-aio 获取 job_id。

async cancel_job(job_id, project_id, location)[source]

取消 BigQuery 作业。

参数
  • job_id (str) – 要取消的作业的 ID。

  • project_id (str | None) – 运行作业的 Google Cloud 项目。

  • location (str | None) – 运行作业的位置。

get_records(query_results, as_dict=False, selected_fields=None)[source]

将 BigQuery 的响应转换为记录。

参数
  • query_results (dict[str, Any]) – SQL 查询的结果

  • as_dict (bool) – 如果为 True,则将结果作为字典列表返回,否则作为列表的列表返回。

  • selected_fields (str | list[str] | None) –

value_check(sql, pass_value, records, tolerance=None)[source]

将单个查询结果行和容差与 pass_value 进行匹配。

引发

AirflowException – 如果匹配失败

interval_check(row1, row2, metrics_thresholds, ignore_zero, ratio_formula)[source]

检查指标(SQL 表达式)的值是否在一定的容差范围内。

参数
  • row1 (str | None) – 第一个 SQL 查询的查询执行作业的第一个结果行

  • row2 (str | None) – 第二个 SQL 查询的查询执行作业的第一个结果行

  • metrics_thresholds (dict[str, Any]) – 一个按指标索引的比率字典,例如 'COUNT(*)': 1.5 将要求当前日期与先前 days_back 之间的差异小于等于 50%。

  • ignore_zero (bool) – 我们是否应忽略零指标

  • ratio_formula (str) – 用于计算两个指标之间比率的公式。假设 cur 是今天的指标,ref 是今天的指标 - days_back。max_over_min:计算 max(cur, ref) / min(cur, ref) relative_diff:计算 abs(cur-ref) / ref

class airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook

BigQuery Table 的异步 hook。

sync_hook_class[source]
async get_table_client(dataset, table_id, project_id, session)[source]

获取 Google Big Query 表格对象。

参数
  • dataset (str) – 要在其中查找表存储桶的数据集的名称。

  • table_id (str) – 要检查其存在性的表的名称。

  • project_id (str) – 要在其中查找表的 Google 云项目。提供给钩子的连接必须提供对指定项目的访问权限。

  • session (aiohttp.ClientSession) – aiohttp ClientSession

此条目是否有帮助?