Google Cloud BigQuery 数据迁移服务 Operator¶
借助 BigQuery 数据迁移服务,您可以按计划、托管的方式自动将数据从 SaaS 应用传输到 Google BigQuery。您的分析团队无需编写任何代码即可奠定数据仓库的基础。BigQuery 数据迁移服务最初支持 Google 应用来源,例如 Google Ads、Campaign Manager、Google Ad Manager 和 YouTube。通过 BigQuery 数据迁移服务,用户还可以访问数据连接器,方便地将数据从 Teradata 和 Amazon S3 传输到 BigQuery。
前置任务¶
要使用这些 Operator,您必须做几件事:
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
按照 Google Cloud 文档 中的说明,为您的项目启用结算功能。
按照 Cloud Console 文档 中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关 安装 的详细信息,请查阅相关文档。
创建传输配置¶
要创建 DTS 传输配置,您可以使用 BigQueryCreateDataTransferOperator
。
在 Airflow 中,客户需要创建一个禁用自动调度的传输配置,然后使用专门的 Airflow Operator 调用 StartManualTransferRuns API 来触发传输运行,例如 BigQueryDataTransferServiceStartTransferRunsOperator
。BigQueryCreateDataTransferOperator
会检查传入的配置中是否存在自动调度选项。如果存在,则不执行任何操作;否则,其值将设置为 True
。
tests/system/google/cloud/bigquery/example_bigquery_dts.py
# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
TRANSFER_CONFIG = {
"destination_dataset_id": DATASET_NAME,
"display_name": "test data transfer",
"data_source_id": "google_cloud_storage",
"schedule_options": {"disable_auto_scheduling": True},
"params": {
"field_delimiter": ",",
"max_bad_records": "0",
"skip_leading_rows": "1",
"data_path_template": BUCKET_URI,
"destination_table_name_template": DTS_BQ_TABLE,
"file_format": "CSV",
},
}
您可以创建带或不带项目 ID 的 Operator。如果项目 ID 缺失,将从使用的 Google Cloud 连接中检索。Operator 的基本用法如下:
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT_ID,
task_id="gcp_bigquery_create_transfer",
)
transfer_config_id = cast("str", XComArg(gcp_bigquery_create_transfer, key="transfer_config_id"))
您可以使用 Jinja 模板 配合参数 transfer_config
、project_id
、authorization_code
、gcp_conn_id
、impersonation_chain
来动态确定值。结果会保存到 XCom,供其他 Operator 使用。此外,新配置的 ID 可通过 transfer_config_id
键在 XCom 中访问。
删除传输配置¶
要删除 DTS 传输配置,您可以使用 BigQueryDeleteDataTransferConfigOperator
。
Operator 的基本用法如下:
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)
您可以使用 Jinja 模板 配合参数 transfer_config
、project_id
、authorization_code
、gcp_conn_id
、impersonation_chain
来动态确定值。
手动启动传输运行¶
启动手动传输运行,其 schedule_time
等于当前时间,立即执行。使用 BigQueryDataTransferServiceStartTransferRunsOperator
。
Operator 的基本用法如下:
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="gcp_bigquery_start_transfer",
project_id=PROJECT_ID,
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
您可以使用 Jinja 模板 配合参数 transfer_config_id
、project_id
、requested_time_range
、requested_run_time
、gcp_conn_id
、impersonation_chain
来动态确定值。
要检查操作是否成功,您可以使用 BigQueryDataTransferServiceTransferRunSensor
。
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
task_id="gcp_run_sensor",
transfer_config_id=transfer_config_id,
run_id=cast("str", XComArg(gcp_bigquery_start_transfer, key="run_id")),
expected_statuses={"SUCCEEDED"},
)
您可以使用 Jinja 模板 配合参数 run_id
、transfer_config_id
、expected_statuses
、project_id
、impersonation_chain
来动态确定值。
参考¶
欲了解更多信息,请参阅: