airflow.providers.google.cloud.operators.bigquery

此模块包含 Google BigQuery 操作符。

模块内容

BigQueryUIColors

BigQuery 操作符的十六进制颜色。

IfExistAction

如果资源存在时要执行的操作。

BigQueryCheckOperator

对 BigQuery 执行检查。

BigQueryValueCheckOperator

使用 sql 代码执行简单的值检查。

BigQueryIntervalCheckOperator

检查以 SQL 表达式形式给出的指标值是否在旧值的容差范围内。

BigQueryColumnCheckOperator

子类化 SQLColumnCheckOperator,以便为 OpenLineage 提供要解析的作业 ID。

BigQueryTableCheckOperator

子类化 SQLTableCheckOperator,以便为 OpenLineage 提供要解析的作业 ID。

BigQueryGetDataOperator

获取数据并返回,可以从 BigQuery 表中获取,也可以从查询作业的结果中获取。

BigQueryCreateEmptyTableOperator

在指定的 BigQuery 数据集中创建一个新表,可以选择带有 schema。

BigQueryCreateExternalTableOperator

使用 Google Cloud Storage 中的数据创建新的外部表。

BigQueryDeleteDatasetOperator

从 BigQuery 中删除您项目中的现有数据集。

BigQueryCreateEmptyDatasetOperator

在 BigQuery 中为您的项目创建一个新的数据集。

BigQueryGetDatasetOperator

获取由 ID 指定的数据集。

BigQueryGetDatasetTablesOperator

检索指定数据集中表的列表。

BigQueryUpdateTableOperator

更新 BigQuery 中您的项目表。

BigQueryUpdateDatasetOperator

更新 BigQuery 中您的项目数据集。

BigQueryDeleteTableOperator

删除 BigQuery 表。

BigQueryUpsertTableOperator

插入或更新到 BigQuery 表。

BigQueryUpdateTableSchemaOperator

更新 BigQuery 表 Schema。

BigQueryInsertJobOperator

执行 BigQuery 作业。

属性

BIGQUERY_JOB_DETAILS_LINK_FMT

LABEL_REGEX

airflow.providers.google.cloud.operators.bigquery.LABEL_REGEX[source]
class airflow.providers.google.cloud.operators.bigquery.BigQueryUIColors[source]

基类: enum.Enum

BigQuery 操作符的十六进制颜色。

CHECK = '#C0D7FF'[source]
QUERY = '#A1BBFF'[source]
TABLE = '#81A0FF'[source]
DATASET = '#5F86FF'[source]
class airflow.providers.google.cloud.operators.bigquery.IfExistAction[source]

基类: enum.Enum

如果资源存在时要执行的操作。

IGNORE = 'ignore'[source]
LOG = 'log'[source]
FAIL = 'fail'[source]
SKIP = 'skip'[source]
class airflow.providers.google.cloud.operators.bigquery.BigQueryCheckOperator(*, sql, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, query_params=None, **kwargs)[source]

基类: _BigQueryDbHookMixin, airflow.providers.common.sql.operators.sql.SQLCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin

对 BigQuery 执行检查。

此操作符期望一个返回单行的 SQL 查询。该行上的每个值都使用 Python bool 转换进行评估。如果任何值为假,则检查会出错。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:检查查询结果是否包含数据

请注意,Python 布尔类型转换将以下内容评估为False

  • False

  • 0

  • 空字符串 ("")

  • 空列表 ([])

  • 空字典或集合 ({})

给定一个类似 SELECT COUNT(*) FROM foo 的查询,只有当计数等于零时才会失败。您可以编写更复杂的查询,例如,检查表是否具有与上游源表相同的行数,或者今天的分区计数是否大于昨天的分区计数,或者一组指标是否小于 7 天平均值的三个标准差。

此操作符可用作管道中的数据质量检查。根据您在 DAG 中的放置位置,您可以选择停止关键路径,防止发布可疑数据,或者在侧面接收电子邮件警报,而不会停止 DAG 的进程。

