airflow.providers.amazon.aws.operators.athena

AthenaOperator

一个将 Trino/Presto 查询提交到 Amazon Athena 的算子。

模块内容

class airflow.providers.amazon.aws.operators.athena.AthenaOperator(*, query, database, output_location=None, client_request_token=None, workgroup='primary', query_execution_context=None, result_configuration=None, sleep_time=30, max_polling_attempts=None, log_query=True, deferrable=conf.getboolean('operators', 'default_deferrable', fallback=False), catalog='AwsDataCatalog', **kwargs)[source]

Bases: airflow.providers.amazon.aws.operators.base_aws.AwsBaseOperator[airflow.providers.amazon.aws.hooks.athena.AthenaHook]

一个将 Trino/Presto 查询提交到 Amazon Athena 的算子。

注意

如果任务在运行过程中被终止,它将取消已启动的 Athena 查询,除非处于可延迟(deferrable)模式。

另请参阅

有关如何使用此算子的更多信息,请参阅指南: Run a query in Amazon Athena

参数:
  • query (str) – 在 Amazon Athena 上执行的 Trino/Presto 查询。(支持模板)

  • database (str) – 要选择的数据库。(支持模板)

  • catalog (str) – 要选择的目录。(支持模板)

  • output_location (str | None) – 用于存放查询结果的 S3 路径。(支持模板)要运行查询,必须通过以下方式之一指定查询结果位置:可以为单个查询使用此设置(客户端),也可以在工作组中通过 WorkGroupConfiguration 设置。如果两者都未设置,Athena 将报错,提示未提供输出位置

  • client_request_token (str | None) – 用户创建的唯一令牌,用于避免同一查询被多次执行

  • workgroup (str) – 执行查询的 Athena 工作组。(支持模板)

  • query_execution_context (dict[str, str] | None) – 查询需要运行的上下文

  • result_configuration (dict[str, Any] | None) – 包含存储结果路径及加密配置的字典

  • sleep_time (int) – 两次连续检查 Athena 查询状态之间的等待时间(秒)

  • max_polling_attempts (int | None) – 在函数退出前轮询查询状态的次数。若要限制任务执行时间,请使用 execution_timeout。

  • log_query (bool) – 执行时是否记录 Athena 查询及其他执行参数。默认 True

  • aws_conn_id – Airflow 用于 AWS 凭证的连接 ID。如果该值为 None 或为空,则使用默认的 boto3 行为。若在分布式方式运行 Airflow 且 aws_conn_idNone 或空,则会使用默认的 boto3 配置(并且需在每个工作节点上维护该配置)。

  • region_name – AWS 区域名称(region_name)。如果未指定,则使用默认的 boto3 行为。

  • verify – 是否验证 SSL 证书。参见: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html

  • botocore_config – 用于 botocore 客户端的配置字典(键值对)。参见: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html

aws_hook_class[source]
ui_color = '#44b5e2'[source]
template_fields: collections.abc.Sequence[str][source]
template_ext: collections.abc.Sequence[str] = ('.sql',)[source]
template_fields_renderers[source]
query[source]
database[source]
output_location = None[source]
client_request_token = None[source]
workgroup = 'primary'[source]
query_execution_context[source]
result_configuration[source]
sleep_time = 30[source]
max_polling_attempts = 999999[source]
query_execution_id: str | None = None[source]
log_query: bool = True[source]
deferrable[source]
catalog: str = 'AwsDataCatalog'[source]
execute(context)[source]

在 Amazon Athena 上运行 Trino/Presto 查询。

execute_complete(context, event=None)[source]
on_kill()[source]

取消已提交的 Amazon Athena 查询。

get_openlineage_facets_on_complete(_)[source]

通过解析 SQL 查询并使用 Athena API 丰富信息,获取 OpenLineage 数据。

除了 CTAS 查询外,查询和计算结果也会存储在 S3 位置。因此会在该位置附加额外的输出。我们不使用保存结果的完整路径(用户前缀 + 某个 UUID),而是仅使用用户提供的路径创建数据集。这样可以更方便地在不同过程之间匹配该数据集。

get_openlineage_dataset(database, table)[source]

此条目是否有帮助?