Amazon Bedrock

Amazon Bedrock 是一项完全托管的服务,通过单一 API 提供来自领先人工智能公司(如 AI21 LabsAnthropicCohereMetaMistral AIStability AIAmazon)的多种高性能基础模型 (FM),以及构建具有安全性、隐私性和负责任的人工智能的生成式 AI 应用程序所需的一系列广泛功能。

先决条件任务

要使用这些 Operators,您必须完成以下几项工作

通用参数

aws_conn_id

引用 Amazon Web Services 连接 ID。如果此参数设置为 None,则使用默认的 boto3 行为,不查找连接。否则使用存储在连接中的凭据。默认值: aws_default

region_name

AWS 区域名称。如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 region_name。否则使用指定的值而非连接值。默认值: None

verify

是否验证 SSL 证书。

  • False - 不验证 SSL 证书。

  • path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果您想使用与 botocore 使用的 CA 证书包不同的证书包,可以指定此参数。

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 verify。否则使用指定的值而非连接值。默认值: None

botocore_config

提供的字典用于构建 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。

示例,有关参数的更多详细信息,请参阅 botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

如果此参数设置为 None 或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则使用指定的值而非连接值。默认值: None

注意

指定一个空字典,{},将覆盖 botocore.config.Config 的连接配置

Operators

调用现有 Amazon Bedrock 模型

要调用现有 Amazon Bedrock 模型,您可以使用 BedrockInvokeModelOperator

请注意,每个模型系列都有不同的输入和输出格式。下方包含一些示例,但有关不同格式的详细信息,请参阅 基础模型的推理参数

例如,要调用 Meta Llama 模型,您可以使用

tests/system/amazon/aws/example_bedrock.py

invoke_llama_model = BedrockInvokeModelOperator(
    task_id="invoke_llama",
    model_id=LLAMA_SHORT_MODEL_ID,
    input_data={"prompt": PROMPT},
)

要调用 Amazon Titan 模型,您可以使用

tests/system/amazon/aws/example_bedrock.py

invoke_titan_model = BedrockInvokeModelOperator(
    task_id="invoke_titan",
    model_id=TITAN_SHORT_MODEL_ID,
    input_data={"inputText": PROMPT},
)

要使用 Completions API 调用 Claude V2 模型,您可以使用

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

invoke_claude_completions = BedrockInvokeModelOperator(
    task_id="invoke_claude_completions",
    model_id=CLAUDE_MODEL_ID,
    input_data={"max_tokens_to_sample": 4000, "prompt": f"\n\nHuman: {PROMPT}\n\nAssistant:"},
)

要使用 Messages API 调用 Claude V3 Sonnet 模型,您可以使用

tests/system/amazon/aws/example_bedrock_batch_inference.py

invoke_claude_messages = BedrockInvokeModelOperator(
    task_id="invoke_claude_messages",
    model_id=CLAUDE_MODEL_ID,
    input_data={
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens": 1000,
        "messages": [{"role": "user", "content": PROMPT_TEMPLATE.format(n=42)}],
    },
)

自定义现有 Amazon Bedrock 模型

要创建微调作业以自定义基础模型,您可以使用 BedrockCustomizeModelOperator

模型自定义作业是异步的,完成时间取决于基础模型以及训练/验证数据的大小。要监控作业状态,您可以使用“model_customization_job_complete”Waiters、BedrockCustomizeModelCompletedSensor Sensors 或 BedrockCustomizeModelCompletedTrigger Trigger。

tests/system/amazon/aws/example_bedrock.py

customize_model = BedrockCustomizeModelOperator(
    task_id="customize_model",
    job_name=custom_model_job_name,
    custom_model_name=custom_model_name,
    role_arn=test_context[ROLE_ARN_KEY],
    base_model_id=f"{model_arn_prefix}{TITAN_SHORT_MODEL_ID}",
    hyperparameters=HYPERPARAMETERS,
    training_data_uri=training_data_uri,
    output_data_uri=f"s3://{bucket_name}/myOutputData",
)

为现有 Amazon Bedrock 模型配置吞吐量

要为基础模型或微调模型创建具有专用容量的预置吞吐量,您可以使用 BedrockCreateProvisionedModelThroughputOperator

