Amazon EMR Serverless Operators

Amazon EMR Serverless 是 Amazon EMR 中的一种无服务器选项,它使数据分析师和工程师可以轻松运行开源大数据分析框架,而无需配置、管理和扩展集群或服务器。您可以获得 Amazon EMR 的所有特性和优势,而无需专家来规划和管理集群。

先决条件任务

要使用这些 Operator,您必须执行以下几项操作

Operators

创建 EMR Serverless 应用

您可以使用 EmrServerlessCreateApplicationOperator 创建新的 EMR Serverless 应用。通过将 deferrable=True 作为参数传递,此 Operator 可以在可延迟模式下运行。这需要安装 aiobotocore 模块。

tests/system/amazon/aws/example_emr_serverless.py

emr_serverless_app = EmrServerlessCreateApplicationOperator(
    task_id="create_emr_serverless_task",
    release_label="emr-6.6.0",
    job_type="SPARK",
    config={"name": "new_application"},
)

启动 EMR Serverless 作业

您可以使用 EmrServerlessStartJobOperator 启动 EMR Serverless 作业。通过将 deferrable=True 作为参数传递,此 Operator 可以在可延迟模式下运行。这需要安装 aiobotocore 模块。

tests/system/amazon/aws/example_emr_serverless.py

start_job = EmrServerlessStartJobOperator(
    task_id="start_emr_serverless_job",
    application_id=emr_serverless_app_id,
    execution_role_arn=role_arn,
    job_driver=SPARK_JOB_DRIVER,
    configuration_overrides=SPARK_CONFIGURATION_OVERRIDES,
)

打开应用 UI

此 Operator 还可以通过传递 enable_application_ui_links=True 参数来配置生成应用 UI 和 Spark stdout 日志的一次性链接。作业开始运行后,这些链接将在相关任务的“详细信息”部分中提供。如果 enable_application_ui_links=False,则链接将显示但置灰不可用。

您需要确保拥有以下 IAM 权限才能生成仪表板链接。

"emr-serverless:GetDashboardForJobRun"

如果为 EMR Serverless 启用了 Amazon S3 或 Amazon CloudWatch 日志,则任务日志和任务详细信息中也将提供指向相应控制台的链接。

停止 EMR Serverless 应用

您可以使用 EmrServerlessStopApplicationOperator 停止 EMR Serverless 应用。通过将 deferrable=True 作为参数传递,此 Operator 可以在可延迟模式下运行。这需要安装 aiobotocore 模块。

tests/system/amazon/aws/example_emr_serverless.py

stop_app = EmrServerlessStopApplicationOperator(
    task_id="stop_application",
    application_id=emr_serverless_app_id,
    force_stop=True,
)

删除 EMR Serverless 应用

您可以使用 EmrServerlessDeleteApplicationOperator 删除 EMR Serverless 应用。通过将 deferrable=True 作为参数传递,此 Operator 可以在可延迟模式下运行。这需要安装 aiobotocore 模块。

tests/system/amazon/aws/example_emr_serverless.py

delete_app = EmrServerlessDeleteApplicationOperator(
    task_id="delete_application",
    application_id=emr_serverless_app_id,
)

Sensors

等待 EMR Serverless 作业状态

要监控 EMR Serverless 作业的状态,您可以使用 EmrServerlessJobSensor

tests/system/amazon/aws/example_emr_serverless.py

wait_for_job = EmrServerlessJobSensor(
    task_id="wait_for_job",
    application_id=emr_serverless_app_id,
    job_run_id=start_job.output,
    # the default is to wait for job completion, here we just wait for the job to be running.
    target_states={*EmrServerlessHook.JOB_SUCCESS_STATES, "RUNNING"},
)

等待 EMR Serverless 应用状态

要监控 EMR Serverless 应用的状态,您可以使用 EmrServerlessApplicationSensor

tests/system/amazon/aws/example_emr_serverless.py

wait_for_app_creation = EmrServerlessApplicationSensor(
    task_id="wait_for_app_creation",
    application_id=emr_serverless_app_id,
)

参考

此条目是否有帮助?