airflow.providers.amazon.aws.sensors.emr

EmrBaseSensor

包含 EMR 通用传感器行为。

EmrServerlessJobSensor

轮询作业运行的状态,直到其达到终止状态;如果作业运行失败则标记为失败。

EmrServerlessApplicationSensor

轮询应用程序的状态,直到其达到终止状态;如果应用程序失败则标记为失败。

EmrContainerSensor

轮询作业运行的状态,直到其达到终止状态;如果作业运行失败则标记为失败。

EmrNotebookExecutionSensor

轮询 EMR Notebook,直到其达到任何目标状态;失败时引发 AirflowException。

EmrJobFlowSensor

轮询 EMR JobFlow 集群,直到其达到任何目标状态;失败时引发 AirflowException。

EmrStepSensor

轮询步骤的状态,直到其达到任何目标状态;失败时引发 AirflowException。

模块内容

class airflow.providers.amazon.aws.sensors.emr.EmrBaseSensor(*, aws_conn_id='aws_default', **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

包含 EMR 通用传感器行为。

子类应实现以下方法
  • get_emr_response()

  • state_from_response()

  • failure_message_from_response()

子类应设置 target_statesfailed_states 字段。

参数::

aws_conn_id (str | None) – 用于 AWS 凭据的 Airflow 连接。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上进行维护)。

ui_color = '#66c3ff'[source]
aws_conn_id = 'aws_default'[source]
target_states: collections.abc.Iterable[str] = [][source]
failed_states: collections.abc.Iterable[str] = [][source]
property hook: airflow.providers.amazon.aws.hooks.emr.EmrHook[source]
poke(context)[source]

派生此类时重写。

abstract get_emr_response(context)[source]

使用 boto3 进行 API 调用并获取响应。

返回::

响应

返回类型::

dict[str, Any]

static state_from_response(response)[source]
抽象方法::

从 boto3 响应中获取状态。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

状态

返回类型::

str

static failure_message_from_response(response)[source]
抽象方法::

从 boto3 响应中获取状态。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

失败消息

返回类型::

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor(*, application_id, job_run_id, target_states=frozenset(EmrServerlessHook.JOB_SUCCESS_STATES), aws_conn_id='aws_default', **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

轮询作业运行的状态,直到其达到终止状态;如果作业运行失败则标记为失败。

另请参阅

有关如何使用此传感器的更多信息,请查看指南: 等待 EMR Serverless 作业状态

参数::
  • application_id (str) – 要检查状态的 application_id

  • job_run_id (str) – 要检查状态的 job_run_id

  • target_states (set | frozenset) – 一组需要等待的状态,默认为 ‘SUCCESS’

  • aws_conn_id (str | None) – 要使用的 aws 连接,默认为 ‘aws_default’。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上进行维护)。

template_fields: collections.abc.Sequence[str] = ('application_id', 'job_run_id')[source]
aws_conn_id = 'aws_default'[source]
target_states[source]
application_id[source]
job_run_id[source]
poke(context)[source]

派生此类时重写。

property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]

创建并返回一个 EmrServerlessHook。

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

失败消息

返回类型::

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrServerlessApplicationSensor(*, application_id, target_states=frozenset(EmrServerlessHook.APPLICATION_SUCCESS_STATES), aws_conn_id='aws_default', **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

轮询应用程序的状态,直到其达到终止状态;如果应用程序失败则标记为失败。

另请参阅

有关如何使用此传感器的更多信息,请查看指南: 等待 EMR Serverless 应用程序状态

参数::
  • application_id (str) – 要检查状态的 application_id

  • target_states (set | frozenset) – 一组需要等待的状态,默认为 {‘CREATED’, ‘STARTED’}

  • aws_conn_id (str | None) – 要使用的 aws 连接,默认为 ‘aws_default’。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上进行维护)。

template_fields: collections.abc.Sequence[str] = ('application_id',)[source]
aws_conn_id = 'aws_default'[source]
target_states[source]
application_id[source]
poke(context)[source]

