TeradataToTeradataOperator¶
TeradataToTeradataOperator 的目的是定义涉及两个 Teradata 实例之间数据传输的任务。 使用 TeradataToTeradataOperator
在两个 Teradata 实例之间传输数据。
在两个 Teradata 实例之间传输数据¶
要在两个 Teradata 实例之间传输数据,请使用 TeradataToTeradataOperator
。
TeradataToTeradataOperator 的示例用法如下
transfer_data = TeradataToTeradataOperator(
task_id="transfer_data",
dest_teradata_conn_id="teradata_default",
destination_table="my_users_dest",
source_teradata_conn_id="teradata_default",
sql="select * from my_users_src",
sql_params={},
rows_chunk=2,
)
完整的 TeradataToTeradata 传输操作符 DAG¶
当我们把所有东西放在一起时,我们的 DAG 应该像这样
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
create_table = TeradataOperator(
task_id="create_table",
sql=r"""
CREATE TABLE Country (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
)
create_table_from_external_file = TeradataOperator(
task_id="create_table_from_external_file",
sql="create_table.sql",
dag=dag,
)
populate_table = TeradataOperator(
task_id="populate_table",
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
get_all_countries = TeradataOperator(
task_id="get_all_countries",
sql=r"SELECT * FROM Country;",
)
get_countries_from_continent = TeradataOperator(
task_id="get_countries_from_continent",
sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
params={"column": "continent", "value": "Asia"},
)
drop_country_table = TeradataOperator(
task_id="drop_country_table",
sql=r"DROP TABLE Country;",
dag=dag,
)
drop_users_table = TeradataOperator(
task_id="drop_users_table",
sql=r"DROP TABLE Users;",
dag=dag,
)
create_schema = TeradataOperator(
task_id="create_schema",
sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
)
create_table_with_schema = TeradataOperator(
task_id="create_table_with_schema",
sql=r"""
CREATE TABLE schema_table (
country_id INTEGER,
name CHAR(25),
continent CHAR(25)
);
""",
schema="airflow_temp",
)
drop_schema_table = TeradataOperator(
task_id="drop_schema_table",
sql=r"DROP TABLE schema_table;",
dag=dag,
schema="airflow_temp",
)
drop_schema = TeradataOperator(
task_id="drop_schema",
sql=r"DROP DATABASE airflow_temp;",
dag=dag,
)
(
create_table
>> create_table_from_external_file
>> populate_table
>> get_all_countries
>> get_countries_from_continent
>> drop_country_table
>> drop_users_table
>> create_schema
>> create_table_with_schema
>> drop_schema_table
>> drop_schema
)