Google Cloud BigQuery 数据传输服务操作符¶
BigQuery 数据传输服务 可以在计划好的、托管的基础上,自动将 SaaS 应用程序的数据移动到 Google BigQuery。您的分析团队可以为数据仓库奠定基础,而无需编写任何代码。BigQuery 数据传输服务最初支持 Google 应用程序来源,如 Google Ads、Campaign Manager、Google Ad Manager 和 YouTube。通过 BigQuery 数据传输服务,用户还可以访问数据连接器,使您可以轻松地将 Teradata 和 Amazon S3 中的数据传输到 BigQuery。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
使用 Cloud Console 选择或创建一个 Cloud Platform 项目。
按照 Google Cloud 文档 中的说明,为您的项目启用结算。
按照 Cloud Console 文档 中的说明,启用 API。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅 安装。
创建传输配置¶
要创建 DTS 传输配置,您可以使用 BigQueryCreateDataTransferOperator
。
对于 Airflow,客户需要创建一个禁用自动调度的传输配置,然后使用专门的 Airflow 操作符触发传输运行,该操作符将调用 StartManualTransferRuns API,例如 BigQueryDataTransferServiceStartTransferRunsOperator
。BigQueryCreateDataTransferOperator
检查传递的配置中是否存在自动调度选项。如果存在,则不执行任何操作,否则将其值设置为 True
。
tests/system/google/cloud/bigquery/example_bigquery_dts.py
# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
TRANSFER_CONFIG = {
"destination_dataset_id": DATASET_NAME,
"display_name": "test data transfer",
"data_source_id": "google_cloud_storage",
"schedule_options": {"disable_auto_scheduling": True},
"params": {
"field_delimiter": ",",
"max_bad_records": "0",
"skip_leading_rows": "1",
"data_path_template": BUCKET_URI,
"destination_table_name_template": DTS_BQ_TABLE,
"file_format": "CSV",
},
}
您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。操作符的基本用法
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
transfer_config=TRANSFER_CONFIG,
project_id=PROJECT_ID,
task_id="gcp_bigquery_create_transfer",
)
transfer_config_id = cast(str, XComArg(gcp_bigquery_create_transfer, key="transfer_config_id"))
您可以使用 Jinja 模板 和 transfer_config
、project_id
、authorization_code
、gcp_conn_id
、impersonation_chain
参数,这允许您动态地确定值。结果将保存到 XCom 中,这允许其他操作符使用它。此外,新配置的 ID 可在 XCom 中通过 transfer_config_id
键访问。
删除传输配置¶
要删除 DTS 传输配置,您可以使用 BigQueryDeleteDataTransferConfigOperator
。
操作符的基本用法
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)
您可以使用 Jinja 模板 和 transfer_config
、project_id
、authorization_code
、gcp_conn_id
、impersonation_chain
参数,这允许您动态地确定值。
手动启动传输运行¶
启动手动传输运行,以便现在执行,schedule_time 等于当前时间。BigQueryDataTransferServiceStartTransferRunsOperator
。
操作符的基本用法
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
task_id="gcp_bigquery_start_transfer",
project_id=PROJECT_ID,
transfer_config_id=transfer_config_id,
requested_run_time={"seconds": int(time.time() + 60)},
)
您可以使用 Jinja 模板 和 transfer_config_id
、project_id
、requested_time_range
、requested_run_time
、gcp_conn_id
、impersonation_chain
参数,这允许您动态地确定值。
要检查操作是否成功,您可以使用 BigQueryDataTransferServiceTransferRunSensor
。
tests/system/google/cloud/bigquery/example_bigquery_dts.py
gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
task_id="gcp_run_sensor",
transfer_config_id=transfer_config_id,
run_id=cast(str, XComArg(gcp_bigquery_start_transfer, key="run_id")),
expected_statuses={"SUCCEEDED"},
)
您可以使用 Jinja 模板 和 run_id
、transfer_config_id
、expected_statuses
、project_id
、impersonation_chain
参数,这允许您动态地确定值。