派生此类时重写。

property hook: airflow.providers.amazon.aws.hooks.emr.EmrServerlessHook[source]

创建并返回一个 EmrServerlessHook。

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

失败消息

返回类型::

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrContainerSensor(*, virtual_cluster_id, job_id, max_retries=None, aws_conn_id='aws_default', poll_interval=10, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基类: airflow.sensors.base.BaseSensorOperator

轮询作业运行的状态,直到其达到终止状态;如果作业运行失败则标记为失败。

另请参阅

有关如何使用此传感器的更多信息,请查看指南: 等待 Amazon EMR 虚拟集群作业

参数::
  • job_id (str) – 要检查状态的 job_id

  • max_retries (int | None) – 轮询查询状态直到返回当前状态的次数,默认为 None

  • aws_conn_id (str | None) – 要使用的 aws 连接,默认为 ‘aws_default’。如果此参数为 None 或为空,则使用默认的 boto3 行为。如果在分布式模式下运行 Airflow 且 aws_conn_id 为 None 或为空,则将使用默认的 boto3 配置(并且必须在每个工作节点上进行维护)。

  • poll_interval (int) – 两次连续调用之间等待检查 athena 查询状态的秒数,默认为 10

  • deferrable (bool) – 在可延迟模式下运行传感器。

INTERMEDIATE_STATES = ('PENDING', 'SUBMITTED', 'RUNNING')[source]
FAILURE_STATES = ('FAILED', 'CANCELLED', 'CANCEL_PENDING')[source]
SUCCESS_STATES = ('COMPLETED',)[source]
template_fields: collections.abc.Sequence[str] = ('virtual_cluster_id', 'job_id')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
ui_color = '#66c3ff'[source]
aws_conn_id = 'aws_default'[source]
virtual_cluster_id[source]
job_id[source]
poll_interval = 10[source]
max_retries = None[source]
deferrable = True[source]
property hook: airflow.providers.amazon.aws.hooks.emr.EmrContainerHook[source]
poke(context)[source]

派生此类时重写。

execute(context)[source]

创建算子时派生。

执行任务的主要方法。Context 与渲染 jinja 模板时使用的字典相同。

有关更多 Context,请参阅 get_template_context。

execute_complete(context, event=None)[source]
class airflow.providers.amazon.aws.sensors.emr.EmrNotebookExecutionSensor(notebook_execution_id, target_states=None, failed_states=None, **kwargs)[source]

基础类: EmrBaseSensor

轮询 EMR Notebook,直到其达到任何目标状态;失败时引发 AirflowException。

另请参阅

有关如何使用此 Sensor 的更多信息,请参阅指南: 等待 EMR Notebook 执行状态

参数::

notebook_execution_id (str) – 要检查(poked)的 Notebook 执行的唯一 ID。

目标状态:

Sensor 将等待执行达到的状态。默认目标状态为 FINISHED

失败状态:

如果执行达到任何失败状态,则 Sensor 将失败。默认失败状态为 FAILED

template_fields: collections.abc.Sequence[str] = ('notebook_execution_id',)[source]
FAILURE_STATES[source]
COMPLETED_STATES[source]
notebook_execution_id[source]
target_states[source]
failed_states[source]
get_emr_response(context)[source]

使用 boto3 进行 API 调用并获取响应。

返回::

响应

返回类型::

dict[str, Any]

static state_from_response(response)[source]

使用 boto3 调用 API 并获取集群级别的详细信息。

返回::

响应

返回类型::

str

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

失败消息

返回类型::

str | None

class airflow.providers.amazon.aws.sensors.emr.EmrJobFlowSensor(*, job_flow_id, target_states=None, failed_states=None, max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基础类: EmrBaseSensor

轮询 EMR JobFlow 集群,直到其达到任何目标状态;失败时引发 AirflowException。

在默认目标状态下,Sensor 会等待集群终止。当 target_states 设置为 ['RUNNING', 'WAITING'] 时,Sensor 会等待作业流准备就绪(在 'STARTING' 和 'BOOTSTRAPPING' 状态之后)。

另请参阅

有关如何使用此 Sensor 的更多信息,请参阅指南: 等待 Amazon EMR 作业流状态

参数::
  • job_flow_id (str) – 要检查其状态的作业流 ID

  • target_states (collections.abc.Iterable[str] | None) – 目标状态,Sensor 会等待作业流达到这些状态中的任何一个。在可推迟模式下,它将运行直到达到终端状态。

  • failed_states (collections.abc.Iterable[str] | None) – 失败状态,当作业流达到这些状态中的任何一个时,Sensor 将失败。

  • max_attempts (int) – 失败前的最大尝试次数

  • deferrable (bool) – 在可延迟模式下运行传感器。

template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'target_states', 'failed_states')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
job_flow_id[source]
target_states = ['TERMINATED'][source]
failed_states = ['TERMINATED_WITH_ERRORS'][source]
max_attempts = 60[source]
deferrable = True[source]
get_emr_response(context)[source]

使用 boto3 调用 API 并获取集群级别的详细信息。

返回::

响应

返回类型::

dict[str, Any]

static state_from_response(response)[source]

从响应字典中获取状态。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

集群当前状态

返回类型::

str

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

失败消息

返回类型::

str | None

execute(context)[source]

创建算子时派生。

执行任务的主要方法。Context 与渲染 jinja 模板时使用的字典相同。

有关更多 Context,请参阅 get_template_context。

execute_complete(context, event=None)[source]
class airflow.providers.amazon.aws.sensors.emr.EmrStepSensor(*, job_flow_id, step_id, target_states=None, failed_states=None, max_attempts=60, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), **kwargs)[source]