参数
  • sql (str) – 要执行的 SQL。

  • gcp_conn_id (str) – Google Cloud 的连接 ID。

  • use_legacy_sql (bool) – 是否使用旧版 SQL (true) 或标准 SQL (false)。

  • location (str | None) – 作业的地理位置。详情请见:https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的访问令牌的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧随其后的身份,列表中第一个帐户将此角色授予发起帐户。(已模板化)

  • labels (dict | None) – 一个包含表标签的字典,传递给 BigQuery。

  • encryption_configuration (dict | None) –

    (可选) 自定义加密配置 (例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • deferrable (bool) – 在可延迟模式下运行操作符。

  • poll_interval (float) – (仅限可延迟模式) 检查作业状态的轮询间隔(以秒为单位)。

  • query_params (list | None) – 一个包含查询参数类型和值的字典列表,传递给 BigQuery。字典的结构应类似于 Google BigQuery Jobs API 中的 “queryParameters”:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs。例如,[{‘name’: ‘corpus’, ‘parameterType’: {‘type’: ‘STRING’}, ‘parameterValue’: {‘value’: ‘romeoandjuliet’}}]。(已模板化)

template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'impersonation_chain', 'labels', 'query_params')[source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
ui_color[source]
conn_id_field = 'gcp_conn_id'[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

充当触发器触发时的回调。

这将立即返回。它依赖于触发器来抛出异常,否则它会假定执行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryValueCheckOperator(*, sql, pass_value, tolerance=None, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[source]

基类: _BigQueryDbHookMixin, airflow.providers.common.sql.operators.sql.SQLValueCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin

使用 sql 代码执行简单的值检查。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南:将查询结果与传递值进行比较

参数
  • sql (str) – 要执行的 SQL。

  • use_legacy_sql (bool) – 是否使用旧版 SQL (true) 或标准 SQL (false)。

  • encryption_configuration (dict | None) –

    (可选) 自定义加密配置 (例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 作业的地理位置。详情请见:https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的访问令牌的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的身份必须将“服务帐户令牌创建者”IAM 角色授予紧随其后的身份,列表中第一个帐户将此角色授予发起帐户。(已模板化)

  • labels (dict | None) – 一个包含表标签的字典,传递给 BigQuery。

  • deferrable (bool) – 在可延迟模式下运行操作符。

  • poll_interval (float) – (仅限可延迟模式) 检查作业状态的轮询间隔(以秒为单位)。

template_fields: collections.abc.Sequence[str] = ('sql', 'gcp_conn_id', 'pass_value', 'impersonation_chain', 'labels')[source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[源代码]
ui_color[源代码]
conn_id_field = 'gcp_conn_id'[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[源代码]

充当触发器触发时的回调。

这将立即返回。它依赖于触发器来抛出异常,否则它会假定执行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryIntervalCheckOperator(*, table, metrics_thresholds, date_filter_column='ds', days_back=-7, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, encryption_configuration=None, impersonation_chain=None, labels=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, project_id=PROVIDE_PROJECT_ID, **kwargs)[源代码]

基类: _BigQueryDbHookMixin, airflow.providers.common.sql.operators.sql.SQLIntervalCheckOperator, _BigQueryOperatorsEncryptionConfigurationMixin

检查以 SQL 表达式形式给出的指标值是否在旧值的容差范围内。

此方法构造如下查询

SELECT {metrics_threshold_dict_key} FROM {table}
WHERE {date_filter_column}=<date>

另请参阅

有关如何使用此运算符的更多信息,请查看以下指南: 比较一段时间内的指标

参数
  • table (str) – 表名

  • days_back (SupportsAbs[int]) – ds 和我们要检查的 ds 之间的天数。默认为 7 天

  • metrics_thresholds (dict) – 一个由指标索引的比率字典,例如 'COUNT(*)': 1.5 将要求当前日与前几天之间有 50% 或更小的差异。

  • use_legacy_sql (bool) – 是否使用旧版 SQL (true) 或标准 SQL (false)。

  • encryption_configuration (dict | None) –

    (可选) 自定义加密配置 (例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 作业的地理位置。详情请见:https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

  • labels (dict | None) – 一个包含表标签的字典,传递给 BigQuery

  • deferrable (bool) – 以可延迟模式运行运算符

  • poll_interval (float) – (仅限可延迟模式)检查作业状态的轮询间隔(以秒为单位)。默认为 4 秒。

  • project_id (str) – 一个表示 BigQuery projectId 的字符串

template_fields: collections.abc.Sequence[str] = ('table', 'gcp_conn_id', 'sql1', 'sql2', 'impersonation_chain', 'labels')[源代码]
ui_color[源代码]
conn_id_field = 'gcp_conn_id'[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[源代码]

充当触发器触发时的回调。

这将立即返回。它依赖于触发器来抛出异常,否则它会假定执行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryColumnCheckOperator(*, table, column_mapping, partition_clause=None, database=None, accept_none=True, encryption_configuration=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, **kwargs)[源代码]

基类:_BigQueryDbHookMixinairflow.providers.common.sql.operators.sql.SQLColumnCheckOperator_BigQueryOperatorsEncryptionConfigurationMixin

子类化 SQLColumnCheckOperator,以便为 OpenLineage 提供要解析的作业 ID。

请参阅基类的文档字符串以了解用法。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:使用预定义的测试检查列

参数
  • table (str) – 表名

  • column_mapping (dict) – 一个将列与其检查相关联的字典

  • partition_clause (str | None) – 一个添加到 WHERE 子句以对数据进行分区的 SQL 语句字符串

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • encryption_configuration (dict | None) –

    (可选) 自定义加密配置 (例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • use_legacy_sql (bool) – 是否使用旧版 SQL (true) 或标准 SQL (false)。

  • location (str | None) – 作业的地理位置。详情请见:https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

  • labels (dict | None) – 一个包含表标签的字典,传递给 BigQuery

template_fields: collections.abc.Sequence[str][源代码]
conn_id_field = 'gcp_conn_id'[源代码]
execute(context=None)[源代码]

对给定的列执行检查。

class airflow.providers.google.cloud.operators.bigquery.BigQueryTableCheckOperator(*, table, checks, partition_clause=None, gcp_conn_id='google_cloud_default', use_legacy_sql=True, location=None, impersonation_chain=None, labels=None, encryption_configuration=None, **kwargs)[源代码]

基类:_BigQueryDbHookMixinairflow.providers.common.sql.operators.sql.SQLTableCheckOperator_BigQueryOperatorsEncryptionConfigurationMixin

子类化 SQLTableCheckOperator,以便为 OpenLineage 提供要解析的作业 ID。

请参阅基类以了解用法。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:检查表级别的数据质量

参数
  • table (str) – 表名

  • checks (dict) – 一个包含检查名称和布尔 SQL 语句的字典

  • partition_clause (str | None) – 一个添加到 WHERE 子句以对数据进行分区的 SQL 语句字符串

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • use_legacy_sql (bool) – 是否使用旧版 SQL (true) 或标准 SQL (false)。

  • location (str | None) – 作业的地理位置。详情请见:https://cloud.google.com/bigquery/docs/locations#specifying_your_location

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

  • labels (dict | None) – 一个包含表标签的字典,传递给 BigQuery

  • encryption_configuration (dict | None) –

    (可选) 自定义加密配置 (例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

template_fields: collections.abc.Sequence[str][源代码]
conn_id_field = 'gcp_conn_id'[源代码]
execute(context=None)[源代码]

对表执行给定的检查。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDataOperator(*, dataset_id=None, table_id=None, table_project_id=None, job_id=None, job_project_id=None, project_id=PROVIDE_PROJECT_ID, max_results=100, selected_fields=None, gcp_conn_id='google_cloud_default', location=None, encryption_configuration=None, impersonation_chain=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, as_dict=False, use_legacy_sql=True, **kwargs)[源代码]

基类:airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator_BigQueryOperatorsEncryptionConfigurationMixin

获取数据并返回,可以从 BigQuery 表中获取,也可以从查询作业的结果中获取。

可以按特定列缩小数据范围,或者检索整个数据。它会根据 “as_dict” 的值,以以下两种格式之一返回:1. False(默认)- 一个 Python 列表的列表,其中嵌套列表的数量等于提取的行数。每个嵌套列表表示一行,其中的元素对应于该特定行的列值。

示例结果[['Tony', 10], ['Mike', 20]]

2. True - 一个 Python 字典的列表,其中每个字典表示一行。在每个字典中,键是列名,值是这些列的对应值。

示例结果[{'name': 'Tony', 'age': 10}, {'name': 'Mike', 'age': 20}]

另请参阅

有关如何使用此运算符的更多信息,请查看指南:从表中提取数据

注意

如果您将字段传递给 selected_fields,其顺序与 BQ 表/作业中现有列的顺序不同,则数据仍将按照 BQ 表的顺序排列。例如,如果 BQ 表有 3 列,分别为 [A,B,C],并且您在 selected_fields 中传递 “B,A”,则数据仍将采用 'A,B' 的形式。

注意

当使用非可延迟模式下的作业 ID 时,该作业应处于 DONE 状态。

示例 - 使用表从 BigQuery 中检索数据:

get_data = BigQueryGetDataOperator(
    task_id="get_data_from_bq",
    dataset_id="test_dataset",
    table_id="Transaction_partitions",
    table_project_id="internal-gcp-project",
    max_results=100,
    selected_fields="DATE",
    gcp_conn_id="airflow-conn-id",
)

示例 - 使用作业 ID 从 BigQuery 中检索数据:

get_data = BigQueryGetDataOperator(
    job_id="airflow_8999918812727394_86a1cecc69c5e3028d28247affd7563",
    job_project_id="internal-gcp-project",
    max_results=100,
    selected_fields="DATE",
    gcp_conn_id="airflow-conn-id",
)
参数
  • dataset_id (str | None) – 所请求表的 dataset ID。(已模板化)

  • table_id (str | None) – 所请求表的 table ID。与 job_id 互斥。(已模板化)

  • table_project_id (str | None) – (可选) 请求表的项目 ID。如果为 None,则将从 hook 的项目 ID 派生。(模板化)

  • job_id (str | None) – 用于检索查询结果的作业 ID。与 table_id 互斥。(模板化)

  • job_project_id (str | None) – (可选) 作业正在运行的 Google Cloud 项目。如果为 None,则将从 hook 的项目 ID 派生。(模板化)

  • project_id (str) – (已弃用) (可选) 将从中返回数据的项目名称。如果为 None,则将从 hook 的项目 ID 派生。(模板化)

  • max_results (int) – 要从表中获取的最大记录(行)数。(模板化)

  • selected_fields (str | None) – 要返回的字段列表(以逗号分隔)。如果未指定,则返回所有字段。

  • encryption_configuration (dict | None) –

    (可选) 自定义加密配置 (例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

  • deferrable (bool) – 以可延迟模式运行运算符

  • poll_interval (float) – (仅限可延迟模式)检查作业状态的轮询间隔(以秒为单位)。默认为 4 秒。

  • as_dict (bool) – 如果为 True,则将结果作为字典列表返回,否则作为列表的列表返回(默认值:False)。

  • use_legacy_sql (bool) – 是否使用旧版 SQL (true) 或标准 SQL (false)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_project_id', 'job_id', 'job_project_id', 'project_id',...[source]
ui_color[source]
generate_query(hook)[source]

如果为给定的数据集和表 ID,则生成 SELECT 查询。

execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[source]

充当触发器触发时的回调。

这将立即返回。它依赖于触发器来抛出异常,否则它会假定执行成功。

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator(*, dataset_id, table_id, table_resource=None, project_id=PROVIDE_PROJECT_ID, schema_fields=None, gcs_schema_object=None, time_partitioning=None, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', labels=None, view=None, materialized_view=None, encryption_configuration=None, location=None, cluster_fields=None, impersonation_chain=None, if_exists='log', bigquery_conn_id=None, exists_ok=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在指定的 BigQuery 数据集中创建一个新表,可以选择带有 schema。

BigQuery 表的架构可以使用两种方式指定。您可以直接传入架构字段,也可以将运算符指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是包含架构字段的 JSON 文件。您也可以创建一个没有架构的表。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:创建原生表

参数
  • project_id (str) – 要在其中创建表的项目。(模板化)

  • dataset_id (str) – 要在其中创建表的数据集。(模板化)

  • table_id (str) – 要创建的表的名称。(模板化)

  • table_resource (dict[str, Any] | None) – 文档中描述的表资源:https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供了此参数,则忽略所有其他参数。(模板化)

  • schema_fields (list | None) –

    如果设置,则架构字段列表定义如下: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    示例:

    schema_fields = [
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ]
    

  • gcs_schema_object (str | None) – 包含架构的 JSON 文件的完整路径(模板化)。例如:gs://test-bucket/dir1/dir2/employee_schema.json

  • time_partitioning (dict | None) –

    根据 API 规范配置可选的时间分区字段,例如按字段、类型和过期时间分区。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Bigquery 服务交互的连接 ID。

  • google_cloud_storage_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Google Cloud Storage 服务交互的连接 ID。

  • labels (dict | None) – 一个包含表标签的字典,传递给 BigQuery

示例 (使用 GCS 中的架构 JSON):

CreateTable = BigQueryCreateEmptyTableOperator(
    task_id="BigQueryCreateEmptyTableOperator_task",
    dataset_id="ODS",
    table_id="Employees",
    project_id="internal-gcp-project",
    gcs_schema_object="gs://schema-bucket/employee_schema.json",
    gcp_conn_id="airflow-conn-id",
    google_cloud_storage_conn_id="airflow-conn-id",
)

相应的架构文件 (employee_schema.json)

[
    {"mode": "NULLABLE", "name": "emp_name", "type": "STRING"},
    {"mode": "REQUIRED", "name": "salary", "type": "INTEGER"},
]

示例 (使用 DAG 中的架构):

CreateTable = BigQueryCreateEmptyTableOperator(
    task_id="BigQueryCreateEmptyTableOperator_task",
    dataset_id="ODS",
    table_id="Employees",
    project_id="internal-gcp-project",
    schema_fields=[
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ],
    gcp_conn_id="airflow-conn-id-account",
    google_cloud_storage_conn_id="airflow-conn-id",
)
参数
  • view (dict | None) –

    (可选) 包含视图定义的字典。如果设置,则将创建一个视图而不是表

  • materialized_view (dict | None) – (可选) 物化视图定义。

  • encryption_configuration (dict | None) –

    (可选) 自定义加密配置 (例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • location (str | None) – 用于操作的位置。

  • cluster_fields (list[str] | None) –

    (可选) 用于聚类的字段。BigQuery 支持对分区表和非分区表进行聚类。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

  • if_exists (str) – 如果表存在,Airflow 应该做什么。如果设置为 log,则 TI 将传递给成功并记录错误消息。设置为 ignore 以忽略错误,设置为 fail 以使 TI 失败,设置为 skip 以跳过它。

  • exists_ok (bool | None) – 已弃用 - 请改用 if_exists="ignore"

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'table_resource', 'project_id', 'gcs_schema_object', 'labels',...[source]
template_fields_renderers[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(task_instance)[source]
class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateExternalTableOperator(*, bucket=None, source_objects=None, destination_project_dataset_table=None, table_resource=None, schema_fields=None, schema_object=None, gcs_schema_bucket=None, source_format=None, autodetect=False, compression=None, skip_leading_rows=None, field_delimiter=None, max_bad_records=0, quote_character=None, allow_quoted_newlines=False, allow_jagged_rows=False, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id='google_cloud_default', src_fmt_configs=None, labels=None, encryption_configuration=None, location=None, impersonation_chain=None, bigquery_conn_id=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

使用 Google Cloud Storage 中的数据创建新的外部表。

BigQuery 表的 schema 可以通过以下两种方式之一指定。您可以直接传入 schema 字段,也可以将运算符指向 Google Cloud Storage 对象名称。Google Cloud Storage 中的对象必须是包含 schema 字段的 JSON 文件。

另请参阅

有关如何使用此运算符的更多信息,请查看指南: 创建外部表

参数
  • bucket (str | None) – 指向外部表的存储桶。(已模板化)

  • source_objects (list[str] | None) – 指向表的 Google Cloud Storage URI 列表。如果 source_format 为 'DATASTORE_BACKUP',则列表必须仅包含一个 URI。

  • destination_project_dataset_table (str | None) – 要将数据加载到的点式 (<project>.)<dataset>.<table> BigQuery 表(已模板化)。如果未包含 <project>,则项目将是连接 json 中定义的项目。

  • schema_fields (list | None) –

    如果设置,则架构字段列表定义如下: https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema

    示例:

    schema_fields = [
        {"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
        {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
    ]
    

    当 source_format 为 'DATASTORE_BACKUP' 时,不应设置此项。

  • table_resource (dict[str, Any] | None) – 文档中描述的表资源: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table 如果提供了,则忽略所有其他参数。将解析来自对象的外部 schema。

  • schema_object (str | None) – 如果设置,则为指向包含表 schema 的 .json 文件的 GCS 对象路径。(已模板化)

  • gcs_schema_bucket (str | None) – 存储 schema JSON 的 GCS 存储桶名称(已模板化)。默认值为 self.bucket。

  • source_format (str | None) – 数据的的文件格式。

  • autodetect (bool) – 尝试自动检测 schema 和格式选项。如果明确指定,则将遵循 schema_fields 和 schema_object 选项。 https://cloud.google.com/bigquery/docs/schema-detect#schema_auto-detection_for_external_data_sources

  • compression (str | None) – (可选) 数据源的压缩类型。可能的值包括 GZIP 和 NONE。默认值为 NONE。此设置对于 Google Cloud Bigtable、Google Cloud Datastore 备份和 Avro 格式将被忽略。

  • skip_leading_rows (int | None) – 从 CSV 加载时要跳过的行数。

  • field_delimiter (str | None) – 用于 CSV 的分隔符。

  • max_bad_records (int) – BigQuery 在运行作业时可以忽略的最大错误记录数。

  • quote_character (str | None) – 用于引用 CSV 文件中数据部分的值。

  • allow_quoted_newlines (bool) – 是否允许带引号的换行符 (true) 或不允许 (false)。

  • allow_jagged_rows (bool) – 是否接受缺少尾部可选列的行。缺失的值被视为 null。如果为 false,则缺少尾部列的记录将被视为错误记录,如果错误记录过多,则作业结果中会返回无效错误。仅适用于 CSV,对于其他格式将被忽略。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 并与 Bigquery 服务交互的连接 ID。

  • google_cloud_storage_conn_id (str) – (可选) 用于连接 Google Cloud 并与 Google Cloud Storage 服务交互的连接 ID。

  • src_fmt_configs (dict | None) – 配置特定于源格式的可选字段

  • labels (dict | None) – 一个包含表标签的字典,传递给 BigQuery

  • encryption_configuration (dict | None) –

    (可选) 自定义加密配置 (例如,Cloud KMS 密钥)。

    encryption_configuration = {
        "kmsKeyName": "projects/PROJECT/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY",
    }
    

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'gcs_schema_bucket',...[source]
template_fields_renderers[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

get_openlineage_facets_on_complete(task_instance)[source]
class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, delete_contents=False, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

从 BigQuery 中删除您项目中的现有数据集。

https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets/delete

另请参阅

有关如何使用此运算符的更多信息,请查看指南: 删除数据集

参数
  • project_id (str) – 数据集的项目 ID。

  • dataset_id (str) – 要删除的数据集。

  • delete_contents (bool) – (可选) 是否强制删除即使数据集不为空。如果设置为 True,将删除数据集中的所有表(如果有)。如果设置为 False 且数据集不为空,则会引发 HttpError 400:“{dataset_id} 仍在被使用”。默认值为 False。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

示例:

delete_temp_data = BigQueryDeleteDatasetOperator(
    dataset_id="temp-dataset",
    project_id="temp-project",
    delete_contents=True,  # Force the deletion of the dataset as well as its tables (if any).
    gcp_conn_id="_my_gcp_conn_",
    task_id="Deletetemp",
    dag=dag,
)
template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator(*, dataset_id=None, project_id=PROVIDE_PROJECT_ID, dataset_reference=None, location=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, if_exists='log', exists_ok=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

在 BigQuery 中为您的项目创建一个新的数据集。

https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

另请参阅

有关如何使用此运算符的更多信息,请查看指南:创建数据集

参数
  • project_id (str) – 我们要在其中创建数据集的项目的名称。

  • dataset_id (str | None) – 数据集的 ID。如果 dataset_reference 中存在 datasetId,则不需要提供。

  • location (str | None) – 数据集应驻留的地理位置。

  • dataset_reference (dict | None) – 可以与请求正文一起提供的数据集引用。更多信息:https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

  • if_exists (str) –

    如果数据集存在,Airflow 应该做什么。如果设置为 log,则 TI 将传递给成功并记录错误消息。设置为 ignore 可忽略错误,设置为 fail 可使 TI 失败,设置为 skip 可跳过它。示例

    create_new_dataset = BigQueryCreateEmptyDatasetOperator(
        dataset_id='new-dataset',
        project_id='my-project',
        dataset_reference={"friendlyName": "New Dataset"}
        gcp_conn_id='_my_gcp_conn_',
        task_id='newDatasetCreator',
        dag=dag)
    

  • exists_ok (bool | None) – 已弃用 - 请改用 if_exists="ignore"

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'dataset_reference', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

获取由 ID 指定的数据集。

另请参阅

有关如何使用此运算符的更多信息,请查看指南: 获取数据集详细信息

参数
  • dataset_id (str) – 数据集的 ID。 如果 dataset_reference 中存在 datasetId,则不需要提供。

  • project_id (str) – 我们要在其中创建数据集的项目的名称。 如果 dataset_reference 中存在 projectId,则不需要提供。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryGetDatasetTablesOperator(*, dataset_id, project_id=PROVIDE_PROJECT_ID, max_results=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

检索指定数据集中表的列表。

另请参阅

有关如何使用此运算符的更多信息,请查看指南: 列出数据集中的表

参数
  • dataset_id (str) – 请求的数据集的 dataset ID。

  • project_id (str) – (可选)请求的数据集的项目。 如果为 None,将使用 self.project_id。

  • max_results (int | None) – (可选)要返回的最大表数。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableOperator(*, table_resource, fields=None, dataset_id=None, table_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

更新 BigQuery 中您的项目表。

使用 fields 指定要更新的表的哪些字段。 如果某个字段在 fields 中列出,并且在表中为 None,则该字段将被删除。

另请参阅

有关如何使用此运算符的更多信息,请查看指南: 更新表

参数
  • dataset_id (str | None) – 数据集的 ID。 如果 table_reference 中存在 datasetId,则不需要提供。

  • table_id (str | None) – 表的 ID。 如果 table_reference 中存在 tableId,则不需要提供。

  • table_resource (dict[str, Any]) – 将在请求正文中提供的数据集资源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource

  • fields (list[str] | None) – 要更改的 table 的字段,拼写为表属性(例如 “friendly_name”)。

  • project_id (str) – 我们要在其中创建表的项目的名称。如果 table_reference 中存在 projectId,则无需提供。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateDatasetOperator(*, dataset_resource, fields=None, dataset_id=None, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

更新 BigQuery 中您的项目数据集。

使用 fields 来指定要更新的数据集的哪些字段。如果某个字段在 fields 中列出,并且在数据集中为 None,则会被删除。如果没有提供 fields,则会使用提供的 dataset_resource 的所有字段。

另请参阅

有关如何使用此运算符的更多信息,请查看指南:更新数据集

参数
  • dataset_id (str | None) – 数据集的 ID。如果 dataset_reference 中存在 datasetId,则不需要提供。

  • dataset_resource (dict[str, Any]) – 将在请求正文中提供的数据集资源。 https://cloud.google.com/bigquery/docs/reference/rest/v2/datasets#resource

  • fields (list[str] | None) – 要更改的数据集的属性(例如 “friendly_name”)。

  • project_id (str) – 我们要在其中创建数据集的项目的名称。 如果 dataset_reference 中存在 projectId,则不需要提供。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryDeleteTableOperator(*, deletion_dataset_table, gcp_conn_id='google_cloud_default', ignore_if_missing=False, location=None, impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

删除 BigQuery 表。

另请参阅

有关如何使用此运算符的更多信息,请查看指南: 删除表

参数
  • deletion_dataset_table (str) – 一个点分隔的 (<项目>.|<项目>:)<数据集>.<表>,指示要删除的表。(已模板化)

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • ignore_if_missing (bool) – 如果为 True,则即使请求的表不存在,也返回成功。

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('deletion_dataset_table', 'impersonation_chain')[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpsertTableOperator(*, dataset_id, table_resource, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', location=None, impersonation_chain=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

插入或更新到 BigQuery 表。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:更新表

参数
  • dataset_id (str) – 一个点分隔的 `<project>.|<project>:)<dataset>,指示将更新哪个数据集。(可使用模板)

  • table_resource (dict) – 表资源。请参阅 https://cloud.google.com/bigquery/docs/reference/v2/tables#resource

  • project_id (str) – 我们要在其中更新数据集的项目的名称。如果 dataset_reference 中有 projectId,则不需要提供。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('dataset_id', 'table_resource', 'impersonation_chain', 'project_id')[source]
template_fields_renderers[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryUpdateTableSchemaOperator(*, schema_fields_updates, dataset_id, table_id, include_policy_tags=False, project_id=PROVIDE_PROJECT_ID, gcp_conn_id='google_cloud_default', impersonation_chain=None, location=None, **kwargs)[source]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator

更新 BigQuery 表 Schema。

根据提供的 schema_fields_updates 参数的内容更新表架构中的字段。提供的架构不需要是完整的,如果该字段已经存在于架构中,您只需要为要修补的项目提供键和值,只需确保设置了“name”键即可。

另请参阅

有关如何使用此运算符的更多信息,请参阅指南:更新表架构

参数
  • schema_fields_updates (list[dict[str, Any]]) –

    部分架构资源。请参阅 https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema

    schema_fields_updates = [
        {"name": "emp_name", "description": "Some New Description"},
        {
            "name": "salary",
            "policyTags": {"names": ["some_new_policy_tag"]},
        },
        {
            "name": "departments",
            "fields": [
                {"name": "name", "description": "Some New Description"},
                {"name": "type", "description": "Some New Description"},
            ],
        },
    ]
    

  • include_policy_tags (bool) – (可选)如果设置为 True,则策略标记将包含在更新请求中,即使未更改也需要特殊权限(默认为 False),请参阅 https://cloud.google.com/bigquery/docs/column-level-security#roles

  • dataset_id (str) – 一个点分隔的 `<project>.|<project>:)<dataset>,指示将更新哪个数据集。(可使用模板)

  • table_id (str) – 请求的表的表 ID。(可使用模板)

  • project_id (str) – 我们要在其中更新数据集的项目的名称。如果 dataset_reference 中有 projectId,则不需要提供。

  • gcp_conn_id (str) – (可选) 用于连接到 Google Cloud 的连接 ID。

  • location (str | None) – 用于操作的位置。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

template_fields: collections.abc.Sequence[str] = ('schema_fields_updates', 'dataset_id', 'table_id', 'project_id', 'impersonation_chain')[source]
template_fields_renderers[source]
ui_color[source]
execute(context)[source]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator(configuration, project_id=PROVIDE_PROJECT_ID, location=None, job_id=None, force_rerun=True, reattach_states=None, gcp_conn_id='google_cloud_default', impersonation_chain=None, cancel_on_kill=True, result_retry=DEFAULT_RETRY, result_timeout=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), poll_interval=4.0, **kwargs)[源代码]

基类: airflow.providers.google.cloud.operators.cloud_base.GoogleCloudBaseOperator, airflow.providers.google.cloud.openlineage.mixins._BigQueryOpenLineageMixin

执行 BigQuery 作业。

等待作业完成并返回作业 ID。此运算符的工作方式如下:

  • 它使用作业的配置或 UUID 计算作业的唯一哈希值,如果 force_rerun 为 True。

  • 创建 job_id 的形式为

    [provided_job_id | airflow_{dag_id}_{task_id}_{exec_date}]_{uniqueness_suffix}

  • 使用 job_id 提交 BigQuery 作业。

  • 如果具有给定 ID 的作业已存在,则它会尝试重新附加到该作业(如果该作业尚未完成),并且其

    状态在 reattach_states 中。如果作业已完成,则运算符将引发 AirflowException

使用 force_rerun 每次都会提交新作业,而不会附加到已存在的作业。

有关作业定义,请参见此处

另请参阅

有关如何使用此运算符的更多信息,请查看以下指南: 执行 BigQuery 作业

参数
  • configuration (dict[str, Any]) – configuration 参数直接映射到作业对象中的 BigQuery 的配置字段。有关更多详细信息,请参见 https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfiguration

  • job_id (str | None) – 作业的 ID。它将附加作业配置的哈希值,除非 force_rerun 为 True。ID 必须仅包含字母 (a-z, A-Z)、数字 (0-9)、下划线 (_) 或破折号 (-)。最大长度为 1,024 个字符。如果未提供,则将生成 UUID。

  • force_rerun (bool) – 如果为 True,则运算符将使用 UUID 的哈希值作为作业 ID 后缀。

  • reattach_states (set[str] | None) – 一组 BigQuery 作业状态,如果遇到这些状态,我们应该重新附加到该作业。应为非最终状态。

  • project_id (str) – 运行作业的 Google Cloud 项目。

  • location (str | None) – 作业运行的位置。

  • gcp_conn_id (str) – 用于连接到 Google Cloud 的连接 ID。

  • impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务帐户,用于使用短期凭据进行模拟,或用于获取列表中最后一个帐户的 access_token 所需的链式帐户列表,该帐户将在请求中被模拟。如果设置为字符串,则该帐户必须授予发起帐户“服务帐户令牌创建者”IAM 角色。如果设置为序列,则列表中的标识必须将“服务帐户令牌创建者”IAM 角色授予紧接其前面的标识,列表中的第一个帐户将此角色授予发起帐户(已模板化)。

  • cancel_on_kill (bool) – 标志,指示当调用 on_kill 时是否取消 hook 的作业。

  • result_retry (google.api_core.retry.Retry) – 如何重试检索行的 result 调用。

  • result_timeout (float | None) – 在使用 result_retry 之前,等待 result 方法的秒数。

  • deferrable (bool) – 以可延迟模式运行运算符

  • poll_interval (float) – (仅限可延迟模式)检查作业状态的轮询间隔(以秒为单位)。默认为 4 秒。

template_fields: collections.abc.Sequence[str] = ('configuration', 'job_id', 'impersonation_chain', 'project_id')[源代码]
template_ext: collections.abc.Sequence[str] = ('.json', '.sql')[源代码]
template_fields_renderers[源代码]
ui_color[源代码]
sql()[源代码]
prepare_template()[源代码]

在模板字段被其内容替换后执行。

如果您需要在渲染模板之前让对象更改文件的内容,则应重写此方法来执行此操作。

execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

execute_complete(context, event)[源代码]

充当触发器触发时的回调。

这将立即返回。它依赖于触发器来抛出异常,否则它会假定执行成功。

on_kill()[源代码]

当任务实例被终止时,覆盖此方法以清理子进程。

在操作符中使用线程、子进程或多进程模块时,需要进行清理,否则会留下僵尸进程。

此条目是否有帮助?