airflow.providers.google.cloud.hooks.bigquery
¶
BigQuery Hook 和一个非常基本的 BigQuery PEP 249 实现。
模块内容¶
类¶
与 BigQuery 交互。 |
|
BigQuery 连接。 |
|
BigQuery 光标。 |
|
一个非常基本的 BigQuery PEP 249 光标实现。 |
|
使用 gcloud-aio 库检索作业详细信息。 |
|
BigQuery Table 的异步 hook。 |
函数¶
|
属性¶
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryHook(use_legacy_sql=True, location=None, priority='INTERACTIVE', api_resource_configs=None, impersonation_scopes=None, labels=None, **kwargs)[源代码]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseHook
,airflow.providers.common.sql.hooks.sql.DbApiHook
与 BigQuery 交互。
此 hook 使用 Google Cloud 连接。
- 参数
gcp_conn_id – 用于 GCP 凭据的 Airflow 连接。
use_legacy_sql (
bool
) – 这指定是否使用旧版 SQL 方言。location (
str
|None
) – BigQuery 资源的位置。priority (
str
) – 指定查询的优先级。可能的值包括 INTERACTIVE 和 BATCH。默认值为 INTERACTIVE。api_resource_configs (
dict
|None
) – 这包含应用于 Google BigQuery 作业的参数配置。impersonation_chain – 这是使用短期凭据模拟的可选服务帐户。
impersonation_scopes (
str
|collections.abc.Sequence
[str
] |None
) – 模拟帐户的可选范围列表。将覆盖连接中的范围。labels (
dict
|None
) – BigQuery 资源标签。
- property scopes: collections.abc.Sequence[str][源代码]¶
返回 OAuth 2.0 范围。
- 返回
返回在 impersonation_scopes、连接配置或默认范围中定义的范围
- 返回类型
- get_client(project_id=PROVIDE_PROJECT_ID, location=None)[源代码]¶
获取已验证的 BigQuery 客户端。
- 参数
project_id (str) – 客户端代表操作的项目 ID。
location (str | None) – 作业/数据集/表的默认位置。
- get_sqlalchemy_engine(engine_kwargs=None)[源代码]¶
创建 SQLAlchemy 引擎对象。
- 参数
engine_kwargs (dict | None) –
create_engine()
中使用的 Kwargs。
- get_records(sql, parameters=None)[源代码]¶
执行 SQL 并返回记录集。
- 参数
sql – 要执行的 SQL 语句 (str) 或要执行的 SQL 语句列表
parameters – 用于渲染 SQL 查询的参数。
- abstract insert_rows(table, rows, target_fields=None, commit_every=1000, replace=False, **kwargs)[源代码]¶
插入行。
目前不支持插入。理论上,你可以使用 BigQuery 的流式 API 将行插入到表中,但尚未实现。
- get_pandas_df(sql, parameters=None, dialect=None, **kwargs)[源代码]¶
获取 BigQuery 结果的 Pandas DataFrame。
必须重写 DbApiHook 方法,因为 Pandas 不支持 PEP 249 连接,除了 SQLite。
另请参阅
https://github.com/pandas-dev/pandas/blob/055d008615272a1ceca9720dc365a2abd316f353/pandas/io/sql.py#L415 https://github.com/pandas-dev/pandas/issues/6900
- 参数
sql (str) – 要执行的 BigQuery SQL。
parameters (collections.abc.Iterable | collections.abc.Mapping[str, Any] | None) – 用于渲染 SQL 查询的参数(未使用,留给重写超类方法)
dialect (str | None) – BigQuery SQL 的方言 – 传统 SQL 或标准 SQL,如果未指定,则默认为使用 self.use_legacy_sql
kwargs – (可选) 传递到 pandas_gbq.read_gbq 方法
- table_exists(dataset_id, table_id, project_id)[源代码]¶
检查 Google BigQuery 中是否存在表。
- 参数
project_id (str) – 要在其中查找表的 Google 云项目。提供给钩子的连接必须提供对指定项目的访问权限。
dataset_id (str) – 要在其中查找表的数据集的名称。
table_id (str) – 要检查其存在性的表的名称。
- table_partition_exists(dataset_id, table_id, partition_id, project_id)[源代码]¶
检查 Google BigQuery 中是否存在分区。
- 参数
project_id (str) – 要在其中查找表的 Google 云项目。提供给钩子的连接必须提供对指定项目的访问权限。
dataset_id (str) – 要在其中查找表的数据集的名称。
table_id (str) – 要检查其存在性的表的名称。
partition_id (str) – 要检查其存在性的分区的名称。
- create_empty_table(project_id=PROVIDE_PROJECT_ID, dataset_id=None, table_id=None, table_resource=None, schema_fields=None, time_partitioning=None, cluster_fields=None, labels=None, view=None, materialized_view=None, encryption_configuration=None, retry=DEFAULT_RETRY, location=None, exists_ok=True)[源代码]¶
在数据集中创建一个新的空表。
要创建由 SQL 查询定义的视图,请将字典解析到 view 参数中。
- 参数
project_id (str) – 要在其中创建表的项目。
dataset_id (str | None) – 要在其中创建表的数据集。
table_id (str | None) – 要创建的表的名称。
table_resource (dict[str, Any] | None) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,则忽略所有其他参数。
schema_fields (list | None) –
如果设置,则架构字段列表定义如下: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
labels (dict | None) – 包含表的标签的字典,传递给 BigQuery
retry (google.api_core.retry.Retry) – 可选。如何重试 RPC。
time_partitioning (dict | None) –
配置可选的时间分区字段,例如按照 API 规范设置分区字段、类型和过期时间。
cluster_fields (list[str] | None) – [可选] 用于聚类的字段。BigQuery 支持对分区表和非分区表进行聚类。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#clustering.fields
view (dict | None) –
[可选] 包含视图定义的字典。如果设置,它将创建一个视图而不是表:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#ViewDefinition
view = { "query": "SELECT * FROM `test-project-id.test_dataset_id.test_table_prefix*` LIMIT 1000", "useLegacySql": False, }
materialized_view (dict | None) – [可选] 物化视图定义。
encryption_configuration (dict | None) –
[可选] 自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key", }
num_retries – 连接出现问题时的最大重试次数。
location (str | None) – (可选)表应驻留的地理位置。
exists_ok (bool) – 如果为
True
,则在创建表时忽略“已存在”错误。
- 返回
创建的表
- 返回类型
- create_empty_dataset(dataset_id=None, project_id=PROVIDE_PROJECT_ID, location=None, dataset_reference=None, exists_ok=True)[source]¶
创建一个新的空数据集。
- 参数
project_id (str) – 要在其中创建空数据集的项目的名称。如果 dataset_reference 中提供了 projectId,则无需提供。
dataset_id (str | None) – 数据集的 ID。如果 dataset_reference 中提供了 datasetId,则无需提供。
location (str | None) – (可选)数据集应驻留的地理位置。没有默认值,但如果未提供任何内容,则数据集将在美国创建。
dataset_reference (dict[str, Any] | None) – 可以与请求正文一起提供的数据集引用。更多信息:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
exists_ok (bool) – 如果为
True
,则在创建数据集时忽略“已存在”错误。
- get_dataset_tables(dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, retry=DEFAULT_RETRY)[source]¶
获取给定数据集的表列表。
更多信息,请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/list
- 参数
dataset_id (str) – 所请求数据集的数据集 ID。
project_id (str) – (可选)所请求数据集的项目。如果为 None,则将使用 self.project_id。
max_results (int | None) – (可选)要返回的最大表数。
retry (google.api_core.retry.Retry) – 如何重试 RPC。
- 返回
与数据集关联的表列表。
- 返回类型
- delete_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, retry=DEFAULT_RETRY)[source]¶
删除项目中 BigQuery 的数据集。
- 参数
project_id (str) – 拥有数据集的项目的名称。
dataset_id (str) – 要删除的数据集。
delete_contents (bool) – 如果为 True,则删除数据集中的所有表。如果为 False 且数据集包含表,则请求将失败。
retry (google.api_core.retry.Retry) – 如何重试 RPC。
- update_table(table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]¶
更改表的某些字段。
使用
fields
指定要更新的字段。必须至少提供一个字段。如果fields
中列出了某个字段,并且该字段在table
中为None
,则该字段值将被删除。如果
table.etag
不为None
,则只有当服务器上的表的 ETag 相同时,更新才会成功。因此,使用get_table
读取表,更改其字段,然后将其传递给update_table
将确保只有在自读取以来未对表进行修改的情况下才会保存更改。- 参数
project_id (str) – 要在其中创建表的项目。
dataset_id (str | None) – 要在其中创建表的数据集。
table_id (str | None) – 要创建的表的名称。
table_resource (dict[str, Any]) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 该表必须包含
tableReference
或project_id
,必须提供dataset_id
和table_id
。fields (list[str] | None) – 要更改的
table
的字段,拼写为 Table 属性(例如,“friendly_name”)。
- insert_all(project_id, dataset_id, table_id, rows, ignore_unknown_values=False, skip_invalid_rows=False, fail_on_error=False)[source]¶
无需加载作业,即可将数据流式传输到 BigQuery,一次一条记录。
- 参数
project_id (str) – 表所在的项目的名称。
dataset_id (str) – 表所在的数据集的名称。
table_id (str) – 表的名称。
rows (list) –
要插入的行。
rows = [{"json": {"a_key": "a_value_0"}}, {"json": {"a_key": "a_value_1"}}]
ignore_unknown_values (bool) – [可选] 接受包含与架构不匹配的值的行。未知值将被忽略。默认值为 false,这会将未知值视为错误。
skip_invalid_rows (bool) – [可选] 插入请求中的所有有效行,即使存在无效行也是如此。默认值为 false,如果存在任何无效行,则会导致整个请求失败。
fail_on_error (bool) – [可选] 如果发生任何错误,则强制任务失败。默认值为 false,这表示即使发生任何插入错误,任务也不应失败。
- update_dataset(fields, dataset_resource, dataset_id=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY)[source]¶
更改数据集的某些字段。
使用
fields
指定要更新的字段。必须提供至少一个字段。如果fields
中列出了某个字段,并且在dataset
中为None
,则会将其删除。如果
dataset.etag
不为None
,则只有当服务器上的数据集具有相同的 ETag 时,更新才会成功。因此,使用get_dataset
读取数据集,更改其字段,然后将其传递给update_dataset
将确保只有在自读取以来未对数据集进行任何修改的情况下才会保存更改。- 参数
dataset_resource (dict[str, Any]) – 将在请求正文中提供的数据集资源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
dataset_id (str | None) – 数据集的 ID。
fields (collections.abc.Sequence[str]) – 要更改的
dataset
的属性(例如,“friendly_name”)。project_id (str) – Google Cloud 项目 ID
retry (google.api_core.retry.Retry) – 如何重试 RPC。
- get_datasets_list(project_id=PROVIDE_PROJECT_ID, include_all=False, filter_=None, max_results=None, page_token=None, retry=DEFAULT_RETRY, return_iterator=False)[source]¶
获取当前项目中的所有 BigQuery 数据集。
更多信息,请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list
- 参数
project_id (str) – 您尝试获取所有数据集的 Google Cloud 项目
include_all (bool) – 如果结果包括隐藏数据集,则为 True。默认为 False。
filter – 用于按标签过滤结果的表达式。有关语法,请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/list#filter。
filter – str
max_results (int | None) – 要返回的最大数据集数。
max_results – int
page_token (str | None) – 表示数据集游标的令牌。如果未传递,则 API 将返回数据集的第一页。该令牌标记要返回的迭代器的开头,并且可以在
HTTPIterator
的next_page_token
处访问page_token
的值。page_token – str
retry (google.api_core.retry.Retry) – 如何重试 RPC。
return_iterator (bool) – 返回 HTTPIterator 而不是返回 list[Row],该迭代器可用于获取 next_page_token 属性。
- get_dataset(dataset_id, project_id=PROVIDE_PROJECT_ID)[source]¶
提取由 dataset_id 引用的数据集。
- 参数
dataset_id (str) – BigQuery 数据集 ID
project_id (str) – Google Cloud 项目 ID
- 返回
dataset_resource
- 返回类型
另请参阅
有关数据集资源内容的更多信息,请参阅:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
- run_grant_dataset_view_access(source_dataset, view_dataset, view_table, view_project=None, project_id=PROVIDE_PROJECT_ID)[source]¶
授予数据集的授权视图访问权限给视图表。
如果此视图已被授予对数据集的访问权限,则不执行任何操作。此方法不是原子的。运行它可能会覆盖同时发生的更新。
- run_table_upsert(dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID)[source]¶
如果表存在则更新表,否则创建新表。
由于 BigQuery 本身不允许表更新插入,因此这不是原子操作。
- 参数
dataset_id (str) – 要将表更新/插入到的数据集。
table_resource (dict[str, Any]) – 表资源。 请参阅 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
project_id (str) – 要将表更新/插入到的项目。如果为 None,则项目将为 self.project_id。
- delete_table(table_id, not_found_ok=True, project_id=PROVIDE_PROJECT_ID)[source]¶
从数据集中删除现有表。
如果表不存在,则返回错误,除非 not_found_ok 设置为 True。
- list_rows(dataset_id, table_id, max_results=None, selected_fields=None, page_token=None, start_index=None, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY, return_iterator=False)[source]¶
列出表中的行。
请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list
- 参数
dataset_id (str) – 请求的表的 dataset ID。
table_id (str) – 请求的表的 table ID。
max_results (int | None) – 返回的最大结果数。
selected_fields (list[str] | str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。
page_token (str | None) – 从先前调用返回的页面令牌,用于标识结果集。
start_index (int | None) – 要读取的起始行的从零开始的索引。
project_id (str) – 客户端代表操作的项目 ID。
location (str | None) – 作业的默认位置。
retry (google.api_core.retry.Retry) – 如何重试 RPC。
return_iterator (bool) – 不是返回 list[Row],而是返回 RowIterator,可用于获取 next_page_token 属性。
- 返回
行列表
- 返回类型
list[google.cloud.bigquery.table.Row] | google.cloud.bigquery.table.RowIterator
- update_table_schema(schema_fields_updates, include_policy_tags, dataset_id, table_id, project_id=PROVIDE_PROJECT_ID)[source]¶
更新给定数据集和表的架构中的字段。
请注意,架构中的某些字段是不可变的;尝试更改它们会导致异常。
如果包含新字段,则将插入该字段,这需要设置所有必需的字段。
- 参数
include_policy_tags (bool) – 如果设置为 True,则策略标签将包含在更新请求中,这需要特殊权限,即使未更改,请参阅 https://cloud.google.com/bigquery/docs/column-level-security#roles
dataset_id (str) – 要更新的请求表的 dataset ID。
table_id (str) – 要更新的表的 table ID。
schema_fields_updates (list[dict[str, Any]]) –
部分架构资源。请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
schema_fields_updates = [ {"name": "emp_name", "description": "Some New Description"}, {"name": "salary", "description": "Some New Description"}, { "name": "departments", "fields": [ {"name": "name", "description": "Some New Description"}, {"name": "type", "description": "Some New Description"}, ], }, ]
project_id (str) – 我们要更新表的项目的名称。
- poll_job_complete(job_id, project_id=PROVIDE_PROJECT_ID, location=None, retry=DEFAULT_RETRY)[source]¶
检查作业是否已完成。
- 参数
job_id (str) – 作业的 ID。
project_id (str) – 运行作业的 Google Cloud 项目
location (str | None) – 作业运行的位置
retry (google.api_core.retry.Retry) – 如何重试 RPC。
- get_job(job_id, project_id=PROVIDE_PROJECT_ID, location=None)[source]¶
检索 BigQuery 作业。
- 参数
job_id (str) – 作业的 ID。ID 只能包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。
project_id (str) – 运行作业的 Google Cloud 项目。
location (str | None) – 作业运行的位置。
- insert_job(configuration, job_id=None, project_id=PROVIDE_PROJECT_ID, location=None, nowait=False, retry=DEFAULT_RETRY, timeout=None)[source]¶
执行 BigQuery 作业并等待其完成。
- 参数
configuration (dict) – configuration 参数直接映射到作业对象中的 BigQuery 的配置字段。 有关详细信息,请参阅 https://cloud.google.com/bigquery/docs/reference/v2/jobs。
job_id (str | None) – 作业的 ID。ID 只能包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。如果未提供,则将生成 uuid。
project_id (str) – 运行作业的 Google Cloud 项目。
location (str | None) – 作业运行的位置。
nowait (bool) – 是否插入作业而不等待结果。
retry (google.api_core.retry.Retry) – 如何重试 RPC。
timeout (float | None) – 在使用
retry
之前,等待底层 HTTP 传输的秒数。
- 返回
作业 ID。
- 返回类型
BigQueryJob
- get_query_results(job_id, location, max_results=None, selected_fields=None, project_id=PROVIDE_PROJECT_ID, retry=DEFAULT_RETRY, job_retry=DEFAULT_JOB_RETRY)[source]¶
获取给定 job_id 的查询结果。
- 参数
job_id (str) – 作业的 ID。ID 只能包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。
location (str) – 用于操作的位置。
selected_fields (list[str] | str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。
max_results (int | None) – 要从表中获取的最大记录(行)数。
project_id (str) – 运行作业的 Google Cloud 项目。
retry (google.api_core.retry.Retry) – 如何重试 RPC。
job_retry (google.api_core.retry.Retry) – 如何重试失败的作业。
- 返回
如果给定,则列出按选定字段筛选的列的行。
- 引发
AirflowException
- 返回类型
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryConnection(*args, **kwargs)[source]¶
BigQuery 连接。
BigQuery 没有持久连接的概念。因此,这些对象是游标的小型无状态工厂,它们完成所有实际工作。
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryBaseCursor(service, project_id, hook, use_legacy_sql=True, api_resource_configs=None, location=None, num_retries=5, labels=None)[source]¶
基类:
airflow.utils.log.logging_mixin.LoggingMixin
BigQuery 光标。
BigQuery 基本游标包含针对 BigQuery 执行查询的辅助方法。在不需要 PEP 249 游标的情况下,运算符可以直接使用这些方法。
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryCursor(service, project_id, hook, use_legacy_sql=True, location=None, num_retries=5)[source]¶
-
一个非常基本的 BigQuery PEP 249 光标实现。
PyHive PEP 249 的实现被用作参考
https://github.com/dropbox/PyHive/blob/master/pyhive/presto.py https://github.com/dropbox/PyHive/blob/master/pyhive/common.py
- airflow.providers.google.cloud.hooks.bigquery.split_tablename(table_input, default_project_id, var_name=None)[source]¶
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
使用 gcloud-aio 库检索作业详细信息。
- async create_job_for_partition_get(dataset_id, table_id=None, project_id=PROVIDE_PROJECT_ID)[source]¶
创建一个新作业并使用 gcloud-aio 获取 job_id。
- async cancel_job(job_id, project_id, location)[source]¶
取消 BigQuery 作业。
- 参数
job_id (str) – 要取消的作业的 ID。
project_id (str | None) – 运行作业的 Google Cloud 项目。
location (str | None) – 运行作业的位置。
- get_records(query_results, as_dict=False, selected_fields=None)[source]¶
将 BigQuery 的响应转换为记录。
- 参数
query_results (dict[str, Any]) – SQL 查询的结果
as_dict (bool) – 如果为 True,则将结果作为字典列表返回,否则作为列表的列表返回。
selected_fields (str | list[str] | None) –
- value_check(sql, pass_value, records, tolerance=None)[source]¶
将单个查询结果行和容差与 pass_value 进行匹配。
- 引发
AirflowException – 如果匹配失败
- interval_check(row1, row2, metrics_thresholds, ignore_zero, ratio_formula)[source]¶
检查指标(SQL 表达式)的值是否在一定的容差范围内。
- 参数
row1 (str | None) – 第一个 SQL 查询的查询执行作业的第一个结果行
row2 (str | None) – 第二个 SQL 查询的查询执行作业的第一个结果行
metrics_thresholds (dict[str, Any]) – 一个按指标索引的比率字典,例如 'COUNT(*)': 1.5 将要求当前日期与先前 days_back 之间的差异小于等于 50%。
ignore_zero (bool) – 我们是否应忽略零指标
ratio_formula (str) – 用于计算两个指标之间比率的公式。假设 cur 是今天的指标,ref 是今天的指标 - days_back。max_over_min:计算 max(cur, ref) / min(cur, ref) relative_diff:计算 abs(cur-ref) / ref
- class airflow.providers.google.cloud.hooks.bigquery.BigQueryTableAsyncHook(gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
基类:
airflow.providers.google.common.hooks.base_google.GoogleBaseAsyncHook
BigQuery Table 的异步 hook。