airflow.providers.http.hooks.http

HttpHook

与 HTTP 服务器交互。

HttpAsyncHook

异步与 HTTP 服务器交互。

模块内容

class airflow.providers.http.hooks.http.HttpHook(method='POST', http_conn_id=default_conn_name, auth_type=None, tcp_keep_alive=True, tcp_keep_alive_idle=120, tcp_keep_alive_count=20, tcp_keep_alive_interval=30, adapter=None)[source]

基类: airflow.hooks.base.BaseHook

与 HTTP 服务器交互。

参数:
  • method (str) – 要调用的 API 方法

  • http_conn_id (str) – HTTP 连接,包含基础 API URL(例如 https://www.google.com/)和可选的认证凭据。默认请求头也可以在 Extra 字段中以 JSON 格式指定。

  • auth_type (Any) – 服务的认证类型

  • adapter (requests.adapters.HTTPAdapter | None) – 要为会话挂载的 requests.adapters.HTTPAdapter 的可选实例。

  • tcp_keep_alive (bool) – 为连接启用 TCP Keep Alive。

  • tcp_keep_alive_idle (int) – TCP Keep Alive Idle 参数(对应于 socket.TCP_KEEPIDLE)。

  • tcp_keep_alive_count (int) – TCP Keep Alive count 参数(对应于 socket.TCP_KEEPCNT

  • tcp_keep_alive_interval (int) – TCP Keep Alive interval 参数(对应于 socket.TCP_KEEPINTVL

  • auth_args – 用于初始化 auth_type 的额外参数,如果与默认 HTTPBasicAuth 不同。

conn_name_attr = 'http_conn_id'[source]
default_conn_name = 'http_default'[source]
conn_type = 'http'[source]
hook_name = 'HTTP'[source]
default_host = ''[source]
default_headers: dict[str, str][source]
http_conn_id = 'http_default'[source]
method = ''[source]
base_url: str = ''[source]
adapter = None[source]
property auth_type[source]
get_conn(headers=None, extra_options=None)[source]

创建一个 Requests HTTP 会话。

参数:
  • headers (dict[Any, Any] | None) – 作为字典传递的附加请求头。

  • extra_options (dict[str, Any] | None) – 执行请求时使用的额外选项

返回值:

配置好的 requests.Session 对象。

返回类型:

requests.Session

run(endpoint=None, data=None, headers=None, extra_options=None, **request_kwargs)[source]

执行请求。

参数:
  • endpoint (str | None) – 要调用的端点,例如 resource/v1/query?

  • data (dict[str, Any] | str | None) – 要上传的载荷或请求参数

  • headers (dict[str, Any] | None) – 作为字典传递的附加请求头

  • extra_options (dict[str, Any] | None) – 执行请求时使用的额外选项,例如 {'check_response': False} 以避免在非 2XX 或 3XX 状态码时抛出异常

  • request_kwargs (Any) – 创建请求时要传递的附加 kwargs。例如,run(json=obj) 将作为 requests.Request(json=obj) 传递。

check_response(response)[source]

检查状态码并在失败时抛出异常。

参数:

response (requests.Response) – requests 响应对象。

抛出:

AirflowException – 如果响应包含的状态码不在 2xx 和 3xx 范围内。

run_and_check(session, prepped_request, extra_options)[source]

获取额外选项,实际运行请求并检查结果。

参数:
  • session (requests.Session) – 用于执行请求的会话

  • prepped_request (requests.PreparedRequest) – 在 run() 中生成的预备请求

  • extra_options (dict[Any, Any]) – 执行请求时使用的额外选项,例如 {'check_response': False} 以避免在非 2XX 或 3XX 状态码时抛出异常

run_with_advanced_retry(_retry_args, *args, **kwargs)[source]

带重试地运行 Hook。

这对于可能受到间歇性问题干扰且不应立即失败的连接器很有用。

参数:

_retry_args (dict[Any, Any]) – 定义重试行为的参数。请参阅 Tenacity 文档:https://github.com/jd/tenacity

hook = HttpHook(http_conn_id="my_conn", method="GET")
retry_args = dict(
    wait=tenacity.wait_exponential(),
    stop=tenacity.stop_after_attempt(10),
    retry=tenacity.retry_if_exception_type(Exception),
)
hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args)
url_from_endpoint(endpoint)[source]

将基础 URL 与端点组合。

test_connection()[source]

测试 HTTP 连接。

class airflow.providers.http.hooks.http.HttpAsyncHook(method='POST', http_conn_id=default_conn_name, auth_type=aiohttp.BasicAuth, retry_limit=3, retry_delay=1.0)[source]

基类: airflow.hooks.base.BaseHook

异步与 HTTP 服务器交互。

参数:
  • method (str) – 要调用的 API 方法

  • http_conn_id (str) – 包含基础 API URL(例如 https://www.google.com/)和可选认证凭据的 HTTP 连接 ID。默认请求头也可以在 Extra 字段中以 JSON 格式指定。

  • auth_type (Any) – 服务的认证类型

conn_name_attr = 'http_conn_id'[source]
default_conn_name = 'http_default'[source]
conn_type = 'http'[source]
hook_name = 'HTTP'[source]
http_conn_id = 'http_default'[source]
method = ''[source]
base_url: str = ''[source]
auth_type: Any[source]
retry_limit = 3[source]
retry_delay = 1.0[source]
async run(session, endpoint=None, data=None, json=None, headers=None, extra_options=None)[source]

执行异步 HTTP 请求调用。

参数:
  • endpoint (str | None) – 要调用的端点,例如 resource/v1/query?

  • data (dict[str, Any] | str | None) – 要上传的载荷或请求参数。

  • json (dict[str, Any] | str | None) – 作为 JSON 上传的载荷。

  • headers (dict[str, Any] | None) – 作为字典传递的附加请求头。

  • extra_options (dict[str, Any] | None) – 创建请求时要传递的附加 kwargs。例如,run(json=obj) 将作为 aiohttp.ClientSession().get(json=obj) 传递。

本条目有帮助吗?