Skip to content

Commit 5d3a82c

Browse files
committed
feat: rewrite keepalive_ready feature based on reader_state table
1 parent 91935c4 commit 5d3a82c

File tree

2 files changed

+30
-39
lines changed

2 files changed

+30
-39
lines changed

Diff for: lib/resty/http.lua

+29-38
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ function _M.new(_)
135135
return nil, err
136136
end
137137
return setmetatable({
138-
sock = sock, keepalive_supported = true, keepalive_ready = false
138+
sock = sock, keepalive_supported = true, reader_state = { keepalive_ready = false, mark_keepalive_ready_on_body_read = true }
139139
}, mt)
140140
end
141141

@@ -197,6 +197,8 @@ function _M.tcp_only_connect(self, ...)
197197
self.port = nil
198198
end
199199

200+
-- Immediately after connection - keepalive should be possible
201+
self.reader_state.keepalive_ready = true
200202
self.keepalive_supported = true
201203
self.ssl = false
202204

@@ -211,7 +213,7 @@ function _M.set_keepalive(self, ...)
211213
end
212214

213215
if self.keepalive_supported == true then
214-
if not self.keepalive_ready then
216+
if not self.reader_state.keepalive_ready then
215217
return nil, "response not fully read"
216218
end
217219

@@ -435,18 +437,7 @@ end
435437
_M.transfer_encoding_is_chunked = transfer_encoding_is_chunked
436438

437439

