airflow.providers.amazon.aws.transfers.s3_to_dynamodb

AttributeDefinition

属性定义类型。

KeySchema

键模式类型。

S3ToDynamoDBOperator

将数据从 S3 加载到 DynamoDB。

模块内容

class airflow.providers.amazon.aws.transfers.s3_to_dynamodb.AttributeDefinition[source]

基类:TypedDict

属性定义类型。

AttributeName: str[source]
AttributeType: Literal['S', 'N', 'B'][source]
class airflow.providers.amazon.aws.transfers.s3_to_dynamodb.KeySchema[source]

基类:TypedDict

键模式类型。

AttributeName: str[source]
KeyType: Literal['HASH', 'RANGE'][source]
class airflow.providers.amazon.aws.transfers.s3_to_dynamodb.S3ToDynamoDBOperator(*, s3_bucket, s3_key, dynamodb_table_name, dynamodb_key_schema, dynamodb_attributes=None, dynamodb_tmp_table_prefix='tmp', delete_on_error=False, use_existing_table=False, input_format='DYNAMODB_JSON', billing_mode='PAY_PER_REQUEST', import_table_kwargs=None, import_table_creation_kwargs=None, wait_for_completion=True, check_interval=30, max_attempts=240, aws_conn_id='aws_default', **kwargs)[source]

基类:airflow.models.BaseOperator

将数据从 S3 加载到 DynamoDB。

存储在 S3 中的数据可以上传到新的或现有的 DynamoDB。支持的文件格式包括 CSV、DynamoDB JSON 和 Amazon ION。

参数:
  • s3_bucket (str) – 要导入的 S3 存储桶

  • s3_key (str) – 从 S3 导入单个或多个对象的键前缀

  • dynamodb_table_name (str) – 要创建的表的名称

  • dynamodb_key_schema (list[KeySchema]) – 主键和排序键。每个元素代表一个主键属性。AttributeName 是属性的名称。KeyType 是属性的角色。有效值包括 HASH 或 RANGE

  • dynamodb_attributes (list[AttributeDefinition] | None) – 表的属性名称。AttributeName 是属性的名称,AttributeType 是属性的数据类型。AttributeType 的有效值包括 S - 属性类型为 String N - 属性类型为 Number B - 属性类型为 Binary

  • dynamodb_tmp_table_prefix (str) – 临时 DynamoDB 表的前缀

  • delete_on_error (bool) – 如果设置此项,则在新 DynamoDB 表导入出错时将其删除

  • use_existing_table (bool) – 是否导入到现有的非新建 DynamoDB 表。如果设置为 true,数据将首先加载到一个临时 DynamoDB 表中(使用 AWS ImportTable 服务),然后分块检索到内存中并加载到目标表中。如果设置为 false,将创建一个新的 DynamoDB 表,并由 AWS ImportTable 服务批量加载 S3 数据。

  • input_format (Literal['CSV', 'DYNAMODB_JSON', 'ION']) – 导入数据的格式。InputFormat 的有效值包括 CSV、DYNAMODB_JSON 或 ION

  • billing_mode (Literal['PROVISIONED', 'PAY_PER_REQUEST']) – 表的计费模式。有效值包括 PROVISIONED 或 PAY_PER_REQUEST

  • on_demand_throughput – 最大读写单元数量的额外选项

  • import_table_kwargs (dict[str, Any] | None) – 要传递的任何其他可选导入表参数,例如 ClientToken、InputCompressionType 或 InputFormatOptions。请参阅:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/import_table.html

  • import_table_creation_kwargs (dict[str, Any] | None) – 要传递的任何其他可选导入表创建参数,例如 ProvisionedThroughput、SSESpecification 或 GlobalSecondaryIndexes。请参阅:https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb/client/import_table.html

  • wait_for_completion (bool) – 是否等待集群停止

  • check_interval (int) – 状态检查之间等待的秒数

  • max_attempts (int) – 检查作业完成的最大尝试次数

  • aws_conn_id (str | None) – AWS 连接详情的引用

template_fields: collections.abc.Sequence[str] = ('s3_bucket', 's3_key', 'dynamodb_table_name', 'dynamodb_key_schema', 'dynamodb_attributes',...[source]
ui_color = '#e2e8f0'[source]
s3_bucket[source]
s3_key[source]
dynamodb_table_name[source]
dynamodb_attributes = None[source]
dynamodb_tmp_table_prefix = 'tmp'[source]
delete_on_error = False[source]
use_existing_table = False[source]
dynamodb_key_schema[source]
input_format = 'DYNAMODB_JSON'[source]
billing_mode = 'PAY_PER_REQUEST'[source]
import_table_kwargs = None[source]
import_table_creation_kwargs = None[source]
wait_for_completion = True[source]
check_interval = 30[source]
max_attempts = 240[source]
aws_conn_id = 'aws_default'[source]
property tmp_table_name[source]

临时表名。

execute(context)[source]

从 Airflow 执行 S3 到 DynamoDB 作业。

参数:

context (airflow.utils.context.Context) – 任务实例的当前上下文

返回值:

Amazon 资源编号 (ARN)

返回类型:

str

此条目是否有帮助?