Skip to content

Check pipeline shutdown status while waiting for valid connection or while issuing a bulk to ES #1119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.15.0
- Added the ability to negatively acknowledge the batch under processing if the plugin is blocked in a retry-error-loop and a shutdown is requested. [#1119](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1119)

## 11.14.1
- [DOC] Fixed incorrect pull request link on the CHANGELOG `11.14.0` entry [#1122](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1122)

Expand Down
7 changes: 6 additions & 1 deletion lib/logstash/outputs/elasticsearch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,12 @@ def map_events(events)
def wait_for_successful_connection
after_successful_connection_done = @after_successful_connection_done
return unless after_successful_connection_done
stoppable_sleep 1 until after_successful_connection_done.true?
stoppable_sleep 1 until (after_successful_connection_done.true? || pipeline_shutdown_requested?)

if pipeline_shutdown_requested?
logger.info "Aborting the batch due to shutdown request while waiting for connections to become live"
abort_batch_if_available!
end

status = @after_successful_connection_thread && @after_successful_connection_thread.value
if status.is_a?(Exception) # check if thread 'halted' with an error
Expand Down
35 changes: 33 additions & 2 deletions lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ def successful_connection?
def after_successful_connection(&block)
Thread.new do
sleep_interval = @retry_initial_interval
# in case of a pipeline's shutdown_requested?, the method #close shutdown also this thread
# so no need to explicitly handle it here and return an AbortedBatchException.
until successful_connection? || @stopping.true?
@logger.debug("Waiting for connectivity to Elasticsearch cluster, retrying in #{sleep_interval}s")
sleep_interval = sleep_for_interval(sleep_interval)
Expand Down Expand Up @@ -191,8 +193,14 @@ def retrying_submit(actions)
@logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request", count: submit_actions.size)
end
rescue => e
@logger.error("Encountered an unexpected error submitting a bulk request, will retry",
message: e.message, exception: e.class, backtrace: e.backtrace)
if abort_batch_present? && e.instance_of?(org.logstash.execution.AbortedBatchException)
# if Logstash support abort of a batch and the batch is aborting,
# bubble up the exception so that the pipeline can handle it
raise e
else
@logger.error("Encountered an unexpected error submitting a bulk request, will retry",
message: e.message, exception: e.class, backtrace: e.backtrace)
end
end

# Everything was a success!
Expand Down Expand Up @@ -331,6 +339,11 @@ def safe_bulk(actions)

sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
if pipeline_shutdown_requested?
# when any connection is available and a shutdown is requested
# the batch can be aborted, eventually for future retry.
abort_batch_if_available!
end
retry unless @stopping.true?
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_request_metrics.increment(:failures)
Expand All @@ -349,6 +362,11 @@ def safe_bulk(actions)
end

sleep_interval = sleep_for_interval(sleep_interval)
if pipeline_shutdown_requested?
# In case ES side changes access credentials and a pipeline reload is triggered
# this error becomes a retry on restart
abort_batch_if_available!
end
retry
rescue => e # Stuff that should never happen - print out full connection issues
@logger.error(
Expand All @@ -363,6 +381,19 @@ def safe_bulk(actions)
end
end

def pipeline_shutdown_requested?
return super if defined?(super) # since LS 8.1.0
execution_context&.pipeline&.shutdown_requested
end

def abort_batch_if_available!
raise org.logstash.execution.AbortedBatchException.new if abort_batch_present?
end

def abort_batch_present?
::Gem::Version.create(LOGSTASH_VERSION) >= ::Gem::Version.create('8.8.0')
end

def dlq_enabled?
# TODO there should be a better way to query if DLQ is enabled
# See more in: https://github.com/elastic/logstash/issues/8064
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '11.14.1'
s.version = '11.15.0'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
98 changes: 98 additions & 0 deletions spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,104 @@
subject.close
end

context "check aborting of a batch" do
context "on an unreachable ES instance" do
let(:events) { [ ::LogStash::Event.new("foo" => "bar1"), ::LogStash::Event.new("foo" => "bar2") ] }

let(:shutdown_value) { true }

let(:logger) { double("logger") }

before(:each) do
allow(subject).to receive(:logger).and_return(logger)
allow(logger).to receive(:info)

allow(subject).to receive(:pipeline_shutdown_requested?) do
shutdown_value
end
end

it "the #multi_receive abort while waiting on unreachable and a shutdown is requested" do
expect { subject.multi_receive(events) }.to raise_error(org.logstash.execution.AbortedBatchException)
expect(logger).to have_received(:info).with(/Aborting the batch due to shutdown request while waiting for connections to become live/i)
end
end

context "when a connected ES becomes unreachable" do
# let(:error) do
# ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
# 429, double("url").as_null_object, request_body, double("response body")
# )
# end

shared_examples 'raise an abort error' do
let(:options) {
{
"index" => "my-index",
"hosts" => ["localhost","localhost:9202"],
"path" => "some-path",
"manage_template" => false
}
}

let(:manticore_urls) { subject.client.pool.urls }
let(:manticore_url) { manticore_urls.first }

let(:stub_http_client_pool!) do
[:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method|
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method)
end
end

let(:event) { ::LogStash::Event.new("foo" => "bar") }

let(:logger) { double("logger").as_null_object }
let(:response) { { :errors => [], :items => [] } }

let(:request_body) { double(:request_body, :bytesize => 1023) }

before(:each) do
bulk_param = [["index", anything, event.to_hash]]

allow(subject).to receive(:logger).and_return(logger)

# fail consistently for ever
allow(subject.client).to receive(:bulk).with(bulk_param).and_raise(error)
end

it "should exit the retry with an abort exception if shutdown is requested" do
# execute in another thread because it blocks in a retry loop until the shutdown is triggered
th = Thread.new do
subject.multi_receive([event])
rescue org.logstash.execution.AbortedBatchException => e
# return exception's class so that it can be verified when retrieving the thread's value
e.class
end

# trigger the shutdown signal
allow(subject).to receive(:pipeline_shutdown_requested?) { true }

expect(th.value).to eq(org.logstash.execution.AbortedBatchException)
end
end

context "with 429 error" do
let(:error) do
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
429, double("url").as_null_object, request_body, double("response body")
)
end

it_behaves_like 'raise an abort error'
end

context "with 'no connections' error" do
let(:error) { ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError.new }

it_behaves_like 'raise an abort error'
end
end
end if LOGSTASH_VERSION >= '8.8'

context "with an active instance" do
let(:options) {
Expand Down