Amazon Relational Database Service (RDS)¶
Amazon Relational Database Service (Amazon RDS) 是一种 Web 服务,可让您更轻松地在云中设置、操作和扩展关系数据库。它为行业标准关系数据库提供经济高效、可调整大小的容量,并管理常见的数据库管理任务。
先决条件任务¶
要使用这些操作符,您必须执行以下操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参见 Airflow® 的安装
设置连接.
操作符¶
创建数据库快照¶
要创建 Amazon RDS 数据库实例或集群的快照,您可以使用 RDSCreateDBSnapshotOperator
。源数据库实例必须处于 available
或 storage-optimization
状态。
tests/system/amazon/aws/example_rds_snapshot.py
create_snapshot = RdsCreateDbSnapshotOperator(
task_id="create_snapshot",
db_type="instance",
db_identifier=rds_instance_name,
db_snapshot_identifier=rds_snapshot_name,
)
复制数据库快照¶
要复制 Amazon RDS 数据库实例或集群的快照,您可以使用 RDSCopyDBSnapshotOperator
。源数据库快照必须处于 available
状态。
tests/system/amazon/aws/example_rds_snapshot.py
copy_snapshot = RdsCopyDbSnapshotOperator(
task_id="copy_snapshot",
db_type="instance",
source_db_snapshot_identifier=rds_snapshot_name,
target_db_snapshot_identifier=rds_snapshot_copy_name,
)
删除数据库快照¶
要删除 Amazon RDS 数据库实例或集群的快照,您可以使用 RDSDeleteDBSnapshotOperator
。数据库快照必须处于 available
状态才能被删除。
tests/system/amazon/aws/example_rds_snapshot.py
delete_snapshot = RdsDeleteDbSnapshotOperator(
task_id="delete_snapshot",
db_type="instance",
db_snapshot_identifier=rds_snapshot_name,
)
将 Amazon RDS 快照导出到 Amazon S3¶
要将 Amazon RDS 快照导出到 Amazon S3,您可以使用 RDSStartExportTaskOperator
。提供的 IAM 角色必须有权访问 S3 存储桶。
tests/system/amazon/aws/example_rds_export.py
start_export = RdsStartExportTaskOperator(
task_id="start_export",
export_task_identifier=rds_export_task_id,
source_arn=snapshot_arn,
s3_bucket_name=bucket_name,
s3_prefix="rds-test",
iam_role_arn=test_context[ROLE_ARN_KEY],
kms_key_id=test_context[KMS_KEY_ID_KEY],
)
取消 Amazon RDS 导出任务¶
要取消到 S3 的 Amazon RDS 导出任务,您可以使用 RDSCancelExportTaskOperator
。任何已写入 S3 存储桶的数据都不会被删除。
tests/system/amazon/aws/example_rds_export.py
cancel_export = RdsCancelExportTaskOperator(
task_id="cancel_export",
export_task_identifier=rds_export_task_id,
)
订阅 Amazon RDS 事件通知¶
要创建 Amazon RDS 事件订阅,您可以使用 RDSCreateEventSubscriptionOperator
。此操作需要 Amazon SNS 主题 Amazon 资源名称 (ARN)。Amazon RDS 事件通知仅适用于未加密的 SNS 主题。如果指定加密的 SNS 主题,则不会为该主题发送事件通知。
tests/system/amazon/aws/example_rds_event.py
create_subscription = RdsCreateEventSubscriptionOperator(
task_id="create_subscription",
subscription_name=rds_subscription_name,
sns_topic_arn=sns_topic,
source_type="db-instance",
source_ids=[rds_instance_name],
event_categories=["availability"],
)
取消订阅 Amazon RDS 事件通知¶
要删除 Amazon RDS 事件订阅,您可以使用 RDSDeleteEventSubscriptionOperator
。
tests/system/amazon/aws/example_rds_event.py
delete_subscription = RdsDeleteEventSubscriptionOperator(
task_id="delete_subscription",
subscription_name=rds_subscription_name,
)
创建数据库实例¶
要创建 AWS 数据库实例,您可以使用 RdsCreateDbInstanceOperator
。您还可以通过将 deferrable
参数设置为 True
,在可延迟模式下运行此操作符。
tests/system/amazon/aws/example_rds_instance.py
create_db_instance = RdsCreateDbInstanceOperator(
task_id="create_db_instance",
db_instance_identifier=rds_db_identifier,
db_instance_class="db.t4g.micro",
engine="postgres",
rds_kwargs={
"MasterUsername": RDS_USERNAME,
"MasterUserPassword": RDS_PASSWORD,
"AllocatedStorage": 20,
"PubliclyAccessible": False,
},
)
删除数据库实例¶
要删除 AWS 数据库实例,您可以使用 RDSDeleteDbInstanceOperator
。您还可以通过将 deferrable
参数设置为 True
,在可延迟模式下运行此操作符。
tests/system/amazon/aws/example_rds_instance.py
delete_db_instance = RdsDeleteDbInstanceOperator(
task_id="delete_db_instance",
db_instance_identifier=rds_db_identifier,
rds_kwargs={
"SkipFinalSnapshot": True,
},
)
启动数据库实例或集群¶
要启动 Amazon RDS 数据库实例或集群,您可以使用 RdsStartDbOperator
。
tests/system/amazon/aws/example_rds_instance.py
start_db_instance = RdsStartDbOperator(
task_id="start_db_instance",
db_identifier=rds_db_identifier,
)
停止数据库实例或集群¶
要停止 Amazon RDS 数据库实例或集群,您可以使用 RdsStopDbOperator
。
tests/system/amazon/aws/example_rds_instance.py
stop_db_instance = RdsStopDbOperator(
task_id="stop_db_instance",
db_identifier=rds_db_identifier,
)
传感器¶
等待 Amazon RDS 实例或集群状态¶
要等待 Amazon RDS 实例或集群达到特定状态,您可以使用 RdsDbSensor
。默认情况下,传感器等待数据库实例达到 available
状态。
tests/system/amazon/aws/example_rds_instance.py
await_db_instance = RdsDbSensor(
task_id="await_db_instance",
db_identifier=rds_db_identifier,
)
等待 Amazon RDS 快照状态¶
要等待具有特定状态的 Amazon RDS 快照,您可以使用 RdsSnapshotExistenceSensor
。默认情况下,传感器等待快照存在且状态为 available
。
tests/system/amazon/aws/example_rds_snapshot.py
snapshot_sensor = RdsSnapshotExistenceSensor(
task_id="snapshot_sensor",
db_type="instance",
db_snapshot_identifier=rds_snapshot_name,
target_statuses=["available"],
)
等待 Amazon RDS 导出任务状态¶
要等待具有特定状态的 Amazon RDS 快照导出任务,您可以使用 RdsExportTaskExistenceSensor
。默认情况下,传感器等待快照存在且状态为 available
。
tests/system/amazon/aws/example_rds_export.py
export_sensor = RdsExportTaskExistenceSensor(
task_id="export_sensor",
export_task_identifier=rds_export_task_id,
target_statuses=["canceled"],
)