airflow.providers.apache.spark.hooks.spark_jdbc

SparkJDBCHook

扩展 SparkSubmitHook,用于使用 Apache Spark 在基于 JDBC 的数据库之间传输数据。

模块内容

class airflow.providers.apache.spark.hooks.spark_jdbc.SparkJDBCHook(spark_app_name='airflow-spark-jdbc', spark_conn_id=default_conn_name, spark_conf=None, spark_py_files=None, spark_files=None, spark_jars=None, num_executors=None, executor_cores=None, executor_memory=None, driver_memory=None, verbose=False, principal=None, keytab=None, cmd_type='spark_to_jdbc', jdbc_table=None, jdbc_conn_id='jdbc-default', jdbc_driver=None, metastore_table=None, jdbc_truncate=False, save_mode=None, save_format=None, batch_size=None, fetch_size=None, num_partitions=None, partition_column=None, lower_bound=None, upper_bound=None, create_table_column_types=None, *args, use_krb5ccache=False, **kwargs)[]

基类: airflow.providers.apache.spark.hooks.spark_submit.SparkSubmitHook

扩展 SparkSubmitHook,用于使用 Apache Spark 在基于 JDBC 的数据库之间传输数据。

参数:
  • spark_app_name (str) – 作业名称 (默认为 airflow-spark-jdbc)

  • spark_conn_id (str) – 在 Airflow 管理界面中配置的 Spark 连接 ID

  • spark_conf (dict[str, Any] | None) – 任何额外的 Spark 配置属性

  • spark_py_files (str | None) – 使用的额外 Python 文件(.zip, .egg, 或 .py)

  • spark_files (str | None) – 要上传到运行作业的容器的额外文件

  • spark_jars (str | None) – 要上传并添加到驱动程序和执行程序 classpath 的额外 jar 包

  • num_executors (int | None) – 要运行的执行程序数量。应设置此参数以管理与 JDBC 数据库建立的连接数

  • executor_cores (int | None) – 每个执行程序的核数

  • executor_memory (str | None) – 每个执行程序的内存 (例如 1000M, 2G)

  • driver_memory (str | None) – 分配给驱动程序的内存 (例如 1000M, 2G)

  • verbose (bool) – 是否将 verbose 标志传递给 spark-submit 以进行调试

  • keytab (str | None) – 包含 keytab 的文件的完整路径

  • principal (str | None) – 用于 keytab 的 kerberos principal 名称

  • cmd_type (str) – 数据流向。2个可能的值: spark_to_jdbc: Spark 将数据从 metastore 写入 jdbc jdbc_to_spark: Spark 将数据从 jdbc 写入 metastore

  • jdbc_table (str | None) – JDBC 表的名称

  • jdbc_conn_id (str) – 用于连接 JDBC 数据库的连接 ID

  • jdbc_driver (str | None) – 用于 JDBC 连接的 JDBC 驱动程序名称。该驱动程序(通常是一个 jar 包)应通过 ‘jars’ 参数传入

  • metastore_table (str | None) – metastore 表的名称,

  • jdbc_truncate (bool) – (仅适用于 spark_to_jdbc) Spark 是否应截断或删除并重新创建 JDBC 表。仅当 ‘save_mode’ 设置为 Overwrite 时生效。此外,如果 schema 不同,Spark 无法截断,将删除并重新创建

  • save_mode (str | None) – 要使用的 Spark save-mode (例如 overwrite, append 等)

  • save_format (str | None) – (仅适用于 jdbc_to_spark) 要使用的 Spark save-format (例如 parquet)

  • batch_size (int | None) – (仅适用于 spark_to_jdbc) 每次往返 JDBC 数据库插入的批次大小。默认为 1000

  • fetch_size (int | None) – (仅适用于 jdbc_to_spark) 每次从 JDBC 数据库获取的批次大小。默认为 JDBC 驱动程序决定

  • num_partitions (int | None) – Spark 可以同时使用的最大分区数,适用于 spark_to_jdbc 和 jdbc_to_spark 操作。这也会限制可以打开的 JDBC 连接数

  • partition_column (str | None) – (仅适用于 jdbc_to_spark) 用于按其对 metastore 表进行分区的数字列。如果指定,还必须指定:num_partitions, lower_bound, upper_bound

  • lower_bound (str | None) – (仅适用于 jdbc_to_spark) 要获取的数字分区列范围的下界。如果指定,还必须指定:num_partitions, partition_column, upper_bound

  • upper_bound (str | None) – (仅适用于 jdbc_to_spark) 要获取的数字分区列范围的上界。如果指定,还必须指定:num_partitions, partition_column, lower_bound

  • create_table_column_types (str | None) – (仅适用于 spark_to_jdbc) 创建表时,要使用的数据库列数据类型,而非默认类型。数据类型信息应采用与 CREATE TABLE 列语法相同的格式指定 (例如: “name CHAR(64), comments VARCHAR(1024)”)。指定的类型应是有效的 spark sql 数据类型。

  • use_krb5ccache (bool) – 如果为 True,则配置 Spark 使用票据缓存而不是依赖 keytab 进行 Kerberos 登录

conn_name_attr = 'spark_conn_id'[]
default_conn_name = 'spark_default'[]
conn_type = 'spark_jdbc'[]
hook_name = 'Spark JDBC'[]
submit_jdbc_job()[]

提交 Spark JDBC 作业。

get_conn()[]

返回 hook 的连接。

此条目有帮助吗?