diff --git a/CHANGELOG.md b/CHANGELOG.md index a2ea085..c51b566 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## 5.4.1 - - Docs: Add retry policy description [#130](https://github.com/logstash-plugins/logstash-output-http/pull/130) + - Fix retry indefinitely in termination process. This feature requires Logstash 8.1 [#129](https://github.com/logstash-plugins/logstash-output-http/pull/129) + - Docs: Add retry policy description [#130](https://github.com/logstash-plugins/logstash-output-http/pull/130) ## 5.4.0 - Introduce retryable unknown exceptions for "connection reset by peer" and "timeout" [#127](https://github.com/logstash-plugins/logstash-output-http/pull/127) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index ed33379..cf26708 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -445,4 +445,4 @@ See https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache [id="plugins-{type}s-{plugin}-common-options"] include::{include_path}/{type}.asciidoc[] -:default_codec!: +:default_codec!: \ No newline at end of file diff --git a/lib/logstash/outputs/http.rb b/lib/logstash/outputs/http.rb index 6b2beda..63ef37a 100644 --- a/lib/logstash/outputs/http.rb +++ b/lib/logstash/outputs/http.rb @@ -28,6 +28,7 @@ class LogStash::Outputs::Http < LogStash::Outputs::Base /Read Timed out/i ] + class PluginInternalQueueLeftoverError < StandardError; end # This output lets you send events to a # generic HTTP(S) endpoint @@ -179,6 +180,9 @@ def send_events(events) event, attempt = popped + raise PluginInternalQueueLeftoverError.new("Received pipeline shutdown request but http output has unfinished events. " \ + "If persistent queue is enabled, events will be retried.") if attempt > 2 && pipeline_shutdown_requested? + action, event, attempt = send_event(event, attempt) begin action = :failure if action == :retry && !@retry_failed @@ -223,6 +227,11 @@ def send_events(events) raise e end + def pipeline_shutdown_requested? + return super if defined?(super) # since LS 8.1.0 + nil + end + def sleep_for_attempt(attempt) sleep_for = attempt**2 sleep_for = sleep_for <= 60 ? sleep_for : 60 diff --git a/spec/outputs/http_spec.rb b/spec/outputs/http_spec.rb index fe0d396..e2e18d2 100644 --- a/spec/outputs/http_spec.rb +++ b/spec/outputs/http_spec.rb @@ -501,6 +501,23 @@ def start_app_and_wait(app, opts = {}) let(:base_config) { { "http_compression" => true } } end end + + describe "retryable error in termination" do + let(:url) { "http://localhost:#{port-1}/invalid" } + let(:events) { [event] } + let(:config) { {"url" => url, "http_method" => "get", "pool_max" => 1} } + + subject { LogStash::Outputs::Http.new(config) } + + before do + subject.register + allow(subject).to receive(:pipeline_shutdown_requested?).and_return(true) + end + + it "raise exception to exit indefinitely retry" do + expect { subject.multi_receive(events) }.to raise_error(LogStash::Outputs::Http::PluginInternalQueueLeftoverError) + end + end end describe LogStash::Outputs::Http do # different block as we're starting web server with TLS