AzureBlobStorageToTeradataOperator

AzureBlobStorageToTeradataOperator 的目的是定义将 CSV、JSON 和 Parquet 格式数据从 Azure Blob Storage 传输到 Teradata 表的任务。使用 AzureBlobStorageToTeradataOperator 将数据从 Azure Blob Storage 传输到 Teradata。此算子利用 Teradata READ_NOS 功能将 CSV、JSON 和 Parquet 格式的数据从 Azure Blob Storage 导入到 Teradata 中。此算子直接从对象存储访问数据,并使用 READ_NOS 和 CREATE TABLE AS 功能以及以下 SQL 语句在数据库中生成永久表。

CREATE MULTISET TABLE multiset_table_name AS (
  SELECT *
  FROM (
    LOCATION='YOUR-OBJECT-STORE-URI'
    AUTHORIZATION=authorization_object
  ) AS d
) WITH DATA;

它支持从公共和私有对象存储加载数据。对于私有对象存储,可以通过 Teradata Authorization 数据库对象或在 Airflow 中通过 Azure Blob Storage 连接定义的对象存储登录和对象存储密钥授予对对象存储的访问权限。相反,对于从公共对象存储传输数据,则不需要授权或访问凭据。

  • Teradata Authorization 数据库对象访问类型可与 AzureBlobStorageToTeradataOperatorteradata_authorization_name 参数一起使用

  • 对象存储访问密钥 ID 和访问密钥 Secret 访问类型可与 S3ToTeradataOperatorazure_conn_id 参数一起使用

https://docs.teradata.com/r/Enterprise_IntelliFlex_VMware/Teradata-VantageTM-Native-Object-Store-Getting-Started-Guide-17.20/Setting-Up-Access/Setting-Access-Privileges

注意

如果两种访问类型都已定义,则 Teradata Authorization 数据库对象优先。

将数据从公共 Azure Blob Storage 传输到 Teradata

以下是从公共 Azure Blob Storage 将 CSV 数据格式传输到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

使用 AWS 连接将数据从私有 Azure Blob Storage 传输到 Teradata

以下是使用 AWS 凭据(定义为 AWS 连接)将 CSV 数据格式从私有 S3 对象存储传输到 teradata 的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_key_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

使用 Teradata Authorization 对象将数据从私有 Azure Blob Storage 传输到 Teradata

Teradata authorization 数据库对象用于控制谁可以访问外部对象存储。Teradata authorization 数据库对象必须存在于 Teradata 数据库中,才能在将数据从 S3 传输到 Teradata 时使用它。请参阅 Teradata 中的 External Object Stores 身份验证

以下是使用 Teradata 中定义的 Authorization 数据库对象将 CSV 数据格式从私有 S3 对象存储传输到 teradata 的 AzureBlobStorageToTeradataOperator 使用示例。

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_auth_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        teradata_authorization_name="azure_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

将 CSV 格式数据从 Azure Blob Storage 传输到 Teradata

以下是从 Azure Blob Storage 将 CSV 数据格式传输到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

将 JSON 格式数据从 Azure Blob Storage 传输到 Teradata

以下是从 Azure Blob Storage 将 JSON 数据格式传输到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

将 PARQUET 格式数据从 Azure Blob Storage 传输到 Teradata

以下是从 Azure Blob Storage 将 PARQUET 数据格式传输到 teradata 表的 AzureBlobStorageToTeradataOperator 使用示例

tests/system/teradata/example_azure_blob_to_teradata_transfer.py

    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

完整的 AzureBlobStorageToTeradataOperator 算子 DAG

当我们把所有东西放在一起时,我们的 DAG 应该如下所示

tests/system/teradata/example_azure_blob_to_teradata_transfer.py


ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_azure_blob_to_teradata_transfer_operator"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        public_bucket=True,
        teradata_table="example_blob_teradata_csv",
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_csv = TeradataOperator(
        task_id="read_data_table_csv",
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_table_csv = TeradataOperator(
        task_id="drop_table_csv",
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    transfer_key_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_key_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_key_data_table_csv = TeradataOperator(
        task_id="read_key_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_key_table_csv = TeradataOperator(
        task_id="drop_key_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    create_azure_authorization = TeradataOperator(
        task_id="create_azure_authorization",
        conn_id=CONN_ID,
        sql="CREATE AUTHORIZATION azure_authorization USER '{{ var.value.get('AZURE_BLOB_ACCOUNTNAME') }}' PASSWORD '{{ var.value.get('AZURE_BLOB_ACCOUNT_SECRET_KEY') }}' ",
    )
    transfer_auth_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_auth_data_blob_to_teradata_csv",
        blob_source_key="/az/airflowteradata.blob.core.windows.net/csvdata/",
        teradata_table="example_blob_teradata_csv",
        teradata_authorization_name="azure_authorization",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_auth_data_table_csv = TeradataOperator(
        task_id="read_auth_data_table_csv",
        conn_id=CONN_ID,
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_auth_table_csv = TeradataOperator(
        task_id="drop_auth_table_csv",
        conn_id=CONN_ID,
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    drop_auth = TeradataOperator(
        task_id="drop_auth",
        conn_id=CONN_ID,
        sql="DROP AUTHORIZATION azure_authorization;",
    )
    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_json = TeradataOperator(
        task_id="read_data_table_json",
        sql="SELECT count(1) from example_blob_teradata_json;",
    )
    drop_table_json = TeradataOperator(
        task_id="drop_table_json",
        sql="DROP TABLE example_blob_teradata_json;",
    )
    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        public_bucket=True,
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_data_table_parquet = TeradataOperator(
        task_id="read_data_table_parquet",
        sql="SELECT count(1) from example_blob_teradata_parquet;",
    )
    drop_table_parquet = TeradataOperator(
        task_id="drop_table_parquet",
        sql="DROP TABLE example_blob_teradata_parquet;",
    )

    (
        transfer_data_csv
        >> transfer_data_json
        >> transfer_data_parquet
        >> read_data_table_csv
        >> read_data_table_json
        >> read_data_table_parquet
        >> drop_table_csv
        >> drop_table_json
        >> drop_table_parquet
        >> transfer_key_data_csv
        >> read_key_data_table_csv
        >> drop_key_table_csv
        >> create_azure_authorization
        >> transfer_auth_data_csv
        >> read_auth_data_table_csv
        >> drop_auth_table_csv
        >> drop_auth
    )

本条目有帮助吗?