预置吞吐量作业是异步的。要监控作业状态,您可以使用“provisioned_model_throughput_complete”Waiters、BedrockProvisionModelThroughputCompletedSensor Sensors 或 BedrockProvisionModelThroughputCompletedSensorTrigger Trigger。

tests/system/amazon/aws/example_bedrock.py

provision_throughput = BedrockCreateProvisionedModelThroughputOperator(
    task_id="provision_throughput",
    model_units=1,
    provisioned_model_name=provisioned_model_name,
    model_id=f"{model_arn_prefix}{TITAN_MODEL_ID}",
)

创建 Amazon Bedrock 知识库

要创建 Amazon Bedrock 知识库,您可以使用 BedrockCreateKnowledgeBaseOperator

有关哪些模型支持将数据嵌入向量存储的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

create_knowledge_base = BedrockCreateKnowledgeBaseOperator(
    task_id="create_knowledge_base",
    name=knowledge_base_name,
    embedding_model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{TITAN_MODEL_ID}",
    role_arn=test_context[ROLE_ARN_KEY],
    storage_config={
        "type": "OPENSEARCH_SERVERLESS",
        "opensearchServerlessConfiguration": {
            "collectionArn": get_collection_arn(collection),
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata",
            },
        },
    },
)

删除 Amazon Bedrock 知识库

删除知识库是一个简单的 boto API 调用,可以在 TaskFlow 任务中完成,示例如下。

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_knowledge_base(knowledge_base_id: str):
    """
    Delete the Amazon Bedrock knowledge base created earlier.

    .. seealso::
        For more information on how to use this sensor, take a look at the guide:
        :ref:`howto/operator:BedrockDeleteKnowledgeBase`

    :param knowledge_base_id: The unique identifier of the knowledge base to delete.
    """
    log.info("Deleting Knowledge Base %s.", knowledge_base_id)
    bedrock_agent_client.conn.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)


创建 Amazon Bedrock 数据源

要创建 Amazon Bedrock 数据源,您可以使用 BedrockCreateDataSourceOperator

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

create_data_source = BedrockCreateDataSourceOperator(
    task_id="create_data_source",
    knowledge_base_id=create_knowledge_base.output,
    name=data_source_name,
    bucket_name=bucket_name,
)

删除 Amazon Bedrock 数据源

删除数据源是一个简单的 boto API 调用,可以在 TaskFlow 任务中完成,示例如下。

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_data_source(knowledge_base_id: str, data_source_id: str):
    """
    Delete the Amazon Bedrock data source created earlier.

    .. seealso::
        For more information on how to use this sensor, take a look at the guide:
        :ref:`howto_operator:BedrockDeleteDataSource`

    :param knowledge_base_id: The unique identifier of the knowledge base which the data source is attached to.
    :param data_source_id: The unique identifier of the data source to delete.
    """
    log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
    bedrock_agent_client.conn.delete_data_source(
        dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id
    )


将数据摄取到 Amazon Bedrock 数据源

要将数据从 Amazon S3 存储桶添加到 Amazon Bedrock 数据源,您可以使用 BedrockIngestDataOperator

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

ingest_data = BedrockIngestDataOperator(
    task_id="ingest_data",
    knowledge_base_id=create_knowledge_base.output,
    data_source_id=create_data_source.output,
)

Amazon Bedrock 检索

要查询知识库,您可以使用 BedrockRetrieveOperator

响应将仅包含与查询相关的来源引用。如果您想将结果通过 LLM 以生成文本响应,请参阅 BedrockRaGOperator

有关哪些模型支持从知识库检索信息的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

retrieve = BedrockRetrieveOperator(
    task_id="retrieve",
    knowledge_base_id=create_knowledge_base.output,
    retrieval_query="Who was the CEO of Amazon in 1997?",
)

Amazon Bedrock 检索并生成 (RaG)

要查询知识库或外部来源并根据检索到的结果生成文本响应,您可以使用 BedrockRaGOperator

响应将包含与查询相关的来源引用以及生成的文本回复。有关哪些模型支持从知识库检索信息的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html

注意:“外部来源”支持已在 boto 1.34.90 中添加

