airflow.providers.google.cloud.operators.bigquery¶
此模块包含 Google BigQuery 操作符。
属性¶
类¶
BigQuery 操作符的十六进制颜色。 |
|
如果资源存在要采取的操作。 |
|
对 BigQuery 执行检查。 |
|
使用 SQL 代码执行简单的值检查。 |
|
检查给定为 SQL 表达式的指标值是否在旧值的容差范围内。 |
|
子类化 SQLColumnCheckOperator,以便为 OpenLineage 提供可解析的作业 ID。 |
|
子类化 SQLTableCheckOperator,以便为 OpenLineage 提供可解析的作业 ID。 |
|
获取数据并返回,可以来自 BigQuery 表,也可以是查询作业的结果。 |
|
在指定的 BigQuery 数据集中创建一个新表,可选择带上模式。 |
|
在指定的 BigQuery 数据集中创建一个新表,可选择带上模式。 |
|
使用来自 Google Cloud Storage 的数据创建一个新的外部表。 |
|
从 BigQuery 中的项目删除现有数据集。 |
|
在 BigQuery 中的项目创建一个新数据集。 |
|
根据 ID 获取指定的数据集。 |
|
检索指定数据集中的表列表。 |
|
在 BigQuery 中的项目更新一个表。 |
|
在 BigQuery 中的项目更新一个数据集。 |
|
删除一个 BigQuery 表。 |
|
对 BigQuery 表进行 upsert 操作。 |
|
更新 BigQuery 表模式。 |
|
执行一个 BigQuery 作业。 |
模块内容¶
- airflow.providers.google.cloud.operators.bigquery.BIGQUERY_JOB_DETAILS_LINK_FMT = 'https://console.cloud.google.com/bigquery?j={job_id}'[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUIColors[source]¶
基类:
enum.Enum
BigQuery 操作符的十六进制颜色。
- class airflow.providers.google.cloud.operators.bigquery.IfExistAction[source]¶
基类:
enum.Enum
如果资源存在要采取的操作。
- class airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator(*, sql, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, query_params=None, **kwargs)[source]¶
基类:
_BigQueryDbHookMixin
,airflow.providers.common.sql.operators.sql.SQLCheckOperator
,_BigQueryOperatorsEncryptionConfigurationMixin
对 BigQuery 执行检查。
此操作符需要一个返回单行的 SQL 查询。该行上的每个值都将使用 Python
bool
强制转换进行评估。如果任一值为假值,检查将出错。另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 检查查询结果是否有数据
请注意,Python bool 强制转换将以下内容评估为 False
False
0
空字符串 (
""
)空列表 (
[]
)空字典或集合 (
{}
)
给定一个类似
SELECT COUNT(*) FROM foo
的查询,只有当计数等于零时才会失败。您可以构建更复杂的查询,例如检查表是否与上游源表具有相同的行数,或者今天分区的计数是否大于昨天分区的计数,或者一组指标是否小于 7 天平均值的三倍标准差。此操作符可用作管道中的数据质量检查。根据您在 DAG 中的位置,您可以选择阻止关键路径,防止发布可疑数据,或者将其放在旁边并接收电子邮件警报而不停止 DAG 的进度。
- 参数:
sql (str) – 要执行的 SQL。
gcp_conn_id (str) – Google Cloud 的连接 ID。
use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。
location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据模拟,或者获取列表中最后一个账号(将在请求中被模拟)的访问令牌所需的链式账号列表。如果设置为字符串,该账号必须授予发起账号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予紧前身份 Service Account Token Creator IAM 角色,列表中第一个账号将此角色授予发起账号。(模板化)
labels (dict | None) – 包含表标签的字典,传递给 BigQuery。
encryption_configuration (dict | None) –
(可选)自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
deferrable (bool) – 以可延迟模式运行操作符。
poll_interval (float) – (仅限可延迟模式)检查作业状态的轮询周期(秒)。
query_params (list | None) – 包含查询参数类型和值的字典列表,传递给 BigQuery。字典的结构应类似于 Google BigQuery Jobs API 中的“queryParameters”:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs。例如,[{ ‘name’: ‘corpus’, ‘parameterType’: { ‘type’: ‘STRING’ }, ‘parameterValue’: { ‘value’: ‘romeoandjuliet’ } }]。(模板化)
- template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'impersonation_chain', 'labels', 'query_params')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckOperator(*, sql, pass_value, tolerance=None, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]¶
基类:
_BigQueryDbHookMixin
,airflow.providers.common.sql.operators.sql.SQLValueCheckOperator
,_BigQueryOperatorsEncryptionConfigurationMixin
使用 SQL 代码执行简单的值检查。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 将查询结果与通过值进行比较
- 参数:
sql (str) – 要执行的 SQL。
use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。
encryption_configuration (dict | None) –
(可选)自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据模拟,或者获取列表中最后一个账号(将在请求中被模拟)的访问令牌所需的链式账号列表。如果设置为字符串,该账号必须授予发起账号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予紧前身份 Service Account Token Creator IAM 角色,列表中第一个账号将此角色授予发起账号。(模板化)
labels (dict | None) – 包含表标签的字典,传递给 BigQuery。
deferrable (bool) – 以可延迟模式运行操作符。
poll_interval (float) – (仅限可延迟模式)检查作业状态的轮询周期(秒)。
- template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'pass_value', 'impersonation_chain', 'labels')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, encryption_configuration=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, project_id=PROVIDE_PROJECT_ID, **kwargs)[源]¶
基类:
_BigQueryDbHookMixin
,airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator
,_BigQueryOperatorsEncryptionConfigurationMixin
检查给定为 SQL 表达式的指标值是否在旧值的容差范围内。
此方法构建查询如下
SELECT {metrics_threshold_dict_key} FROM {table} WHERE {date_filter_column}=<date>
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:比较随时间变化的指标
- 参数:
table (str) – 表名
days_back (SupportsAbs`[`int`]` ) – `ds` 与要对照检查的 `ds` 之间的天数。默认为 7 天
metrics_thresholds (dict) – 按指标索引的比例字典,例如 `'COUNT(*)'`: `1.5` 要求当前日期与前 `days_back` 天之间的差异小于或等于 50%。
use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。
encryption_configuration (dict | None) –
(可选)自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
labels (dict | None) – 包含表标签的字典,传递给 BigQuery
deferrable (bool) – 在 deferrable 模式下运行 Operator
poll_interval (float) – (仅在 deferrable 模式下) 检查作业状态的轮询周期,单位为秒。默认为 4 秒。
project_id (str) – 表示 BigQuery 项目ID 的字符串
- template_fields: collections.abc.Sequence[str] = ('table', 'gcp_conn_id', 'sql1', 'sql2', 'impersonation_chain', 'labels')[源]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryColumnCheckOperator(*, table, column_mapping, partition_clause=None, database=None, accept_none=True, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, **kwargs)[源]¶
基类:
_BigQueryDbHookMixin
,airflow.providers.common.sql.operators.sql.SQLColumnCheckOperator
,_BigQueryOperatorsEncryptionConfigurationMixin
子类化 SQLColumnCheckOperator,以便为 OpenLineage 提供可解析的作业 ID。
用法请参阅基类文档字符串。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:使用预定义测试检查列
- 参数:
table (str) – 表名
column_mapping (dict) – 一个字典,关联列与其检查规则
partition_clause (str | None) – 一个字符串 SQL 语句,添加到 WHERE 子句以分区数据
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
encryption_configuration (dict | None) –
(可选)自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。
location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
labels (dict | None) – 包含表标签的字典,传递给 BigQuery
- template_fields: collections.abc.Sequence[str][源]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryTableCheckOperator(*, table, checks, partition_clause=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, **kwargs)[源]¶
基类:
_BigQueryDbHookMixin
,airflow.providers.common.sql.operators.sql.SQLTableCheckOperator
,_BigQueryOperatorsEncryptionConfigurationMixin
子类化 SQLTableCheckOperator,以便为 OpenLineage 提供可解析的作业 ID。
用法请参阅基类。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:检查表级别的数据质量
- 参数:
table (str) – 表名
checks (dict) – 一个字典,包含检查名称和布尔型 SQL 语句
partition_clause (str | None) – 一个字符串 SQL 语句,添加到 WHERE 子句以分区数据
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。
location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
labels (dict | None) – 包含表标签的字典,传递给 BigQuery
encryption_configuration (dict | None) –
(可选)自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
- template_fields: collections.abc.Sequence[str][源]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator(*, dataset_id=None, table_id=None, table_project_id=None, job_id=None, job_project_id=None, project_id=PROVIDE_PROJECT_ID, max_results=100, selected_fields=None, gcp_conn_id='google_cloud_default', location=None, encryption_configuration=None, impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, as_dict=False, use_legacy_sql=True, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
,_BigQueryOperatorsEncryptionConfigurationMixin
获取数据并返回,可以来自 BigQuery 表,也可以是查询作业的结果。
可以按特定列缩小数据范围,或整体检索数据。根据“as_dict”的值,数据以以下两种格式之一返回:1. False(默认)- 一个 Python 列表的列表,嵌套列表的数量等于获取的行数。每个嵌套列表代表一行,其中的元素对应于该行的列值。
示例结果:
[['Tony', 10], ['Mike', 20]
2. True - 一个 Python 字典列表,其中每个字典代表一行。在每个字典中,键是列名,值是这些列对应的数值。
示例结果:
[{'name': 'Tony', 'age': 10}, {'name': 'Mike', 'age': 20}]
另请参阅
有关如何使用此运算符的更多信息,请参阅指南:从表中获取数据
注意
如果您在
selected_fields
中传递的字段顺序与 BQ 表格/作业中现有列的顺序不同,数据仍将按照 BQ 表格的顺序排列。例如,如果 BQ 表格有三列,顺序为[A,B,C]
,而您在selected_fields
中传递 'B,A',数据仍然会是'A,B'
的形式。注意
在非可延迟模式下使用作业 ID 时,作业应处于 DONE(完成)状态。
示例 - 使用表格从 BigQuery 检索数据:
get_data = BigQueryGetDataOperator( task_id="get_data_from_bq", dataset_id="test_dataset", table_id="Transaction_partitions", table_project_id="internal-gcp-project", max_results=100, selected_fields="DATE", gcp_conn_id="airflow-conn-id", )
示例 - 使用作业 ID 从 BigQuery 检索数据:
get_data = BigQueryGetDataOperator( job_id="airflow_8999918812727394_86a1cecc69c5e3028d28247affd7563", job_project_id="internal-gcp-project", max_results=100, selected_fields="DATE", gcp_conn_id="airflow-conn-id", )
- 参数:
dataset_id (str | None) – 请求的表格的数据集 ID。(模板化)
table_id (str | None) – 请求的表格的表格 ID。与 job_id 互斥。(模板化)
table_project_id (str | None) – (可选)请求的表格的项目 ID。如果为 None,将从 hook 的项目 ID 派生。(模板化)
job_id (str | None) – 从中检索查询结果的作业 ID。与 table_id 互斥。(模板化)
job_project_id (str | None) – (可选)作业运行所在的 Google Cloud 项目。如果为 None,将从 hook 的项目 ID 派生。(模板化)
project_id (str) – (已弃用)(可选)将从中返回数据的项目名称。如果为 None,将从 hook 的项目 ID 派生。(模板化)
max_results (int) – 从表中获取的最大记录数(行数)。(模板化)
selected_fields (str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。
encryption_configuration (dict | None) –
(可选)自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
location (str | None) – 用于操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
deferrable (bool) – 在 deferrable 模式下运行 Operator
poll_interval (float) – (仅在 deferrable 模式下) 检查作业状态的轮询周期,单位为秒。默认为 4 秒。
as_dict (bool) – 如果为 True,则结果返回为字典列表;否则返回为列表的列表(默认:False)。
use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_project_id', 'job_id', 'job_project_id', 'project_id',...[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateTableOperator(*, dataset_id, table_id, table_resource, project_id=PROVIDE_PROJECT_ID, location=None, gcs_schema_object=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', retry=DEFAULT, timeout=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
在指定的 BigQuery 数据集中创建一个新表,可选择带上模式。
用于 BigQuery 表的 schema 可以通过两种方式指定。您可以直接传入 schema 字段,或者指向一个 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是一个包含 schema 字段的 JSON 文件。您也可以在没有 schema 的情况下创建表。
另请参阅
有关如何使用此运算符的更多信息,请参阅指南:创建表
- 参数:
project_id (str) – 可选。要在其中创建表的项目。
dataset_id (str) – 必需。要在其中创建表的数据集。
table_id (str) – 必需。要创建的表的名称。
table_resource (dict[str, Any] | google.cloud.bigquery.table.Table | google.cloud.bigquery.table.TableReference | google.cloud.bigquery.table.TableListItem) – 必需。表资源,如文档所述:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果
table
是引用,则将创建一个具有指定 ID 的空表。表所属的数据集必须已经存在。if_exists (str) – 可选。如果表存在,Airflow 应该做什么。如果设置为 log,则 TI 将成功,并记录一条错误消息。设置为 ignore 以忽略错误,设置为 fail 以使 TI 失败,设置为 skip 以跳过。
gcs_schema_object (str | None) – 可选。包含 schema 的 JSON 文件的完整路径。例如:
gs://test-bucket/dir1/dir2/employee_schema.json
gcp_conn_id (str) – 可选。用于连接到 Google Cloud 并与 Bigquery 服务交互的连接 ID。
google_cloud_storage_conn_id (str) – 可选。用于连接到 Google Cloud 并与 Google Cloud Storage 服务交互的连接 ID。
location (str | None) – 可选。用于操作的位置。
retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 可选。用于重试请求的重试对象。如果指定 None,则不会重试请求。
timeout (float | None) – 可选。等待请求完成的时间(秒)。请注意,如果指定了 retry,则超时应用于每个单独的尝试。
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选。使用短期凭据模拟的服务账号,或获取列表中最后一个账号的 access_token 所需的账号链,该账号将在请求中被模拟。如果设置为字符串,该账号必须授予原始账号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予直接前一个身份 Service Account Token Creator IAM 角色,列表中第一个账号将此角色授予原始账号。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'impersonation_chain')[source]¶
- 类 airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator(*, dataset_id, table_id, table_resource=None, project_id=PROVIDE_PROJECT_ID, schema_fields=None, gcs_schema_object=None, time_partitioning=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', labels=None, view=None, materialized_view=None, encryption_configuration=None, location=None, cluster_fields=None, impersonation_chain=None, if_exists='log', bigquery_conn_id=None, exists_ok=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
在指定的 BigQuery 数据集中创建一个新表,可选择带上模式。
用于 BigQuery 表的 schema 可以通过两种方式指定。您可以直接传入 schema 字段,或者指向一个 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是一个包含 schema 字段的 JSON 文件。您也可以在没有 schema 的情况下创建表。
另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:Create native table
- 参数:
project_id (str) – 要创建表的项目。(templated)
dataset_id (str) – 要在其中创建表的数据集。(templated)
table_id (str) – 要创建的表的名称。(templated)
table_resource (dict[str, Any] | None) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,则所有其他参数将被忽略。(templated)
schema_fields (list | None) –
如果设置,则为此处定义的 schema 字段列表:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
示例:
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
gcs_schema_object (str | None) – 包含 schema 的 JSON 文件的完整路径 (templated)。例如:
gs://test-bucket/dir1/dir2/employee_schema.json
time_partitioning (dict | None) –
根据 API 规范配置可选的时间分区字段,即按字段、类型和过期时间进行分区。
gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Bigquery 服务交互的连接 ID。
google_cloud_storage_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Google Cloud Storage 服务交互的连接 ID。
labels (dict | None) – 包含表标签的字典,传递给 BigQuery
示例 (schema JSON 在 GCS 中):
CreateTable = BigQueryCreateEmptyTableOperator( task_id="BigQueryCreateEmptyTableOperator_task", dataset_id="ODS", table_id="Employees", project_id="internal-gcp-project", gcs_schema_object="gs://schema-bucket/employee_schema.json", gcp_conn_id="airflow-conn-id", google_cloud_storage_conn_id="airflow-conn-id", )
对应的 Schema 文件 (
employee_schema.json
)[ {"mode": "NULLABLE", "name": "emp_name", "type": "STRING"}, {"mode": "REQUIRED", "name": "salary", "type": "INTEGER"}, ]
示例 (schema 在 DAG 中):
CreateTable = BigQueryCreateEmptyTableOperator( task_id="BigQueryCreateEmptyTableOperator_task", dataset_id="ODS", table_id="Employees", project_id="internal-gcp-project", schema_fields=[ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ], gcp_conn_id="airflow-conn-id-account", google_cloud_storage_conn_id="airflow-conn-id", )
- 参数:
view (dict | None) –
(可选) 包含 view 定义的字典。如果设置,它将创建一个 view 而不是表。
materialized_view (dict | None) – (可选) 具化视图 (materialized view) 定义。
encryption_configuration (dict | None) –
(可选)自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
location (str | None) – 用于操作的位置。
cluster_fields (list[str] | None) –
(可选) 用于聚簇 (clustering) 的字段。BigQuery 支持对分区表和非分区表进行聚簇。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
if_exists (str) – 如果表已存在,Airflow 应该怎么做。如果设置为 log,则 TI 将成功,并记录错误消息。设置为 ignore 则忽略错误,设置为 fail 则使 TI 失败,设置为 skip 则跳过。
exists_ok (bool | None) – 已弃用 - 请改用 if_exists=”ignore”。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'labels',...[source]¶
- 类 airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator(*, bucket=None, source_objects=None, destination_project_dataset_table=None, table_resource=None, schema_fields=None, schema_object=None, gcs_schema_bucket=None, source_format=None, autodetect=False, compression=None, skip_leading_rows=None, field_delimiter=None, max_bad_records=0, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', src_fmt_configs=None, labels=None, encryption_configuration=None, location=None, impersonation_chain=None, bigquery_conn_id=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
使用来自 Google Cloud Storage 的数据创建一个新的外部表。
要用于 BigQuery 表的 schema 可以通过两种方式指定。您可以直接传入 schema 字段,或者将 operator 指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是包含 schema 字段的 JSON 文件。
另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:Create external table
- 参数:
bucket (str | None) – 外部表指向的存储桶 (bucket)。(templated)
source_objects (list[str] | None) – 外部表指向的 Google Cloud Storage URI 列表。如果 source_format 是 ‘DATASTORE_BACKUP’,则列表必须只包含一个 URI。
destination_project_dataset_table (str | None) – 要加载数据到的以点分隔的
(<project>.)<dataset>.<table>
BigQuery 表 (templated)。如果未包含<project>
,则项目将是连接 json 中定义的项目。schema_fields (list | None) –
如果设置,则为此处定义的 schema 字段列表:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
示例:
schema_fields = [ {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}, ]
当 source_format 是 ‘DATASTORE_BACKUP’ 时不应设置此项。
table_resource (dict[str, Any] | None) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,则所有其他参数将被忽略。来自对象的外部 schema 将被解析。
schema_object (str | None) – 如果设置,则为指向包含表 schema 的 .json 文件的 GCS 对象路径。(templated)
gcs_schema_bucket (str | None) – 存储 schema JSON 的 GCS 存储桶名称 (templated)。默认值是 self.bucket。
source_format (str | None) – 数据的文件格式。
autodetect (bool) – 尝试自动检测 schema 和格式选项。明确指定 schema_fields 和 schema_object 选项时,将优先使用这些选项。https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources
compression (str | None) – (可选) 数据源的压缩类型。可能的值包括 GZIP 和 NONE。默认值为 NONE。Google Cloud Bigtable、Google Cloud Datastore 备份和 Avro 格式会忽略此设置。
skip_leading_rows (int | None) – 从 CSV 加载时要跳过的行数。
field_delimiter (str | None) – CSV 中使用的分隔符。
max_bad_records (int) – BigQuery 在运行作业时可以忽略的最大错误记录数。
quote_character (str | None) – 用于引用 CSV 文件中数据部分的字符。
allow_quoted_newlines (bool) – 是否允许带引号的换行符 (true) 或不允许 (false)。
allow_jagged_rows (bool) – 接受缺少尾部可选列的行。缺失的值被视为 null。如果为 false,则缺少尾部列的记录被视为错误记录,如果错误记录过多,则在作业结果中返回无效错误。仅适用于 CSV,对其他格式无效。
gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Bigquery 服务交互的连接 ID。
google_cloud_storage_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Google Cloud Storage 服务交互的连接 ID。
src_fmt_configs (dict | None) – 配置特定于源格式的可选字段。
labels (dict | None) – 包含表标签的字典,传递给 BigQuery
encryption_configuration (dict | None) –
(可选)自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY", }
location (str | None) – 用于操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
- template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'gcs_schema_bucket',...[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
从 BigQuery 中的项目删除现有数据集。
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete
另请参阅
有关如何使用此算子的更多信息,请参阅指南:删除数据集
- 参数:
project_id (字符串) – 数据集的项目ID。
dataset_id (字符串) – 要删除的数据集。
delete_contents (布尔值) – (可选)即使数据集不为空,是否强制删除。如果设置为 True,将删除数据集中的所有表(如果有)。如果设置为 False 且数据集不为空,将抛出 HttpError 400:“{dataset_id} is still in use”。默认值为 False。
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
示例:
delete_temp_data = BigQueryDeleteDatasetOperator( dataset_id="temp-dataset", project_id="temp-project", delete_contents=True, # Force the deletion of the dataset as well as its tables (if any). gcp_conn_id="_my_gcp_conn_", task_id="Deletetemp", dag=dag, )
- template_fields: collections.abc.序列[字符串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator(*, dataset_id=None, project_id=PROVIDE_PROJECT_ID, dataset_reference=None, location=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', exists_ok=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
在 BigQuery 中的项目创建一个新数据集。
https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
另请参阅
有关如何使用此算子的更多信息,请参阅指南:创建数据集
- 参数:
project_id (字符串) – 我们要创建数据集的项目名称。
dataset_id (字符串 | 无) – 数据集ID。如果 dataset_reference 中包含 datasetId,则无需提供。
location (字符串 | 无) – 数据集应驻留的地理位置。
dataset_reference (字典 | 无) – 可在请求正文中提供的数据集引用。更多信息:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
if_exists (字符串) –
如果数据集存在,Airflow 应执行什么操作。如果设置为 log,则 TI 将通过并记录一条错误消息。设置为 ignore 以忽略错误,设置为 fail 以使 TI 失败,设置为 skip 以跳过。示例
create_new_dataset = BigQueryCreateEmptyDatasetOperator( dataset_id='new-dataset', project_id='my-project', dataset_reference={"friendlyName": "New Dataset"} gcp_conn_id='_my_gcp_conn_', task_id='newDatasetCreator', dag=dag)
exists_ok (bool | None) – 已弃用 - 请改用 if_exists=”ignore”。
- template_fields: collections.abc.序列[字符串] = ('dataset_id', 'project_id', 'dataset_reference', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
根据 ID 获取指定的数据集。
另请参阅
有关如何使用此算子的更多信息,请参阅指南:获取数据集详情
- 参数:
dataset_id (字符串) – 数据集ID。如果 dataset_reference 中包含 datasetId,则无需提供。
project_id (字符串) – 我们要创建数据集的项目名称。如果 dataset_reference 中包含 projectId,则无需提供。
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
- template_fields: collections.abc.序列[字符串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
检索指定数据集中的表列表。
另请参阅
有关如何使用此算子的更多信息,请参阅指南:列出数据集中的表
- 参数:
dataset_id (字符串) – 请求的数据集的数据集ID。
project_id (字符串) – (可选)请求的数据集所属项目。如果为 None,将使用 self.project_id。
max_results (整数 | 无) – (可选)返回的最大表数。
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
- template_fields: collections.abc.序列[字符串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableOperator(*, table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
在 BigQuery 中的项目更新一个表。
使用
fields
指定要更新表的哪些字段。如果某个字段列在fields
中,并且在表中为None
,则该字段将被删除。另请参阅
有关如何使用此算子的更多信息,请参阅指南:更新表
- 参数:
dataset_id (字符串 | 无) – 数据集ID。如果 table_reference 中包含 datasetId,则无需提供。
table_id (字符串 | 无) – 表ID。如果 table_reference 中包含 tableId,则无需提供。
table_resource (dict[str, Any]) – 将随请求体提供的数据集资源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource
fields (list[str] | None) – 需要更改的
table
的字段,以表属性命名 (例如 “friendly_name”)。project_id (str) – 我们要创建表的项目名称。如果在 table_reference 中包含 projectId,则无需提供。
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateDatasetOperator(*, dataset_resource, fields=None, dataset_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
在 BigQuery 中的项目更新一个数据集。
使用
fields
指定要更新的数据集字段。如果字段列在fields
中且在数据集中为None
,则会将其删除。如果没有提供fields
,则将使用提供的dataset_resource
的所有字段。另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:更新数据集
- 参数:
dataset_id (字符串 | 无) – 数据集ID。如果 dataset_reference 中包含 datasetId,则无需提供。
dataset_resource (dict[str, Any]) – 将随请求体提供的数据集资源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource
project_id (字符串) – 我们要创建数据集的项目名称。如果 dataset_reference 中包含 projectId,则无需提供。
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator(*, deletion_dataset_table, gcp_conn_id='google_cloud_default', ignore_if_missing=False, location=None, impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
删除一个 BigQuery 表。
另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:删除表
- 参数:
deletion_dataset_table (str) – 指示要删除的表,格式为点分隔的
(<project>.|<project>:)<dataset>.<table>
。(模板化)gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
ignore_if_missing (bool) – 如果为 True,则即使请求的表不存在也返回成功。
location (str | None) – 用于操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
- template_fields: collections.abc.Sequence[str] = ('deletion_dataset_table', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUpsertTableOperator(*, dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', location=None, impersonation_chain=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
对 BigQuery 表进行 upsert 操作。
另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:Upsert 表
- 参数:
dataset_id (str) – 指示要更新的数据集,格式为点分隔的
(<project>.|<project>:)<dataset>
。(模板化)table_resource (dict) – 一个表资源。参见 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource
project_id (str) – 我们要更新数据集的项目名称。如果在 dataset_reference 中包含 projectId,则无需提供。
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
location (str | None) – 用于操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
- template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_resource', 'impersonation_chain', 'project_id')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableSchemaOperator(*, schema_fields_updates, dataset_id, table_id, include_policy_tags=False, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, location=None, **kwargs)[source]¶
Bases:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
更新 BigQuery 表模式。
根据提供的 schema_fields_updates 参数的内容更新表 schema 中的字段。提供的 schema 不需要完整,如果字段已存在于 schema 中,您只需提供要更新的项目的键和值,只需确保设置了“name”键。
另请参阅
有关如何使用此 operator 的更多信息,请参阅指南:更新表 schema
- 参数:
schema_fields_updates (list[dict[str, Any]]) –
部分模式资源。请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema
schema_fields_updates = [ {"name": "emp_name", "description": "Some New Description"}, { "name": "salary", "policyTags": {"names": ["some_new_policy_tag"]}, }, { "name": "departments", "fields": [ {"name": "name", "description": "Some New Description"}, {"name": "type", "description": "Some New Description"}, ], }, ]
include_policy_tags (bool) – (可选) 如果设置为 True,策略标签将包含在更新请求中,这需要特殊权限,即使标签未更改(默认为 False)。请参阅 https://cloud.google.com/bigquery/docs/column-level-security#roles
dataset_id (str) – 指示要更新的数据集,格式为点分隔的
(<project>.|<project>:)<dataset>
。(模板化)table_id (str) – 所请求表的表 ID。(支持模板)
project_id (str) – 我们要更新数据集的项目名称。如果在 dataset_reference 中包含 projectId,则无需提供。
gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。
location (str | None) – 用于操作的位置。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
- template_fields: collections.abc.Sequence[str] = ('schema_fields_updates', 'dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]¶
- class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator(configuration, project_id=PROVIDE_PROJECT_ID, location=None, job_id=None, force_rerun=True, reattach_states=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, cancel_on_kill=True, result_retry=DEFAULT_RETRY, result_timeout=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]¶
基类:
airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator
,airflow.providers.google.cloud.openlineage.mixins._BigQueryInsertJobOperatorOpenLineageMixin
执行一个 BigQuery 作业。
等待作业完成并返回作业 ID。此 Operator 按以下方式工作:
如果
force_rerun
为 True,则使用作业配置或 uuid 计算作业的唯一哈希值。- 创建格式如下的
job_id
: [提供的_job_id | airflow_{dag_id}_{task_id}_{exec_date}]_{唯一性后缀}
- 创建格式如下的
使用
job_id
提交 BigQuery 作业。- 如果具有给定 ID 的作业已存在,则尝试重新连接到该作业,前提是该作业尚未完成且其
状态在
reattach_states
中。如果作业已完成,则 Operator 将引发AirflowException
。
使用
force_rerun
将每次提交一个新作业,而不重新连接到已存在的作业。作业定义请参阅此处:
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:执行 BigQuery 作业
- 参数:
configuration (dict[str, Any]) – 配置参数直接映射到作业对象中的 BigQuery 配置字段。有关更多详细信息,请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration
job_id (str | None) – 作业的 ID。除非
force_rerun
为 True,否则将以作业配置的哈希值作为后缀。ID 只能包含字母(a-z、A-Z)、数字(0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。如果未提供,将生成 uuid。force_rerun (bool) – 如果为 True,则 Operator 将使用 uuid 的哈希值作为作业 ID 后缀。
reattach_states (set[str] | None) – 在这些状态下,我们应该重新连接到 BigQuery 作业。应该是非最终状态。
project_id (str) – 作业运行所在的 Google Cloud 项目
location (str | None) – 作业运行的位置
gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。
impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。
cancel_on_kill (bool) – 在调用 on_kill 时,指示是否取消 hook 作业的标志。
result_retry (google.api_core.retry.Retry) – 如何重试检索行的 result 调用。
result_timeout (float | None) – 在使用 result_retry 之前等待 result 方法的秒数。
deferrable (bool) – 在 deferrable 模式下运行 Operator
poll_interval (float) – (仅在 deferrable 模式下) 检查作业状态的轮询周期,单位为秒。默认为 4 秒。
- template_fields: collections.abc.Sequence[str] = ('configuration', 'job_id', 'impersonation_chain', 'project_id')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json', '.sql')[source]¶