Apache Beam Operator

Apache Beam 是一个开源的统一模型,用于定义批处理和流式数据并行处理 Pipeline。使用其中一个开源 Beam SDK,您可以构建一个程序来定义 Pipeline。然后,Pipeline 由 Beam 支持的分布式处理后端之一执行,这些后端包括 Apache Flink、Apache Spark 和 Google Cloud Dataflow。

注意

当 Apache Beam pipeline 在 Dataflow 服务上运行时,此 Operator 要求在 Airflow Worker 上安装 gcloud 命令 (Google Cloud SDK) <https://cloud.google.com/sdk/docs/install>。

在 Apache Beam 中运行 Python Pipeline

必须为 BeamRunPythonPipelineOperator 指定 py_file 参数,因为它包含要由 Beam 执行的 pipeline。该 Python 文件可以位于 Airflow 能够下载的 GCS 上,或者位于本地文件系统上(提供其绝对路径)。

py_interpreter 参数指定执行 pipeline 时使用的 Python 版本,默认值为 python3。如果您的 Airflow 实例运行在 Python 2 上,请指定 python2 并确保您的 py_file 是 Python 2 文件。为了获得最佳结果,请使用 Python 3。

如果指定了 py_requirements 参数,将创建一个带有指定依赖项的临时 Python 虚拟环境,pipeline 将在该环境中运行。

py_system_site_packages 参数指定是否可在虚拟环境内访问 Airflow 实例中的所有 Python 包(如果指定了 py_requirements 参数),建议避免使用此参数,除非 Dataflow 作业要求这样做。

使用 DirectRunner 运行 Python Pipeline

tests/system/apache/beam/example_python.py

start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_local_direct_runner",
    py_file="apache_beam.examples.wordcount",
    py_options=["-m"],
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
)

tests/system/apache/beam/example_python.py

start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_direct_runner",
    py_file=GCS_PYTHON,
    py_options=[],
    pipeline_options={"output": GCS_OUTPUT},
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
)

您可以使用可推迟模式 (deferrable mode) 来异步运行此 Operator。这可以让 Worker 在知道需要等待时释放资源,并将恢复 Operator 的工作交给 Trigger。因此,当它挂起(推迟)时,它不会占用 Worker 槽位,并且您的集群不会在空闲的 Operator 或 Sensor 上浪费太多资源。

tests/system/apache/beam/example_python_async.py

start_python_pipeline_local_direct_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_local_direct_runner",
    py_file="apache_beam.examples.wordcount",
    py_options=["-m"],
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    deferrable=True,
)

tests/system/apache/beam/example_python_async.py

start_python_pipeline_direct_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_direct_runner",
    py_file=GCS_PYTHON,
    py_options=[],
    pipeline_options={"output": GCS_OUTPUT},
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    deferrable=True,
)

使用 DataflowRunner 运行 Python Pipeline

tests/system/apache/beam/example_python.py

start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_dataflow_runner",
    runner="DataflowRunner",
    py_file=GCS_PYTHON,
    pipeline_options={
        "tempLocation": GCS_TMP,
        "stagingLocation": GCS_STAGING,
        "output": GCS_OUTPUT,
    },
    py_options=[],
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config=DataflowConfiguration(
        job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
    ),
)

tests/system/apache/beam/example_python_dataflow.py

start_python_job_dataflow_runner_async = BeamRunPythonPipelineOperator(
    task_id="start_python_job_dataflow_runner_async",
    runner="DataflowRunner",
    py_file=GCS_PYTHON_DATAFLOW_ASYNC,
    pipeline_options={
        "tempLocation": GCS_TMP,
        "stagingLocation": GCS_STAGING,
        "output": GCS_OUTPUT,
    },
    py_options=[],
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config=DataflowConfiguration(
        job_name="{{task.task_id}}",
        project_id=GCP_PROJECT_ID,
        location="us-central1",
        wait_until_finished=False,
    ),
)

wait_for_python_job_dataflow_runner_async_done = DataflowJobStatusSensor(
    task_id="wait-for-python-job-async-done",
    job_id="{{task_instance.xcom_pull('start_python_job_dataflow_runner_async')['dataflow_job_id']}}",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    project_id=GCP_PROJECT_ID,
    location="us-central1",
)

start_python_job_dataflow_runner_async >> wait_for_python_job_dataflow_runner_async_done

您可以使用可推迟模式 (deferrable mode) 来异步运行此 Operator。这可以让 Worker 在知道需要等待时释放资源,并将恢复 Operator 的工作交给 Trigger。因此,当它挂起(推迟)时,它不会占用 Worker 槽位,并且您的集群不会在空闲的 Operator 或 Sensor 上浪费太多资源。

tests/system/apache/beam/example_python_async.py

start_python_pipeline_dataflow_runner = BeamRunPythonPipelineOperator(
    task_id="start_python_pipeline_dataflow_runner",
    runner="DataflowRunner",
    py_file=GCS_PYTHON,
    pipeline_options={
        "tempLocation": GCS_TMP,
        "stagingLocation": GCS_STAGING,
        "output": GCS_OUTPUT,
    },
    py_options=[],
    py_requirements=["apache-beam[gcp]==2.59.0"],
    py_interpreter="python3",
    py_system_site_packages=False,
    dataflow_config=DataflowConfiguration(
        job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
    ),
    deferrable=True,
)


