airflow.providers.apache.spark.operators.spark_jdbc

模块内容

SparkJDBCOperator

扩展 SparkSubmitOperator,以使用 Apache Spark 执行与基于 JDBC 的数据库之间的数据传输。

class airflow.providers.apache.spark.operators.spark_jdbc.SparkJDBCOperator(*, spark_app_name='airflow-spark-jdbc', spark_conn_id='spark-default', spark_conf=None, spark_py_files=None, spark_files=None, spark_jars=None, cmd_type='spark_to_jdbc', jdbc_table=None, jdbc_conn_id='jdbc-default', jdbc_driver=None, metastore_table=None, jdbc_truncate=False, save_mode=None, save_format=None, batch_size=None, fetch_size=None, num_partitions=None, partition_column=None, lower_bound=None, upper_bound=None, create_table_column_types=None, **kwargs)[源代码]

基类:airflow.providers.apache.spark.operators.spark_submit.SparkSubmitOperator

扩展 SparkSubmitOperator,以使用 Apache Spark 执行与基于 JDBC 的数据库之间的数据传输。

与 SparkSubmitOperator 一样,它假设 “spark-submit” 二进制文件在 PATH 中可用。

另请参阅

有关如何使用此操作符的更多信息,请查看指南:SparkJDBCOperator

参数
  • spark_app_name (str) – 作业名称(默认为 airflow-spark-jdbc)

  • spark_conn_id (str) – 在 Airflow 管理中配置的 spark 连接 ID

  • spark_conf (dict[str, Any] | None) – 任何其他的 Spark 配置属性

  • spark_py_files (str | None) – 使用的附加 python 文件(.zip、.egg 或 .py)

  • spark_files (str | None) – 要上传到运行作业的容器的其他文件

  • spark_jars (str | None) – 要上传并添加到驱动程序和执行程序类路径的额外 jar 包

  • cmd_type (str) – 数据应如何流动。2 个可能的值:spark_to_jdbc:spark 将数据从 metastore 写入到 jdbc;jdbc_to_spark:spark 将数据从 jdbc 写入到 metastore

  • jdbc_table (str | None) – JDBC 表的名称

  • jdbc_conn_id (str) – 用于连接到 JDBC 数据库的连接 ID

  • jdbc_driver (str | None) – 用于 JDBC 连接的 JDBC 驱动程序的名称。此驱动程序(通常是一个 jar 包)应在 'jars' 参数中传递

  • metastore_table (str | None) – metastore 表的名称

  • jdbc_truncate (bool) – (仅限 spark_to_jdbc) Spark 是否应截断或删除并重新创建 JDBC 表。只有当 'save_mode' 设置为 Overwrite 时,此选项才会生效。此外,如果模式不同,Spark 无法截断,将会删除并重新创建

  • save_mode (str | None) – 要使用的 Spark 保存模式(例如,overwrite、append 等)

  • save_format (str | None) – (仅限 jdbc_to_spark) 要使用的 Spark 保存格式(例如,parquet)

  • batch_size (int | None) – (仅限 spark_to_jdbc) 每次往返 JDBC 数据库时要插入的批次大小。默认为 1000

  • fetch_size (int | None) – (仅限 jdbc_to_spark) 每次往返 JDBC 数据库时要获取的批次大小。默认值取决于 JDBC 驱动程序

  • num_partitions (int | None) – Spark 可以同时使用的最大分区数,适用于 spark_to_jdbc 和 jdbc_to_spark 操作。这也会限制可以打开的 JDBC 连接数

  • partition_column (str | None) – (仅限 jdbc_to_spark) 用于对 metastore 表进行分区的数字列。如果指定,还必须指定:num_partitions、lower_bound、upper_bound

  • lower_bound (str | None) – (仅限 jdbc_to_spark) 要提取的数字分区列范围的下限。如果指定,还必须指定:num_partitions、partition_column、upper_bound

  • upper_bound (str | None) – (仅限 jdbc_to_spark) 要提取的数字分区列范围的上限。如果指定,还必须指定:num_partitions、partition_column、lower_bound

  • create_table_column_types (str | None) – (仅限 spark_to_jdbc) 创建表时要使用的数据库列数据类型,而不是默认值。数据类型信息应以与 CREATE TABLE 列语法相同的格式指定(例如:“name CHAR(64), comments VARCHAR(1024)”)。指定的类型应为有效的 spark sql 数据类型。

  • kwargs (Any) – 传递给 SparkSubmitOperator 的 kwargs。

execute(context)[源代码]

调用 SparkSubmitHook 以运行提供的 spark 作业。

on_kill()[源代码]

覆盖此方法,以在任务实例被终止时清理子进程。

操作符中对 threading、subprocess 或 multiprocessing 模块的任何使用都需要清理,否则会留下幽灵进程。

此条目是否对您有帮助?