阿里云 AnalyticDB Spark 操作符¶
概述¶
Airflow 与阿里云 AnalyticDB Spark 的集成提供了多个操作符,用于开发 Spark 批处理和 SQL 应用程序。
开发 Spark 批处理应用程序¶
目的¶
本示例 Dag 使用 AnalyticDBSparkBatchOperator 提交 Spark Pi 和 Spark Logistic 回归应用程序。
定义任务¶
在下面的代码中,我们提交 Spark Pi 和 Spark Logistic 回归应用程序。
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
schedule=None,
default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
max_active_runs=1,
catchup=False,
) as dag:
spark_pi = AnalyticDBSparkBatchOperator(
task_id="task1",
file="local:///tmp/spark-examples.jar",
class_name="org.apache.spark.examples.SparkPi",
cluster_id="<your cluster id>",
rg_name="<your resource group name>",
)
spark_lr = AnalyticDBSparkBatchOperator(
task_id="task2",
file="local:///tmp/spark-examples.jar",
class_name="org.apache.spark.examples.SparkLR",
cluster_id="<your cluster id>",
rg_name="<your resource group name>",
)
# Replace the above cluster_id and rg_name with your own values.
spark_pi >> spark_lr
from tests_common.test_utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()