使用 Amazon Bedrock 知识库的示例

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

knowledge_base_rag = BedrockRaGOperator(
    task_id="knowledge_base_rag",
    input="Who was the CEO of Amazon on 2022?",
    source_type="KNOWLEDGE_BASE",
    model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{CLAUDE_MODEL_ID}",
    knowledge_base_id=create_knowledge_base.output,
)

使用 Amazon S3 存储桶中的 PDF 文件的示例

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

external_sources_rag = BedrockRaGOperator(
    task_id="external_sources_rag",
    input="Who was the CEO of Amazon in 2022?",
    source_type="EXTERNAL_SOURCES",
    model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
    sources=[
        {
            "sourceType": "S3",
            "s3Location": {"uri": f"s3://{bucket_name}/AMZN-2022-Shareholder-Letter.pdf"},
        }
    ],
)

创建 Amazon Bedrock 批量推理作业

要创建批量推理作业以对多个提示调用模型,您可以使用 BedrockBatchInferenceOperator

输入必须采用 jsonl 格式并上传到 Amazon S3 存储桶。有关详细信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference.html

注意:作业会添加到队列中并按顺序处理。考虑到潜在的等待时间,以及可选超时参数以小时为单位的事实,在这种情况下,建议使用可延迟模式而非“wait_for_completion”。

使用 Amazon Bedrock 批量推理作业的示例

tests/system/amazon/aws/example_bedrock_batch_inference.py

batch_infer = BedrockBatchInferenceOperator(
    task_id="batch_infer",
    job_name=job_name,
    role_arn=test_context[ROLE_ARN_KEY],
    model_id=CLAUDE_MODEL_ID,
    input_uri=input_uri,
    output_uri=output_uri,
)

Sensors

等待 Amazon Bedrock 自定义模型作业

要等待 Amazon Bedrock 自定义模型作业的状态直到其达到终端状态,您可以使用 BedrockCustomizeModelCompletedSensor

tests/system/amazon/aws/example_bedrock.py

await_custom_model_job = BedrockCustomizeModelCompletedSensor(
    task_id="await_custom_model_job",
    job_name=custom_model_job_name,
)

等待 Amazon Bedrock 预置模型吞吐量作业

要等待 Amazon Bedrock 预置模型吞吐量作业的状态直到其达到终端状态,您可以使用 BedrockProvisionModelThroughputCompletedSensor

tests/system/amazon/aws/example_bedrock.py

await_provision_throughput = BedrockProvisionModelThroughputCompletedSensor(
    task_id="await_provision_throughput",
    model_id=provision_throughput.output,
)

等待 Amazon Bedrock 知识库

要等待 Amazon Bedrock 知识库的状态直到其达到终端状态,您可以使用 BedrockKnowledgeBaseActiveSensor

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

await_knowledge_base = BedrockKnowledgeBaseActiveSensor(
    task_id="await_knowledge_base", knowledge_base_id=create_knowledge_base.output
)

等待 Amazon Bedrock 摄取作业完成

要等待 Amazon Bedrock 数据摄取作业的状态直到其达到终端状态,您可以使用 BedrockIngestionJobSensor

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

await_ingest = BedrockIngestionJobSensor(
    task_id="await_ingest",
    knowledge_base_id=create_knowledge_base.output,
    data_source_id=create_data_source.output,
    ingestion_job_id=ingest_data.output,
)

等待 Amazon Bedrock 批量推理作业

要等待 Amazon Bedrock 批量推理作业的状态直到其达到“Scheduled”或“Completed”状态,您可以使用 BedrockBatchInferenceScheduledSensor

Bedrock 会将批量推理作业添加到队列中,并且可能需要一些时间才能完成。如果您想等待作业完成,请将 success_state 设置为 TargetState.COMPLETED;如果您只想等待服务确认作业已在队列中,请使用 TargetState.SCHEDULED。

tests/system/amazon/aws/example_bedrock_batch_inference.py

await_job_scheduled = BedrockBatchInferenceSensor(
    task_id="await_job_scheduled",
    job_arn=batch_infer.output,
    success_state=BedrockBatchInferenceSensor.SuccessState.SCHEDULED,
)

参考

此条目有用吗?