在 Apache Beam 中运行 Java Pipeline

对于 Java pipeline,必须为 BeamRunJavaPipelineOperator 指定 jar 参数,因为它包含要由 Apache Beam 执行的 pipeline。该 JAR 文件可以位于 Airflow 能够下载的 GCS 上,或者位于本地文件系统上(提供其绝对路径)。

使用 DirectRunner 运行 Java Pipeline

tests/system/apache/beam/example_beam.py

jar_to_local_direct_runner = GCSToLocalFilesystemOperator(
    task_id="jar_to_local_direct_runner",
    bucket=GCS_JAR_DIRECT_RUNNER_BUCKET_NAME,
    object_name=GCS_JAR_DIRECT_RUNNER_OBJECT_NAME,
    filename="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
)

start_java_pipeline_direct_runner = BeamRunJavaPipelineOperator(
    task_id="start_java_pipeline_direct_runner",
    jar="/tmp/beam_wordcount_direct_runner_{{ ds_nodash }}.jar",
    pipeline_options={
        "output": "/tmp/start_java_pipeline_direct_runner",
        "inputFile": GCS_INPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
)

jar_to_local_direct_runner >> start_java_pipeline_direct_runner

使用 DataflowRunner 运行 Java Pipeline

tests/system/apache/beam/example_java_dataflow.py

jar_to_local_dataflow_runner = GCSToLocalFilesystemOperator(
    task_id="jar_to_local_dataflow_runner",
    bucket=GCS_JAR_DATAFLOW_RUNNER_BUCKET_NAME,
    object_name=GCS_JAR_DATAFLOW_RUNNER_OBJECT_NAME,
    filename="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
)

start_java_pipeline_dataflow = BeamRunJavaPipelineOperator(
    task_id="start_java_pipeline_dataflow",
    runner="DataflowRunner",
    jar="/tmp/beam_wordcount_dataflow_runner_{{ ds_nodash }}.jar",
    pipeline_options={
        "tempLocation": GCS_TMP,
        "stagingLocation": GCS_STAGING,
        "output": GCS_OUTPUT,
    },
    job_class="org.apache.beam.examples.WordCount",
    dataflow_config={"job_name": "{{task.task_id}}", "location": "us-central1"},
)

jar_to_local_dataflow_runner >> start_java_pipeline_dataflow


在 Apache Beam 中运行 Go Pipeline

必须为 BeamRunGoPipelineOperator 指定 go_file 参数,因为它包含要由 Beam 执行的 pipeline。该 Go 文件可以位于 Airflow 能够下载的 GCS 上,或者位于本地文件系统上(提供其绝对路径)。当从本地文件系统运行时,等效命令为 go run <go_file>。如果从 GCS 存储桶拉取,它将事先使用 go run init example.com/maingo mod tidy 初始化模块并安装依赖项。

使用 DirectRunner 运行 Go Pipeline

tests/system/apache/beam/example_go.py

start_go_pipeline_local_direct_runner = BeamRunGoPipelineOperator(
    task_id="start_go_pipeline_local_direct_runner",
    go_file="files/apache_beam/examples/wordcount.go",
)

tests/system/apache/beam/example_go.py

start_go_pipeline_direct_runner = BeamRunGoPipelineOperator(
    task_id="start_go_pipeline_direct_runner",
    go_file=GCS_GO,
    pipeline_options={"output": GCS_OUTPUT},
)

使用 DataflowRunner 运行 Go Pipeline

tests/system/apache/beam/example_go.py

start_go_pipeline_dataflow_runner = BeamRunGoPipelineOperator(
    task_id="start_go_pipeline_dataflow_runner",
    runner="DataflowRunner",
    go_file=GCS_GO,
    pipeline_options={
        "tempLocation": GCS_TMP,
        "stagingLocation": GCS_STAGING,
        "output": GCS_OUTPUT,
        "WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
    },
    dataflow_config=DataflowConfiguration(
        job_name="{{task.task_id}}", project_id=GCP_PROJECT_ID, location="us-central1"
    ),
)

tests/system/apache/beam/example_go_dataflow.py

start_go_job_dataflow_runner_async = BeamRunGoPipelineOperator(
    task_id="start_go_job_dataflow_runner_async",
    runner="DataflowRunner",
    go_file=GCS_GO_DATAFLOW_ASYNC,
    pipeline_options={
        "tempLocation": GCS_TMP,
        "stagingLocation": GCS_STAGING,
        "output": GCS_OUTPUT,
        "WorkerHarnessContainerImage": "apache/beam_go_sdk:latest",
    },
    dataflow_config=DataflowConfiguration(
        job_name="{{task.task_id}}",
        project_id=GCP_PROJECT_ID,
        location="us-central1",
        wait_until_finished=False,
    ),
)

wait_for_go_job_dataflow_runner_async_done = DataflowJobStatusSensor(
    task_id="wait-for-go-job-async-done",
    job_id="{{task_instance.xcom_pull('start_go_job_dataflow_runner_async')['dataflow_job_id']}}",
    expected_statuses={DataflowJobStatus.JOB_STATE_DONE},
    project_id=GCP_PROJECT_ID,
    location="us-central1",
)

start_go_job_dataflow_runner_async >> wait_for_go_job_dataflow_runner_async_done

参考

如需更多信息,请查阅:

此条目有帮助吗?