airflow.providers.amazon.aws.transfers.s3_to_sql

模块内容

S3ToSqlOperator

将数据从 S3 加载到 SQL 数据库。

class airflow.providers.amazon.aws.transfers.s3_to_sql.S3ToSqlOperator(*, s3_key, s3_bucket, table, parser, column_list=None, commit_every=1000, schema=None, sql_conn_id='sql_default', sql_hook_params=None, aws_conn_id='aws_default', **kwargs)[源代码]

基类: airflow.models.BaseOperator

将数据从 S3 加载到 SQL 数据库。

你需要提供一个解析器函数,该函数将文件名作为输入并返回行的可迭代对象

另请参阅

有关如何使用此操作符的更多信息,请查看以下指南: Amazon S3 到 SQL 传输操作符

参数
  • schema (str | None) – SQL 数据库中特定模式的引用

  • table (str) – SQL 数据库中特定表的引用

  • s3_bucket (str) – 特定 S3 存储桶的引用

  • s3_key (str) – 特定 S3 密钥的引用

  • sql_conn_id (str) – 特定 SQL 数据库的引用。 必须为 DBApiHook 类型

  • sql_hook_params (dict | None) – 要传递到底层钩子的额外配置参数。 应与所需的钩子构造函数参数匹配。

  • aws_conn_id (str | None) – 特定 S3 / AWS 连接的引用

  • column_list (list[str] | None) – 要在插入 SQL 中使用的列名列表。

  • commit_every (int) – 在一次事务中插入的最大行数。设置为 0 以在一次事务中插入所有行。

  • parser (Callable[[str], collections.abc.Iterable[collections.abc.Iterable]]) –

    解析器函数,它接受一个文件路径作为输入并返回一个可迭代对象。例如,要使用逐行生成行的 CSV 解析器,请传递以下函数

    def parse_csv(filepath):
        import csv
    
        with open(filepath, newline="") as file:
            yield from csv.reader(file)
    

template_fields: collections.abc.Sequence[str] = ('s3_bucket', 's3_key', 'schema', 'table', 'column_list', 'sql_conn_id')[源代码]
template_ext: collections.abc.Sequence[str] = ()[源代码]
ui_color = '#f4a460'[源代码]
execute(context)[源代码]

在创建操作符时派生。

上下文与渲染 jinja 模板时使用的字典相同。

有关更多上下文,请参阅 get_template_context。

db_hook()[源代码]

此条目是否有帮助?