Google Cloud Dataflow 操作符¶
Dataflow 是一项托管服务,用于执行各种数据处理模式。这些流水线使用 Apache Beam 编程模型创建,该模型支持批处理和流处理。
先决条件任务¶
要使用这些操作符,您必须执行一些操作:
使用Cloud 控制台选择或创建一个 Cloud Platform 项目。
为您的项目启用计费,如Google Cloud 文档所述。
启用 API,如Cloud 控制台文档所述。
通过 pip 安装 API 库。
pip install 'apache-airflow[google]'有关详细信息,请参阅安装。
运行数据流水线的方式¶
根据您的环境和源文件,有几种运行 Dataflow 流水线的方式:
非模板化流水线: 如果开发者有 Java 的
*.jar
文件或 Python 的*.py
文件,他们可以在 Airflow Worker 上将其作为本地进程运行。这也意味着必须在 Worker 上安装必要的系统依赖项。对于 Java,Worker 必须安装 JRE 运行时。对于 Python,则必须安装 Python 解释器。运行时版本必须与流水线版本兼容。这是启动流水线最快的方式,但由于其系统依赖性问题频繁,可能会导致问题。有关详细信息,请参阅:Java SDK 流水线, Python SDK 流水线。开发者还可以通过以 JSON 格式传递流水线结构来创建流水线,然后运行它来创建作业。有关详细信息,请参阅:JSON 格式的流水线 和 JSON 格式的流水线。模板化流水线: 程序员可以通过准备一个模板来使流水线独立于环境,然后该模板将在 Google 管理的机器上运行。这样,环境的变化就不会影响您的流水线。模板有两种类型:
一个好的做法是使用非模板化流水线测试您的流水线,然后在生产环境中使用模板运行流水线。
有关流水线类型之间差异的详细信息,请参阅 Google Cloud 文档中的Dataflow 模板。
启动非模板化流水线¶
JSON 格式的流水线¶
可以通过以 JSON 格式传递流水线结构来创建新的流水线。请参阅 DataflowCreatePipelineOperator
。这将创建一个新的流水线,该流水线将在 Dataflow Pipelines UI 上可见。
以下是运行 DataflowCreatePipelineOperator 创建 Dataflow 流水线的示例:
tests/system/google/cloud/dataflow/example_dataflow_pipeline.py
create_pipeline = DataflowCreatePipelineOperator(
task_id="create_pipeline",
project_id=GCP_PROJECT_ID,
location=GCP_LOCATION,
body={
"name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/pipelines/{PIPELINE_NAME}",
"type": PIPELINE_TYPE,
"workload": {
"dataflowFlexTemplateRequest": {
"launchParameter": {
"containerSpecGcsPath": GCS_PATH,
"jobName": PIPELINE_JOB_NAME,
"environment": {"tempLocation": TEMP_LOCATION},
"parameters": {
"inputFile": INPUT_FILE,
"output": OUTPUT,
},
},
"projectId": GCP_PROJECT_ID,
"location": GCP_LOCATION,
}
},
},
)
要运行新创建的流水线,您可以使用 DataflowRunPipelineOperator
tests/system/google/cloud/dataflow/example_dataflow_pipeline.py
run_pipeline = DataflowRunPipelineOperator(
task_id="run_pipeline",
pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
)
调用后,DataflowRunPipelineOperator 将返回通过运行给定流水线创建的 Google Cloud Dataflow Job。
有关 API 使用的更多信息,请参阅 Google Cloud 文档中的数据流水线 API REST 资源。
要使用源文件(Java 中的 JAR 或 Python 文件)创建新的流水线,请使用 create job 操作符。源文件可以位于 GCS 或本地文件系统上。BeamRunJavaPipelineOperator
或 BeamRunPythonPipelineOperator
Java SDK 流水线¶
对于 Java 流水线,必须为 BeamRunJavaPipelineOperator
指定 jar
参数,因为它包含要在 Dataflow 上执行的流水线。该 JAR 文件可以位于 Airflow 可以下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。
以下是使用存储在 GCS 上的 jar 创建和运行 Java 流水线的示例
tests/system/google/cloud/dataflow/example_dataflow_native_java.py
start_java_job_dataflow = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_java_job_dataflow",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"job_name": f"java-pipeline-job-{ENV_ID}",
"check_if_running": CheckJobRunning.IgnoreJob,
"location": LOCATION,
"poll_sleep": 10,
"append_job_name": False,
},
)
以下是使用存储在 GCS 上的 jar 在可延迟模式下创建和运行 Java 流水线的示例
tests/system/google/cloud/dataflow/example_dataflow_native_java.py
start_java_job_dataflow_deferrable = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_java_job_dataflow_deferrable",
jar=GCS_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"job_name": f"deferrable-java-pipeline-job-{ENV_ID}",
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
"append_job_name": False,
},
deferrable=True,
)
以下是使用存储在本地文件系统上的 jar 创建和运行 Java 流水线的示例
tests/system/google/cloud/dataflow/example_dataflow_native_java.py
start_java_job_direct = BeamRunJavaPipelineOperator(
task_id="start_java_job_direct",
jar=LOCAL_JAR,
pipeline_options={
"output": GCS_OUTPUT,
},
job_class="org.apache.beam.examples.WordCount",
dataflow_config={
"check_if_running": CheckJobRunning.WaitForRun,
"location": LOCATION,
"poll_sleep": 10,
},
)
以下是使用存储在 GCS 上的 jar 创建和运行 Java 流式处理流水线的示例
tests/system/google/cloud/dataflow/example_dataflow_java_streaming.py
start_java_streaming_job_dataflow = BeamRunJavaPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_java_streaming_dataflow_job",
jar=LOCAL_JAR,
pipeline_options={
"tempLocation": GCS_TMP,
"input_topic": INPUT_TOPIC,
"output_topic": OUTPUT_TOPIC,
"streaming": True,
},
dataflow_config={
"job_name": f"java-streaming-job-{ENV_ID}",
"location": LOCATION,
},
)
Python SDK 流水线¶
必须为 BeamRunPythonPipelineOperator
指定 py_file
参数,因为它包含要在 Dataflow 上执行的流水线。Python 文件可以位于 Airflow 可以下载的 GCS 上,也可以位于本地文件系统上(提供其绝对路径)。
py_interpreter
参数指定执行流水线时使用的 Python 版本,默认值为 python3
。如果您的 Airflow 实例运行在 Python 2 上 - 指定 python2
并确保您的 py_file
是 Python 2。为了获得最佳结果,请使用 Python 3。
如果指定了 py_requirements
参数,将创建一个带有指定需求的临时 Python 虚拟环境,并在其中运行流水线。
py_system_site_packages
参数指定是否所有来自 Airflow 实例的 Python 包都可以在虚拟环境中访问(如果指定了 py_requirements
参数),建议避免使用,除非 Dataflow 作业需要。
tests/system/google/cloud/dataflow/example_dataflow_native_python.py
start_python_job_dataflow = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_python_job_dataflow",
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={"location": LOCATION, "job_name": "start_python_job"},
)
执行模型¶
Dataflow 有多种执行流水线的方式。可以通过以下模式进行:异步批处理(触发即忘)、阻塞批处理(等待完成)或流式处理(无限期运行)。在 Airflow 中,最佳实践是使用异步批处理流水线或流式处理,并使用传感器来监听预期的作业状态。
默认情况下,BeamRunJavaPipelineOperator
, BeamRunPythonPipelineOperator
, DataflowTemplatedJobStartOperator
和 DataflowStartFlexTemplateOperator
的参数 wait_until_finished
被设置为 None
,这会导致取决于流水线类型的不同行为:
对于流式处理流水线,等待作业启动,
对于批处理流水线,等待作业完成。
如果 wait_until_finished
设置为 True
,操作符将始终等待流水线执行结束。如果设置为 False
,则仅提交作业。
请参阅:为在 Cloud Dataflow 服务上执行配置 PipelineOptions
异步执行¶
Dataflow 批处理作业默认是异步的;但是,这取决于应用程序代码(包含在 JAR 或 Python 文件中)及其编写方式。为了使 Dataflow 作业异步执行,请确保不等待流水线对象(即在应用程序代码中不对 PipelineResult
调用 waitUntilFinish
或 wait_until_finish
)。
tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
start_python_job_async = BeamRunPythonPipelineOperator(
task_id="start_python_job_async",
runner=BeamRunnerType.DataflowRunner,
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"output": GCS_OUTPUT,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={
"job_name": "start_python_job_async",
"location": LOCATION,
"wait_until_finished": False,
},
)
阻塞执行¶
为了使 Dataflow 作业执行并等待完成,请确保在应用程序代码中等待流水线对象。这可以通过对 Java SDK 调用 pipeline.run()
返回的 PipelineResult
上的 waitUntilFinish
来实现,或者对 Python SDK 调用 pipeline.run()
返回的 PipelineResult
上的 wait_until_finish
来实现。
应避免阻塞作业,因为在 Airflow 上运行时会有一个后台进程。此进程会持续运行以等待 Dataflow 作业完成,从而增加 Airflow 的资源消耗。
流式执行¶
要执行流式 Dataflow 作业,请确保设置了流式选项(对于 Python),或者在流水线中从一个无边界数据源(如 Pub/Sub)读取数据(对于 Java)。
tests/system/google/cloud/dataflow/example_dataflow_streaming_python.py
start_streaming_python_job = BeamRunPythonPipelineOperator(
runner=BeamRunnerType.DataflowRunner,
task_id="start_streaming_python_job",
py_file=GCS_PYTHON_SCRIPT,
py_options=[],
pipeline_options={
"temp_location": GCS_TMP,
"input_topic": "projects/pubsub-public-data/topics/taxirides-realtime",
"output_topic": f"projects/{PROJECT_ID}/topics/{TOPIC_ID}",
"streaming": True,
},
py_requirements=["apache-beam[gcp]==2.59.0"],
py_interpreter="python3",
py_system_site_packages=False,
dataflow_config={"location": LOCATION, "job_name": "start_python_job_streaming"},
)
将参数 drain_pipeline
设置为 True
允许在终止任务实例期间通过排空(draining)而不是取消(canceling)来停止流式作业。
请参阅停止正在运行的流水线。
模板化作业¶
模板提供了将流水线暂存在 Cloud Storage 并从那里运行的能力。这在开发工作流程中提供了灵活性,因为它将流水线的开发与暂存和执行步骤分离开来。Dataflow 有两种类型的模板:经典模板和 Flex 模板。有关更多信息,请参阅Dataflow 模板的官方文档。
以下是使用 DataflowTemplatedJobStartOperator
运行经典模板的 Dataflow 作业示例
tests/system/google/cloud/dataflow/example_dataflow_template.py
start_template_job = DataflowTemplatedJobStartOperator(
task_id="start_template_job",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
wait_until_finished=True,
)
同样,此操作也可以在可延迟模式下使用操作符。
tests/system/google/cloud/dataflow/example_dataflow_template.py
start_template_job_deferrable = DataflowTemplatedJobStartOperator(
task_id="start_template_job_deferrable",
project_id=PROJECT_ID,
template="gs://dataflow-templates/latest/Word_Count",
parameters={"inputFile": f"gs://{BUCKET_NAME}/{CSV_FILE_NAME}", "output": GCS_OUTPUT},
location=LOCATION,
deferrable=True,
)
请参阅可与此操作符一起使用的 Google 提供的模板列表。
以下是使用 DataflowStartFlexTemplateOperator
运行 Flex 模板的 Dataflow 作业示例
tests/system/google/cloud/dataflow/example_dataflow_template.py
start_flex_template_job = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
wait_until_finished=True,
)
同样,此操作也可以在可延迟模式下使用操作符。
tests/system/google/cloud/dataflow/example_dataflow_template.py
start_flex_template_job_deferrable = DataflowStartFlexTemplateOperator(
task_id="start_flex_template_job_deferrable",
project_id=PROJECT_ID,
body=BODY,
location=LOCATION,
append_job_name=False,
deferrable=True,
)
Dataflow YAML¶
Beam YAML 是一种无代码 SDK,用于使用 YAML 文件配置 Apache Beam 流水线。您可以使用 Beam YAML 编写和运行 Beam 流水线,而无需编写任何代码。此 API 可用于定义流式和批处理流水线。
以下是使用 DataflowStartYamlJobOperator
运行 Dataflow YAML 作业的示例
tests/system/google/cloud/dataflow/example_dataflow_yaml.py
start_dataflow_yaml_job = DataflowStartYamlJobOperator(
task_id="start_dataflow_yaml_job",
job_name=DATAFLOW_YAML_JOB_NAME,
yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
append_job_name=True,
deferrable=False,
region=REGION,
project_id=PROJECT_ID,
jinja_variables=BQ_VARIABLES,
)
可以通过传递参数 deferrable=True
在可延迟模式下运行此操作符。
tests/system/google/cloud/dataflow/example_dataflow_yaml.py
start_dataflow_yaml_job_def = DataflowStartYamlJobOperator(
task_id="start_dataflow_yaml_job_def",
job_name=DATAFLOW_YAML_JOB_NAME,
yaml_pipeline_file=DATAFLOW_YAML_PIPELINE_FILE_URL,
append_job_name=True,
deferrable=True,
region=REGION,
project_id=PROJECT_ID,
jinja_variables=BQ_VARIABLES_DEF,
expected_terminal_state=DataflowJobStatus.JOB_STATE_DONE,
)
警告
此操作符要求在 Airflow Worker 上安装 gcloud
命令 (Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>__
请参阅Dataflow YAML 参考。
停止流水线¶
要停止一个或多个 Dataflow 流水线,您可以使用 DataflowStopJobOperator
。流式处理流水线默认被排空(drained),将 drain_pipeline
设置为 False
将改为取消它们。提供 job_id
以停止特定作业,或提供 job_name_prefix
以停止所有具有给定名称前缀的作业。
tests/system/google/cloud/dataflow/example_dataflow_native_python.py
stop_dataflow_job = DataflowStopJobOperator(
task_id="stop_dataflow_job",
location=LOCATION,
job_name_prefix="start-python-pipeline",
)
请参阅:停止正在运行的流水线。
删除流水线¶
要删除 Dataflow 流水线,您可以使用 DataflowDeletePipelineOperator
。以下是使用此操作符的示例:
tests/system/google/cloud/dataflow/example_dataflow_pipeline.py
delete_pipeline = DataflowDeletePipelineOperator(
task_id="delete_pipeline",
pipeline_name=PIPELINE_NAME,
project_id=GCP_PROJECT_ID,
trigger_rule=TriggerRule.ALL_DONE,
)
更新流水线¶
流式处理流水线一旦创建并运行,其配置就不可更改,因为它是不可变的。要进行任何修改,您需要更新流水线的定义(例如,更新您的代码或模板),然后提交一个新的作业。实质上,您将创建一个带有所需更新的新流水线实例。
对于批处理流水线,如果一个作业当前正在运行并且您想更新其配置,您必须取消该作业。这是因为 Dataflow 作业一旦开始,就变得不可变。尽管批处理流水线旨在处理有限量的数据并最终自行完成,但您无法更新正在进行的作业。如果在作业运行时需要更改任何参数或流水线逻辑,您将不得不取消当前的运行,然后启动一个带有更新配置的新作业。
如果批处理流水线已经成功完成,则没有正在运行的作业可供更新;新配置仅适用于下一次作业提交。
传感器¶
当作业以异步方式触发时,可以使用传感器来检查特定的作业属性。
DataflowJobStatusSensor (Dataflow 作业状态传感器)
.
tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
wait_for_python_job_async_done = DataflowJobStatusSensor(
task_id="wait_for_python_job_async_done",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
location=LOCATION,
)
可以通过传递参数 deferrable=True
在可延迟模式下运行此操作符。
tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py
wait_for_beam_python_pipeline_job_status_def = DataflowJobStatusSensor(
task_id="wait_for_beam_python_pipeline_job_status_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
expected_statuses=DataflowJobStatus.JOB_STATE_DONE,
location=LOCATION,
deferrable=True,
)
DataflowJobMetricsSensor (Dataflow 作业指标传感器)
.
tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""
def callback(metrics: list[dict]) -> bool:
dag.log.info("Looking for '%s' >= %d", metric_name, value)
for metric in metrics:
context = metric.get("name", {}).get("context", {})
original_name = context.get("original_name", "")
tentative = context.get("tentative", "")
if original_name == "Service-cpu_num_seconds" and not tentative:
return metric["scalar"] >= value
raise AirflowException(f"Metric '{metric_name}' not found in metrics")
return callback
wait_for_python_job_async_metric = DataflowJobMetricsSensor(
task_id="wait_for_python_job_async_metric",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
)
可以通过传递参数 deferrable=True
在可延迟模式下运行此操作符。
tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py
def check_metric_scalar_gte(metric_name: str, value: int) -> Callable:
"""Check is metric greater than equals to given value."""
def callback(metrics: list[dict]) -> bool:
dag.log.info("Looking for '%s' >= %d", metric_name, value)
for metric in metrics:
context = metric.get("name", {}).get("context", {})
original_name = context.get("original_name", "")
tentative = context.get("tentative", "")
if original_name == "Service-cpu_num_seconds" and not tentative:
return metric["scalar"] >= value
raise AirflowException(f"Metric '{metric_name}' not found in metrics")
return callback
wait_for_beam_python_pipeline_job_metric_def = DataflowJobMetricsSensor(
task_id="wait_for_beam_python_pipeline_job_metric_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_metric_scalar_gte(metric_name="Service-cpu_num_seconds", value=100),
fail_on_terminal_state=False,
deferrable=True,
)
DataflowJobMessagesSensor (Dataflow 作业消息传感器)
.
tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
def check_message(messages: list[dict]) -> bool:
"""Check message"""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False
wait_for_python_job_async_message = DataflowJobMessagesSensor(
task_id="wait_for_python_job_async_message",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_message,
fail_on_terminal_state=False,
)
可以通过传递参数 deferrable=True
在可延迟模式下运行此操作符。
tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py
def check_job_message(messages: list[dict]) -> bool:
"""Check job message."""
for message in messages:
if "Adding workflow start and stop steps." in message.get("messageText", ""):
return True
return False
wait_for_beam_python_pipeline_job_message_def = DataflowJobMessagesSensor(
task_id="wait_for_beam_python_pipeline_job_message_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_job_message,
fail_on_terminal_state=False,
deferrable=True,
)
DataflowJobAutoScalingEventsSensor (Dataflow 作业自动扩缩事件传感器)
.
tests/system/google/cloud/dataflow/example_dataflow_native_python_async.py
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
"""Check autoscaling event"""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False
wait_for_python_job_async_autoscaling_event = DataflowJobAutoScalingEventsSensor(
task_id="wait_for_python_job_async_autoscaling_event",
job_id="{{task_instance.xcom_pull('start_python_job_async')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
)
可以通过传递参数 deferrable=True
在可延迟模式下运行此操作符。
tests/system/google/cloud/dataflow/example_dataflow_sensors_deferrable.py
def check_autoscaling_event(autoscaling_events: list[dict]) -> bool:
"""Check autoscaling event."""
for autoscaling_event in autoscaling_events:
if "Worker pool started." in autoscaling_event.get("description", {}).get("messageText", ""):
return True
return False
wait_for_beam_python_pipeline_job_autoscaling_event_def = DataflowJobAutoScalingEventsSensor(
task_id="wait_for_beam_python_pipeline_job_autoscaling_event_def",
job_id="{{task_instance.xcom_pull('start_beam_python_pipeline')['dataflow_job_id']}}",
location=LOCATION,
callback=check_autoscaling_event,
fail_on_terminal_state=False,
deferrable=True,
)
参考¶
有关更多信息,请查看: