HTTP Operator¶
以下代码示例使用 http_default
连接,这意味着请求将发送到 httpbin 站点以执行基本的 HTTP 操作。
HttpSensor¶
使用 HttpSensor
探查(poke)直到 response_check
可调用对象评估为 true
。
这里我们探查直到 httpbin 返回的响应文本包含 httpbin
。
tests/system/http/example_http.py
task_http_sensor_check = HttpSensor(
task_id="http_sensor_check",
http_conn_id="http_default",
endpoint="",
request_params={},
response_check=lambda response: "httpbin" in response.text,
poke_interval=5,
dag=dag,
)
该 Sensor 也可以在 deferrable 模式下使用
tests/system/http/example_http.py
task_http_sensor_check_async = HttpSensor(
task_id="http_sensor_check_async",
http_conn_id="http_default",
endpoint="",
deferrable=True,
request_params={},
poke_interval=5,
dag=dag,
)
HttpOperator¶
使用 HttpOperator
调用 HTTP 请求并获取响应文本。
警告
通过 HttpOperator 配置 https
不直观
由于历史原因,通过 HTTP Operator 配置 HTTPS
连接性,嗯,既困难又不直观。Operator 默认为 http
协议,你可以通过 scheme
连接属性更改 Operator 使用的 scheme。然而,此字段最初是为数据库类型的 URI 添加到连接中的,其中数据库 scheme 传统上设置为 URI path
的第一个组件。因此,如果你想通过 URI 配置 https
连接,你需要将 https
scheme 传递给 HttpOperator。尽管看起来很蠢,你的连接 URI 将是这样:http://your_host:443/https
。然后,如果你想在 HttpOperator 中使用不同的 URL path,你应该在运行任务时将你的 path 作为 endpoint
参数传递。例如,要向 https://your_host:443/my_endpoint
运行查询,你需要将 endpoint 参数设置为 my_endpoint
。或者,如果你愿意,你也可以对包含 https://
前缀的主机进行百分比编码,只要它包含 ://
(百分比编码后为 %3a%2f%2f
),path 的第一个组件就不会被用作 scheme。你的 URI 定义可能看起来像这样:http://https%3a%2f%2fyour_host:443/
。然而,在这种情况下,path
将完全不被使用 - 如果你想使用特定的 path 发起请求,你仍然需要在任务中使用 endpoint
参数。尽管这不直观,但从历史上看,Operator/Hook 的工作方式就是这样,并且在不破坏向后兼容性的前提下很难更改,因为有其他基于 HttpOperator
构建的 Operator 依赖于此功能,并且已经有很多用户在使用它了。
在第一个示例中,我们使用 json 数据调用 POST
请求,并在收到相同的 json 数据时成功,否则任务将失败。
tests/system/http/example_http.py
task_post_op = HttpOperator(
task_id="post_op",
endpoint="post",
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
response_check=lambda response: response.json()["json"]["priority"] == 5,
dag=dag,
)
这里我们调用一个 GET
请求并向其传递参数。无论响应文本是什么,任务都将成功。
tests/system/http/example_http.py
task_get_op = HttpOperator(
task_id="get_op",
method="GET",
endpoint="get",
data={"param1": "value1", "param2": "value2"},
headers={},
dag=dag,
)
HttpOperator 默认将响应体作为文本返回。如果你想在将其传递给下游的下一个任务之前修改响应,请使用 response_filter
。这在你遇到以下情况时非常有用:
你正在使用的 API 返回大型 JSON 负载,而你只对其中一部分数据感兴趣
API 返回 xml 或 csv 格式的数据,而你想将其转换为 JSON
你对响应的头部而不是响应体感兴趣
下面是一个从 REST API 检索数据并仅返回嵌套属性而不是完整响应体的示例。
tests/system/http/example_http.py
task_get_op_response_filter = HttpOperator(
task_id="get_op_response_filter",
method="GET",
endpoint="get",
response_filter=lambda response: response.json()["nested"]["property"],
dag=dag,
)
在第三个示例中,我们执行 PUT
操作,根据提供给请求的数据放置/设置数据。
tests/system/http/example_http.py
task_put_op = HttpOperator(
task_id="put_op",
method="PUT",
endpoint="put",
data=json.dumps({"priority": 5}),
headers={"Content-Type": "application/json"},
dag=dag,
)
在此示例中,我们对 delete
endpoint 调用 DELETE
操作。这次我们向请求传递表单数据。
tests/system/http/example_http.py
task_del_op = HttpOperator(
task_id="del_op",
method="DELETE",
endpoint="delete",
data="some=data",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)
这里我们将表单数据传递给 POST
操作,这等同于通常的表单提交。
tests/system/http/example_http.py
task_post_op_formenc = HttpOperator(
task_id="post_op_formenc",
endpoint="post",
data="name=Joe",
headers={"Content-Type": "application/x-www-form-urlencoded"},
dag=dag,
)
HttpOperator
还允许重复调用 API endpoint,通常用于循环处理其分页。所有 API 响应都由 Operator 存储在内存中并以一个单一结果返回。因此,与非分页调用相比,它可能消耗更多的内存和 CPU。
默认情况下,HttpOperator 的结果将成为一个 Response.text 列表(而不是一个单一的 Response.text 对象)。
示例 - 假设你的 API 返回一个包含 cursor 的 JSON body:你可以编写一个 pagination_function
,它将接收你的请求的原始 request.Response
对象,并基于此 cursor 生成新的请求参数(作为 dict
)。HttpOperator 将重复调用 API 直到该函数停止返回任何内容。
tests/system/http/example_http.py
def get_next_page_cursor(response) -> dict | None:
"""
Take the raw `request.Response` object, and check for a cursor.
If a cursor exists, this function creates and return parameters to call
the next page of result.
"""
next_cursor = response.json().get("cursor")
if next_cursor:
return dict(data={"cursor": next_cursor})
return None
task_get_paginated = HttpOperator(
task_id="get_paginated",
method="GET",
endpoint="get",
data={"cursor": ""},
pagination_function=get_next_page_cursor,
dag=dag,
)