Amazon Managed Workflows for Apache Airflow (MWAA)

Amazon Managed Workflows for Apache Airflow (MWAA) 是一项适用于 Apache Airflow 的托管服务,允许您使用当前熟悉的原有 Apache Airflow 平台来协调工作流。您无需承担管理底层基础设施的运营负担,即可获得更高的可扩展性、可用性和安全性。

注意:与 Airflow 的内置操作器不同,这些操作器旨在与托管在 AWS MWAA 上的外部 Airflow 环境进行交互。

先决条件任务

要使用这些操作器,您必须执行以下操作:

通用参数

aws_conn_id

参考 Amazon Web Services Connection ID。如果此参数设置为 None,则使用默认的 boto3 行为,不进行连接查找。否则使用 Connection 中存储的凭证。默认值:aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则将使用 AWS Connection Extra Parameter 中的 region_name。否则使用指定的值代替连接值。默认值:None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书捆绑文件的文件名。如果您想使用与 botocore 使用的 CA 证书捆绑文件不同的文件,可以指定此参数。

如果此参数设置为 None 或省略,则将使用 AWS Connection Extra Parameter 中的 verify。否则使用指定的值代替连接值。默认值:None

botocore_config

提供的字典用于构建 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。

示例,有关参数的更多详细信息,请参阅 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则将使用 AWS Connection Extra Parameter 中的 config_kwargs。否则使用指定的值代替连接值。默认值:None

注意

指定一个空字典 {} 将覆盖 botocore.config.Config 的连接配置

操作器

在 Amazon MWAA 环境中触发 DAG 运行

要在 Amazon MWAA 环境中触发 DAG 运行,您可以使用 MwaaTriggerDagRunOperator

在以下示例中,任务 trigger_dag_run 在环境 MyAirflowEnvironment 中触发 DAG hello_world 的 DAG 运行,并等待运行完成。

tests/system/amazon/aws/example_mwaa.py

trigger_dag_run = MwaaTriggerDagRunOperator(
    task_id="trigger_dag_run",
    env_name=env_name,
    trigger_dag_id=trigger_dag_id,
    wait_for_completion=True,
)

传感器

等待 AWS MWAA DAG 运行的状态

要等待在 Amazon MWAA 上运行的 DAG 运行达到给定状态之一,您可以使用 MwaaDagRunSensor

在以下示例中,任务 wait_for_dag_run 等待上述任务中创建的 DAG 运行完成。

tests/system/amazon/aws/example_mwaa.py

wait_for_dag_run = MwaaDagRunSensor(
    task_id="wait_for_dag_run",
    external_env_name=env_name,
    external_dag_id=trigger_dag_id,
    external_dag_run_id="{{ task_instance.xcom_pull(task_ids='trigger_dag_run')['RestApiResponse']['dag_run_id'] }}",
    poke_interval=5,
)

参考

此条目是否有帮助?