Google Cloud BigQuery 到 Postgres 的传输 Operator¶
Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库服务。PostgreSQL 是一种开源关系型数据库管理系统。此 operator 可用于将数据从 BigQuery 表复制到 PostgreSQL。
先决条件任务¶
要使用这些 operators,您必须执行以下操作:
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
为您的项目启用计费,如 Google Cloud 文档 中所述。
启用 API,如 Cloud Console 文档 中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
Operator¶
使用 BigQueryToPostgresOperator
operator 执行将数据从 BigQuery 表复制到 Postgres 表的操作。
使用 Jinja 模板 配合 target_table_name
, impersonation_chain
, dataset_id
, table_id
动态定义值。
您可以使用参数 selected_fields
来限制要复制的字段(默认为所有字段),以及参数 replace
来覆盖目标表而不是追加数据。如果使用参数 replace
,则需要同时指定 selected_fields
和 replace_index
参数,这是由于 PostgreSQL 底层 INSERT 命令中的 ON CONFLICT 子句的限制所致。
更多信息请参考以上链接。
传输数据¶
以下 Operator 将数据从 BigQuery 表复制到 PostgreSQL。
tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py
bigquery_to_postgres = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=False,
)
此 Operator 也可以使用来自 BigQuery 表的匹配数据替换 PostgreSQL 表中的数据。
tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py
bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres_upsert",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=True,
selected_fields=["emp_name", "salary"],
replace_index=["emp_name", "salary"],
)
参考资料¶
有关进一步信息,请查阅: