Amazon Athena SQL¶
Amazon Athena 是一项交互式查询服务,使您可以轻松使用标准 SQL 分析 Amazon Simple Storage Service (S3) 中的数据。Athena 为无服务器模式,您无需设置或管理基础设施,且仅为运行的查询付费。要开始使用,只需指向 S3 中的数据,定义模式,然后使用标准 SQL 开始查询。
前置任务¶
要使用这些操作符,您需要完成以下几项工作
通过 pip 安装 API 库。
pip install 'apache-airflow[amazon]'详细信息可在 Airflow® 安装 中获取。
设置连接.
操作符¶
执行 SQL 查询¶
The generic SQLExecuteQueryOperator can be used to execute SQL queries against Amazon Athena using a Athena connection.
若要在不将结果返回至 Airflow 的情况下对 Amazon Athena 执行单个 SQL 查询,请改用 AthenaOperator。
/opt/airflow/providers/common/sql/tests/system/common/sql/example_sql_execute_query.py
execute_query = SQLExecuteQueryOperator(
task_id="execute_query",
sql=f"SELECT 1; SELECT * FROM {AIRFLOW_DB_METADATA_TABLE} LIMIT 1;",
split_statements=True,
return_last=False,
)
此外,如果您需要使用 Amazon Athena 执行简单的数据质量测试,可以使用 SQLTableCheckOperator。
下面的示例演示如何实例化 SQLTableCheckOperator 任务。
/opt/airflow/providers/common/sql/tests/system/common/sql/example_sql_column_table_check.py
row_count_check = SQLTableCheckOperator(
task_id="row_count_check",
table=AIRFLOW_DB_METADATA_TABLE,
checks={
"row_count_check": {
"check_statement": "COUNT(*) = 1",
}
},
)