Amazon Bedrock¶
Amazon Bedrock 是一项完全托管的服务,通过单个 API 提供来自领先 AI 公司(如 AI21 Labs、Anthropic、Cohere、Meta、Mistral AI、Stability AI 和 Amazon)的一系列高性能基础模型 (FM),以及构建具有安全性、隐私性和负责任 AI 的生成式 AI 应用程序所需的各种功能。
先决条件任务¶
要使用这些操作符,您必须执行以下几项操作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息请参见 Airflow® 的安装
设置连接.
通用参数¶
- aws_conn_id
引用 Amazon Web Services 连接 ID。如果此参数设置为
None
,则使用默认的 boto3 行为,而不进行连接查找。否则,使用连接中存储的凭据。默认值:aws_default
- region_name
AWS 区域名称。如果此参数设置为
None
或省略,则使用来自 AWS 连接额外参数 的 region_name。否则,使用指定的值,而不是连接值。默认值:None
- verify
是否验证 SSL 证书。
False
- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑包的文件名。如果要使用与 botocore 使用的不同的 CA 证书捆绑包,可以指定此参数。
如果此参数设置为
None
或省略,则使用来自 AWS 连接额外参数 的 verify。否则,使用指定的值,而不是连接值。默认值:None
- botocore_config
提供的字典用于构造一个 botocore.config.Config。此配置可用于配置避免节流异常、超时等。
{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None
或省略,则使用来自 AWS 连接额外参数 的 config_kwargs。否则,使用指定的值,而不是连接值。默认值:None
注意
指定一个空字典,
{}
,将覆盖 botocore.config.Config 的连接配置
操作符¶
调用现有的 Amazon Bedrock 模型¶
要调用现有的 Amazon Bedrock 模型,您可以使用 BedrockInvokeModelOperator
。
请注意,每个模型系列都有不同的输入和输出格式。下面包含一些示例,但有关不同格式的详细信息,请参阅 基础模型的推理参数
例如,要调用 Meta Llama 模型,您可以使用
tests/system/amazon/aws/example_bedrock.py
invoke_llama_model = BedrockInvokeModelOperator(
task_id="invoke_llama",
model_id=LLAMA_SHORT_MODEL_ID,
input_data={"prompt": PROMPT},
)
要调用 Amazon Titan 模型,您可以使用
tests/system/amazon/aws/example_bedrock.py
invoke_titan_model = BedrockInvokeModelOperator(
task_id="invoke_titan",
model_id=TITAN_SHORT_MODEL_ID,
input_data={"inputText": PROMPT},
)
要使用 Completions API 调用 Claude V2 模型,您可以使用
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
invoke_claude_completions = BedrockInvokeModelOperator(
task_id="invoke_claude_completions",
model_id=CLAUDE_MODEL_ID,
input_data={"max_tokens_to_sample": 4000, "prompt": f"\n\nHuman: {PROMPT}\n\nAssistant:"},
)
自定义现有的 Amazon Bedrock 模型¶
要创建微调作业来自定义基础模型,您可以使用 BedrockCustomizeModelOperator
。
模型自定义作业是异步的,完成时间取决于基础模型和训练/验证数据的大小。要监控作业的状态,您可以使用“model_customization_job_complete”等待器、BedrockCustomizeModelCompletedSensor
传感器或 BedrockCustomizeModelCompletedTrigger
触发器。
tests/system/amazon/aws/example_bedrock.py
customize_model = BedrockCustomizeModelOperator(
task_id="customize_model",
job_name=custom_model_job_name,
custom_model_name=custom_model_name,
role_arn=test_context[ROLE_ARN_KEY],
base_model_id=f"{model_arn_prefix}{TITAN_SHORT_MODEL_ID}",
hyperparameters=HYPERPARAMETERS,
training_data_uri=training_data_uri,
output_data_uri=f"s3://{bucket_name}/myOutputData",
)
为现有的 Amazon Bedrock 模型配置吞吐量¶
要为基础模型或微调模型创建具有专用容量的预置吞吐量,您可以使用 BedrockCreateProvisionedModelThroughputOperator
。
预置吞吐量作业是异步的。要监控作业的状态,您可以使用“provisioned_model_throughput_complete”等待器、BedrockProvisionModelThroughputCompletedSensor
传感器或 BedrockProvisionModelThroughputCompletedSensorTrigger
触发器。
tests/system/amazon/aws/example_bedrock.py
provision_throughput = BedrockCreateProvisionedModelThroughputOperator(
task_id="provision_throughput",
model_units=1,
provisioned_model_name=provisioned_model_name,
model_id=f"{model_arn_prefix}{TITAN_MODEL_ID}",
)
创建 Amazon Bedrock 知识库¶
要创建 Amazon Bedrock 知识库,您可以使用 BedrockCreateKnowledgeBaseOperator
。
有关哪些模型支持将数据嵌入到向量存储中的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
create_knowledge_base = BedrockCreateKnowledgeBaseOperator(
task_id="create_knowledge_base",
name=knowledge_base_name,
embedding_model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{TITAN_MODEL_ID}",
role_arn=test_context[ROLE_ARN_KEY],
storage_config={
"type": "OPENSEARCH_SERVERLESS",
"opensearchServerlessConfiguration": {
"collectionArn": get_collection_arn(collection),
"vectorIndexName": index_name,
"fieldMapping": {
"vectorField": "vector",
"textField": "text",
"metadataField": "text-metadata",
},
},
},
)
删除 Amazon Bedrock 知识库¶
删除知识库是一个简单的 boto API 调用,可以在 TaskFlow 任务中完成,如下例所示。
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_knowledge_base(knowledge_base_id: str):
"""
Delete the Amazon Bedrock knowledge base created earlier.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:BedrockDeleteKnowledgeBase`
:param knowledge_base_id: The unique identifier of the knowledge base to delete.
"""
log.info("Deleting Knowledge Base %s.", knowledge_base_id)
bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
创建 Amazon Bedrock 数据源¶
要创建 Amazon Bedrock 数据源,您可以使用 BedrockCreateDataSourceOperator
。
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
create_data_source = BedrockCreateDataSourceOperator(
task_id="create_data_source",
knowledge_base_id=create_knowledge_base.output,
name=data_source_name,
bucket_name=bucket_name,
)
删除 Amazon Bedrock 数据源¶
删除数据源是一个简单的 boto API 调用,可以在 TaskFlow 任务中完成,如下例所示。
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_data_source(knowledge_base_id: str, data_source_id: str):
"""
Delete the Amazon Bedrock data source created earlier.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto_operator:BedrockDeleteDataSource`
:param knowledge_base_id: The unique identifier of the knowledge base which the data source is attached to.
:param data_source_id: The unique identifier of the data source to delete.
"""
log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
bedrock_agent_client.delete_data_source(dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id)
将数据提取到 Amazon Bedrock 数据源¶
要将 Amazon S3 存储桶中的数据添加到 Amazon Bedrock 数据源,您可以使用 BedrockIngestDataOperator
。
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
ingest_data = BedrockIngestDataOperator(
task_id="ingest_data",
knowledge_base_id=create_knowledge_base.output,
data_source_id=create_data_source.output,
)
Amazon Bedrock 检索¶
要查询知识库,您可以使用 BedrockRetrieveOperator
。
响应将仅包含与查询相关的源的引用。如果您想通过 LLM 传递结果以生成文本响应,请参阅 BedrockRaGOperator
有关哪些模型支持从知识库检索信息的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
retrieve = BedrockRetrieveOperator(
task_id="retrieve",
knowledge_base_id=create_knowledge_base.output,
retrieval_query="Who was the CEO of Amazon in 1997?",
)
Amazon Bedrock 检索和生成 (RaG)¶
要查询知识库或外部来源,并基于检索到的结果生成文本响应,您可以使用 BedrockRaGOperator
。
响应将包含与查询相关的来源的引文以及生成的文本回复。有关哪些模型支持从知识库检索信息的更多信息,请参阅 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
注意:boto 1.34.90 中添加了对“外部来源”的支持
使用 Amazon Bedrock 知识库的示例
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
knowledge_base_rag = BedrockRaGOperator(
task_id="knowledge_base_rag",
input="Who was the CEO of Amazon on 2022?",
source_type="KNOWLEDGE_BASE",
model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{CLAUDE_MODEL_ID}",
knowledge_base_id=create_knowledge_base.output,
)
在 Amazon S3 存储桶中使用 PDF 文件的示例
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
external_sources_rag = BedrockRaGOperator(
task_id="external_sources_rag",
input="Who was the CEO of Amazon in 2022?",
source_type="EXTERNAL_SOURCES",
model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
sources=[
{
"sourceType": "S3",
"s3Location": {"uri": f"s3://{bucket_name}/AMZN-2022-Shareholder-Letter.pdf"},
}
],
)
传感器¶
等待 Amazon Bedrock 自定义模型作业¶
要等待 Amazon Bedrock 自定义模型作业的状态,直到它达到最终状态,您可以使用 BedrockCustomizeModelCompletedSensor
tests/system/amazon/aws/example_bedrock.py
await_custom_model_job = BedrockCustomizeModelCompletedSensor(
task_id="await_custom_model_job",
job_name=custom_model_job_name,
)
等待 Amazon Bedrock 预置模型吞吐量作业¶
要等待 Amazon Bedrock 预置模型吞吐量作业的状态,直到它达到最终状态,您可以使用 BedrockProvisionModelThroughputCompletedSensor
tests/system/amazon/aws/example_bedrock.py
await_provision_throughput = BedrockProvisionModelThroughputCompletedSensor(
task_id="await_provision_throughput",
model_id=provision_throughput.output,
)
等待 Amazon Bedrock 知识库¶
要等待 Amazon Bedrock 知识库的状态,直到它达到最终状态,您可以使用 BedrockKnowledgeBaseActiveSensor
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
await_knowledge_base = BedrockKnowledgeBaseActiveSensor(
task_id="await_knowledge_base", knowledge_base_id=create_knowledge_base.output
)
等待 Amazon Bedrock 摄取作业完成¶
要等待 Amazon Bedrock 数据摄取作业的状态,直到它达到最终状态,您可以使用 BedrockIngestionJobSensor
tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py
await_ingest = BedrockIngestionJobSensor(
task_id="await_ingest",
knowledge_base_id=create_knowledge_base.output,
data_source_id=create_data_source.output,
ingestion_job_id=ingest_data.output,
)