airflow.providers.amazon.aws.operators.emr
¶
模块内容¶
类¶
一个将步骤添加到现有 EMR job_flow 的操作符。 |
|
一个启动 EMR 笔记本执行的操作符。 |
|
一个停止正在运行的 EMR 笔记本执行的操作符。 |
|
一个在 EKS 虚拟集群上创建 EMR 的操作符。 |
|
一个将作业提交到 EKS 虚拟集群上的 EMR 的操作符。 |
|
创建一个 EMR JobFlow,从 EMR 连接读取配置。 |
|
一个修改现有 EMR 集群的操作符。 |
|
终止 EMR JobFlow 的操作符。 |
|
创建 Serverless EMR 应用程序的操作符。 |
|
启动 EMR Serverless 作业的操作符。 |
|
停止 EMR Serverless 应用程序的操作符。 |
|
删除 EMR Serverless 应用程序的操作符。 |
- class airflow.providers.amazon.aws.operators.emr.EmrAddStepsOperator(*, job_flow_id=None, job_flow_name=None, cluster_states=None, aws_conn_id='aws_default', steps=None, wait_for_completion=False, waiter_delay=30, waiter_max_attempts=60, execution_role_arn=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[源代码]¶
基类:
airflow.models.BaseOperator
一个将步骤添加到现有 EMR job_flow 的操作符。
另请参阅
有关如何使用此操作符的更多信息,请查看以下指南:向 EMR 作业流添加步骤
- 参数
job_flow_id (str | None) – 要向其添加步骤的 JobFlow 的 ID。(已模板化)
job_flow_name (str | None) – 要向其添加步骤的 JobFlow 的名称。用作传递 job_flow_id 的替代方法。将在参数 cluster_states 中的状态之一中搜索具有匹配名称的 JobFlow ID。应该只存在一个这样的集群,否则将失败。(已模板化)
cluster_states (list[str] | None) – 按 job_flow_name 搜索 JobFlow ID 时可接受的集群状态。(已模板化)
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此值为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 并且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。
steps (list[dict] | str | None) – 要添加到作业流的 boto3 样式步骤或对步骤文件的引用(必须是“.json”)。(已模板化)
wait_for_completion (bool) – 如果为 True,则操作符将等待所有步骤完成。
execution_role_arn (str | None) – 集群上步骤的运行时角色的 ARN。
do_xcom_push – 如果为 True,则 job_flow_id 将以键 job_flow_id 推送到 XCom。
wait_for_completion – 是否等待作业运行完成。(默认值:True)
deferrable (bool) – 如果为 True,则操作符将异步等待作业完成。这意味着等待完成。此模式需要安装 aiobotocore 模块。(默认值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'job_flow_name', 'cluster_states', 'steps', 'execution_role_arn')[源代码]¶
- template_ext: collections.abc.Sequence[str] = ('.json',)[源代码]¶
- class airflow.providers.amazon.aws.operators.emr.EmrStartNotebookExecutionOperator(editor_id, relative_path, cluster_id, service_role, notebook_execution_name=None, notebook_params=None, notebook_instance_security_group_id=None, master_instance_security_group_id=None, tags=None, wait_for_completion=False, aws_conn_id='aws_default', waiter_max_attempts=None, waiter_delay=None, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
一个启动 EMR 笔记本执行的操作符。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:启动 EMR 笔记本执行
- 参数
editor_id (str) – 用于笔记本执行的 EMR 笔记本的唯一标识符。
relative_path (str) – 此执行的笔记本文件的路径和文件名,相对于为 EMR 笔记本指定的路径。
cluster_id (str) – 笔记本所连接的 EMR 集群的唯一标识符。
service_role (str) – 用作 Amazon EMR(EMR 角色)服务角色的 IAM 角色的名称或 ARN,用于笔记本执行。
notebook_execution_name (str | None) – 笔记本执行的可选名称。
notebook_params (str | None) – 以 JSON 格式传递给运行时执行的 EMR 笔记本的输入参数。
notebook_instance_security_group_id (str | None) – 要与此笔记本执行的 EMR 笔记本关联的 Amazon EC2 安全组的唯一标识符。
master_instance_security_group_id (str | None) – 要与此笔记本执行的 EMR 集群的主实例关联的可选 EC2 安全组的唯一 ID。
tags (list | None) – 要与笔记本执行关联的可选键值对列表。
waiter_max_attempts (int | None) – 失败前的最大尝试次数。
waiter_delay (int | None) – 轮询笔记本状态之间的秒数。
- template_fields: collections.abc.Sequence[str] = ('editor_id', 'cluster_id', 'relative_path', 'service_role', 'notebook_execution_name',...[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrStopNotebookExecutionOperator(notebook_execution_id, wait_for_completion=False, aws_conn_id='aws_default', waiter_max_attempts=None, waiter_delay=None, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
一个停止正在运行的 EMR 笔记本执行的操作符。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:停止 EMR 笔记本执行
- 参数
notebook_execution_id (str) – 笔记本执行的唯一标识符。
wait_for_completion (bool) – 如果为 True,则操作符将等待笔记本处于 STOPPED 或 FINISHED 状态。默认为 False。
aws_conn_id (str | None) – 要使用的 aws 连接。如果此项为 None 或为空,则使用默认的 boto3 行为。如果在分布式环境中运行 Airflow 并且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。
waiter_max_attempts (int | None) – 失败前的最大尝试次数。
waiter_delay (int | None) – 轮询笔记本状态之间的秒数。
- template_fields: collections.abc.Sequence[str] = ('notebook_execution_id', 'waiter_delay', 'waiter_max_attempts')[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrEksCreateClusterOperator(*, virtual_cluster_name, eks_cluster_name, eks_namespace, virtual_cluster_id='', aws_conn_id='aws_default', tags=None, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
一个在 EKS 虚拟集群上创建 EMR 的操作符。
另请参阅
有关如何使用此运算符的更多信息,请参阅以下指南:创建 Amazon EMR EKS 虚拟集群
- 参数
- template_fields: collections.abc.Sequence[str] = ('virtual_cluster_name', 'eks_cluster_name', 'eks_namespace')[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrContainerOperator(*, name, virtual_cluster_id, execution_role_arn, release_label, job_driver, configuration_overrides=None, client_request_token=None, aws_conn_id='aws_default', wait_for_completion=True, poll_interval=30, tags=None, max_polling_attempts=None, job_retry_max_attempts=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
一个将作业提交到 EKS 虚拟集群上的 EMR 的操作符。
另请参阅
有关如何使用此运算符的更多信息,请参阅以下指南:向 Amazon EMR 虚拟集群提交作业
- 参数
name ( str ) – 作业运行的名称。
virtual_cluster_id ( str ) – EMR on EKS 虚拟集群 ID
execution_role_arn ( str ) – 与作业运行关联的 IAM 角色 ARN。
release_label ( str ) – 用于作业运行的 Amazon EMR 发布版本。
job_driver ( dict ) – 作业配置详细信息,例如 Spark 作业参数。
configuration_overrides ( dict | None ) – 作业运行的配置覆盖,特别是应用程序配置或监控配置。
client_request_token ( str | None ) – 作业运行请求的客户端幂等令牌。如果您想指定唯一的 ID 来防止启动两个作业,请使用此令牌。如果未提供令牌,将为您生成 UUIDv4 令牌。
aws_conn_id ( str | None ) – 用于 AWS 凭证的 Airflow 连接。
wait_for_completion ( bool ) – 是否在运算符中等待作业完成。
poll_interval ( int ) – 两次连续调用之间等待的时间(以秒为单位),以检查 EMR 上的查询状态
max_polling_attempts ( int | None ) – 等待作业运行完成的最大次数。默认为 None,这将轮询,直到作业*不*处于挂起、已提交或运行状态。
job_retry_max_attempts ( int | None ) – EMR 作业失败时重试的最大次数。默认为 None,这将禁用重试。
tags ( dict | None ) – 分配给作业运行的标签。默认为 None
deferrable ( bool ) – 在可延期模式下运行运算符。
- template_fields: collections.abc.Sequence[str] = ('name', 'virtual_cluster_id', 'execution_role_arn', 'release_label', 'job_driver',...[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrCreateJobFlowOperator(*, aws_conn_id='aws_default', emr_conn_id='emr_default', job_flow_overrides=None, region_name=None, wait_for_completion=None, wait_policy=None, waiter_max_attempts=None, waiter_delay=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
创建一个 EMR JobFlow,从 EMR 连接读取配置。
可以传递一个 JobFlow 覆盖字典,以覆盖来自连接的配置。
另请参阅
有关如何使用此运算符的更多信息,请查看指南: 创建 EMR 作业流
- 参数
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此项为 None 或为空,则使用默认的 boto3 行为。如果在分布式环境中运行 Airflow,并且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)
emr_conn_id (str | None) – Amazon Elastic MapReduce 连接。用于接收初始 Amazon EMR 集群配置:
boto3.client('emr').run_job_flow
请求正文。如果此项为 None 或为空,或者连接不存在,则使用空的初始配置。job_flow_overrides (str | dict[str, Any] | None) – boto3 样式的参数或对参数文件(必须是“.json”)的引用,以覆盖特定的
emr_conn_id
额外参数。(已模板化)region_name (str | None) – 传递给 EmrHook 的区域名称
wait_for_completion (bool | None) – 已弃用 - 请改用 wait_policy。是否在创建后立即完成任务(False)或等待作业流完成(True)(默认值:None)
wait_policy (airflow.providers.amazon.aws.utils.waiter.WaitPolicy | None) – 是否在创建后立即完成任务(None)或:- 等待作业流完成(WaitPolicy.WAIT_FOR_COMPLETION)- 等待作业流完成且集群终止(WaitPolicy.WAIT_FOR_STEPS_COMPLETION)(默认值:None)
waiter_max_attempts (int | None) – 失败前的最大尝试次数。
waiter_delay (int | None) – 轮询笔记本状态之间的秒数。
deferrable (bool) – 如果为 True,则运算符将异步等待爬网完成。这意味着等待完成。此模式需要安装 aiobotocore 模块。(默认值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_overrides', 'waiter_delay', 'waiter_max_attempts')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json',)[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrModifyClusterOperator(*, cluster_id, step_concurrency_level, aws_conn_id='aws_default', **kwargs)[source]¶
基类:
airflow.models.BaseOperator
一个修改现有 EMR 集群的操作符。
另请参阅
有关如何使用此运算符的更多信息,请查看指南: 修改 Amazon EMR 容器
- 参数
cluster_id (str) – 集群标识符
step_concurrency_level (int) – 集群的并发级别
aws_conn_id (str | None) – 要使用的 aws 连接
aws_conn_id – 要使用的 aws 连接
do_xcom_push – 如果为 True,则将 cluster_id 推送到 XCom,键为 cluster_id。
- template_fields: collections.abc.Sequence[str] = ('cluster_id', 'step_concurrency_level')[source]¶
- template_ext: collections.abc.Sequence[str] = ()[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrTerminateJobFlowOperator(*, job_flow_id, aws_conn_id='aws_default', waiter_delay=60, waiter_max_attempts=20, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
终止 EMR JobFlow 的操作符。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:终止 EMR 作业流
- 参数
job_flow_id (str) – 要终止的 JobFlow 的 ID。(已模板化)
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此值为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 并且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。
waiter_delay (int) – 检查 JobFlow 状态的两个连续调用之间等待的时间(以秒为单位)
waiter_max_attempts (int) – 轮询 JobFlow 状态的最大次数。
deferrable (bool) – 如果为 True,则运算符将异步等待爬网完成。这意味着等待完成。此模式需要安装 aiobotocore 模块。(默认值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_id',)[source]¶
- template_ext: collections.abc.Sequence[str] = ()[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessCreateApplicationOperator(release_label, job_type, client_request_token='', config=None, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
创建 Serverless EMR 应用程序的操作符。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:创建 EMR Serverless 应用程序
- 参数
release_label (str) – 与应用程序关联的 EMR 发布版本。
job_type (str) – 要启动的应用程序类型,例如 Spark 或 Hive。
wait_for_completion (bool) – 如果为 true,则在返回之前等待应用程序启动。默认为 True。如果设置为 False,则
waiter_max_attempts
和waiter_delay
仅在等待应用程序处于CREATED
状态时应用。client_request_token (str) – 要创建的应用程序的客户端幂等性令牌。它的值对于每个请求都必须是唯一的。
config (dict | None) – 用于 boto API create_application 调用的任意参数的可选字典。
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此值为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 并且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 轮询应用程序状态的间隔秒数。
deferrable (bool) – 如果为 True,操作符将异步等待应用程序被创建。这意味着等待完成。此模式需要安装 aiobotocore 模块。(默认值:False,但可以通过在配置文件中将 default_deferrable 设置为 True 来覆盖)
- Waiter_max_attempts
等待程序应轮询应用程序以检查状态的次数。如果未设置,等待程序将使用其默认值。
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessStartJobOperator(application_id, execution_role_arn, job_driver, configuration_overrides=None, client_request_token='', config=None, wait_for_completion=True, aws_conn_id='aws_default', name=None, waiter_max_attempts=NOTSET, waiter_delay=NOTSET, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), enable_application_ui_links=False, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
启动 EMR Serverless 作业的操作符。
另请参阅
有关如何使用此操作符的更多信息,请查看指南: 启动 EMR Serverless 作业
- 参数
application_id (str) – 要启动的 EMR Serverless 应用程序的 ID。
execution_role_arn (str) – 执行操作的角色 ARN。
job_driver (dict) – 作业运行的驱动程序。
configuration_overrides (dict | None) – 用于覆盖现有配置的配置规范。
client_request_token (str) – 要创建的应用程序的客户端幂等性令牌。它的值对于每个请求都必须是唯一的。
config (dict | None) – 可选字典,用于 boto API start_job_run 调用的任意参数。
wait_for_completion (bool) – 如果为 true,则在返回之前等待作业启动。默认为 True。如果设置为 False,则
waiter_countdown
和waiter_check_interval_seconds
仅在等待应用程序处于STARTED
状态时应用。aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此值为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 并且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。
name (str | None) – EMR Serverless 作业的名称。如果未提供,则将分配默认名称。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 轮询作业运行状态的间隔秒数。
deferrable (bool) – 如果为 True,则操作符将异步等待爬网完成。这意味着等待完成。此模式需要安装 aiobotocore 模块。(默认值:False,但可以通过在配置文件中将 default_deferrable 设置为 True 来覆盖)
enable_application_ui_links (bool) – 如果为 True,则操作符将生成 EMR Serverless 应用程序 UI 的一次性链接。生成的链接将允许任何有权访问 DAG 的用户查看 Spark 或 Tez UI 或 Spark stdout 日志。默认为 False。
- Waiter_max_attempts
等待程序应轮询应用程序以检查状态的次数。如果未设置,等待程序将使用其默认值。
- template_fields: collections.abc.Sequence[str] = ('application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides',...[source]¶
- execute(context, event=None)[source]¶
在创建操作符时派生。
Context 与渲染 jinja 模板时使用的字典相同。
有关更多上下文,请参阅 get_template_context。
- is_monitoring_in_job_override(config_key, job_override)[source]¶
检查是否为作业启用了监控。
注意:这与应用程序默认值不兼容: https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/default-configs.html
这用于确定应显示哪些额外链接。
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessStopApplicationOperator(application_id, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, force_stop=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
停止 EMR Serverless 应用程序的操作符。
另请参阅
有关如何使用此操作符的更多信息,请查看指南:打开应用程序用户界面
- 参数
application_id (str) – 要停止的 EMR Serverless 应用程序的 ID。
wait_for_completion (bool) – 如果为 true,则在返回之前等待应用程序停止。默认为 True。
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此值为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 并且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。
force_stop (bool) – 如果设置为 True,则该应用程序的任何未处于终端状态的作业都将被取消。否则,尝试停止有正在运行的作业的应用程序将返回错误。如果希望等待作业正常完成,请使用
airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor
。waiter_delay (int | airflow.utils.types.ArgNotSet) – 轮询应用程序状态之间的秒数。默认为 60 秒。
deferrable (bool) – 如果为 True,则操作符将异步等待应用程序停止。这意味着等待完成。此模式需要安装 aiobotocore 模块。(默认:False,但可以在配置文件中将 default_deferrable 设置为 True 来覆盖)。
- Waiter_max_attempts
等待程序应轮询应用程序以检查状态的次数。默认为 25。
- template_fields: collections.abc.Sequence[str] = ('application_id',)[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessDeleteApplicationOperator(application_id, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, force_stop=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
EmrServerlessStopApplicationOperator
删除 EMR Serverless 应用程序的操作符。
另请参阅
有关如何使用此操作符的更多信息,请查看指南: 删除 EMR Serverless 应用程序
- 参数
application_id (str) – 要删除的 EMR Serverless 应用程序的 ID。
wait_for_completion (bool) – 如果为 true,则在返回之前等待应用程序被删除。默认为 True。请注意,此操作符始终会先等待应用程序停止。
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此值为 None 或空,则使用默认的 boto3 行为。如果在分布式方式下运行 Airflow 并且 aws_conn_id 为 None 或空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护)。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 轮询应用程序状态之间的秒数。默认为 60 秒。
deferrable (bool) – 如果为 True,则操作符将异步等待应用程序被删除。这意味着等待完成。此模式需要安装 aiobotocore 模块。(默认:False,但可以在配置文件中将 default_deferrable 设置为 True 来覆盖)。
force_stop (bool) – 如果设置为 True,则该应用程序的任何未处于终端状态的作业都将被取消。否则,尝试删除有正在运行的作业的应用程序将返回错误。如果希望等待作业正常完成,请使用
airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor
。
- Waiter_max_attempts
等待程序应轮询应用程序以检查状态的次数。默认为 25。
- template_fields: collections.abc.Sequence[str] = ('application_id',)[source]¶