Amazon EMR Serverless 操作符¶
Amazon EMR Serverless 是 Amazon EMR 中的一种无服务器选项,使数据分析师和工程师可以轻松运行开源大数据分析框架,而无需配置、管理和扩展集群或服务器。您可以在无需专家规划和管理集群的情况下获得 Amazon EMR 的所有功能和优势。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 的安装
设置连接.
操作符¶
创建 EMR Serverless 应用程序¶
您可以使用 EmrServerlessCreateApplicationOperator
创建新的 EMR Serverless 应用程序。可以通过传递 deferrable=True
作为参数,在可延迟模式下运行此操作符。这需要安装 aiobotocore 模块。
tests/system/amazon/aws/example_emr_serverless.py
emr_serverless_app = EmrServerlessCreateApplicationOperator(
task_id="create_emr_serverless_task",
release_label="emr-6.6.0",
job_type="SPARK",
config={"name": "new_application"},
)
启动 EMR Serverless 作业¶
您可以使用 EmrServerlessStartJobOperator
启动 EMR Serverless 作业。可以通过传递 deferrable=True
作为参数,在可延迟模式下运行此操作符。这需要安装 aiobotocore 模块。
tests/system/amazon/aws/example_emr_serverless.py
start_job = EmrServerlessStartJobOperator(
task_id="start_emr_serverless_job",
application_id=emr_serverless_app_id,
execution_role_arn=role_arn,
job_driver=SPARK_JOB_DRIVER,
configuration_overrides=SPARK_CONFIGURATION_OVERRIDES,
)
打开应用程序用户界面¶
还可以通过传递 enable_application_ui_links=True
作为参数,将操作符配置为生成指向应用程序用户界面和 Spark 标准输出日志的一次性链接。 作业开始运行后,这些链接将在相关任务的“详细信息”部分中提供。如果 enable_application_ui_links=False
,则链接将存在但会灰显。
您需要确保您具有以下 IAM 权限才能生成仪表板链接。
"emr-serverless:GetDashboardForJobRun"
如果为 EMR Serverless 启用了 Amazon S3 或 Amazon CloudWatch 日志,则指向相应控制台的链接也将在任务日志和任务详细信息中提供。
停止 EMR Serverless 应用程序¶
您可以使用 EmrServerlessStopApplicationOperator
停止 EMR Serverless 应用程序。可以通过传递 deferrable=True
作为参数,在可延迟模式下运行此操作符。这需要安装 aiobotocore 模块。
tests/system/amazon/aws/example_emr_serverless.py
stop_app = EmrServerlessStopApplicationOperator(
task_id="stop_application",
application_id=emr_serverless_app_id,
force_stop=True,
)
删除 EMR Serverless 应用程序¶
您可以使用 EmrServerlessDeleteApplicationOperator
删除 EMR Serverless 应用程序。可以通过传递 deferrable=True
作为参数,在可延迟模式下运行此操作符。这需要安装 aiobotocore 模块。
tests/system/amazon/aws/example_emr_serverless.py
delete_app = EmrServerlessDeleteApplicationOperator(
task_id="delete_application",
application_id=emr_serverless_app_id,
)
传感器¶
等待 EMR Serverless 作业状态¶
要监控 EMR Serverless 作业的状态,您可以使用 EmrServerlessJobSensor
。
tests/system/amazon/aws/example_emr_serverless.py
wait_for_job = EmrServerlessJobSensor(
task_id="wait_for_job",
application_id=emr_serverless_app_id,
job_run_id=start_job.output,
# the default is to wait for job completion, here we just wait for the job to be running.
target_states={*EmrServerlessHook.JOB_SUCCESS_STATES, "RUNNING"},
)
等待 EMR Serverless 应用程序状态¶
要监控 EMR Serverless 应用程序的状态,您可以使用 EmrServerlessApplicationSensor
。
tests/system/amazon/aws/example_emr_serverless.py
wait_for_app_creation = EmrServerlessApplicationSensor(
task_id="wait_for_app_creation",
application_id=emr_serverless_app_id,
)