airflow.providers.google.cloud.operators.bigquery

此模块包含 Google BigQuery 操作符。

属性

BIGQUERY_JOB_DETAILS_LINK_FMT

LABEL_REGEX

BigQueryUIColors

BigQuery 操作符的十六进制颜色。

IfExistAction

如果资源存在要采取的操作。

BigQueryCheckOperator

对 BigQuery 执行检查。

BigQueryValueCheckOperator

使用 SQL 代码执行简单的值检查。

BigQueryIntervalCheckOperator

检查给定为 SQL 表达式的指标值是否在旧值的容差范围内。

BigQueryColumnCheckOperator

子类化 SQLColumnCheckOperator,以便为 OpenLineage 提供可解析的作业 ID。

BigQueryTableCheckOperator

子类化 SQLTableCheckOperator,以便为 OpenLineage 提供可解析的作业 ID。

BigQueryGetDataOperator

获取数据并返回,可以来自 BigQuery 表,也可以是查询作业的结果。

BigQueryCreateTableOperator

在指定的 BigQuery 数据集中创建一个新表,可选择带上模式。

BigQueryCreateEmptyTableOperator

在指定的 BigQuery 数据集中创建一个新表,可选择带上模式。

BigQueryCreateExternalTableOperator

使用来自 Google Cloud Storage 的数据创建一个新的外部表。

BigQueryDeleteDatasetOperator

从 BigQuery 中的项目删除现有数据集。

BigQueryCreateEmptyDatasetOperator

在 BigQuery 中的项目创建一个新数据集。

BigQueryGetDatasetOperator

根据 ID 获取指定的数据集。

BigQueryGetDatasetTablesOperator

检索指定数据集中的表列表。

BigQueryUpdateTableOperator

在 BigQuery 中的项目更新一个表。

BigQueryUpdateDatasetOperator

在 BigQuery 中的项目更新一个数据集。

BigQueryDeleteTableOperator

删除一个 BigQuery 表。

BigQueryUpsertTableOperator

对 BigQuery 表进行 upsert 操作。

BigQueryUpdateTableSchemaOperator

更新 BigQuery 表模式。

BigQueryInsertJobOperator

执行一个 BigQuery 作业。

模块内容

airflow.providers.google.cloud.operators.bigquery.LABEL_REGEX[source]
class airflow.providers.google.cloud.operators.bigquery.BigQueryUIColors[source]

基类: enum.Enum

BigQuery 操作符的十六进制颜色。

CHECK = '#C0D7FF'[source]
QUERY = '#A1BBFF'[source]
TABLE = '#81A0FF'[source]
DATASET = '#5F86FF'[source]
class airflow.providers.google.cloud.operators.bigquery.IfExistAction[source]

基类: enum.Enum

如果资源存在要采取的操作。

IGNORE = 'ignore'[source]
LOG = 'log'[source]
FAIL = 'fail'[source]
SKIP = 'skip'[source]
class airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator(*, sql, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, query_params=None, **kwargs)[source]

基类: _BigQueryDbHookMixin, airflow.providers.common.sql.operators.sql.SQLCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin

对 BigQuery 执行检查。

此操作符需要一个返回单行的 SQL 查询。该行上的每个值都将使用 Python bool 强制转换进行评估。如果任一值为假值,检查将出错。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 检查查询结果是否有数据

请注意,Python bool 强制转换将以下内容评估为 False

  • False

  • 0

  • 空字符串 ("")

  • 空列表 ([])

  • 空字典或集合 ({})

给定一个类似 SELECT COUNT(*) FROM foo 的查询,只有当计数等于零时才会失败。您可以构建更复杂的查询,例如检查表是否与上游源表具有相同的行数,或者今天分区的计数是否大于昨天分区的计数,或者一组指标是否小于 7 天平均值的三倍标准差。

此操作符可用作管道中的数据质量检查。根据您在 DAG 中的位置,您可以选择阻止关键路径,防止发布可疑数据,或者将其放在旁边并接收电子邮件警报而不停止 DAG 的进度。

参数:
  • sql (str) – 要执行的 SQL。

  • gcp_conn_id (str) – Google Cloud 的连接 ID。

  • use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。

  • location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据模拟,或者获取列表中最后一个账号(将在请求中被模拟)的访问令牌所需的链式账号列表。如果设置为字符串,该账号必须授予发起账号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予紧前身份 Service Account Token Creator IAM 角色,列表中第一个账号将此角色授予发起账号。(模板化)

  • labels (dict | None) – 包含表标签的字典,传递给 BigQuery。

  • encryption_configuration (dict | None) –

    (可选)自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • deferrable (bool) – 以可延迟模式运行操作符。

  • poll_interval (float) – (仅限可延迟模式)检查作业状态的轮询周期(秒)。

  • query_params (list | None) – 包含查询参数类型和值的字典列表,传递给 BigQuery。字典的结构应类似于 Google BigQuery Jobs API 中的“queryParameters”:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs。例如,[{ ‘name’: ‘corpus’, ‘parameterType’: { ‘type’: ‘STRING’ }, ‘parameterValue’: { ‘value’: ‘romeoandjuliet’ } }]。(模板化)

