Skip to content

Commit 2da0562

Browse files
committed
fix: auto-release connections to pool when body reading completes
Connections are now automatically released to the pool when body reading completes, rather than requiring explicit release calls. This is handled in the connection process itself (hackney_conn) when transitioning from receiving state to connected state. Changes: - hackney_conn: Add auto_release_to_pool in connected(enter, receiving, ...) - hackney_conn: Add should_auto_release/1 and auto_release_to_pool/1 helpers - hackney.erl: Remove safe_release_to_pool (no longer needed) - hackney.erl: For HEAD requests, call body() to trigger auto-release - Test: Add hackney_pool_integration_tests for pool release behavior This fixes connection leaks where manual body reading would leave connections checked out but not returned to the pool.
1 parent 0de0f92 commit 2da0562

File tree

3 files changed

+240
-26
lines changed

3 files changed

+240
-26
lines changed

src/hackney.erl

Lines changed: 9 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ send_request(ConnPid, {Method, Path, Headers, Body}) when is_pid(ConnPid) ->
550550
%%====================================================================
551551

552552
%% @doc Get the full response body.
553+
%% After reading the body, the connection is automatically released back to the pool.
553554
-spec body(conn()) -> {ok, binary()} | {error, term()}.
554555
body(ConnPid) when is_pid(ConnPid) ->
555556
hackney_conn:body(ConnPid).
@@ -559,6 +560,8 @@ body(ConnPid, Timeout) when is_pid(ConnPid) ->
559560
hackney_conn:body(ConnPid, Timeout).
560561

561562
%% @doc Stream the response body in chunks.
563+
%% Returns {ok, Data} for each chunk, done when complete, or {error, Reason}.
564+
%% When done is returned, the connection is automatically released back to the pool.
562565
-spec stream_body(conn()) -> {ok, binary()} | done | {error, term()}.
563566
stream_body(ConnPid) when is_pid(ConnPid) ->
564567
hackney_conn:stream_body(ConnPid).
@@ -568,9 +571,9 @@ stream_body(ConnPid) when is_pid(ConnPid) ->
568571
%% can't guarantee the socket state after skipping.
569572
-spec skip_body(conn()) -> ok | {error, term()}.
570573
skip_body(ConnPid) when is_pid(ConnPid) ->
571-
case body(ConnPid) of
574+
case hackney_conn:body(ConnPid) of
572575
{ok, _} ->
573-
%% Stop the connection process so pool gets DOWN message and decrements in_use
576+
%% Body was read (connection auto-released to pool), now stop it
574577
hackney_conn:stop(ConnPid),
575578
ok;
576579
{error, Reason} ->
@@ -936,15 +939,14 @@ sync_request_with_redirect_body(ConnPid, Method, Path, HeadersList, FinalBody,
936939
{ok, Status, RespHeaders} ->
937940
case Method of
938941
<<"HEAD">> ->
939-
%% HEAD responses have no body - release connection to pool
940-
safe_release_to_pool(ConnPid),
942+
%% HEAD responses have no body - call body() to trigger auto-release
943+
%% (body returns immediately for HEAD with empty response)
944+
_ = hackney_conn:body(ConnPid),
941945
{ok, Status, RespHeaders};
942946
_ when WithBody ->
943947
case hackney_conn:body(ConnPid) of
944948
{ok, RespBody} ->
945-
%% Body read - release connection to pool
946-
%% (connection might already be closed if server closed without Content-Length)
947-
safe_release_to_pool(ConnPid),
949+
%% Body read - connection auto-released to pool
948950
{ok, Status, RespHeaders, RespBody};
949951
{error, Reason} ->
950952
{error, Reason}
@@ -1581,22 +1583,6 @@ proxy_type_for_scheme(_) -> http.
15811583
hackney:request(Method, URL, Headers, Body, Options)).
15821584
-include("hackney_methods.hrl").
15831585

