Google Dataprep Operator¶
Dataprep 是一款智能云数据服务,用于可视化探索、清理和准备数据以进行分析和机器学习。该服务可用于探索和转换来自不同和/或大型数据集的原始数据,将其转换为干净且结构化的数据,以便进一步分析和处理。Dataprep Job 是一个内部对象,编码了运行 Cloud Dataprep job group 的一部分所需的信息。有关该服务的更多信息,请访问 Google Dataprep API 文档
开始之前¶
在 Airflow 中使用 Dataprep 之前,您需要使用 TOKEN 对您的帐户进行身份验证。要连接 Dataprep 与 Airflow,您需要 Dataprep TOKEN。请按照 Dataprep 说明进行操作。
TOKEN 应以 JSON 格式添加到 Airflow 的 Connection 中。您可以查看 管理连接
DataprepRunJobGroupOperator 将运行指定的 job。Operator 需要一个 recipe ID。要确定 recipe ID,请使用 runJobGroup 的 API 文档。例如,如果 URL 是 /flows/10?recipe=7,则 recipe ID 为 7。无法通过此 Operator 创建 recipe。只能通过 此处 提供的 UI 创建。某些参数可以通过 DAG 的 body 请求覆盖。在示例 DAG 中显示了如何执行此操作。
请看以下示例:设置这些字段的值:.. code-block
Connection Id: "your_conn_id"
Extra: {"token": "TOKEN", "base_url": "https://api.clouddataprep.com"}
前提任务¶
要使用这些 Operator,您必须执行以下几项操作
使用 Cloud Console 选择或创建 Cloud Platform 项目。
为您的项目启用结算功能,如 Google Cloud 文档中所述。
启用 API,如 Cloud Console 文档中所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'安装有详细信息。
运行 Job Group¶
Operator 的任务是创建一个 job group,该 job group 以经过身份验证的用户身份启动指定的 job。这与在应用程序中单击“运行 Job”按钮执行的操作相同。
要获取 Cloud Dataprep job 中的 job 信息,请使用:DataprepRunJobGroupOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
run_job_group_task = DataprepRunJobGroupOperator(
task_id="run_job_group",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
body_request={
"wrangledDataset": {"id": DATASET_WRANGLED_ID},
"overrides": WRITE_SETTINGS,
},
)
获取 Job Group 的 Job¶
Operator 的任务是获取 Cloud Dataprep job 中的批处理 job 的信息。
要获取 Cloud Dataprep job 中的 job 信息,请使用:DataprepGetJobsForJobGroupOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
get_jobs_for_job_group_task = DataprepGetJobsForJobGroupOperator(
task_id="get_jobs_for_job_group",
dataprep_conn_id=CONNECTION_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)
获取 Job Group¶
Operator 的任务是获取指定的 job group。Job group 是从 flow 中的特定节点执行的 job。
要获取 Cloud Dataprep job 中的 job 信息,请使用:DataprepGetJobGroupOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
get_job_group_task = DataprepGetJobGroupOperator(
task_id="get_job_group",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
embed="",
include_deleted=False,
)
复制 Flow¶
Operator 的任务是复制 flow。
要获取 Cloud Dataprep job 中的 job 信息,请使用:DataprepCopyFlowOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
copy_task = DataprepCopyFlowOperator(
task_id="copy_flow",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
flow_id=FLOW_ID,
name=f"copy_{DATASET_NAME}",
)
运行 Flow¶
Operator 的任务是运行 flow。Flow 是用于整理逻辑的容器,包含导入的数据集、recipe、输出对象和引用。
要获取 Cloud Dataprep job 中的 job 信息,请使用:DataprepRunFlowOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
run_flow_task = DataprepRunFlowOperator(
task_id="run_flow",
dataprep_conn_id=CONNECTION_ID,
project_id=GCP_PROJECT_ID,
flow_id=FLOW_COPY_ID,
body_request={},
)
删除 Flow¶
Operator 的任务是删除 flow。Flow 是用于整理逻辑的容器,包含导入的数据集、recipe、输出对象和引用。
要获取 Cloud Dataprep job 中的 job 信息,请使用:DataprepDeleteFlowOperator
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
delete_flow_task = DataprepDeleteFlowOperator(
task_id="delete_flow",
dataprep_conn_id=CONNECTION_ID,
flow_id="{{ task_instance.xcom_pull('copy_flow')['id'] }}",
)
检查 Job Group 是否完成¶
Sensor 的任务是告知系统何时已启动的 job group 已完成,无论成功与否。Job group 是从 flow 中的特定节点执行的 job。
要获取 Cloud Dataprep job 中的 job 信息,请使用:DataprepJobGroupIsFinishedSensor
示例用法
tests/system/google/cloud/dataprep/example_dataprep.py
check_flow_status_sensor = DataprepJobGroupIsFinishedSensor(
task_id="check_flow_status",
dataprep_conn_id=CONNECTION_ID,
job_group_id="{{ task_instance.xcom_pull('run_flow')['data'][0]['id'] }}",
)