Amazon EMR on Amazon EKS¶
Amazon EMR on EKS 为 Amazon EMR 提供了一种部署选项,允许您在 Amazon EKS 上运行开源大数据框架。
先决条件任务¶
要使用这些 Operator,您需要做几件事
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
Operator¶
创建 Amazon EMR EKS 虚拟集群¶
EmrEksCreateClusterOperator
将创建 Amazon EMR on EKS 虚拟集群。下面的示例 DAG 展示了如何创建 EMR on EKS 虚拟集群。
要在 Amazon EKS 上创建 Amazon EMR 集群,您需要指定一个虚拟集群名称、您希望使用的 eks 集群以及一个 eks 命名空间。
有关更多详细信息,请参阅 EMR on EKS 开发指南。
tests/system/amazon/aws/example_emr_eks.py
create_emr_eks_cluster = EmrEksCreateClusterOperator(
task_id="create_emr_eks_cluster",
virtual_cluster_name=virtual_cluster_name,
eks_cluster_name=eks_cluster_name,
eks_namespace=eks_namespace,
)
向 Amazon EMR 虚拟集群提交作业¶
注意
本示例假设您已经配置了 EMR on EKS 虚拟集群。有关更多信息,请参阅 EMR on EKS 入门指南。
EmrContainerOperator
将向 Amazon EMR on Amazon EKS 虚拟集群提交新作业。下面的示例作业计算数学常数 Pi
。在生产作业中,您通常会引用 Amazon Simple Storage Service (S3) 上的 Spark 脚本。
要为 Amazon EMR on Amazon EKS 创建作业,您需要指定您的虚拟集群 ID、您要使用的 Amazon EMR 版本、您的 IAM 执行角色以及 Spark 提交参数。
您还可以选择提供配置覆盖,例如 Spark、Hive 或 Log4j 属性,以及将 Spark 日志发送到 Amazon S3 或 Amazon Cloudwatch 的监控配置。
在此示例中,我们展示了如何添加 applicationConfiguration
以使用 AWS Glue Data Catalog,以及 monitoringConfiguration
以将日志发送到 Amazon CloudWatch 中的 /aws/emr-eks-spark
日志组。有关作业配置的更多详细信息,请参阅 EMR on EKS 指南。
tests/system/amazon/aws/example_emr_eks.py
job_driver_arg = {
"sparkSubmitJobDriver": {
"entryPoint": f"s3://{s3_bucket_name}/{S3_FILE_NAME}",
"sparkSubmitParameters": "--conf spark.executors.instances=2 --conf spark.executors.memory=2G "
"--conf spark.executor.cores=2 --conf spark.driver.cores=1",
}
}
configuration_overrides_arg = {
"monitoringConfiguration": {
"cloudWatchMonitoringConfiguration": {
"logGroupName": "/emr-eks-jobs",
"logStreamNamePrefix": "airflow",
}
},
}
我们将 virtual_cluster_id
和 execution_role_arn
值作为 operator 参数传递,但您可以将它们存储在连接中或在 DAG 中提供。您的 AWS 区域应在 aws_default
连接中定义为 {"region_name": "us-east-1"}
,或者是一个通过 aws_conn_id
参数传递给 operator 的自定义连接名称。operator 返回作业运行的作业 ID。
tests/system/amazon/aws/example_emr_eks.py
job_starter = EmrContainerOperator(
task_id="start_job",
virtual_cluster_id=str(create_emr_eks_cluster.output),
execution_role_arn=job_role_arn,
release_label="emr-7.0.0-latest",
job_driver=job_driver_arg,
configuration_overrides=configuration_overrides_arg,
name="pi.py",
)
Sensor¶
等待 Amazon EMR 虚拟集群作业¶
要等待 Amazon EMR 虚拟集群作业的状态达到终端状态,您可以使用 EmrContainerSensor
tests/system/amazon/aws/example_emr_eks.py
job_waiter = EmrContainerSensor(
task_id="job_waiter",
virtual_cluster_id=str(create_emr_eks_cluster.output),
job_id=str(job_starter.output),
)