template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'impersonation_chain', 'labels', 'query_params')[source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
ui_color = '#C0D7FF'[source]
conn_id_field = 'gcp_conn_id'[source]
gcp_conn_id = 'google_cloud_default'[source]
use_legacy_sql = True[source]
location = None[source]
impersonation_chain = None[source]
labels = None[source]
encryption_configuration = None[source]
deferrable = True[source]
poll_interval = 4.0[source]
query_params = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

在触发器触发时作为回调函数执行。

此方法立即返回。它依赖于触发器抛出异常,否则假定执行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckOperator(*, sql, pass_value, tolerance=None, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]

基类: _BigQueryDbHookMixin, airflow.providers.common.sql.operators.sql.SQLValueCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin

使用 SQL 代码执行简单的值检查。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南: 将查询结果与通过值进行比较

参数:
  • sql (str) – 要执行的 SQL。

  • use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。

  • encryption_configuration (dict | None) –

    (可选)自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据模拟,或者获取列表中最后一个账号(将在请求中被模拟)的访问令牌所需的链式账号列表。如果设置为字符串,该账号必须授予发起账号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予紧前身份 Service Account Token Creator IAM 角色,列表中第一个账号将此角色授予发起账号。(模板化)

  • labels (dict | None) – 包含表标签的字典,传递给 BigQuery。

  • deferrable (bool) – 以可延迟模式运行操作符。

  • poll_interval (float) – (仅限可延迟模式)检查作业状态的轮询周期(秒)。

template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'pass_value', 'impersonation_chain', 'labels')[source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
ui_color = '#C0D7FF'[source]
conn_id_field = 'gcp_conn_id'[source]
location = None[source]
gcp_conn_id = 'google_cloud_default'[source]
use_legacy_sql = True[source]
encryption_configuration = None[source]
impersonation_chain = None[source]
labels = None[source]
deferrable = True[source]
poll_interval = 4.0[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

在触发器触发时作为回调函数执行。

此方法立即返回。它依赖于触发器抛出异常,否则假定执行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, encryption_configuration=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, project_id=PROVIDE_PROJECT_ID, **kwargs)[源]

基类:_BigQueryDbHookMixinairflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator_BigQueryOperatorsEncryptionConfigurationMixin

检查给定为 SQL 表达式的指标值是否在旧值的容差范围内。

此方法构建查询如下

SELECT {metrics_threshold_dict_key} FROM {table}
WHERE {date_filter_column}=<date>

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南:比较随时间变化的指标

参数:
  • table (str) – 表名

  • days_back (SupportsAbs`[`int`]` ) – `ds` 与要对照检查的 `ds` 之间的天数。默认为 7 天

  • metrics_thresholds (dict) – 按指标索引的比例字典,例如 `'COUNT(*)'`: `1.5` 要求当前日期与前 `days_back` 天之间的差异小于或等于 50%。

  • use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。

  • encryption_configuration (dict | None) –

    (可选)自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

  • labels (dict | None) – 包含表标签的字典,传递给 BigQuery

  • deferrable (bool) – 在 deferrable 模式下运行 Operator

  • poll_interval (float) – (仅在 deferrable 模式下) 检查作业状态的轮询周期,单位为秒。默认为 4 秒。

  • project_id (str) – 表示 BigQuery 项目ID 的字符串

template_fields: collections.abc.Sequence[str] = ('table', 'gcp_conn_id', 'sql1', 'sql2', 'impersonation_chain', 'labels')[源]
ui_color = '#C0D7FF'[源]
conn_id_field = 'gcp_conn_id'[源]
gcp_conn_id = 'google_cloud_default'[源]
use_legacy_sql = True[源]
location = None[源]
encryption_configuration = None[源]
impersonation_chain = None[源]
labels = None[源]
project_id = None[源]
deferrable = True[源]
poll_interval = 4.0[源]
execute(context)[源]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[源]

在触发器触发时作为回调函数执行。

此方法立即返回。它依赖于触发器抛出异常,否则假定执行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryColumnCheckOperator(*, table, column_mapping, partition_clause=None, database=None, accept_none=True, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, **kwargs)[源]

基类:_BigQueryDbHookMixinairflow.providers.common.sql.operators.sql.SQLColumnCheckOperator_BigQueryOperatorsEncryptionConfigurationMixin

子类化 SQLColumnCheckOperator,以便为 OpenLineage 提供可解析的作业 ID。

用法请参阅基类文档字符串。

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南:使用预定义测试检查列

参数:
  • table (str) – 表名

  • column_mapping (dict) – 一个字典,关联列与其检查规则

  • partition_clause (str | None) – 一个字符串 SQL 语句,添加到 WHERE 子句以分区数据

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • encryption_configuration (dict | None) –

    (可选)自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。

  • location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

  • labels (dict | None) – 包含表标签的字典,传递给 BigQuery

template_fields: collections.abc.Sequence[str][源]
conn_id_field = 'gcp_conn_id'[源]
table[源]
column_mapping[源]
partition_clause = None[源]
database = None[源]
accept_none = True[源]
gcp_conn_id = 'google_cloud_default'[源]
encryption_configuration = None[源]
use_legacy_sql = True[源]
location = None[源]
impersonation_chain = None[源]
labels = None[源]
execute(context=None)[源]

对给定的列执行检查。

class airflow.providers.google.cloud.operators.bigquery.BigQueryTableCheckOperator(*, table, checks, partition_clause=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, **kwargs)[源]

基类:_BigQueryDbHookMixinairflow.providers.common.sql.operators.sql.SQLTableCheckOperator_BigQueryOperatorsEncryptionConfigurationMixin

子类化 SQLTableCheckOperator,以便为 OpenLineage 提供可解析的作业 ID。

用法请参阅基类。

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南:检查表级别的数据质量

参数:
  • table (str) – 表名

  • checks (dict) – 一个字典,包含检查名称和布尔型 SQL 语句

  • partition_clause (str | None) – 一个字符串 SQL 语句,添加到 WHERE 子句以分区数据

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。

  • location (str | None) – 作业的地理位置。详情请参见: https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

  • labels (dict | None) – 包含表标签的字典,传递给 BigQuery

  • encryption_configuration (dict | None) –

    (可选)自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

template_fields: collections.abc.Sequence[str][源]
conn_id_field = 'gcp_conn_id'[源]
gcp_conn_id = 'google_cloud_default'[源]
use_legacy_sql = True[源]
location = None[源]
impersonation_chain = None[源]
labels = None[源]
encryption_configuration = None[源]
execute(context=None)[源]

对表执行给定的检查。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator(*, dataset_id=None, table_id=None, table_project_id=None, job_id=None, job_project_id=None, project_id=PROVIDE_PROJECT_ID, max_results=100, selected_fields=None, gcp_conn_id='google_cloud_default', location=None, encryption_configuration=None, impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, as_dict=False, use_legacy_sql=True, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator, _BigQueryOperatorsEncryptionConfigurationMixin

获取数据并返回,可以来自 BigQuery 表,也可以是查询作业的结果。

可以按特定列缩小数据范围,或整体检索数据。根据“as_dict”的值,数据以以下两种格式之一返回:1. False(默认)- 一个 Python 列表的列表,嵌套列表的数量等于获取的行数。每个嵌套列表代表一行,其中的元素对应于该行的列值。

示例结果: [['Tony', 10], ['Mike', 20]

2. True - 一个 Python 字典列表,其中每个字典代表一行。在每个字典中,键是列名,值是这些列对应的数值。

示例结果: [{'name': 'Tony', 'age': 10}, {'name': 'Mike', 'age': 20}]

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:从表中获取数据

注意

如果您在 selected_fields 中传递的字段顺序与 BQ 表格/作业中现有列的顺序不同,数据仍将按照 BQ 表格的顺序排列。例如,如果 BQ 表格有三列,顺序为 [A,B,C],而您在 selected_fields 中传递 'B,A',数据仍然会是 'A,B' 的形式。

注意

在非可延迟模式下使用作业 ID 时,作业应处于 DONE(完成)状态。

示例 - 使用表格从 BigQuery 检索数据:

get_data = BigQueryGetDataOperator(
    task_id="get_data_from_bq",
    dataset_id="test_dataset",
    table_id="Transaction_partitions",
    table_project_id="internal-gcp-project",
    max_results=100,
    selected_fields="DATE",
    gcp_conn_id="airflow-conn-id",
)

示例 - 使用作业 ID 从 BigQuery 检索数据:

get_data = BigQueryGetDataOperator(
    job_id="airflow_8999918812727394_86a1cecc69c5e3028d28247affd7563",
    job_project_id="internal-gcp-project",
    max_results=100,
    selected_fields="DATE",
    gcp_conn_id="airflow-conn-id",
)
参数:
  • dataset_id (str | None) – 请求的表格的数据集 ID。(模板化)

  • table_id (str | None) – 请求的表格的表格 ID。与 job_id 互斥。(模板化)

  • table_project_id (str | None) – (可选)请求的表格的项目 ID。如果为 None,将从 hook 的项目 ID 派生。(模板化)

  • job_id (str | None) – 从中检索查询结果的作业 ID。与 table_id 互斥。(模板化)

  • job_project_id (str | None) – (可选)作业运行所在的 Google Cloud 项目。如果为 None,将从 hook 的项目 ID 派生。(模板化)

  • project_id (str) – (已弃用)(可选)将从中返回数据的项目名称。如果为 None,将从 hook 的项目 ID 派生。(模板化)

  • max_results (int) – 从表中获取的最大记录数(行数)。(模板化)

  • selected_fields (str | None) – 要返回的字段列表(逗号分隔)。如果未指定,则返回所有字段。

  • encryption_configuration (dict | None) –

    (可选)自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

  • deferrable (bool) – 在 deferrable 模式下运行 Operator

  • poll_interval (float) – (仅在 deferrable 模式下) 检查作业状态的轮询周期,单位为秒。默认为 4 秒。

  • as_dict (bool) – 如果为 True,则结果返回为字典列表;否则返回为列表的列表(默认:False)。

  • use_legacy_sql (bool) – 是否使用传统 SQL (true) 或标准 SQL (false)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_project_id', 'job_id', 'job_project_id', 'project_id',...[source]
ui_color = '#A1BBFF'[source]
table_project_id = None[source]
dataset_id = None[source]
table_id = None[source]
job_project_id = None[source]
job_id = None[source]
max_results = 100[source]
selected_fields = None[source]
gcp_conn_id = 'google_cloud_default'[source]
location = None[source]
impersonation_chain = None[source]
encryption_configuration = None[source]
project_id = None[source]
deferrable = True[source]
poll_interval = 4.0[source]
as_dict = False[source]
use_legacy_sql = True[source]
generate_query(hook)[source]

为给定的数据集和表 ID 生成 SELECT 查询。

execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

在触发器触发时作为回调函数执行。

此方法立即返回。它依赖于触发器抛出异常,否则假定执行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateTableOperator(*, dataset_id, table_id, table_resource, project_id=PROVIDE_PROJECT_ID, location=None, gcs_schema_object=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', retry=DEFAULT, timeout=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的 BigQuery 数据集中创建一个新表,可选择带上模式。

用于 BigQuery 表的 schema 可以通过两种方式指定。您可以直接传入 schema 字段,或者指向一个 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是一个包含 schema 字段的 JSON 文件。您也可以在没有 schema 的情况下创建表。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:创建表

参数:
  • project_id (str) – 可选。要在其中创建表的项目。

  • dataset_id (str) – 必需。要在其中创建表的数据集。

  • table_id (str) – 必需。要创建的表的名称。

  • table_resource (dict[str, Any] | google.cloud.bigquery.table.Table | google.cloud.bigquery.table.TableReference | google.cloud.bigquery.table.TableListItem) – 必需。表资源,如文档所述:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果 table 是引用,则将创建一个具有指定 ID 的空表。表所属的数据集必须已经存在。

  • if_exists (str) – 可选。如果表存在,Airflow 应该做什么。如果设置为 log,则 TI 将成功,并记录一条错误消息。设置为 ignore 以忽略错误,设置为 fail 以使 TI 失败,设置为 skip 以跳过。

  • gcs_schema_object (str | None) – 可选。包含 schema 的 JSON 文件的完整路径。例如:gs://test-bucket/dir1/dir2/employee_schema.json

  • gcp_conn_id (str) – 可选。用于连接到 Google Cloud 并与 Bigquery 服务交互的连接 ID。

  • google_cloud_storage_conn_id (str) – 可选。用于连接到 Google Cloud 并与 Google Cloud Storage 服务交互的连接 ID。

  • location (str | None) – 可选。用于操作的位置。

  • retry (google.api_core.retry.Retry | google.api_core.gapic_v1.method._MethodDefault) – 可选。用于重试请求的重试对象。如果指定 None,则不会重试请求。

  • timeout (float | None) – 可选。等待请求完成的时间(秒)。请注意,如果指定了 retry,则超时应用于每个单独的尝试。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选。使用短期凭据模拟的服务账号,或获取列表中最后一个账号的 access_token 所需的账号链,该账号将在请求中被模拟。如果设置为字符串,该账号必须授予原始账号 Service Account Token Creator IAM 角色。如果设置为序列,列表中的身份必须授予直接前一个身份 Service Account Token Creator IAM 角色,列表中第一个账号将此角色授予原始账号。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
project_id = None[source]
location = None[source]
dataset_id[source]
table_id[source]
table_resource[source]
if_exists[source]
gcs_schema_object = None[source]
gcp_conn_id = 'google_cloud_default'[source]
google_cloud_storage_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
retry[source]
timeout = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(_)[source]

实现 _on_complete,因为我们将使用 create 方法返回的表资源。

airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator(*, dataset_id, table_id, table_resource=None, project_id=PROVIDE_PROJECT_ID, schema_fields=None, gcs_schema_object=None, time_partitioning=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', labels=None, view=None, materialized_view=None, encryption_configuration=None, location=None, cluster_fields=None, impersonation_chain=None, if_exists='log', bigquery_conn_id=None, exists_ok=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的 BigQuery 数据集中创建一个新表,可选择带上模式。

用于 BigQuery 表的 schema 可以通过两种方式指定。您可以直接传入 schema 字段,或者指向一个 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是一个包含 schema 字段的 JSON 文件。您也可以在没有 schema 的情况下创建表。

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南:Create native table

参数:
  • project_id (str) – 要创建表的项目。(templated)

  • dataset_id (str) – 要在其中创建表的数据集。(templated)

  • table_id (str) – 要创建的表的名称。(templated)

  • table_resource (dict[str, Any] | None) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,则所有其他参数将被忽略。(templated)

  • schema_fields (list | None) –

    如果设置,则为此处定义的 schema 字段列表:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    示例:

    schema_fields = [
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ]
    

  • gcs_schema_object (str | None) – 包含 schema 的 JSON 文件的完整路径 (templated)。例如:gs://test-bucket/dir1/dir2/employee_schema.json

  • time_partitioning (dict | None) –

    根据 API 规范配置可选的时间分区字段,即按字段、类型和过期时间进行分区。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Bigquery 服务交互的连接 ID。

  • google_cloud_storage_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Google Cloud Storage 服务交互的连接 ID。

  • labels (dict | None) – 包含表标签的字典,传递给 BigQuery

示例 (schema JSON 在 GCS 中):

CreateTable = BigQueryCreateEmptyTableOperator(
    task_id="BigQueryCreateEmptyTableOperator_task",
    dataset_id="ODS",
    table_id="Employees",
    project_id="internal-gcp-project",
    gcs_schema_object="gs://schema-bucket/employee_schema.json",
    gcp_conn_id="airflow-conn-id",
    google_cloud_storage_conn_id="airflow-conn-id",
)

对应的 Schema 文件 (employee_schema.json)

[
    {"mode": "NULLABLE", "name": "emp_name", "type": "STRING"},
    {"mode": "REQUIRED", "name": "salary", "type": "INTEGER"},
]

示例 (schema 在 DAG 中):

CreateTable = BigQueryCreateEmptyTableOperator(
    task_id="BigQueryCreateEmptyTableOperator_task",
    dataset_id="ODS",
    table_id="Employees",
    project_id="internal-gcp-project",
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
    gcp_conn_id="airflow-conn-id-account",
    google_cloud_storage_conn_id="airflow-conn-id",
)
参数:
  • view (dict | None) –

    (可选) 包含 view 定义的字典。如果设置,它将创建一个 view 而不是表。

  • materialized_view (dict | None) – (可选) 具化视图 (materialized view) 定义。

  • encryption_configuration (dict | None) –

    (可选)自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • location (str | None) – 用于操作的位置。

  • cluster_fields (list[str] | None) –

    (可选) 用于聚簇 (clustering) 的字段。BigQuery 支持对分区表和非分区表进行聚簇。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

  • if_exists (str) – 如果表已存在,Airflow 应该怎么做。如果设置为 log,则 TI 将成功,并记录错误消息。设置为 ignore 则忽略错误,设置为 fail 则使 TI 失败,设置为 skip 则跳过。

  • exists_ok (bool | None) – 已弃用 - 请改用 if_exists=”ignore”

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'labels',...[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
project_id = None[source]
dataset_id[source]
table_id[source]
schema_fields = None[source]
gcs_schema_object = None[source]
gcp_conn_id = 'google_cloud_default'[source]
google_cloud_storage_conn_id = 'google_cloud_default'[source]
time_partitioning[source]
labels = None[source]
view = None[source]
materialized_view = None[source]
encryption_configuration : dict | None = None[source]
location = None[source]
cluster_fields = None[source]
table_resource = None[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(_)[source]

实现 _on_complete,因为我们将使用 create 方法返回的表资源。

airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator(*, bucket=None, source_objects=None, destination_project_dataset_table=None, table_resource=None, schema_fields=None, schema_object=None, gcs_schema_bucket=None, source_format=None, autodetect=False, compression=None, skip_leading_rows=None, field_delimiter=None, max_bad_records=0, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', src_fmt_configs=None, labels=None, encryption_configuration=None, location=None, impersonation_chain=None, bigquery_conn_id=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

使用来自 Google Cloud Storage 的数据创建一个新的外部表。

要用于 BigQuery 表的 schema 可以通过两种方式指定。您可以直接传入 schema 字段,或者将 operator 指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是包含 schema 字段的 JSON 文件。

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南:Create external table

参数:
  • bucket (str | None) – 外部表指向的存储桶 (bucket)。(templated)

  • source_objects (list[str] | None) – 外部表指向的 Google Cloud Storage URI 列表。如果 source_format 是 ‘DATASTORE_BACKUP’,则列表必须只包含一个 URI。

  • destination_project_dataset_table (str | None) – 要加载数据到的以点分隔的 (<project>.)<dataset>.<table> BigQuery 表 (templated)。如果未包含 <project>,则项目将是连接 json 中定义的项目。

  • schema_fields (list | None) –

    如果设置,则为此处定义的 schema 字段列表:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    示例:

    schema_fields = [
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ]
    

    当 source_format 是 ‘DATASTORE_BACKUP’ 时不应设置此项。

  • table_resource (dict[str, Any] | None) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供,则所有其他参数将被忽略。来自对象的外部 schema 将被解析。

  • schema_object (str | None) – 如果设置,则为指向包含表 schema 的 .json 文件的 GCS 对象路径。(templated)

  • gcs_schema_bucket (str | None) – 存储 schema JSON 的 GCS 存储桶名称 (templated)。默认值是 self.bucket。

  • source_format (str | None) – 数据的文件格式。

  • autodetect (bool) – 尝试自动检测 schema 和格式选项。明确指定 schema_fields 和 schema_object 选项时,将优先使用这些选项。https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources

  • compression (str | None) – (可选) 数据源的压缩类型。可能的值包括 GZIP 和 NONE。默认值为 NONE。Google Cloud Bigtable、Google Cloud Datastore 备份和 Avro 格式会忽略此设置。

  • skip_leading_rows (int | None) – 从 CSV 加载时要跳过的行数。

  • field_delimiter (str | None) – CSV 中使用的分隔符。

  • max_bad_records (int) – BigQuery 在运行作业时可以忽略的最大错误记录数。

  • quote_character (str | None) – 用于引用 CSV 文件中数据部分的字符。

  • allow_quoted_newlines (bool) – 是否允许带引号的换行符 (true) 或不允许 (false)。

  • allow_jagged_rows (bool) – 接受缺少尾部可选列的行。缺失的值被视为 null。如果为 false,则缺少尾部列的记录被视为错误记录,如果错误记录过多,则在作业结果中返回无效错误。仅适用于 CSV,对其他格式无效。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Bigquery 服务交互的连接 ID。

  • google_cloud_storage_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Google Cloud Storage 服务交互的连接 ID。

  • src_fmt_configs (dict | None) – 配置特定于源格式的可选字段。

  • labels (dict | None) – 包含表标签的字典,传递给 BigQuery

  • encryption_configuration (dict | None) –

    (可选)自定义加密配置(例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'gcs_schema_bucket',...[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
table_resource = None[source]
bucket = ''[source]
source_objects = [][source]
schema_object = None[source]
gcs_schema_bucket = ''[source]
destination_project_dataset_table = ''[source]
max_bad_records = 0[source]
quote_character = None[source]
allow_quoted_newlines = False[source]
allow_jagged_rows = False[source]
gcp_conn_id = 'google_cloud_default'[source]
google_cloud_storage_conn_id = 'google_cloud_default'[source]
autodetect = False[source]
src_fmt_configs[source]
labels = None[source]
encryption_configuration = None[source]
location = None[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(_)[source]

实现 _on_complete,因为我们将使用 create 方法返回的表资源。

class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

从 BigQuery 中的项目删除现有数据集。

https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete

另请参阅

有关如何使用此算子的更多信息,请参阅指南:删除数据集

参数:
  • project_id (字符串) – 数据集的项目ID。

  • dataset_id (字符串) – 要删除的数据集。

  • delete_contents (布尔值) – (可选)即使数据集不为空,是否强制删除。如果设置为 True,将删除数据集中的所有表(如果有)。如果设置为 False 且数据集不为空,将抛出 HttpError 400:“{dataset_id} is still in use”。默认值为 False。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

示例:

delete_temp_data = BigQueryDeleteDatasetOperator(
    dataset_id="temp-dataset",
    project_id="temp-project",
    delete_contents=True,  # Force the deletion of the dataset as well as its tables (if any).
    gcp_conn_id="_my_gcp_conn_",
    task_id="Deletetemp",
    dag=dag,
)
template_fields: collections.abc.序列[字符串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color = '#5F86FF'[source]
dataset_id[source]
project_id = None[source]
delete_contents = False[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator(*, dataset_id=None, project_id=PROVIDE_PROJECT_ID, dataset_reference=None, location=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', exists_ok=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 BigQuery 中的项目创建一个新数据集。

https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

另请参阅

有关如何使用此算子的更多信息,请参阅指南:创建数据集

参数:
  • project_id (字符串) – 我们要创建数据集的项目名称。

  • dataset_id (字符串 | ) – 数据集ID。如果 dataset_reference 中包含 datasetId,则无需提供。

  • location (字符串 | ) – 数据集应驻留的地理位置。

  • dataset_reference (字典 | ) – 可在请求正文中提供的数据集引用。更多信息:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

  • if_exists (字符串) –

    如果数据集存在,Airflow 应执行什么操作。如果设置为 log,则 TI 将通过并记录一条错误消息。设置为 ignore 以忽略错误,设置为 fail 以使 TI 失败,设置为 skip 以跳过。示例

    create_new_dataset = BigQueryCreateEmptyDatasetOperator(
        dataset_id='new-dataset',
        project_id='my-project',
        dataset_reference={"friendlyName": "New Dataset"}
        gcp_conn_id='_my_gcp_conn_',
        task_id='newDatasetCreator',
        dag=dag)
    

  • exists_ok (bool | None) – 已弃用 - 请改用 if_exists=”ignore”

template_fields: collections.abc.序列[字符串] = ('dataset_id', 'project_id', 'dataset_reference', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#5F86FF'[source]
dataset_id = None[source]
project_id = None[source]
location = None[source]
gcp_conn_id = 'google_cloud_default'[source]
dataset_reference[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

根据 ID 获取指定的数据集。

另请参阅

有关如何使用此算子的更多信息,请参阅指南:获取数据集详情

参数:
  • dataset_id (字符串) – 数据集ID。如果 dataset_reference 中包含 datasetId,则无需提供。

  • project_id (字符串) – 我们要创建数据集的项目名称。如果 dataset_reference 中包含 projectId,则无需提供。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

template_fields: collections.abc.序列[字符串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color = '#5F86FF'[source]
dataset_id[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

检索指定数据集中的表列表。

另请参阅

有关如何使用此算子的更多信息,请参阅指南:列出数据集中的表

参数:
  • dataset_id (字符串) – 请求的数据集的数据集ID。

  • project_id (字符串) – (可选)请求的数据集所属项目。如果为 None,将使用 self.project_id。

  • max_results (整数 | ) – (可选)返回的最大表数。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

template_fields: collections.abc.序列[字符串] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color = '#5F86FF'[source]
dataset_id[source]
project_id = None[source]
max_results = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableOperator(*, table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 BigQuery 中的项目更新一个表。

使用 fields 指定要更新表的哪些字段。如果某个字段列在 fields 中,并且在表中为 None,则该字段将被删除。

另请参阅

有关如何使用此算子的更多信息,请参阅指南:更新表

参数:
  • dataset_id (字符串 | ) – 数据集ID。如果 table_reference 中包含 datasetId,则无需提供。

  • table_id (字符串 | ) – 表ID。如果 table_reference 中包含 tableId,则无需提供。

  • table_resource (dict[str, Any]) – 将随请求体提供的数据集资源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource

  • fields (list[str] | None) – 需要更改的 table 的字段,以表属性命名 (例如 “friendly_name”)。

  • project_id (str) – 我们要创建表的项目名称。如果在 table_reference 中包含 projectId,则无需提供。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
dataset_id = None[source]
table_id = None[source]
project_id = None[source]
fields = None[source]
gcp_conn_id = 'google_cloud_default'[source]
table_resource[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(_)[source]

实现 _on_complete,因为我们将使用 update 方法返回的表资源。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateDatasetOperator(*, dataset_resource, fields=None, dataset_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 BigQuery 中的项目更新一个数据集。

使用 fields 指定要更新的数据集字段。如果字段列在 fields 中且在数据集中为 None,则会将其删除。如果没有提供 fields,则将使用提供的 dataset_resource 的所有字段。

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南:更新数据集

参数:
  • dataset_id (字符串 | ) – 数据集ID。如果 dataset_reference 中包含 datasetId,则无需提供。

  • dataset_resource (dict[str, Any]) – 将随请求体提供的数据集资源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • fields (list[str] | None) – 要更改的数据集属性 (例如 “friendly_name”)。

  • project_id (字符串) – 我们要创建数据集的项目名称。如果 dataset_reference 中包含 projectId,则无需提供。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#5F86FF'[source]
dataset_id = None[source]
project_id = None[source]
fields = None[source]
gcp_conn_id = 'google_cloud_default'[source]
dataset_resource[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator(*, deletion_dataset_table, gcp_conn_id='google_cloud_default', ignore_if_missing=False, location=None, impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

删除一个 BigQuery 表。

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南:删除表

参数:
  • deletion_dataset_table (str) – 指示要删除的表,格式为点分隔的 (<project>.|<project>:)<dataset>.<table>。(模板化)

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • ignore_if_missing (bool) – 如果为 True,则即使请求的表不存在也返回成功。

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

template_fields: collections.abc.Sequence[str] = ('deletion_dataset_table', 'impersonation_chain')[source]
ui_color = '#81A0FF'[source]
deletion_dataset_table[source]
gcp_conn_id = 'google_cloud_default'[source]
ignore_if_missing = False[source]
location = None[source]
impersonation_chain = None[source]
hook: airflow.providers.google.cloud.hooks.bigquery.BigQueryHook | None = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(_)[source]

实现 _on_complete,因为我们需要从 hook 获取默认 project_id。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpsertTableOperator(*, dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', location=None, impersonation_chain=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

对 BigQuery 表进行 upsert 操作。

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南:Upsert 表

参数:
  • dataset_id (str) – 指示要更新的数据集,格式为点分隔的 (<project>.|<project>:)<dataset>。(模板化)

  • table_resource (dict) – 一个表资源。参见 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource

  • project_id (str) – 我们要更新数据集的项目名称。如果在 dataset_reference 中包含 projectId,则无需提供。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_resource', 'impersonation_chain', 'project_id')[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
dataset_id[source]
table_resource[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
location = None[source]
impersonation_chain = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(_)[source]

实现 _on_complete,因为我们将使用 upsert 方法返回的表资源。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableSchemaOperator(*, schema_fields_updates, dataset_id, table_id, include_policy_tags=False, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, location=None, **kwargs)[source]

Bases: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

更新 BigQuery 表模式。

根据提供的 schema_fields_updates 参数的内容更新表 schema 中的字段。提供的 schema 不需要完整,如果字段已存在于 schema 中,您只需提供要更新的项目的键和值,只需确保设置了“name”键。

另请参阅

有关如何使用此 operator 的更多信息,请参阅指南:更新表 schema

参数:
  • schema_fields_updates (list[dict[str, Any]]) –

    部分模式资源。请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema

    schema_fields_updates = [
        {"name": "emp_name", "description": "Some New Description"},
        {
            "name": "salary",
            "policyTags": {"names": ["some_new_policy_tag"]},
        },
        {
            "name": "departments",
            "fields": [
                {"name": "name", "description": "Some New Description"},
                {"name": "type", "description": "Some New Description"},
            ],
        },
    ]
    

  • include_policy_tags (bool) – (可选) 如果设置为 True,策略标签将包含在更新请求中,这需要特殊权限,即使标签未更改(默认为 False)。请参阅 https://cloud.google.com/bigquery/docs/column-level-security#roles

  • dataset_id (str) – 指示要更新的数据集,格式为点分隔的 (<project>.|<project>:)<dataset>。(模板化)

  • table_id (str) – 所请求表的表 ID。(支持模板)

  • project_id (str) – 我们要更新数据集的项目名称。如果在 dataset_reference 中包含 projectId,则无需提供。

  • gcp_conn_id (str) – (可选)用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

template_fields: collections.abc.Sequence[str] = ('schema_fields_updates', 'dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color = '#81A0FF'[source]
schema_fields_updates[source]
include_policy_tags = False[source]
table_id[source]
dataset_id[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
impersonation_chain = None[source]
location = None[source]
execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(_)[source]

实现 _on_complete,因为我们将使用 update 方法返回的表资源。

class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator(configuration, project_id=PROVIDE_PROJECT_ID, location=None, job_id=None, force_rerun=True, reattach_states=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, cancel_on_kill=True, result_retry=DEFAULT_RETRY, result_timeout=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator, airflow.providers.google.cloud.openlineage.mixins._BigQueryInsertJobOperatorOpenLineageMixin

执行一个 BigQuery 作业。

等待作业完成并返回作业 ID。此 Operator 按以下方式工作:

  • 如果 force_rerun 为 True,则使用作业配置或 uuid 计算作业的唯一哈希值。

  • 创建格式如下的 job_id

    [提供的_job_id | airflow_{dag_id}_{task_id}_{exec_date}]_{唯一性后缀}

  • 使用 job_id 提交 BigQuery 作业。

  • 如果具有给定 ID 的作业已存在,则尝试重新连接到该作业,前提是该作业尚未完成且其

    状态在 reattach_states 中。如果作业已完成,则 Operator 将引发 AirflowException

使用 force_rerun 将每次提交一个新作业,而不重新连接到已存在的作业。

作业定义请参阅此处:

另请参阅

有关如何使用此 Operator 的更多信息,请参阅指南:执行 BigQuery 作业

参数:
  • configuration (dict[str, Any]) – 配置参数直接映射到作业对象中的 BigQuery 配置字段。有关更多详细信息,请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration

  • job_id (str | None) – 作业的 ID。除非 force_rerun 为 True,否则将以作业配置的哈希值作为后缀。ID 只能包含字母(a-z、A-Z)、数字(0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。如果未提供,将生成 uuid。

  • force_rerun (bool) – 如果为 True,则 Operator 将使用 uuid 的哈希值作为作业 ID 后缀。

  • reattach_states (set[str] | None) – 在这些状态下,我们应该重新连接到 BigQuery 作业。应该是非最终状态。

  • project_id (str) – 作业运行所在的 Google Cloud 项目

  • location (str | None) – 作业运行的位置

  • gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence`[`str`]` | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者一个账户链,需要通过该链获取列表中最后一个账户的 `access_token`,该账户将在请求中被模拟。如果设置为字符串,该账户必须授予起始账户 Service Account Token Creator IAM 角色。如果设置为序列,则列表中的身份必须将其 Service Account Token Creator IAM 角色授予直接前一个身份,列表中的第一个账户将此角色授予起始账户(模板化)。

  • cancel_on_kill (bool) – 在调用 on_kill 时,指示是否取消 hook 作业的标志。

  • result_retry (google.api_core.retry.Retry) – 如何重试检索行的 result 调用。

  • result_timeout (float | None) – 在使用 result_retry 之前等待 result 方法的秒数。

  • deferrable (bool) – 在 deferrable 模式下运行 Operator

  • poll_interval (float) – (仅在 deferrable 模式下) 检查作业状态的轮询周期,单位为秒。默认为 4 秒。

template_fields: collections.abc.Sequence[str] = ('configuration', 'job_id', 'impersonation_chain', 'project_id')[source]
template_ext: collections.abc.Sequence[str] = ('.json', '.sql')[source]
template_fields_renderers[source]
ui_color = '#A1BBFF'[source]
configuration[source]
location = None[source]
job_id = None[source]
project_id = None[source]
gcp_conn_id = 'google_cloud_default'[source]
force_rerun = True[source]
reattach_states: set[str][source]
impersonation_chain = None[source]
cancel_on_kill = True[source]
result_retry[source]
result_timeout = None[source]
hook: airflow.providers.google.cloud.hooks.bigquery.BigQueryHook | None = None[source]
deferrable = True[source]
poll_interval = 4.0[source]
property sql: str | None[source]
prepare_template()[source]

在模板字段被其内容替换后执行。

如果您的对象需要在模板渲染之前更改文件内容,则应覆盖此方法。

execute(context)[source]

创建操作符时派生。

Context 与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

在触发器触发时作为回调函数执行。

此方法立即返回。它依赖于触发器抛出异常,否则假定执行成功。

on_kill()[source]

覆盖此方法可在任务实例被 kill 时清理子进程。

在 Operator 中使用 threading、subprocess 或 multiprocessing 模块时,需要进行清理,否则会留下僵尸进程。

此条目是否有帮助?