1584-
1585-
%% @private Safe release to pool - handles the case where connection
1586-
%% might have already exited (e.g., server closed without Content-Length).
1587-
safe_release_to_pool(ConnPid) when is_pid(ConnPid) ->
1588-
case is_process_alive(ConnPid) of
1589-
true ->
1590-
try
1591-
hackney_conn:release_to_pool(ConnPid)
1592-
catch
1593-
exit:{normal, _} -> ok;
1594-
exit:{noproc, _} -> ok
1595-
end;
1596-
false ->
1597-
ok
1598-
end.
1599-
16001586
%% @doc Parse a proxy URL and extract host, port, and optional credentials.
16011587
%% Supports URLs like:
16021588
%% - "http://proxy.example.com:8080"

src/hackney_conn.erl

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -637,7 +637,8 @@ connecting(EventType, Event, Data) ->
637637
%% State: connected - Ready for requests
638638
%%====================================================================
639639

640-
connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, idle_timeout = Timeout} = Data) ->
640+
connected(enter, OldState, #conn_data{transport = Transport, socket = Socket,
641+
idle_timeout = Timeout, pool_pid = PoolPid} = Data) ->
641642
%% Set socket to active mode to receive server close notifications (tcp_closed/ssl_closed)
642643
%% This fixes issue #544: stale connections not being detected when server closes idle connections
643644
%% Only enable active mode when returning from a completed request cycle (receiving, streaming states)
@@ -647,9 +648,20 @@ connected(enter, OldState, #conn_data{transport = Transport, socket = Socket, id
647648
{_, false} -> ok;
648649
{_, true} -> Transport:setopts(Socket, [{active, once}])
649650
end,
651+
%% Auto-release to pool when body reading is complete
652+
%% This happens when transitioning from receiving state (body fully read)
653+
Data2 = case {PoolPid, should_auto_release(OldState)} of
654+
{undefined, _} ->
655+
Data;
656+
{_, false} ->
657+
Data;
658+
{_, true} ->
659+
%% Transfer ownership back to pool and notify it
660+
auto_release_to_pool(Data)
661+
end,
650662
case Timeout of
651-
infinity -> keep_state_and_data;
652-
_ -> {keep_state, Data, [{state_timeout, Timeout, idle_timeout}]}
663+
infinity -> {keep_state, Data2};
664+
_ -> {keep_state, Data2, [{state_timeout, Timeout, idle_timeout}]}
653665
end;
654666

655667
connected({call, From}, release_to_pool, #conn_data{pool_pid = PoolPid, owner_mon = OldMon,
@@ -1930,6 +1942,23 @@ should_enable_active_mode(streaming_once) -> true;
19301942
should_enable_active_mode(closed) -> true; %% Reconnection from closed state
19311943
should_enable_active_mode(_) -> false.
19321944

1945+
%% @private Determine if we should auto-release to pool when entering connected state
1946+
%% We release when body reading is complete (coming from receiving state)
1947+
%% but NOT from streaming states (user is still actively streaming)
1948+
should_auto_release(receiving) -> true;
1949+
should_auto_release(_) -> false.
1950+
1951+
%% @private Auto-release connection back to pool
1952+
%% Transfers ownership to pool and notifies it asynchronously
1953+
auto_release_to_pool(#conn_data{pool_pid = PoolPid, owner_mon = OldMon} = Data) ->
1954+
%% Transfer ownership to pool
1955+
demonitor(OldMon, [flush]),
1956+
NewMon = monitor(process, PoolPid),
1957+
Data2 = Data#conn_data{owner = PoolPid, owner_mon = NewMon},
1958+
%% Notify pool asynchronously (avoid blocking the state machine)
1959+
notify_pool_available(Data2),
1960+
Data2.
1961+
19331962
%% @private Check if socket is healthy (not closed by peer)
19341963
%% Note: With active mode enabled on connected sockets, we rely on tcp_closed/ssl_closed
19351964
%% messages to detect server closes. This function now uses peername to verify the
Lines changed: 199 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
%% @doc Integration tests for pool connection release behavior
2+
-module(hackney_pool_integration_tests).
3+
-include_lib("eunit/include/eunit.hrl").
4+
5+
-define(PORT, 9877).
6+
-define(POOL, test_pool_integration).
7+
8+
%% Setup/teardown for integration tests
9+
setup() ->
10+
{ok, _} = application:ensure_all_started(hackney),
11+
{ok, _} = application:ensure_all_started(cowboy),
12+
Dispatch = cowboy_router:compile([{'_', [{"/[...]", test_http_resource, []}]}]),
13+
{ok, _} = cowboy:start_clear(test_pool_int_http, [{port, ?PORT}], #{
14+
env => #{dispatch => Dispatch}
15+
}),
16+
%% Create a test pool with small limits for easy testing
17+
hackney_pool:start_pool(?POOL, [{max_connections, 10}]),
18+
ok.
19+
20+
cleanup(_) ->
21+
hackney_pool:stop_pool(?POOL),
22+
cowboy:stop_listener(test_pool_int_http),
23+
ok.
24+
25+
url(Path) ->
26+
<<"http://localhost:", (integer_to_binary(?PORT))/binary, Path/binary>>.
27+
28+
%% =============================================================================
29+
%% Pool Integration Tests
30+
%% =============================================================================
31+
32+
pool_integration_test_() ->
33+
{setup,
34+
fun setup/0,
35+
fun cleanup/1,
36+
[
37+
{"connection released after with_body request", fun test_with_body_release/0},
38+
{"connection released after manual body read", fun test_manual_body_release/0},
39+
{"connection released after close", fun test_close_release/0},
40+
{"multiple requests reuse connections", fun test_connection_reuse_integration/0},
41+
{"concurrent requests respect pool limits", fun test_concurrent_requests/0},
42+
{"connection released on error response", fun test_error_response_release/0},
43+
{"pool stats accurate during requests", fun test_pool_stats_accuracy/0}
44+
]}.
45+
46+
%% Test that connection is released after with_body request
47+
test_with_body_release() ->
48+
%% Get initial stats
49+
InitStats = hackney_pool:get_stats(?POOL),
50+
InitFree = proplists:get_value(free_count, InitStats),
51+
InitInUse = proplists:get_value(in_use_count, InitStats),
52+
53+
%% Make request with with_body
54+
{ok, 200, _Headers, _Body} = hackney:request(get, url(<<"/get">>), [], <<>>,
55+
[{pool, ?POOL}, {with_body, true}]),
56+
57+
%% Allow time for async checkin
58+
timer:sleep(50),
59+
60+
%% Check stats after - should have one more free connection
61+
AfterStats = hackney_pool:get_stats(?POOL),
62+
AfterFree = proplists:get_value(free_count, AfterStats),
63+
AfterInUse = proplists:get_value(in_use_count, AfterStats),
64+
65+
%% Connection should be returned to pool (free increased or same)
66+
?assert(AfterFree >= InitFree orelse AfterInUse =< InitInUse),
67+
%% No connections should be in use
68+
?assertEqual(0, AfterInUse).
69+
70+
%% Test that connection is released after manual body read
71+
test_manual_body_release() ->
72+
%% Make request without with_body
73+
{ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>,
74+
[{pool, ?POOL}]),
75+
76+
%% Check stats during - connection should be in use
77+
DuringStats = hackney_pool:get_stats(?POOL),
78+
DuringInUse = proplists:get_value(in_use_count, DuringStats),
79+
?assert(DuringInUse >= 1),
80+
81+
%% Read body
82+
{ok, _Body} = hackney:body(Ref),
83+
84+
%% Allow time for async checkin
85+
timer:sleep(50),
86+
87+
%% Check stats after - no connections in use
88+
AfterStats = hackney_pool:get_stats(?POOL),
89+
AfterInUse = proplists:get_value(in_use_count, AfterStats),
90+
?assertEqual(0, AfterInUse).
91+
92+
%% Test that connection is released after explicit close
93+
test_close_release() ->
94+
%% Make request without reading body
95+
{ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>,
96+
[{pool, ?POOL}]),
97+
98+
%% Check stats during
99+
DuringStats = hackney_pool:get_stats(?POOL),
100+
DuringInUse = proplists:get_value(in_use_count, DuringStats),
101+
?assert(DuringInUse >= 1),
102+
103+
%% Close without reading body
104+
ok = hackney:close(Ref),
105+
106+
%% Allow time for process cleanup
107+
timer:sleep(50),
108+
109+
%% Check stats after - no connections in use
110+
AfterStats = hackney_pool:get_stats(?POOL),
111+
AfterInUse = proplists:get_value(in_use_count, AfterStats),
112+
?assertEqual(0, AfterInUse).
113+
114+
%% Test that connections are reused
115+
test_connection_reuse_integration() ->
116+
%% Make first request
117+
{ok, 200, _, _} = hackney:request(get, url(<<"/get">>), [], <<>>,
118+
[{pool, ?POOL}, {with_body, true}]),
119+
timer:sleep(50),
120+
121+
Stats1 = hackney_pool:get_stats(?POOL),
122+
Free1 = proplists:get_value(free_count, Stats1),
123+
124+
%% Make second request - should reuse connection
125+
{ok, 200, _, _} = hackney:request(get, url(<<"/get">>), [], <<>>,
126+
[{pool, ?POOL}, {with_body, true}]),
127+
timer:sleep(50),
128+
129+
Stats2 = hackney_pool:get_stats(?POOL),
130+
Free2 = proplists:get_value(free_count, Stats2),
131+
132+
%% Should have same number of free connections (connection reused)
133+
?assertEqual(Free1, Free2).
134+
135+
%% Test concurrent requests respect pool limits
136+
test_concurrent_requests() ->
137+
%% Start 5 concurrent requests
138+
Self = self(),
139+
NumRequests = 5,
140+
141+
Pids = [spawn(fun() ->
142+
Result = hackney:request(get, url(<<"/get">>), [], <<>>,
143+
[{pool, ?POOL}, {with_body, true}]),
144+
Self ! {done, self(), Result}
145+
end) || _ <- lists:seq(1, NumRequests)],
146+
147+
%% Collect results
148+
Results = [receive {done, Pid, R} -> R end || Pid <- Pids],
149+
150+
%% All should succeed
151+
lists:foreach(fun(R) ->
152+
?assertMatch({ok, 200, _, _}, R)
153+
end, Results),
154+
155+
%% Allow time for cleanup
156+
timer:sleep(100),
157+
158+
%% No connections in use after all complete
159+
Stats = hackney_pool:get_stats(?POOL),
160+
InUse = proplists:get_value(in_use_count, Stats),
161+
?assertEqual(0, InUse).
162+
163+
%% Test connection released after error response (4xx/5xx)
164+
test_error_response_release() ->
165+
%% Make request to 404 endpoint
166+
{ok, 404, _Headers, Body} = hackney:request(get, url(<<"/not-found">>), [], <<>>,
167+
[{pool, ?POOL}, {with_body, true}]),
168+
?assertMatch(<<"{\"error\":", _/binary>>, Body),
169+
170+
timer:sleep(50),
171+
172+
%% Connection should still be returned to pool
173+
Stats = hackney_pool:get_stats(?POOL),
174+
InUse = proplists:get_value(in_use_count, Stats),
175+
?assertEqual(0, InUse).
176+
177+
%% Test pool stats are accurate during request lifecycle
178+
test_pool_stats_accuracy() ->
179+
%% Initial state
180+
Stats0 = hackney_pool:get_stats(?POOL),
181+
InUse0 = proplists:get_value(in_use_count, Stats0),
182+
183+
%% Start request without with_body
184+
{ok, 200, _Headers, Ref} = hackney:request(get, url(<<"/get">>), [], <<>>,
185+
[{pool, ?POOL}]),
186+
187+
%% During request - connection in use
188+
Stats1 = hackney_pool:get_stats(?POOL),
189+
InUse1 = proplists:get_value(in_use_count, Stats1),
190+
?assertEqual(InUse0 + 1, InUse1),
191+
192+
%% Read body
193+
{ok, _Body} = hackney:body(Ref),
194+
timer:sleep(50),
195+
196+
%% After body read - connection returned
197+
Stats2 = hackney_pool:get_stats(?POOL),
198+
InUse2 = proplists:get_value(in_use_count, Stats2),
199+
?assertEqual(InUse0, InUse2).

0 commit comments

Comments
 (0)