阿里云 AnalyticDB Spark 操作符¶
开发 Spark 批处理应用程序¶
目的¶
此示例 DAG 使用 AnalyticDBSparkBatchOperator
提交 Spark Pi 和 Spark 逻辑回归应用程序。
定义任务¶
在以下代码中,我们提交 Spark Pi 和 Spark 逻辑回归应用程序。
with DAG(
dag_id=DAG_ID,
start_date=datetime(2021, 1, 1),
default_args={"cluster_id": "your cluster", "rg_name": "your resource group", "region": "your region"},
max_active_runs=1,
catchup=False,
) as dag:
spark_pi = AnalyticDBSparkBatchOperator(
task_id="task1",
file="local:///tmp/spark-examples.jar",
class_name="org.apache.spark.examples.SparkPi",
)
spark_lr = AnalyticDBSparkBatchOperator(
task_id="task2",
file="local:///tmp/spark-examples.jar",
class_name="org.apache.spark.examples.SparkLR",
)
spark_pi >> spark_lr
from tests.system.utils.watcher import watcher
# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()