diff --git a/CHANGELOG.md b/CHANGELOG.md index bb390701..74f33b98 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 11.15.0 + - Added the ability to negatively acknowledge the batch under processing if the plugin is blocked in a retry-error-loop and a shutdown is requested. [#1119](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1119) + ## 11.14.1 - [DOC] Fixed incorrect pull request link on the CHANGELOG `11.14.0` entry [#1122](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1122) diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 53552e5b..2ee9af4e 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -431,7 +431,12 @@ def map_events(events) def wait_for_successful_connection after_successful_connection_done = @after_successful_connection_done return unless after_successful_connection_done - stoppable_sleep 1 until after_successful_connection_done.true? + stoppable_sleep 1 until (after_successful_connection_done.true? || pipeline_shutdown_requested?) + + if pipeline_shutdown_requested? + logger.info "Aborting the batch due to shutdown request while waiting for connections to become live" + abort_batch_if_available! + end status = @after_successful_connection_thread && @after_successful_connection_thread.value if status.is_a?(Exception) # check if thread 'halted' with an error diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index edb334e6..3cb32c0f 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -159,6 +159,8 @@ def successful_connection? def after_successful_connection(&block) Thread.new do sleep_interval = @retry_initial_interval + # in case of a pipeline's shutdown_requested?, the method #close shutdown also this thread + # so no need to explicitly handle it here and return an AbortedBatchException. until successful_connection? || @stopping.true? @logger.debug("Waiting for connectivity to Elasticsearch cluster, retrying in #{sleep_interval}s") sleep_interval = sleep_for_interval(sleep_interval) @@ -191,8 +193,14 @@ def retrying_submit(actions) @logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request", count: submit_actions.size) end rescue => e - @logger.error("Encountered an unexpected error submitting a bulk request, will retry", - message: e.message, exception: e.class, backtrace: e.backtrace) + if abort_batch_present? && e.instance_of?(org.logstash.execution.AbortedBatchException) + # if Logstash support abort of a batch and the batch is aborting, + # bubble up the exception so that the pipeline can handle it + raise e + else + @logger.error("Encountered an unexpected error submitting a bulk request, will retry", + message: e.message, exception: e.class, backtrace: e.backtrace) + end end # Everything was a success! @@ -331,6 +339,11 @@ def safe_bulk(actions) sleep_interval = sleep_for_interval(sleep_interval) @bulk_request_metrics.increment(:failures) + if pipeline_shutdown_requested? + # when any connection is available and a shutdown is requested + # the batch can be aborted, eventually for future retry. + abort_batch_if_available! + end retry unless @stopping.true? rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e @bulk_request_metrics.increment(:failures) @@ -349,6 +362,11 @@ def safe_bulk(actions) end sleep_interval = sleep_for_interval(sleep_interval) + if pipeline_shutdown_requested? + # In case ES side changes access credentials and a pipeline reload is triggered + # this error becomes a retry on restart + abort_batch_if_available! + end retry rescue => e # Stuff that should never happen - print out full connection issues @logger.error( @@ -363,6 +381,19 @@ def safe_bulk(actions) end end + def pipeline_shutdown_requested? + return super if defined?(super) # since LS 8.1.0 + execution_context&.pipeline&.shutdown_requested + end + + def abort_batch_if_available! + raise org.logstash.execution.AbortedBatchException.new if abort_batch_present? + end + + def abort_batch_present? + ::Gem::Version.create(LOGSTASH_VERSION) >= ::Gem::Version.create('8.8.0') + end + def dlq_enabled? # TODO there should be a better way to query if DLQ is enabled # See more in: https://github.com/elastic/logstash/issues/8064 diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 1104aeaa..831493be 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.14.1' + s.version = '11.15.0' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index d9f4c4e3..c56bade0 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -50,6 +50,104 @@ subject.close end + context "check aborting of a batch" do + context "on an unreachable ES instance" do + let(:events) { [ ::LogStash::Event.new("foo" => "bar1"), ::LogStash::Event.new("foo" => "bar2") ] } + + let(:shutdown_value) { true } + + let(:logger) { double("logger") } + + before(:each) do + allow(subject).to receive(:logger).and_return(logger) + allow(logger).to receive(:info) + + allow(subject).to receive(:pipeline_shutdown_requested?) do + shutdown_value + end + end + + it "the #multi_receive abort while waiting on unreachable and a shutdown is requested" do + expect { subject.multi_receive(events) }.to raise_error(org.logstash.execution.AbortedBatchException) + expect(logger).to have_received(:info).with(/Aborting the batch due to shutdown request while waiting for connections to become live/i) + end + end + + context "when a connected ES becomes unreachable" do +# let(:error) do +# ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( +# 429, double("url").as_null_object, request_body, double("response body") +# ) +# end + + shared_examples 'raise an abort error' do + let(:options) { + { + "index" => "my-index", + "hosts" => ["localhost","localhost:9202"], + "path" => "some-path", + "manage_template" => false + } + } + + let(:manticore_urls) { subject.client.pool.urls } + let(:manticore_url) { manticore_urls.first } + + let(:stub_http_client_pool!) do + [:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method| + allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method) + end + end + + let(:event) { ::LogStash::Event.new("foo" => "bar") } + + let(:logger) { double("logger").as_null_object } + let(:response) { { :errors => [], :items => [] } } + + let(:request_body) { double(:request_body, :bytesize => 1023) } + + before(:each) do + bulk_param = [["index", anything, event.to_hash]] + + allow(subject).to receive(:logger).and_return(logger) + + # fail consistently for ever + allow(subject.client).to receive(:bulk).with(bulk_param).and_raise(error) + end + + it "should exit the retry with an abort exception if shutdown is requested" do + # execute in another thread because it blocks in a retry loop until the shutdown is triggered + th = Thread.new do + subject.multi_receive([event]) + rescue org.logstash.execution.AbortedBatchException => e + # return exception's class so that it can be verified when retrieving the thread's value + e.class + end + + # trigger the shutdown signal + allow(subject).to receive(:pipeline_shutdown_requested?) { true } + + expect(th.value).to eq(org.logstash.execution.AbortedBatchException) + end + end + + context "with 429 error" do + let(:error) do + ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( + 429, double("url").as_null_object, request_body, double("response body") + ) + end + + it_behaves_like 'raise an abort error' + end + + context "with 'no connections' error" do + let(:error) { ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError.new } + + it_behaves_like 'raise an abort error' + end + end + end if LOGSTASH_VERSION >= '8.8' context "with an active instance" do let(:options) {