airflow.providers.amazon.aws.transfers.dynamodb_to_s3

JSONEncoder

自定义 JSON 编码器实现。

DynamoDBToS3Operator

将记录从 DynamoDB 表复制到 S3。

模块内容

class airflow.providers.amazon.aws.transfers.dynamodb_to_s3.JSONEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

继承自: json.JSONEncoder

自定义 JSON 编码器实现。

default(obj)[source]

将 Decimal 对象转换为 JSON 可序列化格式。

class airflow.providers.amazon.aws.transfers.dynamodb_to_s3.DynamoDBToS3Operator(*, dynamodb_table_name, s3_bucket_name, file_size=1000, dynamodb_scan_kwargs=None, s3_key_prefix='', process_func=_convert_item_to_json_bytes, point_in_time_export=False, export_time=None, export_format='DYNAMODB_JSON', export_table_to_point_in_time_kwargs=None, check_interval=30, max_attempts=60, **kwargs)[source]

继承自: airflow.providers.amazon.aws.transfers.base.AwsToAwsBaseOperator

将记录从 DynamoDB 表复制到 S3。

它扫描一个 DynamoDB 表并将接收到的记录写入本地文件系统上的文件。当文件大小超过用户指定的文件大小限制时,它将文件刷新到 S3。

用户还可以使用 dynamodb_scan_kwargs 指定过滤条件,以仅复制满足条件的记录。

另请参阅

有关如何使用此操作符的更多信息,请参阅指南:Amazon DynamoDB 到 Amazon S3 传输操作符

参数:
  • dynamodb_table_name (str) – 要从中复制数据的 Dynamodb 表

  • s3_bucket_name (str) – 要复制数据到的 S3 存储桶

  • file_size (int) – 如果文件大小 >= file_size,则将文件刷新到 S3

  • dynamodb_scan_kwargs (dict[str, Any] | None) – 传递给 <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/dynamodb.html#DynamoDB.Table.scan> 的 kwargs

  • s3_key_prefix (str) – S3 对象键的前缀

  • process_func (Callable[[dict[str, Any]], bytes]) – 如何将 DynamoDB 条目转换为字节。默认情况下,我们转储 JSON

  • point_in_time_export (bool) – 布尔值,指示操作符使用 ‘scan’ 还是 ‘point in time export’(时间点导出)

  • export_time (datetime.datetime | None) – 过去的时间点,用于导出表数据,以 Unix 纪元开始的秒数计算。表导出将是该时间点表状态的快照。

  • export_format (str) – 导出数据的格式。ExportFormat 的有效值为 DYNAMODB_JSON 或 ION。

  • export_table_to_point_in_time_kwargs (dict | None) – 传递给 boto3 export_table_to_point_in_time 函数调用的额外参数。例如,ExportTypeIncrementalExportSpecification

  • check_interval (int) – 两次尝试之间等待的时间间隔(秒)。仅在提供了 export_time 时有效。

  • max_attempts (int) – 最大尝试次数。仅在提供了 export_time 时有效。

template_fields: collections.abc.Sequence[str] = ('source_aws_conn_id', 'dest_aws_conn_id', 'dynamodb_table_name', 's3_bucket_name', 'file_size',...[source]
template_fields_renderers[source]
file_size = 1000[source]
process_func[source]
dynamodb_table_name[source]
dynamodb_scan_kwargs = None[source]
s3_bucket_name[source]
s3_key_prefix = ''[source]
point_in_time_export = False[source]
export_time = None[source]
export_format = 'DYNAMODB_JSON'[source]
export_table_to_point_in_time_kwargs[source]
check_interval = 30[source]
max_attempts = 60[source]
属性 hook[source]

创建 DynamoDBHook。

execute(context)[source]

在创建操作符时派生。

Context 是渲染 jinja 模板时使用的相同字典。

参考 get_template_context 获取更多上下文信息。

此条目有帮助吗?