PySpark 装饰器¶
如果可用,在 @task.pyspark
装饰器中包装的 Python 可调用对象会注入 SparkSession 和 SparkContext 对象。
参数¶
以下参数可以传递给装饰器
- conn_id:str
用于连接到 Spark 集群的连接 ID。如果未指定,则 Spark 主节点设置为
local[*]
。- config_kwargs:dict
用于初始化 SparkConf 对象的 kwargs。 这将覆盖连接中设置的 Spark 配置选项。
示例¶
以下示例演示如何使用 @task.pyspark
装饰器。 请注意,spark
和 sc
对象会被注入到函数中。
@task.pyspark(conn_id="spark-local")
def spark_task(spark: SparkSession, sc: SparkContext) -> pd.DataFrame:
df = spark.createDataFrame(
[
(1, "John Doe", 21),
(2, "Jane Doe", 22),
(3, "Joe Bloggs", 23),
],
["id", "name", "age"],
)
df.show()
return df.toPandas()
Spark Connect¶
在 Apache Spark 3.4 中,Spark Connect 引入了一种解耦的客户端-服务器架构,允许使用 DataFrame API 远程连接到 Spark 集群。在 Airflow 中,使用 Spark Connect 是使用 PySpark 装饰器的首选方式,因为它不需要在与 Airflow 相同的主机上运行 Spark 驱动程序。要使用 Spark Connect,请在主机 URL 前面加上 sc://
。 例如,sc://spark-cluster:15002
。
身份验证¶
Spark Connect 没有内置的身份验证。 然而,gRPC HTTP/2 接口允许使用身份验证通过身份验证代理与 Spark Connect 服务器通信。 要使用身份验证,请确保创建 Spark Connect
连接并设置正确的凭据。