Skip to content

Commit 6dca44e

Browse files
kaisechengyaauie
andauthored
Fix retry indefinitely in termination process (#129)
The shutdown process is blocked when `url` points to an invalid host:port. This commit checks pipeline shutdown state to quit retry loop. This feature requires logstash-core that exposes `pipeline_shutdown_requested?` Co-authored-by: Ry Biesemeyer <[email protected]>
1 parent 6bc70f6 commit 6dca44e

File tree

4 files changed

+29
-2
lines changed

4 files changed

+29
-2
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
## 5.4.1
2-
- Docs: Add retry policy description [#130](https://github.com/logstash-plugins/logstash-output-http/pull/130)
2+
- Fix retry indefinitely in termination process. This feature requires Logstash 8.1 [#129](https://github.com/logstash-plugins/logstash-output-http/pull/129)
3+
- Docs: Add retry policy description [#130](https://github.com/logstash-plugins/logstash-output-http/pull/130)
34

45
## 5.4.0
56
- Introduce retryable unknown exceptions for "connection reset by peer" and "timeout" [#127](https://github.com/logstash-plugins/logstash-output-http/pull/127)

docs/index.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,4 +445,4 @@ See https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache
445445
[id="plugins-{type}s-{plugin}-common-options"]
446446
include::{include_path}/{type}.asciidoc[]
447447

448-
:default_codec!:
448+
:default_codec!:

lib/logstash/outputs/http.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base
2828
/Read Timed out/i
2929
]
3030

31+
class PluginInternalQueueLeftoverError < StandardError; end
3132

3233
# This output lets you send events to a
3334
# generic HTTP(S) endpoint
@@ -179,6 +180,9 @@ def send_events(events)
179180

180181
event, attempt = popped
181182

183+
raise PluginInternalQueueLeftoverError.new("Received pipeline shutdown request but http output has unfinished events. " \
184+
"If persistent queue is enabled, events will be retried.") if attempt > 2 && pipeline_shutdown_requested?
185+
182186
action, event, attempt = send_event(event, attempt)
183187
begin
184188
action = :failure if action == :retry && !@retry_failed
@@ -223,6 +227,11 @@ def send_events(events)
223227
raise e
224228
end
225229

230+
def pipeline_shutdown_requested?
231+
return super if defined?(super) # since LS 8.1.0
232+
nil
233+
end
234+
226235
def sleep_for_attempt(attempt)
227236
sleep_for = attempt**2
228237
sleep_for = sleep_for <= 60 ? sleep_for : 60

spec/outputs/http_spec.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,23 @@ def start_app_and_wait(app, opts = {})
501501
let(:base_config) { { "http_compression" => true } }
502502
end
503503
end
504+
505+
describe "retryable error in termination" do
506+
let(:url) { "http://localhost:#{port-1}/invalid" }
507+
let(:events) { [event] }
508+
let(:config) { {"url" => url, "http_method" => "get", "pool_max" => 1} }
509+
510+
subject { LogStash::Outputs::Http.new(config) }
511+
512+
before do
513+
subject.register
514+
allow(subject).to receive(:pipeline_shutdown_requested?).and_return(true)
515+
end
516+
517+
it "raise exception to exit indefinitely retry" do
518+
expect { subject.multi_receive(events) }.to raise_error(LogStash::Outputs::Http::PluginInternalQueueLeftoverError)
519+
end
520+
end
504521
end
505522

506523
describe LogStash::Outputs::Http do # different block as we're starting web server with TLS

0 commit comments

Comments
 (0)