S3ToTeradataOperator

S3ToTeradataOperator 的目的是定义将 CSV、JSON 和 Parquet 格式数据从 AWS 简单存储服务 (S3) 传输到 Teradata 表的任务。此 Operator 使用 Teradata 的 READ_NOS 功能将数据从 AWS 简单存储服务 (S3) 传输到 Teradata 表。READ_NOS 是 Teradata Vantage 中的一个表 Operator,允许用户列出指定位置的外部文件。更多详情,请参阅 READ_NOS 功能

使用 S3ToTeradataOperator 将数据从 S3 传输到 Teradata。此 Operator 利用 Teradata 的 READ_NOS 功能,将 CSV、JSON 和 Parquet 格式的数据从 S3 导入到 Teradata。此 Operator 直接从对象存储访问数据,并使用 READ_NOS 和 CREATE TABLE AS 功能以及以下 SQL 语句在数据库中生成永久表。

注意

当前版本的 S3ToTeradataOperator 不支持使用安全令牌服务 (STS) 临时凭证访问 AWS S3。相反,它仅支持使用长期凭证进行访问。

将 CSV 格式数据从 S3 传输到 Teradata

使用 S3ToTeradataOperator 将 CSV 格式数据从 S3 传输到 Teradata 表的一个示例如下

tests/system/teradata/example_s3_to_teradata_transfer.py

    transfer_data_csv = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_csv",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_csv",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )

将 JSON 格式数据从 S3 传输到 Teradata

使用 S3ToTeradataOperator 将 JSON 格式数据从 S3 传输到 Teradata 表的一个示例如下

tests/system/teradata/example_s3_to_teradata_transfer.py

    transfer_data_json = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_json",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_json",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )

将 PARQUET 格式数据从 S3 传输到 Teradata

使用 S3ToTeradataOperator 将 PARQUET 格式数据从 S3 传输到 Teradata 表的一个示例如下

tests/system/teradata/example_s3_to_teradata_transfer.py

    transfer_data_parquet = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_parquet",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_parquet",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )

完整的 S3ToTeradataOperator DAG

将所有内容整合在一起,我们的 DAG 应该看起来像这样

tests/system/teradata/example_s3_to_teradata_transfer.py



ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_s3_to_teradata_transfer_operator"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
    transfer_data_csv = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_csv",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/CSVDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_csv",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_public_s3_to_teradata_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    read_data_table_csv = TeradataOperator(
        task_id="read_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT * from example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    drop_table_csv = TeradataOperator(
        task_id="drop_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv]
    transfer_key_data_csv = S3ToTeradataOperator(
        task_id="transfer_key_data_s3_to_teradata_key_csv",
        s3_source_key="/s3/airflowteradatatest.s3.ap-southeast-2.amazonaws.com/",
        teradata_table="example_s3_teradata_csv",
        aws_conn_id="aws_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_access_s3_to_teradata_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    read_key_data_table_csv = TeradataOperator(
        task_id="read_key_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT * from example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    drop_key_table_csv = TeradataOperator(
        task_id="drop_key_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_create_authorization]
    create_aws_authorization = TeradataOperator(
        task_id="create_aws_authorization",
        conn_id=CONN_ID,
        sql="CREATE AUTHORIZATION aws_authorization USER '{{ var.value.get('AWS_ACCESS_KEY_ID') }}' PASSWORD '{{ var.value.get('AWS_SECRET_ACCESS_KEY') }}' ",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_create_authorization]
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv]
    transfer_auth_data_csv = S3ToTeradataOperator(
        task_id="transfer_auth_data_s3_to_teradata_auth_csv",
        s3_source_key="/s3/teradata-download.s3.us-east-1.amazonaws.com/DevTools/csv/",
        teradata_table="example_s3_teradata_csv",
        teradata_authorization_name="aws_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_authorization_s3_to_teradata_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    read_auth_data_table_csv = TeradataOperator(
        task_id="read_auth_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT * from example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    drop_auth_table_csv = TeradataOperator(
        task_id="drop_auth_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_s3_teradata_csv;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_csv]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_authorization]
    drop_auth = TeradataOperator(
        task_id="drop_auth",
        conn_id=CONN_ID,
        sql="DROP AUTHORIZATION aws_authorization;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_authorization]
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
    transfer_data_json = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_json",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/JSONDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_json",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_json]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
    read_data_table_json = TeradataOperator(
        task_id="read_data_table_json",
        sql="SELECT * from example_s3_teradata_json;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_json]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
    drop_table_json = TeradataOperator(
        task_id="drop_table_json",
        sql="DROP TABLE example_s3_teradata_json;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table_json]
    # [START s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
    transfer_data_parquet = S3ToTeradataOperator(
        task_id="transfer_data_s3_to_teradata_parquet",
        s3_source_key="/s3/td-usgs-public.s3.amazonaws.com/PARQUETDATA/09394500/2018/06/",
        public_bucket=True,
        teradata_table="example_s3_teradata_parquet",
        aws_conn_id="aws_default",
        trigger_rule="all_done",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_transfer_data_s3_to_teradata_parquet]
    # [START s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
    read_data_table_parquet = TeradataOperator(
        task_id="read_data_table_parquet",
        sql="SELECT * from example_s3_teradata_parquet;",
    )
    # [END s3_to_teradata_transfer_operator_howto_guide_read_data_table_parquet]
    # [START s3_to_teradata_transfer_operator_howto_guide_drop_table]
    drop_table_parquet = TeradataOperator(
        task_id="drop_table_parquet",
        sql="DROP TABLE example_s3_teradata_parquet;",
    )

    # [END s3_to_teradata_transfer_operator_howto_guide_drop_table]
    (
        transfer_data_csv
        >> transfer_data_json
        >> transfer_data_parquet
        >> read_data_table_csv
        >> read_data_table_json
        >> read_data_table_parquet
        >> drop_table_csv
        >> drop_table_json
        >> drop_table_parquet
        >> transfer_key_data_csv
        >> read_key_data_table_csv
        >> drop_key_table_csv
        >> create_aws_authorization
        >> transfer_auth_data_csv
        >> read_auth_data_table_csv
        >> drop_auth_table_csv
        >> drop_auth
    )

本条目有帮助吗?