airflow.providers.google.cloud.transfers.gcs_to_bigquery¶
此模块包含一个 Google Cloud Storage 到 BigQuery 的操作符。
属性¶
类¶
将文件从 Google Cloud Storage 加载到 BigQuery 中。 |
模块内容¶
- airflow.providers.google.cloud.transfers.gcs_to_bigquery.ALLOWED_FORMATS = ['CSV', 'NEWLINE_DELIMITED_JSON', 'AVRO', 'GOOGLE_SHEETS', 'DATASTORE_BACKUP', 'PARQUET', 'ORC'][源码]¶
- class airflow.providers.google.cloud.transfers.gcs_to_bigquery.GCSToBigQueryOperator(*, bucket, source_objects, destination_project_dataset_table, schema_fields=None, schema_object=None, schema_object_bucket=None, source_format='CSV', compression='NONE', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=None, write_disposition='WRITE_EMPTY', field_delimiter=',', max_bad_records=0, quote_character=None, ignore_unknown_values=False, allow_quoted_newlines=False, allow_jagged_rows=False, encoding='UTF-8', max_id_key=None, gcp_conn_id='google_cloud_default', schema_update_options=(), src_fmt_configs=None, external_table=False, time_partitioning=None, cluster_fields=None, autodetect=True, encryption_configuration=None, location=None, impersonation_chain=None, labels=None, description=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), result_retry=DEFAULT_RETRY, result_timeout=None, cancel_on_kill=True, job_id=None, force_rerun=True, reattach_states=None, project_id=PROVIDE_PROJECT_ID, force_delete=False, **kwargs)[源码]¶
Bases:
airflow.models.BaseOperator
将文件从 Google Cloud Storage 加载到 BigQuery 中。
用于 BigQuery 表的模式可以通过两种方式指定。你可以直接传入模式字段,或者指向 Google Cloud Storage 中的对象名称。Google Cloud Storage 中的对象必须是包含模式字段的 JSON 文件。
另请参阅
有关如何使用此操作符的更多信息,请查阅指南:操作符
- 参数:
bucket – 要从中加载的存储桶。(模板化)
source_objects – 要从中加载的 Google Cloud Storage URI 字符串或列表。(模板化) 如果 source_format 是 ‘DATASTORE_BACKUP’,列表必须只包含一个 URI。
destination_project_dataset_table – 用于加载数据的带有
(<project>.|<project>:)<dataset>.<table>
点号格式的 BigQuery 表。如果未包含<project>
,则项目将是连接 json 中定义的项目。(模板化)schema_fields – 如果设置,则为此处定义的模式字段列表:https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load 当 source_format 为 ‘DATASTORE_BACKUP’ 时不应设置此项。如果 ‘schema_object’ 为 null 且 autodetect 为 False,则必须定义此参数。
schema_object – 如果设置,则为指向包含表模式的 .json 文件的 GCS 对象路径。(模板化) 如果 ‘schema_fields’ 为 null 且 autodetect 为 False,则必须定义此参数。
schema_object_bucket – [可选] 如果设置,则为存储模式对象模板的 GCS 存储桶。(模板化) (默认值:
bucket
的值)source_format – 要导出的文件格式。
compression – [可选] 数据源的压缩类型。可能的值包括 GZIP 和 NONE。默认值为 NONE。对于 Google Cloud Bigtable、Google Cloud Datastore 备份和 Avro 格式,此设置将被忽略。
create_disposition – 如果表不存在时的创建处置。
skip_leading_rows – 加载数据时 BigQuery 将跳过的 CSV 文件顶部的行数。当 autodetect 开启时,行为如下:skip_leading_rows 未指定 - 自动检测尝试检测第一行中的标题。如果未检测到,则该行作为数据读取。否则,数据将从第二行开始读取。skip_leading_rows 为 0 - 指示自动检测没有标题,数据应从第一行开始读取。skip_leading_rows = N > 0 - 自动检测跳过 N-1 行,并尝试在第 N 行检测标题。如果未检测到标题,则跳过第 N 行。否则,第 N 行用于提取检测到的模式的列名。默认值设置为 None,以便 autodetect 选项可以检测模式字段。
write_disposition – 如果表已存在时的写入处置。
field_delimiter – 从 CSV 加载时使用的分隔符。
max_bad_records – BigQuery 在运行作业时可以忽略的最大错误记录数。
quote_character – 用于引用 CSV 文件中数据部分的字符。
ignore_unknown_values – [可选] 指示 BigQuery 是否应允许表中模式未表示的额外值。如果为 true,则忽略额外值。如果为 false,则带有额外列的记录将被视为错误记录,如果错误记录过多,则在作业结果中返回无效错误。
allow_quoted_newlines – 是否允许带引号的换行符 (true) 或不允许 (false)。
allow_jagged_rows – 接受缺少尾随可选列的行。缺失值将被视为 null。如果为 false,则缺少尾随列的记录将被视为错误记录,如果错误记录过多,则在作业结果中返回无效错误。仅适用于 CSV,对于其他格式将被忽略。
encoding – 数据的字符编码。参见:https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).csvOptions.encoding https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#externalDataConfiguration.csvOptions.encoding
max_id_key – 如果设置,则为要加载的 BigQuery 表中列的名称。加载完成后,此名称将用于从 BigQuery 中选择 MAX 值。结果将由 execute() 命令返回,并存储在 XCom 中供后续操作符使用。这对于增量加载很有帮助——在未来的执行中,你可以从最大 ID 开始。
schema_update_options – 允许将目标表的模式作为加载作业的副作用进行更新。
src_fmt_configs – 配置特定于源格式的可选字段
external_table – 标志,用于指定目标表是否应为 BigQuery 外部表。默认值为 False。
time_partitioning – 配置可选的时间分区字段,即根据 API 规范按字段、类型和过期时间进行分区。请注意,‘field’ 不能与 dataset.table$partition 同时使用。
cluster_fields – 请求将此加载的结果按一个或多个列排序存储。BigQuery 支持对分区表和非分区表进行聚类。给定的列顺序决定了排序顺序。不适用于外部表。
autodetect – [可选] 指示是否应自动推断 CSV 和 JSON 源的选项和模式。(默认值:
True
)。如果未定义 ‘schema_fields’ 和 ‘schema_object’,则必须将参数设置为 True。如果在 Airflow 外部创建表,建议将其设置为 True。如果 autodetect 为 None 且未提供模式(无论是通过 schema_fields 还是 schema_object),则假定表已存在。encryption_configuration –
[可选] 自定义加密配置(例如,Cloud KMS 密钥)。
encryption_configuration = { "kmsKeyName": "projects/testp/locations/us/keyRings/test-kr/cryptoKeys/test-key", }
location – [可选] 作业的地理位置。除美国和欧盟外,必填。详细信息请参见 https://cloud.google.com/bigquery/docs/locations#specifying_your_location
impersonation_chain (str | collections.abc.Sequence[str] | None) – 可选的服务账号,用于使用短期凭据进行模拟,或者获取列表中最后一个账号的 access_token 所需的账号链列表,该账号将在请求中被模拟。如果设置为字符串,则该账号必须授予发起账号 Service Account Token Creator IAM 角色。如果设置为序列,则列表中身份必须授予 Service Account Token Creator IAM 角色给直接前一个身份,列表中第一个账号将此角色授予发起账号(模板化)。
labels – [可选] BigQuery 表的标签。
description – [可选] BigQuery 表的描述。仅在新建目标表时使用。如果表已存在且提供了与当前描述不同的值,则作业将失败。
deferrable (bool) – 以可延迟模式运行操作符
force_delete (bool) – 如果目标表已存在,则强制删除它。
- template_fields: collections.abc.Sequence[str] = ('bucket', 'source_objects', 'schema_object', 'schema_object_bucket',...[源码]¶
- template_ext: collections.abc.Sequence[str] = ('.sql',)[源码]¶
- execute(context)[source]¶
在创建 Operator 时派生。
Context 是与渲染 Jinja 模板时使用的相同的字典。
请参阅 get_template_context 以获取更多上下文信息。
- execute_complete(context, event)[source]¶
立即返回,并依赖 trigger 抛出成功事件。trigger 的回调函数。
依赖 trigger 抛出异常,否则假定执行成功。