airflow.providers.apache.beam.hooks.beam

本模块包含一个 Apache Beam Hook。

BeamRunnerType

列出 runner 类型辅助类。

BeamHook

Apache Beam 的 Hook。

BeamAsyncHook

Apache Beam 的异步 Hook。

函数

beam_options_to_args(options)

从参数字典返回格式化的 pipeline 选项。

process_fd(proc, fd, log[, process_line_callback, ...])

将输出打印到日志。

run_beam_command(cmd, log[, process_line_callback, ...])

在子进程中运行 pipeline 命令。

模块内容

class airflow.providers.apache.beam.hooks.beam.BeamRunnerType[source]

列出 runner 类型辅助类。

有关 runner 的更多信息,请参阅:https://beam.apache.ac.cn/documentation/

DataflowRunner = 'DataflowRunner'[source]
DirectRunner = 'DirectRunner'[source]
SparkRunner = 'SparkRunner'[source]
FlinkRunner = 'FlinkRunner'[source]
SamzaRunner = 'SamzaRunner'[source]
NemoRunner = 'NemoRunner'[source]
JetRunner = 'JetRunner'[source]
Twister2Runner = 'Twister2Runner'[source]
airflow.providers.apache.beam.hooks.beam.beam_options_to_args(options)[source]

从参数字典返回格式化的 pipeline 选项。

此方法的逻辑应与 Apache Beam 兼容:https://github.com/apache/beam/blob/77f57d1fc498592089e32701b45505bbdccccd47/sdks/python/ apache_beam/options/pipeline_options.py#L260-L268

警告:如果需要修改,请检查最新的 main 分支实现!

参数:

options (dict) – 包含选项的字典

返回值:

参数列表

返回类型:

list[str]

airflow.providers.apache.beam.hooks.beam.process_fd(proc, fd, log, process_line_callback=None, is_dataflow_job_id_exist_callback=None)[source]

将输出打印到日志。

参数:
  • proc – 子进程。

  • fd – 文件描述符。

  • process_line_callback (Callable[[str], None] | None) – 可选回调函数,可用于处理 stdout 和 stderr 以检测 job id。

  • log (logging.Logger) – logger。

airflow.providers.apache.beam.hooks.beam.run_beam_command(cmd, log, process_line_callback=None, working_directory=None, is_dataflow_job_id_exist_callback=None)[source]

在子进程中运行 pipeline 命令。

参数:
  • cmd (list[str]) – 在子进程中运行的命令部分

  • process_line_callback (Callable[[str], None] | None) – 可选回调函数,可用于处理 stdout 和 stderr 以检测 job id

  • working_directory (str | None) – 工作目录

  • log (logging.Logger) – logger。

class airflow.providers.apache.beam.hooks.beam.BeamHook(runner)[source]

基类:airflow.hooks.base.BaseHook

Apache Beam 的 Hook。

Hook 中所有使用 project_id 的方法必须使用关键字参数而非位置参数调用。

参数:

runner (str) – Runner 类型

runner[source]
start_python_pipeline(variables, py_file, py_options, py_interpreter='python3', py_requirements=None, py_system_site_packages=False, process_line_callback=None, is_dataflow_job_id_exist_callback=None)[source]

启动 Apache Beam Python pipeline。

参数:
  • variables (dict) – 传递给 pipeline 的变量。

  • py_file (str) – 要执行的 Python 文件路径。

  • py_options (list[str]) – 附加选项。

  • py_interpreter (str) – Apache Beam pipeline 的 Python 版本。如果为 None,则默认为 python3。要跟踪 Beam 支持的 Python 版本和相关问题,请检查:https://issues.apache.org/jira/browse/BEAM-1251

  • py_requirements (list[str] | None) –

    要安装的附加 Python 包。如果为此参数传递了值,则会创建一个新的虚拟环境并安装附加包。

    如果您的系统上未安装 apache-beam 包,或者您想使用其他版本,也可以安装该包。

  • py_system_site_packages (bool) –

    是否在您的 virtualenv 中包含 system_site_packages。有关更多信息,请参阅 virtualenv 文档。

    仅当 py_requirements 参数不为 None 时,此选项才相关。

  • process_line_callback (Callable[[str], None] | None) – (可选)回调函数,可用于处理 stdout 和 stderr 文件描述符的每一行。

