Amazon Bedrock¶
Amazon Bedrock 是一项完全托管的服务,通过单一 API 提供来自领先人工智能公司(如 AI21 Labs
、Anthropic
、Cohere
、Meta
、Mistral AI
、Stability AI
和 Amazon
)的多种高性能基础模型 (FM),以及构建具有安全性、隐私性和负责任的人工智能的生成式 AI 应用程序所需的一系列广泛功能。
先决条件任务¶
要使用这些 Operators,您必须完成以下几项工作
使用 AWS Console 或 AWS CLI 创建必要的资源。
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参阅 Airflow® 安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,不查找连接。否则使用存储在连接中的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 region_name。否则使用指定的值而非连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书包的文件名。如果您想使用与 botocore 使用的 CA 证书包不同的证书包,可以指定此参数。
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 verify。否则使用指定的值而非连接值。默认值:None
- botocore_config
提供的字典用于构建 botocore.config.Config。此配置可用于配置 避免节流异常、超时等。
示例,有关参数的更多详细信息,请参阅 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用 AWS 连接额外参数 中的 config_kwargs。否则使用指定的值而非连接值。默认值:None
注意
指定一个空字典,
{}
,将覆盖 botocore.config.Config 的连接配置
Operators¶
调用现有 Amazon Bedrock 模型¶
要调用现有 Amazon Bedrock 模型,您可以使用 BedrockInvokeModelOperator
。
请注意,每个模型系列都有不同的输入和输出格式。下方包含一些示例,但有关不同格式的详细信息,请参阅 基础模型的推理参数
例如,要调用 Meta Llama 模型,您可以使用
tests/system/amazon/aws/example_bedrock.py
invoke_llama_model = BedrockInvokeModelOperator(
task_id="invoke_llama",
model_id=LLAMA_SHORT_MODEL_ID,
input_data={"prompt": PROMPT},
)
要调用 Amazon Titan 模型,您可以使用
tests/system/amazon/aws/example_bedrock.py
invoke_titan_model = BedrockInvokeModelOperator(
task_id="invoke_titan",
model_id=TITAN_SHORT_MODEL_ID,
input_data={"inputText": PROMPT},
)
要使用 Completions API 调用 Claude V2 模型,您可以使用
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
invoke_claude_completions = BedrockInvokeModelOperator(
task_id="invoke_claude_completions",
model_id=CLAUDE_MODEL_ID,
input_data={"max_tokens_to_sample": 4000, "prompt": f"\n\nHuman: {PROMPT}\n\nAssistant:"},
)
要使用 Messages API 调用 Claude V3 Sonnet 模型,您可以使用
tests/system/amazon/aws/example_bedrock_batch_inference.py
invoke_claude_messages = BedrockInvokeModelOperator(
task_id="invoke_claude_messages",
model_id=CLAUDE_MODEL_ID,
input_data={
"anthropic_version": "bedrock-2023-05-31",
"max_tokens": 1000,
"messages": [{"role": "user", "content": PROMPT_TEMPLATE.format(n=42)}],
},
)
自定义现有 Amazon Bedrock 模型¶
要创建微调作业以自定义基础模型,您可以使用 BedrockCustomizeModelOperator
。
模型自定义作业是异步的,完成时间取决于基础模型以及训练/验证数据的大小。要监控作业状态,您可以使用“model_customization_job_complete”Waiters、BedrockCustomizeModelCompletedSensor
Sensors 或 BedrockCustomizeModelCompletedTrigger
Trigger。
tests/system/amazon/aws/example_bedrock.py
customize_model = BedrockCustomizeModelOperator(
task_id="customize_model",
job_name=custom_model_job_name,
custom_model_name=custom_model_name,
role_arn=test_context[ROLE_ARN_KEY],
base_model_id=f"{model_arn_prefix}{TITAN_SHORT_MODEL_ID}",
hyperparameters=HYPERPARAMETERS,
training_data_uri=training_data_uri,
output_data_uri=f"s3://{bucket_name}/myOutputData",
)
为现有 Amazon Bedrock 模型配置吞吐量¶
要为基础模型或微调模型创建具有专用容量的预置吞吐量,您可以使用 BedrockCreateProvisionedModelThroughputOperator
。
预置吞吐量作业是异步的。要监控作业状态,您可以使用“provisioned_model_throughput_complete”Waiters、BedrockProvisionModelThroughputCompletedSensor
Sensors 或 BedrockProvisionModelThroughputCompletedSensorTrigger
Trigger。
tests/system/amazon/aws/example_bedrock.py
provision_throughput = BedrockCreateProvisionedModelThroughputOperator(
task_id="provision_throughput",
model_units=1,
provisioned_model_name=provisioned_model_name,
model_id=f"{model_arn_prefix}{TITAN_MODEL_ID}",
)
创建 Amazon Bedrock 知识库¶
要创建 Amazon Bedrock 知识库,您可以使用 BedrockCreateKnowledgeBaseOperator
。
有关哪些模型支持将数据嵌入向量存储的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
create_knowledge_base = BedrockCreateKnowledgeBaseOperator(
task_id="create_knowledge_base",
name=knowledge_base_name,
embedding_model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{TITAN_MODEL_ID}",
role_arn=test_context[ROLE_ARN_KEY],
storage_config={
"type": "OPENSEARCH_SERVERLESS",
"opensearchServerlessConfiguration": {
"collectionArn": get_collection_arn(collection),
"vectorIndexName": index_name,
"fieldMapping": {
"vectorField": "vector",
"textField": "text",
"metadataField": "text-metadata",
},
},
},
)
删除 Amazon Bedrock 知识库¶
删除知识库是一个简单的 boto API 调用,可以在 TaskFlow 任务中完成,示例如下。
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_knowledge_base(knowledge_base_id: str):
"""
Delete the Amazon Bedrock knowledge base created earlier.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:BedrockDeleteKnowledgeBase`
:param knowledge_base_id: The unique identifier of the knowledge base to delete.
"""
log.info("Deleting Knowledge Base %s.", knowledge_base_id)
bedrock_agent_client.conn.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
创建 Amazon Bedrock 数据源¶
要创建 Amazon Bedrock 数据源,您可以使用 BedrockCreateDataSourceOperator
。
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
create_data_source = BedrockCreateDataSourceOperator(
task_id="create_data_source",
knowledge_base_id=create_knowledge_base.output,
name=data_source_name,
bucket_name=bucket_name,
)
删除 Amazon Bedrock 数据源¶
删除数据源是一个简单的 boto API 调用,可以在 TaskFlow 任务中完成,示例如下。
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_data_source(knowledge_base_id: str, data_source_id: str):
"""
Delete the Amazon Bedrock data source created earlier.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto_operator:BedrockDeleteDataSource`
:param knowledge_base_id: The unique identifier of the knowledge base which the data source is attached to.
:param data_source_id: The unique identifier of the data source to delete.
"""
log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
bedrock_agent_client.conn.delete_data_source(
dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id
)
将数据摄取到 Amazon Bedrock 数据源¶
要将数据从 Amazon S3 存储桶添加到 Amazon Bedrock 数据源,您可以使用 BedrockIngestDataOperator
。
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
ingest_data = BedrockIngestDataOperator(
task_id="ingest_data",
knowledge_base_id=create_knowledge_base.output,
data_source_id=create_data_source.output,
)
Amazon Bedrock 检索¶
要查询知识库,您可以使用 BedrockRetrieveOperator
。
响应将仅包含与查询相关的来源引用。如果您想将结果通过 LLM 以生成文本响应,请参阅 BedrockRaGOperator
有关哪些模型支持从知识库检索信息的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
retrieve = BedrockRetrieveOperator(
task_id="retrieve",
knowledge_base_id=create_knowledge_base.output,
retrieval_query="Who was the CEO of Amazon in 1997?",
)
Amazon Bedrock 检索并生成 (RaG)¶
要查询知识库或外部来源并根据检索到的结果生成文本响应,您可以使用 BedrockRaGOperator
。
响应将包含与查询相关的来源引用以及生成的文本回复。有关哪些模型支持从知识库检索信息的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
注意:“外部来源”支持已在 boto 1.34.90 中添加
使用 Amazon Bedrock 知识库的示例
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
knowledge_base_rag = BedrockRaGOperator(
task_id="knowledge_base_rag",
input="Who was the CEO of Amazon on 2022?",
source_type="KNOWLEDGE_BASE",
model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{CLAUDE_MODEL_ID}",
knowledge_base_id=create_knowledge_base.output,
)
使用 Amazon S3 存储桶中的 PDF 文件的示例
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
external_sources_rag = BedrockRaGOperator(
task_id="external_sources_rag",
input="Who was the CEO of Amazon in 2022?",
source_type="EXTERNAL_SOURCES",
model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
sources=[
{
"sourceType": "S3",
"s3Location": {"uri": f"s3://{bucket_name}/AMZN-2022-Shareholder-Letter.pdf"},
}
],
)
创建 Amazon Bedrock 批量推理作业¶
要创建批量推理作业以对多个提示调用模型,您可以使用 BedrockBatchInferenceOperator
。
输入必须采用 jsonl 格式并上传到 Amazon S3 存储桶。有关详细信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference.html。
注意:作业会添加到队列中并按顺序处理。考虑到潜在的等待时间,以及可选超时参数以小时为单位的事实,在这种情况下,建议使用可延迟模式而非“wait_for_completion”。
使用 Amazon Bedrock 批量推理作业的示例
tests/system/amazon/aws/example_bedrock_batch_inference.py
batch_infer = BedrockBatchInferenceOperator(
task_id="batch_infer",
job_name=job_name,
role_arn=test_context[ROLE_ARN_KEY],
model_id=CLAUDE_MODEL_ID,
input_uri=input_uri,
output_uri=output_uri,
)
Sensors¶
等待 Amazon Bedrock 自定义模型作业¶
要等待 Amazon Bedrock 自定义模型作业的状态直到其达到终端状态,您可以使用 BedrockCustomizeModelCompletedSensor
tests/system/amazon/aws/example_bedrock.py
await_custom_model_job = BedrockCustomizeModelCompletedSensor(
task_id="await_custom_model_job",
job_name=custom_model_job_name,
)
等待 Amazon Bedrock 预置模型吞吐量作业¶
要等待 Amazon Bedrock 预置模型吞吐量作业的状态直到其达到终端状态,您可以使用 BedrockProvisionModelThroughputCompletedSensor
tests/system/amazon/aws/example_bedrock.py
await_provision_throughput = BedrockProvisionModelThroughputCompletedSensor(
task_id="await_provision_throughput",
model_id=provision_throughput.output,
)
等待 Amazon Bedrock 知识库¶
要等待 Amazon Bedrock 知识库的状态直到其达到终端状态,您可以使用 BedrockKnowledgeBaseActiveSensor
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
await_knowledge_base = BedrockKnowledgeBaseActiveSensor(
task_id="await_knowledge_base", knowledge_base_id=create_knowledge_base.output
)
等待 Amazon Bedrock 摄取作业完成¶
要等待 Amazon Bedrock 数据摄取作业的状态直到其达到终端状态,您可以使用 BedrockIngestionJobSensor
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
await_ingest = BedrockIngestionJobSensor(
task_id="await_ingest",
knowledge_base_id=create_knowledge_base.output,
data_source_id=create_data_source.output,
ingestion_job_id=ingest_data.output,
)
等待 Amazon Bedrock 批量推理作业¶
要等待 Amazon Bedrock 批量推理作业的状态直到其达到“Scheduled”或“Completed”状态,您可以使用 BedrockBatchInferenceScheduledSensor
Bedrock 会将批量推理作业添加到队列中,并且可能需要一些时间才能完成。如果您想等待作业完成,请将 success_state 设置为 TargetState.COMPLETED;如果您只想等待服务确认作业已在队列中,请使用 TargetState.SCHEDULED。
tests/system/amazon/aws/example_bedrock_batch_inference.py
await_job_scheduled = BedrockBatchInferenceSensor(
task_id="await_job_scheduled",
job_arn=batch_infer.output,
success_state=BedrockBatchInferenceSensor.SuccessState.SCHEDULED,
)