438-
local function _reader_keepalive_ready_mark(http_client)
439-
return function()
440-
http_client.keepalive_ready = true
441-
end
442-
end
443-
444-
local function _reader_keepalive_ready_no_op()
445-
return function() end
446-
end
447-
448-
449-
local function _chunked_body_reader(keepalive_ready_callback, sock, default_chunk_size)
440+
local function _chunked_body_reader(reader_state, sock, default_chunk_size)
450441
return co_wrap(function(max_chunk_size)
451442
local remaining = 0
452443
local length
@@ -505,12 +496,14 @@ local function _chunked_body_reader(keepalive_ready_callback, sock, default_chun
505496

506497
until length == 0
507498

508-
keepalive_ready_callback()
499+
if reader_state.mark_keepalive_ready_on_body_read then
500+
reader_state.keepalive_ready = true
501+
end
509502
end)
510503
end
511504

512505

513-
local function _body_reader(keepalive_ready_callback, sock, content_length, default_chunk_size)
506+
local function _body_reader(reader_state, sock, content_length, default_chunk_size)
514507
return co_wrap(function(max_chunk_size)
515508
max_chunk_size = max_chunk_size or default_chunk_size
516509

@@ -540,8 +533,9 @@ local function _body_reader(keepalive_ready_callback, sock, content_length, defa
540533
elseif not max_chunk_size then
541534
-- We have a length and potentially keep-alive, but want everything.
542535
co_yield(sock:receive(content_length))
543-
keepalive_ready_callback()
544-
536+
if reader_state.mark_keepalive_ready_on_body_read then
537+
reader_state.keepalive_ready = true
538+
end
545539
else
546540
-- We have a length and potentially a keep-alive, and wish to stream
547541
-- the response.
@@ -569,7 +563,9 @@ local function _body_reader(keepalive_ready_callback, sock, content_length, defa
569563
end
570564

571565
until length == 0
572-
keepalive_ready_callback()
566+
if reader_state.mark_keepalive_ready_on_body_read then
567+
reader_state.keepalive_ready = true
568+
end
573569
end
574570
end)
575571
end
@@ -608,10 +604,11 @@ local function _read_body(res)
608604
end
609605

610606

611-
local function _trailer_reader(keepalive_ready_callback, sock)
607+
local function _trailer_reader(reader_state, sock)
612608
return co_wrap(function()
613609
co_yield(_receive_headers(sock))
614-
keepalive_ready_callback()
610+
-- We can always pool after reading trailers
611+
reader_state.keepalive_ready = true
615612
end)
616613
end
617614

@@ -677,7 +674,7 @@ function _M.send_request(self, params)
677674
setmetatable(params, { __index = DEFAULT_PARAMS })
678675

679676
-- Sending a new request makes keepalive disabled until its response is fully read
680-
self.keepalive_ready = false
677+
self.reader_state.keepalive_ready = false
681678

682679
local sock = self.sock
683680
local body = params.body
@@ -830,23 +827,16 @@ function _M.read_response(self, params)
830827
local trailer_reader
831828
local has_body = false
832829
local has_trailer = false
833-
local body_reader_keepalive_ready_callback
834830

835-
if res_headers["Trailer"] then
836-
has_trailer = true
837-
-- If there are trailers - fully reading response body doesn't mean socket is ready to be pooled
838-
body_reader_keepalive_ready_callback = _reader_keepalive_ready_no_op()
839-
else
840-
-- If there are no trailers - fully reading response body means socket is ready to be pooled
841-
body_reader_keepalive_ready_callback = _reader_keepalive_ready_mark(self)
842-
end
831+
has_trailer = (res_headers["Trailer"] ~= nil)
832+
self.reader_state.mark_keepalive_ready_on_body_read = not has_trailer
843833

844834
-- Receive the body_reader
845835
if _should_receive_body(params.method, status) then
846836
has_body = true
847837

848838
if version == 1.1 and transfer_encoding_is_chunked(res_headers) then
849-
body_reader, err = _chunked_body_reader(body_reader_keepalive_ready_callback, sock)
839+
body_reader, err = _chunked_body_reader(self.reader_state, sock)
850840
else
851841
local length
852842
ok, length = pcall(tonumber, res_headers["Content-Length"])
@@ -855,17 +845,17 @@ function _M.read_response(self, params)
855845
length = nil
856846
end
857847

858-
body_reader, err = _body_reader(body_reader_keepalive_ready_callback, sock, length)
848+
body_reader, err = _body_reader(self.reader_state, sock, length)
859849
end
860850
else
861851
if not has_trailer then
862852
-- If there's no body and no trailer - it's ready for keep-alive
863-
self.keepalive_ready = true
853+
self.reader_state.keepalive_ready = true
864854
end
865855
end
866856

867857
if has_trailer then
868-
trailer_reader, err = _trailer_reader(_reader_keepalive_ready_mark(self), sock)
858+
trailer_reader, err = _trailer_reader(self.reader_state, sock)
869859
end
870860

871861
if err then
@@ -1024,14 +1014,15 @@ function _M.get_client_body_reader(_, chunksize, sock)
10241014
end
10251015
end
10261016

1027-
local reader_keep_alive_ready_callback = _reader_keepalive_ready_no_op()
1017+
-- Reading the request body has nothing to do with pooling the upstream server socket
1018+
local request_body_reader_state = { mark_keepalive_ready_on_body_read = false }
10281019
local headers = ngx_req_get_headers()
10291020
local length = headers.content_length
10301021
if length then
1031-
return _body_reader(reader_keep_alive_ready_callback, sock, tonumber(length), chunksize)
1022+
return _body_reader(request_body_reader_state, sock, tonumber(length), chunksize)
10321023
elseif transfer_encoding_is_chunked(headers) then
10331024
-- Not yet supported by ngx_lua but should just work...
1034-
return _chunked_body_reader(reader_keep_alive_ready_callback, sock, chunksize)
1025+
return _chunked_body_reader(request_body_reader_state, sock, chunksize)
10351026
else
10361027
return nil
10371028
end

Diff for: lib/resty/http_connect.lua

+1-1
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ local function connect(self, options)
262262
self.host = request_host
263263
self.port = request_port
264264
-- Immediately after connection - keepalive should be possible
265-
self.keepalive_ready = true
265+
self.reader_state.keepalive_ready = true
266266
self.keepalive_supported = true
267267
self.ssl = ssl
268268
-- set only for http, https has already been handled

0 commit comments

Comments
 (0)