Amazon关系型数据库服务 (RDS)¶
Amazon关系型数据库服务 (Amazon RDS) 是一种 Web 服务,可让您更轻松地在云中设置、操作和扩展关系型数据库。它为行业标准关系型数据库提供经济高效、可调整大小的容量,并管理常见的数据库管理任务。
先决任务¶
要使用这些操作符,您必须执行以下几步
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为而不进行连接查找。否则使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 region_name。否则使用指定的值而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果要使用与 botocore 所用不同的 CA 证书包,可以指定此参数。
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 verify。否则使用指定的值而不是连接值。默认值:None
- botocore_config
提供的字典用于构造一个 botocore.config.Config。此配置可用于配置 避免限制异常、超时等。
示例,有关参数的更多详情,请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则使用指定的值而不是连接值。默认值:None
注意
指定一个空字典
{}
将覆盖 botocore.config.Config 的连接配置
操作符¶
创建数据库快照¶
要创建 Amazon RDS 数据库实例或集群的快照,可以使用 RDSCreateDBSnapshotOperator
。源数据库实例必须处于 available
或 storage-optimization
状态。
tests/system/amazon/aws/example_rds_snapshot.py
create_snapshot = RdsCreateDbSnapshotOperator(
task_id="create_snapshot",
db_type="instance",
db_identifier=rds_instance_name,
db_snapshot_identifier=rds_snapshot_name,
)
复制数据库快照¶
要复制 Amazon RDS 数据库实例或集群的快照,可以使用 RDSCopyDBSnapshotOperator
。源数据库快照必须处于 available
状态。
tests/system/amazon/aws/example_rds_snapshot.py
copy_snapshot = RdsCopyDbSnapshotOperator(
task_id="copy_snapshot",
db_type="instance",
source_db_snapshot_identifier=rds_snapshot_name,
target_db_snapshot_identifier=rds_snapshot_copy_name,
)
删除数据库快照¶
要删除 Amazon RDS 数据库实例或集群的快照,可以使用 RDSDeleteDBSnapshotOperator
。数据库快照必须处于 available
状态才能被删除。
tests/system/amazon/aws/example_rds_snapshot.py
delete_snapshot = RdsDeleteDbSnapshotOperator(
task_id="delete_snapshot",
db_type="instance",
db_snapshot_identifier=rds_snapshot_name,
)
将 Amazon RDS 快照导出到 Amazon S3¶
要将 Amazon RDS 快照导出到 Amazon S3,可以使用 RDSStartExportTaskOperator
。提供的 IAM 角色必须具有访问 S3 存储桶的权限。
tests/system/amazon/aws/example_rds_export.py
start_export = RdsStartExportTaskOperator(
task_id="start_export",
export_task_identifier=rds_export_task_id,
source_arn=snapshot_arn,
s3_bucket_name=bucket_name,
s3_prefix="rds-test",
iam_role_arn=test_context[ROLE_ARN_KEY],
kms_key_id=test_context[KMS_KEY_ID_KEY],
)
取消 Amazon RDS 导出任务¶
要取消到 S3 的 Amazon RDS 导出任务,可以使用 RDSCancelExportTaskOperator
。已写入 S3 存储桶的任何数据都不会被删除。
tests/system/amazon/aws/example_rds_export.py
cancel_export = RdsCancelExportTaskOperator(
task_id="cancel_export",
export_task_identifier=rds_export_task_id,
)
订阅 Amazon RDS 事件通知¶
要创建 Amazon RDS 事件订阅,可以使用 RDSCreateEventSubscriptionOperator
。此操作需要 Amazon SNS 主题的 Amazon Resource Name (ARN)。Amazon RDS 事件通知仅适用于未加密的 SNS 主题。如果指定加密的 SNS 主题,则不会为该主题发送事件通知。
tests/system/amazon/aws/example_rds_event.py
create_subscription = RdsCreateEventSubscriptionOperator(
task_id="create_subscription",
subscription_name=rds_subscription_name,
sns_topic_arn=sns_topic,
source_type="db-instance",
source_ids=[rds_instance_name],
event_categories=["availability"],
)
取消订阅 Amazon RDS 事件通知¶
要删除 Amazon RDS 事件订阅,可以使用 RDSDeleteEventSubscriptionOperator
。
tests/system/amazon/aws/example_rds_event.py
delete_subscription = RdsDeleteEventSubscriptionOperator(
task_id="delete_subscription",
subscription_name=rds_subscription_name,
)
创建数据库实例¶
要创建 AWS 数据库实例,可以使用 RdsCreateDbInstanceOperator
。您还可以通过将 deferrable
参数设置为 True
来以可延迟模式运行此操作符。
tests/system/amazon/aws/example_rds_instance.py
create_db_instance = RdsCreateDbInstanceOperator(
task_id="create_db_instance",
db_instance_identifier=rds_db_identifier,
db_instance_class="db.t4g.micro",
engine="postgres",
rds_kwargs={
"MasterUsername": RDS_USERNAME,
"MasterUserPassword": RDS_PASSWORD,
"AllocatedStorage": 20,
"PubliclyAccessible": False,
},
)
删除数据库实例¶
要删除 AWS 数据库实例,可以使用 RDSDeleteDbInstanceOperator
。您还可以通过将 deferrable
参数设置为 True
来以可延迟模式运行此操作符。
tests/system/amazon/aws/example_rds_instance.py
delete_db_instance = RdsDeleteDbInstanceOperator(
task_id="delete_db_instance",
db_instance_identifier=rds_db_identifier,
rds_kwargs={
"SkipFinalSnapshot": True,
},
)
启动数据库实例或集群¶
要启动 Amazon RDS 数据库实例或集群,可以使用 RdsStartDbOperator
。
tests/system/amazon/aws/example_rds_instance.py
start_db_instance = RdsStartDbOperator(
task_id="start_db_instance",
db_identifier=rds_db_identifier,
)
停止数据库实例或集群¶
要停止 Amazon RDS 数据库实例或集群,可以使用 RdsStopDbOperator
。
tests/system/amazon/aws/example_rds_instance.py
stop_db_instance = RdsStopDbOperator(
task_id="stop_db_instance",
db_identifier=rds_db_identifier,
)
传感器¶
等待 Amazon RDS 实例或集群状态¶
要等待 Amazon RDS 实例或集群达到特定状态,可以使用 RdsDbSensor
。默认情况下,传感器等待数据库实例达到 available
状态。
tests/system/amazon/aws/example_rds_instance.py
await_db_instance = RdsDbSensor(
task_id="await_db_instance",
db_identifier=rds_db_identifier,
)
等待 Amazon RDS 快照状态¶
要等待具有特定状态的 Amazon RDS 快照,可以使用 RdsSnapshotExistenceSensor
。默认情况下,传感器等待状态为 available
的快照存在。
tests/system/amazon/aws/example_rds_snapshot.py
snapshot_sensor = RdsSnapshotExistenceSensor(
task_id="snapshot_sensor",
db_type="instance",
db_snapshot_identifier=rds_snapshot_name,
target_statuses=["available"],
)
等待 Amazon RDS 导出任务状态¶
要等待具有特定状态的 Amazon RDS 快照导出任务,可以使用 RdsExportTaskExistenceSensor
。默认情况下,传感器等待状态为 available
的快照存在。
tests/system/amazon/aws/example_rds_export.py
export_sensor = RdsExportTaskExistenceSensor(
task_id="export_sensor",
export_task_identifier=rds_export_task_id,
target_statuses=["canceled"],
)