Amazon Managed Workflows for Apache Airflow (MWAA)¶
Amazon Managed Workflows for Apache Airflow (MWAA) 是一项适用于 Apache Airflow 的托管服务,允许您使用当前熟悉的原有 Apache Airflow 平台来协调工作流。您无需承担管理底层基础设施的运营负担,即可获得更高的可扩展性、可用性和安全性。
注意:与 Airflow 的内置操作器不同,这些操作器旨在与托管在 AWS MWAA 上的外部 Airflow 环境进行交互。
先决条件任务¶
要使用这些操作器,您必须执行以下操作:
使用 AWS Console 或 AWS CLI 创建必要的资源。
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
通用参数¶
- aws_conn_id
参考 Amazon Web Services Connection ID。如果此参数设置为
None
,则使用默认的 boto3 行为,不进行连接查找。否则使用 Connection 中存储的凭证。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则将使用 AWS Connection Extra Parameter 中的 region_name。否则使用指定的值代替连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑文件的文件名。如果您想使用与 botocore 使用的 CA 证书捆绑文件不同的文件,可以指定此参数。
如果此参数设置为
None
或省略,则将使用 AWS Connection Extra Parameter 中的 verify。否则使用指定的值代替连接值。默认值:None
- botocore_config
提供的字典用于构建 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。
示例,有关参数的更多详细信息,请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则将使用 AWS Connection Extra Parameter 中的 config_kwargs。否则使用指定的值代替连接值。默认值:None
注意
指定一个空字典
{}
将覆盖 botocore.config.Config 的连接配置
操作器¶
在 Amazon MWAA 环境中触发 DAG 运行¶
要在 Amazon MWAA 环境中触发 DAG 运行,您可以使用 MwaaTriggerDagRunOperator
在以下示例中,任务 trigger_dag_run
在环境 MyAirflowEnvironment
中触发 DAG hello_world
的 DAG 运行,并等待运行完成。
tests/system/amazon/aws/example_mwaa.py
trigger_dag_run = MwaaTriggerDagRunOperator(
task_id="trigger_dag_run",
env_name=env_name,
trigger_dag_id=trigger_dag_id,
wait_for_completion=True,
)
传感器¶
等待 AWS MWAA DAG 运行的状态¶
要等待在 Amazon MWAA 上运行的 DAG 运行达到给定状态之一,您可以使用 MwaaDagRunSensor
在以下示例中,任务 wait_for_dag_run
等待上述任务中创建的 DAG 运行完成。
tests/system/amazon/aws/example_mwaa.py
wait_for_dag_run = MwaaDagRunSensor(
task_id="wait_for_dag_run",
external_env_name=env_name,
external_dag_id=trigger_dag_id,
external_dag_run_id="{{ task_instance.xcom_pull(task_ids='trigger_dag_run')['RestApiResponse']['dag_run_id'] }}",
poke_interval=5,
)