Google Cloud BigQuery 传输到 Postgres 运算符¶
Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库产品。PostgreSQL 是一个开源的关系数据库管理系统。此运算符可用于将数据从 BigQuery 表复制到 PostgreSQL。
先决条件任务¶
要使用这些运算符,您必须执行以下几项操作
使用 Cloud Console 选择或创建一个云平台项目。
按照 Google Cloud 文档 中的说明,为您的项目启用结算功能。
按照 Cloud Console 文档 中的说明启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'安装 中提供了详细信息。
运算符¶
使用 BigQueryToPostgresOperator
运算符将数据从 BigQuery 表复制到 Postgres 表。
使用 Jinja 模板 和 target_table_name
、impersonation_chain
、dataset_id
、table_id
动态定义值。
您可以使用参数 selected_fields
来限制要复制的字段(默认为所有字段),以及使用参数 replace
来覆盖目标表而不是追加到它。如果使用 replace
参数,则由于底层 INSERT 命令中 PostgreSQL 的 ON CONFLICT 子句的约束,需要同时指定 selected_fields
和 replace_index
参数。
有关更多信息,请参阅上面的链接。
传输数据¶
以下运算符将数据从 BigQuery 表复制到 PostgreSQL。
bigquery_to_postgres = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=False,
)
运算符还可以使用来自 BigQuery 表的匹配数据替换 PostgreSQL 表中的数据。
bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
task_id="bigquery_to_postgres_upsert",
postgres_conn_id=CONNECTION_ID,
dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
target_table_name=SQL_TABLE,
batch_size=BATCH_SIZE,
replace=True,
selected_fields=["emp_name", "salary"],
replace_index=["emp_name", "salary"],
)