Google Cloud BigQuery 数据传输服务操作符

BigQuery 数据传输服务 可以在计划好的、托管的基础上,自动将 SaaS 应用程序的数据移动到 Google BigQuery。您的分析团队可以为数据仓库奠定基础,而无需编写任何代码。BigQuery 数据传输服务最初支持 Google 应用程序来源,如 Google Ads、Campaign Manager、Google Ad Manager 和 YouTube。通过 BigQuery 数据传输服务,用户还可以访问数据连接器,使您可以轻松地将 Teradata 和 Amazon S3 中的数据传输到 BigQuery。

先决条件任务

要使用这些操作符,您必须执行以下几项操作

创建传输配置

要创建 DTS 传输配置,您可以使用 BigQueryCreateDataTransferOperator

对于 Airflow,客户需要创建一个禁用自动调度的传输配置,然后使用专门的 Airflow 操作符触发传输运行,该操作符将调用 StartManualTransferRuns API,例如 BigQueryDataTransferServiceStartTransferRunsOperatorBigQueryCreateDataTransferOperator 检查传递的配置中是否存在自动调度选项。如果存在,则不执行任何操作,否则将其值设置为 True

tests/system/google/cloud/bigquery/example_bigquery_dts.py


# In the case of Airflow, the customer needs to create a transfer
# config with the automatic scheduling disabled and then trigger
# a transfer run using a specialized Airflow operator
TRANSFER_CONFIG = {
    "destination_dataset_id": DATASET_NAME,
    "display_name": "test data transfer",
    "data_source_id": "google_cloud_storage",
    "schedule_options": {"disable_auto_scheduling": True},
    "params": {
        "field_delimiter": ",",
        "max_bad_records": "0",
        "skip_leading_rows": "1",
        "data_path_template": BUCKET_URI,
        "destination_table_name_template": DTS_BQ_TABLE,
        "file_format": "CSV",
    },
}

您可以使用或不使用项目 ID 创建操作符。如果缺少项目 ID,它将从使用的 Google Cloud 连接中检索。操作符的基本用法

tests/system/google/cloud/bigquery/example_bigquery_dts.py

gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
    transfer_config=TRANSFER_CONFIG,
    project_id=PROJECT_ID,
    task_id="gcp_bigquery_create_transfer",
)

transfer_config_id = cast(str, XComArg(gcp_bigquery_create_transfer, key="transfer_config_id"))

您可以使用 Jinja 模板transfer_configproject_idauthorization_codegcp_conn_idimpersonation_chain 参数,这允许您动态地确定值。结果将保存到 XCom 中,这允许其他操作符使用它。此外,新配置的 ID 可在 XCom 中通过 transfer_config_id 键访问。

删除传输配置

要删除 DTS 传输配置,您可以使用 BigQueryDeleteDataTransferConfigOperator

操作符的基本用法

tests/system/google/cloud/bigquery/example_bigquery_dts.py

gcp_bigquery_delete_transfer = BigQueryDeleteDataTransferConfigOperator(
    transfer_config_id=transfer_config_id, task_id="gcp_bigquery_delete_transfer"
)

您可以使用 Jinja 模板transfer_configproject_idauthorization_codegcp_conn_idimpersonation_chain 参数,这允许您动态地确定值。

手动启动传输运行

启动手动传输运行,以便现在执行,schedule_time 等于当前时间。BigQueryDataTransferServiceStartTransferRunsOperator

操作符的基本用法

tests/system/google/cloud/bigquery/example_bigquery_dts.py

gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
    task_id="gcp_bigquery_start_transfer",
    project_id=PROJECT_ID,
    transfer_config_id=transfer_config_id,
    requested_run_time={"seconds": int(time.time() + 60)},
)

您可以使用 Jinja 模板transfer_config_idproject_idrequested_time_rangerequested_run_timegcp_conn_idimpersonation_chain 参数,这允许您动态地确定值。

要检查操作是否成功,您可以使用 BigQueryDataTransferServiceTransferRunSensor

tests/system/google/cloud/bigquery/example_bigquery_dts.py

gcp_run_sensor = BigQueryDataTransferServiceTransferRunSensor(
    task_id="gcp_run_sensor",
    transfer_config_id=transfer_config_id,
    run_id=cast(str, XComArg(gcp_bigquery_start_transfer, key="run_id")),
    expected_statuses={"SUCCEEDED"},
)

您可以使用 Jinja 模板run_idtransfer_config_idexpected_statusesproject_idimpersonation_chain 参数,这允许您动态地确定值。

参考

有关更多信息,请查看

此条目是否有帮助?