Google Cloud BigQuery 传输到 Postgres 运算符

Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库产品。PostgreSQL 是一个开源的关系数据库管理系统。此运算符可用于将数据从 BigQuery 表复制到 PostgreSQL。

先决条件任务

要使用这些运算符,您必须执行以下几项操作

运算符

使用 BigQueryToPostgresOperator 运算符将数据从 BigQuery 表复制到 Postgres 表。

使用 Jinja 模板target_table_nameimpersonation_chaindataset_idtable_id 动态定义值。

您可以使用参数 selected_fields 来限制要复制的字段(默认为所有字段),以及使用参数 replace 来覆盖目标表而不是追加到它。如果使用 replace 参数,则由于底层 INSERT 命令中 PostgreSQL 的 ON CONFLICT 子句的约束,需要同时指定 selected_fieldsreplace_index 参数。

有关更多信息,请参阅上面的链接。

传输数据

以下运算符将数据从 BigQuery 表复制到 PostgreSQL。

tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py[源代码]

bigquery_to_postgres = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=False,
)

运算符还可以使用来自 BigQuery 表的匹配数据替换 PostgreSQL 表中的数据。

tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py[源代码]

bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres_upsert",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=True,
    selected_fields=["emp_name", "salary"],
    replace_index=["emp_name", "salary"],
)

参考

有关更多信息,请查看

此条目是否有帮助?