airflow.providers.amazon.aws.operators.emr¶
类¶
将步骤添加到现有 EMR job_flow 的操作符。 |
|
启动 EMR notebook 执行的操作符。 |
|
停止正在运行的 EMR notebook 执行的操作符。 |
|
创建 EKS 上 EMR 虚拟集群的操作符。 |
|
将作业提交到 EKS 上 EMR 虚拟集群的操作符。 |
|
创建一个 EMR JobFlow,从 EMR 连接读取配置。 |
|
修改现有 EMR 集群的操作符。 |
|
终止 EMR JobFlows 的操作符。 |
|
创建无服务器 EMR Application 的操作符。 |
|
启动无服务器 EMR job 的操作符。 |
|
停止无服务器 EMR application 的操作符。 |
|
删除无服务器 EMR application 的操作符。 |
模块内容¶
- class airflow.providers.amazon.aws.operators.emr.EmrAddStepsOperator(*, job_flow_id=None, job_flow_name=None, cluster_states=None, aws_conn_id='aws_default', steps=None, wait_for_completion=False, waiter_delay=30, waiter_max_attempts=60, execution_role_arn=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
将步骤添加到现有 EMR job_flow 的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 将步骤添加到 EMR job flow
- 参数:
job_flow_id (str | None) – 要添加步骤的 JobFlow 的 ID。(已模板化)
job_flow_name (str | None) – 要添加步骤的 JobFlow 名称。用作传递 job_flow_id 的替代方法。将在参数 cluster_states 中指定的状态之一中搜索具有匹配名称的 JobFlow ID。必须仅存在一个这样的集群,否则将失败。(已模板化)
cluster_states (list[str] | None) – 通过 job_flow_name 搜索 JobFlow ID 时可接受的集群状态。(已模板化)
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果为 None 或空,则使用 boto3 默认行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用 boto3 默认配置(并且必须在每个工作节点上维护)。
steps (list[dict] | str | None) – boto3 风格的步骤或对要添加到 jobflow 的步骤文件(必须是“.json”)的引用。(已模板化)
wait_for_completion (bool) – 如果为 True,操作符将等待所有步骤完成。
execution_role_arn (str | None) – 集群上步骤的运行时角色的 ARN。
do_xcom_push – 如果为 True,job_flow_id 将通过键 job_flow_id 推送到 XCom。
wait_for_completion – 是否等待作业运行完成。(默认值:True)
deferrable (bool) – 如果为 True,操作符将异步等待作业完成。这意味着等待完成。此模式需要安装 aiobotocore 模块。(默认值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'job_flow_name', 'cluster_states', 'steps', 'execution_role_arn')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json',)[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrStartNotebookExecutionOperator(editor_id, relative_path, cluster_id, service_role, notebook_execution_name=None, notebook_params=None, notebook_instance_security_group_id=None, master_instance_security_group_id=None, tags=None, wait_for_completion=False, aws_conn_id='aws_default', waiter_max_attempts=None, waiter_delay=None, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
启动 EMR notebook 执行的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 启动 EMR notebook 执行
- 参数:
editor_id (str) – 用于 notebook 执行的 EMR notebook 的唯一标识符。
relative_path (str) – 本次执行的 notebook 文件的路径和文件名,相对于为 EMR notebook 指定的路径。
cluster_id (str) – notebook 所连接的 EMR 集群的唯一标识符。
service_role (str) – 用于 notebook 执行的 Amazon EMR 服务角色(EMR 角色)的 IAM 角色名称或 ARN。
notebook_execution_name (str | None) – notebook 执行的可选名称。
notebook_params (str | None) – 在运行时以 JSON 格式传递给 EMR notebook 执行的输入参数。
notebook_instance_security_group_id (str | None) – 与本次 notebook 执行的 EMR notebook 关联的 Amazon EC2 安全组的唯一标识符。
master_instance_security_group_id (str | None) – 与本次 notebook 执行的 EMR 集群主实例关联的 EC2 安全组的可选唯一 ID。
tags (list | None) – 与 notebook 执行关联的可选键值对列表。
waiter_max_attempts (int | None) – 失败前的最大尝试次数。
waiter_delay (int | None) – 轮询 notebook 状态之间的秒数。
- template_fields: collections.abc.Sequence[str] = ('editor_id', 'cluster_id', 'relative_path', 'service_role', 'notebook_execution_name',...[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrStopNotebookExecutionOperator(notebook_execution_id, wait_for_completion=False, aws_conn_id='aws_default', waiter_max_attempts=None, waiter_delay=None, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
停止正在运行的 EMR notebook 执行的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 停止 EMR notebook 执行
- 参数:
notebook_execution_id (str) – notebook 执行的唯一标识符。
wait_for_completion (bool) – 如果为 True,操作符将等待 notebook 进入 STOPPED 或 FINISHED 状态。默认为 False。
aws_conn_id (str | None) – 要使用的 AWS 连接。如果为 None 或空,则使用 boto3 默认行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用 boto3 默认配置(并且必须在每个工作节点上维护)。
waiter_max_attempts (int | None) – 失败前的最大尝试次数。
waiter_delay (int | None) – 轮询 notebook 状态之间的秒数。
- template_fields: collections.abc.Sequence[str] = ('notebook_execution_id', 'waiter_delay', 'waiter_max_attempts')[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrEksCreateClusterOperator(*, virtual_cluster_name, eks_cluster_name, eks_namespace, virtual_cluster_id='', aws_conn_id='aws_default', tags=None, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
创建 EKS 上 EMR 虚拟集群的操作符。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:创建 Amazon EMR EKS 虚拟集群
- 参数:
- template_fields: collections.abc.Sequence[str] = ('virtual_cluster_name', 'eks_cluster_name', 'eks_namespace')[source]¶
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrContainerHook[source]¶
创建并返回 EmrContainerHook。
- class airflow.providers.amazon.aws.operators.emr.EmrContainerOperator(*, name, virtual_cluster_id, execution_role_arn, release_label, job_driver, configuration_overrides=None, client_request_token=None, aws_conn_id='aws_default', wait_for_completion=True, poll_interval=30, tags=None, max_polling_attempts=None, job_retry_max_attempts=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
将作业提交到 EKS 上 EMR 虚拟集群的操作符。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:向 Amazon EMR 虚拟集群提交作业
- 参数:
name (str) – 作业运行的名称。
virtual_cluster_id (str) – EMR on EKS 虚拟集群 ID。
execution_role_arn (str) – 与作业运行关联的 IAM 角色 ARN。
release_label (str) – 用于作业运行的 Amazon EMR 发行版版本。
job_driver (dict) – 作业配置详细信息,例如 Spark 作业参数。
configuration_overrides (dict | None) – 作业运行的配置覆盖,特别是应用程序配置或监控配置。
client_request_token (str | None) – 作业运行请求的客户端幂等性令牌。如果想指定唯一的 ID 以防止同时启动两个作业,请使用此参数。如果未提供令牌,则会为您生成一个 UUIDv4 令牌。
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。
wait_for_completion (bool) – 是否在 Operator 中等待作业完成。
poll_interval (int) – 两次连续调用以检查 EMR 上的查询状态之间等待的时间(以秒为单位)。
max_polling_attempts (int | None) – 等待作业运行完成的最大尝试次数。默认为 None,这将一直轮询直到作业状态不是 pending、submitted 或 running。
job_retry_max_attempts (int | None) – EMR 作业失败时最大重试次数。默认为 None,表示禁用重试。
tags (dict | None) – 分配给作业运行的标签。默认为 None
deferrable (bool) – 以可延迟模式运行 Operator。
- template_fields: collections.abc.Sequence[str] = ('name', 'virtual_cluster_id', 'execution_role_arn', 'release_label', 'job_driver',...[source]¶
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrContainerHook[source]¶
创建并返回 EmrContainerHook。
- class airflow.providers.amazon.aws.operators.emr.EmrCreateJobFlowOperator(*, aws_conn_id='aws_default', emr_conn_id='emr_default', job_flow_overrides=None, region_name=None, wait_for_completion=None, wait_policy=None, waiter_max_attempts=None, waiter_delay=None, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
创建一个 EMR JobFlow,从 EMR 连接读取配置。
可以传递一个 JobFlow 覆盖字典,该字典会覆盖连接中的配置。
另请参阅
有关如何使用此 Operator 的更多信息,请参阅指南:创建 EMR 作业流
- 参数:
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果以分布式方式运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上维护此配置)
emr_conn_id (str | None) – Amazon Elastic MapReduce 连接。用于接收初始 Amazon EMR 集群配置:
boto3.client('emr').run_job_flow
请求体。如果此参数为 None 或为空,或者连接不存在,则使用空的初始配置。job_flow_overrides (str | dict[str, Any] | None) – boto3 风格的参数或对参数文件的引用(必须是 '.json'),用于覆盖特定的
emr_conn_id
额外参数。(templated)region_name (str | None) – 传递给 EmrHook 的区域名称。
wait_for_completion (bool | None) – 已弃用 - 请改用 wait_policy。任务创建后立即完成 (False) 还是等待作业流完成 (True)(默认值:None)
wait_policy (airflow.providers.amazon.aws.utils.waiter.WaitPolicy | None) – 任务创建后是否立即完成 (None) 或: - 等待作业流完成 (WaitPolicy.WAIT_FOR_COMPLETION) - 等待作业流完成和集群终止 (WaitPolicy.WAIT_FOR_STEPS_COMPLETION)(默认值:None)
waiter_max_attempts (int | None) – 失败前的最大尝试次数。
waiter_delay (int | None) – 轮询 notebook 状态之间的秒数。
deferrable (bool) – 如果为 True,则 Operator 将异步等待爬取完成。这意味着将等待完成。此模式需要安装 aiobotocore 模块。(默认值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_overrides', 'waiter_delay', 'waiter_max_attempts')[source]¶
- template_ext: collections.abc.Sequence[str] = ('.json',)[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrModifyClusterOperator(*, cluster_id, step_concurrency_level, aws_conn_id='aws_default', **kwargs)[source]¶
基类:
airflow.models.BaseOperator
修改现有 EMR 集群的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 修改 Amazon EMR 容器
- 参数:
- template_fields: collections.abc.Sequence[str] = ('cluster_id', 'step_concurrency_level')[source]¶
- template_ext: collections.abc.Sequence[str] = ()[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrTerminateJobFlowOperator(*, job_flow_id, aws_conn_id='aws_default', waiter_delay=60, waiter_max_attempts=20, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
终止 EMR JobFlows 的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 终止 EMR 作业流
- 参数:
job_flow_id (str) – 要终止的 JobFlow ID。(templated)
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果为 None 或空,则使用 boto3 默认行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用 boto3 默认配置(并且必须在每个工作节点上维护)。
waiter_delay (int) – 两次连续检查 JobFlow 状态之间等待的时间(秒)
waiter_max_attempts (int) – 轮询 JobFlow 状态的最大尝试次数。
deferrable (bool) – 如果为 True,则 Operator 将异步等待爬取完成。这意味着将等待完成。此模式需要安装 aiobotocore 模块。(默认值:False)
- template_fields: collections.abc.Sequence[str] = ('job_flow_id',)[source]¶
- template_ext: collections.abc.Sequence[str] = ()[source]¶
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessCreateApplicationOperator(release_label, job_type, client_request_token='', config=None, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
创建无服务器 EMR Application 的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 创建 EMR Serverless 应用程序
- 参数:
release_label (str) – 与应用程序关联的 EMR 发行版本。
job_type (str) – 要启动的应用程序类型,例如 Spark 或 Hive。
wait_for_completion (bool) – 如果为 true,则在返回之前等待应用程序启动。默认为 True。如果设置为 False,则 `waiter_max_attempts` 和 `waiter_delay` 仅在等待应用程序处于 `CREATED` 状态时应用。
client_request_token (str) – 要创建的应用程序的客户端幂等令牌。其值对于每个请求必须是唯一的。
config (dict | None) – boto API create_application 调用可选的任意参数字典。
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果为 None 或空,则使用 boto3 默认行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用 boto3 默认配置(并且必须在每个工作节点上维护)。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 轮询应用程序状态之间的秒数。
deferrable (bool) – 如果为 True,操作符将异步等待应用程序创建完成。这意味着需要等待完成。此模式需要安装 aiobotocore 模块。(默认值:False,但可以通过在配置文件中将 default_deferrable 设置为 True 来覆盖)
- Waiter_max_attempts:
等待器应轮询应用程序检查状态的次数。如果未设置,等待器将使用其默认值。
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]¶
创建并返回 EmrServerlessHook。
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessStartJobOperator(application_id, execution_role_arn, job_driver, configuration_overrides=None, client_request_token='', config=None, wait_for_completion=True, aws_conn_id='aws_default', name=None, waiter_max_attempts=NOTSET, waiter_delay=NOTSET, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), enable_application_ui_links=False, **kwargs)[source]¶
基类:
airflow.models.BaseOperator
启动无服务器 EMR job 的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南: 启动 EMR Serverless 作业
- 参数:
application_id (str) – 要启动的 EMR Serverless 应用程序的 ID。
execution_role_arn (str) – 执行操作的角色 ARN。
job_driver (dict) – 作业运行在其上的驱动程序。
configuration_overrides (dict | None) – 用于覆盖现有配置的配置规范。
client_request_token (str) – 要创建的应用程序的客户端幂等令牌。其值对于每个请求必须是唯一的。
config (dict | None) – boto API start_job_run 调用可选的任意参数字典。
wait_for_completion (bool) – 如果为 True,则在返回之前等待作业启动。默认为 True。如果设置为 False,则 `waiter_countdown` 和 `waiter_check_interval_seconds` 仅在等待应用程序处于 `STARTED` 状态时应用。
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果为 None 或空,则使用 boto3 默认行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用 boto3 默认配置(并且必须在每个工作节点上维护)。
name (str | None) – EMR Serverless 作业的名称。如果未提供,将分配一个默认名称。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 轮询作业运行状态之间的秒数。
deferrable (bool) – 如果为 True,操作符将异步等待爬虫完成。这意味着需要等待完成。此模式需要安装 aiobotocore 模块。(默认值:False,但可以通过在配置文件中将 default_deferrable 设置为 True 来覆盖)
enable_application_ui_links (bool) – 如果为 True,操作符将生成指向 EMR Serverless 应用程序 UI 的一次性链接。生成的链接将允许任何有权访问 DAG 的用户查看 Spark 或 Tez UI 或 Spark 标准输出日志。默认为 False。
- Waiter_max_attempts:
等待器应轮询应用程序检查状态的次数。如果未设置,等待器将使用其默认值。
- template_fields: collections.abc.Sequence[str] = ('application_id', 'config', 'execution_role_arn', 'job_driver', 'configuration_overrides',...[source]¶
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]¶
创建并返回 EmrServerlessHook。
- execute(context, event=None)[source]¶
创建操作符时派生。
上下文与渲染 Jinja 模板时使用的字典相同。
有关更多上下文,请参阅 get_template_context。
- is_monitoring_in_job_override(config_key, job_override)[source]¶
检查是否为作业启用了监控。
注意:这与应用程序默认设置不兼容:https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/default-configs.html
这用于确定应显示哪些额外链接。
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessStopApplicationOperator(application_id, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, force_stop=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
基类:
airflow.models.BaseOperator
停止无服务器 EMR application 的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南:打开应用程序 UI
- 参数:
application_id (str) – 要停止的 EMR Serverless 应用程序的 ID。
wait_for_completion (bool) – 如果为 true,则在返回之前等待应用程序停止。默认为 True
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果为 None 或空,则使用 boto3 默认行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用 boto3 默认配置(并且必须在每个工作节点上维护)。
force_stop (bool) – 如果设置为 True,则该应用程序中所有未处于终止状态的作业都将被取消。否则,尝试停止具有正在运行作业的应用程序将返回错误。如果您希望等待作业正常完成,请使用
airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor
waiter_delay (int | airflow.utils.types.ArgNotSet) – 轮询应用程序状态之间的秒数。默认为 60 秒。
deferrable (bool) – 如果为 True,操作符将异步等待应用程序停止。这意味着需要等待完成。此模式需要安装 aiobotocore 模块。(默认值:False,但可以通过在配置文件中将 default_deferrable 设置为 True 来覆盖)
- Waiter_max_attempts:
等待器应轮询应用程序检查状态的次数。默认为 25。
- template_fields: collections.abc.Sequence[str] = ('application_id',)[source]¶
- property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]¶
创建并返回 EmrServerlessHook。
- class airflow.providers.amazon.aws.operators.emr.EmrServerlessDeleteApplicationOperator(application_id, wait_for_completion=True, aws_conn_id='aws_default', waiter_max_attempts=NOTSET, waiter_delay=NOTSET, force_stop=False, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]¶
继承自:
EmrServerlessStopApplicationOperator
删除无服务器 EMR application 的操作符。
另请参阅
有关如何使用此操作符的更多信息,请参阅指南:删除 EMR Serverless 应用程序
- 参数:
application_id (str) – 要删除的 EMR Serverless 应用程序的 ID。
wait_for_completion (bool) – 如果为 true,则在返回之前等待应用程序被删除。默认为 True。请注意,此操作符将始终先等待应用程序停止。
aws_conn_id (str | None) – 用于 AWS 凭证的 Airflow 连接。如果为 None 或空,则使用 boto3 默认行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或空,则将使用 boto3 默认配置(并且必须在每个工作节点上维护)。
waiter_delay (int | airflow.utils.types.ArgNotSet) – 轮询应用程序状态之间的秒数。默认为 60 秒。
deferrable (bool) – 如果为 True,操作符将异步等待应用程序被删除。这意味着需要等待完成。此模式需要安装 aiobotocore 模块。(默认值:False,但可以通过在配置文件中将 default_deferrable 设置为 True 来覆盖)
force_stop (bool) – 如果设置为 True,则该应用程序中所有未处于终止状态的作业都将被取消。否则,尝试删除具有正在运行作业的应用程序将返回错误。如果您希望等待作业正常完成,请使用
airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor
- Waiter_max_attempts:
等待器应轮询应用程序检查状态的次数。默认为 25。
- template_fields: collections.abc.Sequence[str] = ('application_id',)[source]¶