Amazon Bedrock¶
Amazon Bedrock 是一项全托管服务,提供来自领先 AI 公司(如 AI21 Labs、Anthropic、Cohere、Meta、Mistral AI、Stability AI、以及 Amazon)的高性能基础模型(FMs)选择,通过单一 API,并提供构建具备安全性、隐私和负责任 AI 的生成式 AI 应用所需的丰富功能。
前置任务¶
要使用这些操作符,您需要完成以下几项工作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息可在 Airflow® 安装 中获取。
设置连接.
通用参数¶
- aws_conn_id
引用到 Amazon Web Services 连接 ID。如果此参数设置为
None,则使用默认的 boto3 行为且不进行连接查找。否则使用存储在连接中的凭证。默认值:aws_default- region_name
AWS 区域名称。如果此参数设置为
None或省略,则使用 region_name(来自 AWS 连接额外参数)。否则使用指定的值而不是连接中的值。默认值:None- verify
是否验证 SSL 证书。
False- 不验证 SSL 证书。path/to/cert/bundle.pem - 要使用的 CA 证书捆绑文件名。如果您想使用与 botocore 不同的 CA 证书捆绑,可指定此参数。
如果此参数设置为
None或省略,则使用 verify(来自 AWS 连接额外参数)。否则使用指定的值而不是连接中的值。默认值:None- botocore_config
提供的字典将用于构建 botocore.config.Config。此配置可用于配置避免限流异常、超时等。
示例,关于参数的更多细节请查看 botocore.config.Config¶{ "signature_version": "unsigned", "s3": { "us_east_1_regional_endpoint": True, }, "retries": { "mode": "standard", "max_attempts": 10, }, "connect_timeout": 300, "read_timeout": 300, "tcp_keepalive": True, }
如果此参数设置为
None或省略,则使用 config_kwargs(来自 AWS 连接额外参数)。否则使用指定的值而不是连接中的值。默认值:None注意
指定一个空字典
{}将覆盖 botocore.config.Config 的连接配置。
操作符¶
调用现有的 Amazon Bedrock 模型¶
要调用现有的 Amazon Bedrock 模型,您可以使用 BedrockInvokeModelOperator。
请注意,每个模型系列都有不同的输入和输出格式。以下提供了一些示例,详细的格式信息请参见 基础模型推理参数
例如,要调用 Meta Llama 模型,您可以使用
invoke_llama_model = BedrockInvokeModelOperator(
task_id="invoke_llama",
model_id=LLAMA_SHORT_MODEL_ID,
input_data={"prompt": PROMPT},
)
要调用 Amazon Titan 模型,您可以使用
invoke_titan_model = BedrockInvokeModelOperator(
task_id="invoke_titan",
model_id=TITAN_MODEL_ID,
input_data={"inputText": PROMPT},
)
要使用 Messages API 调用 Claude Sonnet 模型,您可以使用
invoke_claude_model = BedrockInvokeModelOperator(
task_id="invoke_claude_model",
model_id=CLAUDE_MODEL_ID,
input_data={
"max_tokens": 4000,
"anthropic_version": ANTHROPIC_VERSION,
"messages": [
{"role": "user", "content": f"\n\nHuman: {PROMPT}\n\nAssistant:"},
],
},
)
定制现有的 Amazon Bedrock 模型¶
要创建微调作业以定制基础模型,您可以使用 BedrockCustomizeModelOperator。
模型定制作业是异步的,完成时间取决于基础模型以及训练/验证数据的规模。要监控作业状态,您可以使用 “model_customization_job_complete” Waiter、BedrockCustomizeModelCompletedSensor 传感器,或 BedrockCustomizeModelCompletedTrigger 触发器。
customize_model = BedrockCustomizeModelOperator(
task_id="customize_model",
job_name=custom_model_job_name,
custom_model_name=custom_model_name,
role_arn=test_context[ROLE_ARN_KEY],
base_model_id=f"{model_arn_prefix}{TITAN_SHORT_MODEL_ID}",
hyperparameters=HYPERPARAMETERS,
training_data_uri=training_data_uri,
output_data_uri=f"s3://{bucket_name}/myOutputData",
)
为现有的 Amazon Bedrock 模型配置吞吐量¶
要为基础模型或微调模型创建具有专用容量的预置吞吐量,您可以使用 BedrockCreateProvisionedModelThroughputOperator。
预置吞吐量作业是异步的。要监控作业状态,您可以使用 “provisioned_model_throughput_complete” Waiter、BedrockProvisionModelThroughputCompletedSensor 传感器,或 BedrockProvisionModelThroughputCompletedSensorTrigger 触发器。
provision_throughput = BedrockCreateProvisionedModelThroughputOperator(
task_id="provision_throughput",
model_units=1,
provisioned_model_name=provisioned_model_name,
model_id=f"{model_arn_prefix}{TITAN_MODEL_ID}",
)
创建 Amazon Bedrock 知识库¶
要创建 Amazon Bedrock 知识库,您可以使用 BedrockCreateKnowledgeBaseOperator。
有关哪些模型支持将数据嵌入向量存储的更多信息,请参见 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
create_knowledge_base = BedrockCreateKnowledgeBaseOperator(
task_id="create_knowledge_base",
name=knowledge_base_name,
embedding_model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{TITAN_MODEL_ID}",
role_arn=test_context[ROLE_ARN_KEY],
storage_config={
"type": "OPENSEARCH_SERVERLESS",
"opensearchServerlessConfiguration": {
"collectionArn": get_collection_arn(collection),
"vectorIndexName": index_name,
"fieldMapping": {
"vectorField": "vector",
"textField": "text",
"metadataField": "text-metadata",
},
},
},
)
删除 Amazon Bedrock 知识库¶
删除知识库是一次简单的 boto API 调用,可在 TaskFlow 任务中像下面的示例一样完成。
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_knowledge_base(knowledge_base_id: str):
"""
Delete the Amazon Bedrock knowledge base created earlier.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:BedrockDeleteKnowledgeBase`
:param knowledge_base_id: The unique identifier of the knowledge base to delete.
"""
log.info("Deleting Knowledge Base %s.", knowledge_base_id)
bedrock_agent_client.conn.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)
创建 Amazon Bedrock 数据源¶
要创建 Amazon Bedrock 数据源,您可以使用 BedrockCreateDataSourceOperator。
create_data_source = BedrockCreateDataSourceOperator(
task_id="create_data_source",
knowledge_base_id=create_knowledge_base.output,
name=data_source_name,
bucket_name=bucket_name,
)
删除 Amazon Bedrock 数据源¶
删除数据源是一次简单的 boto API 调用,可在 TaskFlow 任务中像下面的示例一样完成。
@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_data_source(knowledge_base_id: str, data_source_id: str):
"""
Delete the Amazon Bedrock data source created earlier.
.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto_operator:BedrockDeleteDataSource`
:param knowledge_base_id: The unique identifier of the knowledge base which the data source is attached to.
:param data_source_id: The unique identifier of the data source to delete.
"""
log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
bedrock_agent_client.conn.delete_data_source(
dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id
)
将数据摄入到 Amazon Bedrock 数据源¶
要将 Amazon S3 存储桶中的数据添加到 Amazon Bedrock 数据源,您可以使用 BedrockIngestDataOperator。
ingest_data = BedrockIngestDataOperator(
task_id="ingest_data",
knowledge_base_id=create_knowledge_base.output,
data_source_id=create_data_source.output,
)
Amazon Bedrock 检索¶
要查询知识库,您可以使用 BedrockRetrieveOperator。
响应只会包含与查询相关的来源引用。如果您希望将结果传递给 LLM 以生成文本响应,请参见 BedrockRaGOperator。
有关哪些模型支持从知识库检索信息的更多信息,请参见 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
retrieve = BedrockRetrieveOperator(
task_id="retrieve",
knowledge_base_id=create_knowledge_base.output,
retrieval_query="Who was the CEO of Amazon in 1997?",
)
Amazon Bedrock 检索并生成 (RaG)¶
要查询知识库或外部来源并基于检索结果生成文本响应,您可以使用 BedrockRaGOperator。
响应将包含与查询相关的来源引用以及生成的文本回复。有关哪些模型支持从知识库检索信息的更多信息,请参见 https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html
注意:对“外部来源”的支持已在 boto 1.34.90 中添加
使用 Amazon Bedrock 知识库的示例
knowledge_base_rag = BedrockRaGOperator(
task_id="knowledge_base_rag",
input="Who was the CEO of Amazon on 2022?",
source_type="KNOWLEDGE_BASE",
model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{CLAUDE_MODEL_ID}",
knowledge_base_id=create_knowledge_base.output,
)
使用位于 Amazon S3 存储桶中的 PDF 文件的示例
external_sources_rag = BedrockRaGOperator(
task_id="external_sources_rag",
input="Who was the CEO of Amazon in 2022?",
source_type="EXTERNAL_SOURCES",
model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
sources=[
{
"sourceType": "S3",
"s3Location": {"uri": f"s3://{bucket_name}/AMZN-2022-Shareholder-Letter.pdf"},
}
],
)
创建 Amazon Bedrock 批量推理作业¶
要创建批量推理作业以在多个提示上调用模型,您可以使用 BedrockBatchInferenceOperator。
输入必须采用 jsonl 格式并上传到 Amazon S3 存储桶。详情请参见 https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference.html。
注意:作业会添加到队列中并按顺序处理。考虑到可能的等待时间,并且可选的 timeout 参数以小时计,在这种情况下推荐使用可延迟模式而非 “wait_for_completion”。
使用 Amazon Bedrock 批量推理作业的示例
batch_infer = BedrockBatchInferenceOperator(
task_id="batch_infer",
job_name=job_name,
role_arn=test_context[ROLE_ARN_KEY],
model_id=CLAUDE_MODEL_ID,
input_uri=input_uri,
output_uri=output_uri,
)
传感器¶
等待 Amazon Bedrock 定制模型作业¶
要等待 Amazon Bedrock 定制模型作业的状态直至达到终止状态,您可以使用 BedrockCustomizeModelCompletedSensor
await_custom_model_job = BedrockCustomizeModelCompletedSensor(
task_id="await_custom_model_job",
job_name=custom_model_job_name,
)
等待 Amazon Bedrock 预置模型吞吐量作业¶
要等待 Amazon Bedrock 预置模型吞吐量作业的状态直至达到终止状态,您可以使用 BedrockProvisionModelThroughputCompletedSensor
await_provision_throughput = BedrockProvisionModelThroughputCompletedSensor(
task_id="await_provision_throughput",
model_id=provision_throughput.output,
)
等待 Amazon Bedrock 知识库¶
要等待 Amazon Bedrock 知识库的状态直至达到终止状态,您可以使用 BedrockKnowledgeBaseActiveSensor
await_knowledge_base = BedrockKnowledgeBaseActiveSensor(
task_id="await_knowledge_base", knowledge_base_id=create_knowledge_base.output
)
等待 Amazon Bedrock 数据摄入作业完成¶
要等待 Amazon Bedrock 数据摄入作业的状态直至达到终止状态,您可以使用 BedrockIngestionJobSensor
await_ingest = BedrockIngestionJobSensor(
task_id="await_ingest",
knowledge_base_id=create_knowledge_base.output,
data_source_id=create_data_source.output,
ingestion_job_id=ingest_data.output,
)
等待 Amazon Bedrock 批量推理作业¶
要等待 Amazon Bedrock 批量推理作业的状态直至达到 “Scheduled” 或 “Completed” 状态,您可以使用 BedrockBatchInferenceScheduledSensor
Bedrock 将批量推理作业添加到队列,完成可能需要一些时间。如果您想等待作业完成,请将 success_state 设置为 TargetState.COMPLETED;如果您只想等待服务确认作业已排入队列,请使用 TargetState.SCHEDULED。
await_job_scheduled = BedrockBatchInferenceSensor(
task_id="await_job_scheduled",
job_arn=batch_infer.output,
success_state=BedrockBatchInferenceSensor.SuccessState.SCHEDULED,
)