Google Cloud BigQuery 到 Postgres 的传输 Operator

Google Cloud BigQuery 是 Google Cloud 的无服务器数据仓库服务。PostgreSQL 是一种开源关系型数据库管理系统。此 operator 可用于将数据从 BigQuery 表复制到 PostgreSQL。

先决条件任务

要使用这些 operators,您必须执行以下操作:

Operator

使用 BigQueryToPostgresOperator operator 执行将数据从 BigQuery 表复制到 Postgres 表的操作。

使用 Jinja 模板 配合 target_table_name, impersonation_chain, dataset_id, table_id 动态定义值。

您可以使用参数 selected_fields 来限制要复制的字段(默认为所有字段),以及参数 replace 来覆盖目标表而不是追加数据。如果使用参数 replace,则需要同时指定 selected_fieldsreplace_index 参数,这是由于 PostgreSQL 底层 INSERT 命令中的 ON CONFLICT 子句的限制所致。

更多信息请参考以上链接。

传输数据

以下 Operator 将数据从 BigQuery 表复制到 PostgreSQL。

tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py

bigquery_to_postgres = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=False,
)

此 Operator 也可以使用来自 BigQuery 表的匹配数据替换 PostgreSQL 表中的数据。

tests/system/google/cloud/bigquery/example_bigquery_to_postgres.py

bigquery_to_postgres_upsert = BigQueryToPostgresOperator(
    task_id="bigquery_to_postgres_upsert",
    postgres_conn_id=CONNECTION_ID,
    dataset_table=f"{BIGQUERY_DATASET_NAME}.{BIGQUERY_TABLE}",
    target_table_name=SQL_TABLE,
    batch_size=BATCH_SIZE,
    replace=True,
    selected_fields=["emp_name", "salary"],
    replace_index=["emp_name", "salary"],
)

参考资料

有关进一步信息,请查阅:

此条目有帮助吗?