start_java_pipeline(variables, jar, job_class=None, process_line_callback=None, is_dataflow_job_id_exist_callback=None)[source]

启动 Apache Beam Java pipeline。

参数:
  • variables (dict) – 传递给 job 的变量。

  • jar (str) – pipeline 的 jar 包名称

  • job_class (str | None) – pipeline 的 Java 类名称。

  • process_line_callback (Callable[[str], None] | None) – (可选)回调函数,可用于处理 stdout 和 stderr 文件描述符的每一行。

start_go_pipeline(variables, go_file, process_line_callback=None, should_init_module=False)[source]

使用源文件启动 Apache Beam Go pipeline。

参数:
  • variables (dict) – 传递给 job 的变量。

  • go_file (str) – 包含您的 beam pipeline 的 Go 文件路径。

  • process_line_callback (Callable[[str], None] | None) – (可选)回调函数,可用于处理 stdout 和 stderr 文件描述符的每一行。

  • should_init_module (bool) – 如果为 False(默认),将只执行 go run 命令。如果为 True,将使用 go mod initgo mod tidy 初始化模块和依赖项,这在使用 GCSHook 拉取源码时很有用。

返回值:

返回类型:

start_go_pipeline_with_binary(variables, launcher_binary, worker_binary, process_line_callback=None)[source]

使用可执行二进制文件启动 Apache Beam Go pipeline。

参数:
  • variables (dict) – 传递给 job 的变量。

  • launcher_binary (str) – 为启动平台编译的二进制文件路径。

  • worker_binary (str) – 为 worker 平台编译的二进制文件路径。

  • process_line_callback (Callable[[str], None] | None) – (可选)回调函数,可用于处理 stdout 和 stderr 文件描述符的每一行。

class airflow.providers.apache.beam.hooks.beam.BeamAsyncHook(runner)[source]

基类:BeamHook

Apache Beam 的异步 Hook。

参数:

runner (str) – Runner 类型。

runner[source]
async start_python_pipeline_async(variables, py_file, py_options=None, py_interpreter='python3', py_requirements=None, py_system_site_packages=False, process_line_callback=None)[source]

启动 Apache Beam Python pipeline。

参数:
  • variables (dict) – 传递给 pipeline 的变量。

  • py_file (str) – 要执行的 Python 文件路径。

  • py_options (list[str] | None) – 附加选项。

  • py_interpreter (str) – Apache Beam pipeline 的 Python 版本。如果为 None,则默认为 python3。要跟踪 Beam 支持的 Python 版本和相关问题,请检查:https://issues.apache.org/jira/browse/BEAM-1251

  • py_requirements (list[str] | None) – 要安装的附加 Python 包。如果为此参数传递了值,则会创建一个新的虚拟环境并安装附加包。如果您的系统上未安装 apache-beam 包,或者您想使用其他版本,也可以安装该包。

  • py_system_site_packages (bool) – 是否在您的 virtualenv 中包含 system_site_packages。有关更多信息,请参阅 virtualenv 文档。仅当 py_requirements 参数不为 None 时,此选项才相关。

  • process_line_callback (Callable[[str], None] | None) – 可选回调函数,可用于处理 stdout 和 stderr 以检测 job id

async start_java_pipeline_async(variables, jar, job_class=None, process_line_callback=None)[source]

启动 Apache Beam Java pipeline。

参数:
  • variables (dict) – 传递给 job 的变量。

  • jar (str) – pipeline 的 jar 包名称。

  • job_class (str | None) – pipeline 的 Java 类名称。

  • process_line_callback (Callable[[str], None] | None) – 可选回调函数,可用于处理 stdout 和 stderr 以检测 job id

返回值:

Beam 命令执行返回码。

async start_pipeline_async(variables, command_prefix, working_directory=None, process_line_callback=None)[source]
async run_beam_command_async(cmd, log, working_directory=None, process_line_callback=None)[source]

在子进程中运行 pipeline 命令。

参数:
  • cmd (list[str]) – 在子进程中运行的命令部分

  • working_directory (str | None) – 工作目录

  • log (logging.Logger) – logger

  • process_line_callback (Callable[[str], None] | None) – 可选回调函数,可用于处理 stdout 和 stderr 以检测 job id

async read_logs(stream_reader, process_line_callback=None)[source]

本条目是否有帮助?