基础类: EmrBaseSensor

轮询步骤的状态,直到其达到任何目标状态;失败时引发 AirflowException。

在默认目标状态下,Sensor 会等待步骤完成。

另请参阅

有关如何使用此 Sensor 的更多信息,请参阅指南: 等待 Amazon EMR 步骤状态

参数::
  • job_flow_id (str) – 包含要检查状态的步骤的作业流 ID

  • step_id (str) – 要检查其状态的步骤 ID

  • target_states (collections.abc.Iterable[str] | None) – 目标状态,Sensor 会等待步骤达到这些状态中的任何一个。在可推迟 Sensor 的情况下,它将等待直到达到终端状态。

  • failed_states (collections.abc.Iterable[str] | None) – 失败状态,当步骤达到这些状态中的任何一个时,Sensor 将失败。

  • max_attempts (int) – 失败前的最大尝试次数

  • deferrable (bool) – 在可延迟模式下运行传感器。

template_fields: collections.abc.Sequence[str] = ('job_flow_id', 'step_id', 'target_states', 'failed_states')[source]
template_ext: collections.abc.Sequence[str] = ()[source]
job_flow_id[source]
step_id[source]
target_states = ['COMPLETED'][source]
failed_states = ['CANCELLED', 'FAILED', 'INTERRUPTED'][source]
max_attempts = 60[source]
deferrable = True[source]
get_emr_response(context)[source]

使用 boto3 调用 API 并获取有关集群步骤的详细信息。

返回::

响应

返回类型::

dict[str, Any]

static state_from_response(response)[source]

从响应字典中获取状态。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

集群步骤的执行状态

返回类型::

str

static failure_message_from_response(response)[source]

从响应字典中获取失败消息。

参数::

response (dict[str, Any]) – 来自 AWS API 的响应

返回::

失败消息

返回类型::

str | None

execute(context)[source]

创建算子时派生。

执行任务的主要方法。Context 与渲染 jinja 模板时使用的字典相同。

有关更多 Context,请参阅 get_template_context。

execute_complete(context, event=None)[source]

本条目有帮助吗?