AWS Database Migration Service (DMS)

AWS Database Migration Service (AWS DMS) 是一种网络服务,您可以使用它将位于本地、Amazon Relational Database Service(Amazon RDS)数据库实例或 Amazon Elastic Compute Cloud(Amazon EC2)实例上的数据库迁移到 AWS 服务上的数据库。这些服务可以包括 Amazon RDS 上的数据库或 Amazon EC2 实例上的数据库。您还可以将数据库从 AWS 服务迁移到本地数据库。您可以在使用相同数据库引擎的源和目标端点之间进行迁移,例如从 Oracle 数据库迁移到 Oracle 数据库。您也可以在使用不同数据库引擎的源和目标端点之间进行迁移,例如从 Oracle 数据库迁移到 PostgreSQL 数据库。

前置任务

要使用这些操作符,您需要完成以下几项工作

通用参数

aws_conn_id

引用到 Amazon Web Services 连接 ID。如果此参数设置为 None,则使用默认的 boto3 行为且不进行连接查找。否则使用存储在连接中的凭证。默认值:aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则使用 region_name(来自 AWS 连接额外参数)。否则使用指定的值而不是连接中的值。默认值:None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书捆绑文件名。如果您想使用与 botocore 不同的 CA 证书捆绑,可指定此参数。

如果此参数设置为 None 或省略,则使用 verify(来自 AWS 连接额外参数)。否则使用指定的值而不是连接中的值。默认值:None

botocore_config

提供的字典将用于构建 botocore.config.Config。此配置可用于配置避免限流异常、超时等。

示例,关于参数的更多细节请查看 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则使用 config_kwargs(来自 AWS 连接额外参数)。否则使用指定的值而不是连接中的值。默认值:None

注意

指定一个空字典 {} 将覆盖 botocore.config.Config 的连接配置。

操作符

Create a replication task

要创建复制任务,您可以使用 DmsCreateTaskOperator

tests/system/amazon/aws/example_dms.py[source]

create_task = DmsCreateTaskOperator(
    task_id="create_task",
    replication_task_id=dms_replication_task_id,
    source_endpoint_arn=create_assets["source_endpoint_arn"],
    target_endpoint_arn=create_assets["target_endpoint_arn"],
    replication_instance_arn=create_assets["replication_instance_arn"],
    table_mappings=table_mappings,
)

Start a replication task

要启动复制任务,您可以使用 DmsStartTaskOperator

tests/system/amazon/aws/example_dms.py[source]

start_task = DmsStartTaskOperator(
    task_id="start_task",
    replication_task_arn=task_arn,
)

Get details of replication tasks

要检索一系列复制任务的详细信息,您可以使用 DmsDescribeTasksOperator

tests/system/amazon/aws/example_dms.py[source]

describe_tasks = DmsDescribeTasksOperator(
    task_id="describe_tasks",
    describe_tasks_kwargs={
        "Filters": [
            {
                "Name": "replication-instance-arn",
                "Values": [create_assets["replication_instance_arn"]],
            }
        ]
    },
    do_xcom_push=False,
)

Stop a replication task

要停止复制任务,您可以使用 DmsStopTaskOperator

tests/system/amazon/aws/example_dms.py[source]

stop_task = DmsStopTaskOperator(
    task_id="stop_task",
    replication_task_arn=task_arn,
)

Delete a replication task

要删除复制任务,您可以使用 DmsDeleteTaskOperator

tests/system/amazon/aws/example_dms.py[source]

delete_task = DmsDeleteTaskOperator(
    task_id="delete_task",
    replication_task_arn=task_arn,
)

Create a serverless replication config

要创建无服务器复制配置,请使用 DmsCreateReplicationConfigOperator

tests/system/amazon/aws/example_dms_serverless.py[source]

create_replication_config = DmsCreateReplicationConfigOperator(
    task_id="create_replication_config",
    replication_config_id=replication_id,
    source_endpoint_arn=create_assets["source_endpoint_arn"],
    target_endpoint_arn=create_assets["target_endpoint_arn"],
    compute_config={
        "MaxCapacityUnits": 4,
        "MinCapacityUnits": 1,
        "MultiAZ": False,
        "ReplicationSubnetGroupId": "default",
    },
    replication_type="full-load",
    table_mappings=json.dumps(table_mappings),
)

Describe a serverless replication config

要描述无服务器复制配置,请使用 DmsDescribeReplicationConfigsOperator

tests/system/amazon/aws/example_dms_serverless.py[source]

describe_replication_configs = DmsDescribeReplicationConfigsOperator(
    task_id="describe_replication_configs",
)

Start a serverless replication

要启动无服务器复制,请使用 DmsStartReplicationOperator

tests/system/amazon/aws/example_dms_serverless.py[source]

replicate = DmsStartReplicationOperator(
    task_id="replicate",
    replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}",
    replication_start_type="start-replication",
    wait_for_completion=True,
    waiter_delay=60,
    waiter_max_attempts=200,
)

Stop a serverless replication

要停止无服务器复制,请使用 DmsStopReplicationOperator

tests/system/amazon/aws/example_dms_serverless.py[source]

stop_replication = DmsStopReplicationOperator(
    task_id="stop_replication",
    replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}",
    wait_for_completion=True,
    waiter_delay=120,
    waiter_max_attempts=200,
)

Get the status of a serverless replication

要获取无服务器复制的状态,请使用 DmsDescribeReplicationsOperator

tests/system/amazon/aws/example_dms_serverless.py[source]

describe_replications = DmsDescribeReplicationsOperator(
    task_id="describe_replications",
)

Delete a serverless replication configuration

要删除无服务器复制配置,请使用 DmsDeleteReplicationConfigOperator

tests/system/amazon/aws/example_dms_serverless.py[source]

delete_replication_config = DmsDeleteReplicationConfigOperator(
    task_id="delete_replication_config",
    waiter_max_attempts=200,
    replication_config_arn="{{ task_instance.xcom_pull(task_ids='create_replication_config', key='return_value') }}",
)

传感器

Wait for a replication task to complete

要检查复制任务的状态直至完成,您可以使用 DmsTaskCompletedSensor

tests/system/amazon/aws/example_dms.py[source]

await_task_stop = DmsTaskCompletedSensor(
    task_id="await_task_stop",
    replication_task_arn=task_arn,
)

参考

此条目是否有帮助?