Skip to content

Commit b7d899d

Browse files
authored
Check pipeline shutdown status while waiting for valid connection or while issuing a bulk to ES (#1119)
Updates the wait_for_successful_connection method and safe_bulk to react to shutdown requests. When the plugin is in a retry loop and Logstash it's running into provide the ability to negatively ACK the batch under processing and a pipeline shutdown was requested then the plugin terminate signalling that the batch under processing shouldn't be acknowledged, raising an AbortedBatchException.
1 parent a0b4832 commit b7d899d

File tree

5 files changed

+141
-4
lines changed

5 files changed

+141
-4
lines changed

Diff for: CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 11.15.0
2+
- Added the ability to negatively acknowledge the batch under processing if the plugin is blocked in a retry-error-loop and a shutdown is requested. [#1119](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1119)
3+
14
## 11.14.1
25
- [DOC] Fixed incorrect pull request link on the CHANGELOG `11.14.0` entry [#1122](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1122)
36

Diff for: lib/logstash/outputs/elasticsearch.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -431,7 +431,12 @@ def map_events(events)
431431
def wait_for_successful_connection
432432
after_successful_connection_done = @after_successful_connection_done
433433
return unless after_successful_connection_done
434-
stoppable_sleep 1 until after_successful_connection_done.true?
434+
stoppable_sleep 1 until (after_successful_connection_done.true? || pipeline_shutdown_requested?)
435+
436+
if pipeline_shutdown_requested?
437+
logger.info "Aborting the batch due to shutdown request while waiting for connections to become live"
438+
abort_batch_if_available!
439+
end
435440

436441
status = @after_successful_connection_thread && @after_successful_connection_thread.value
437442
if status.is_a?(Exception) # check if thread 'halted' with an error

Diff for: lib/logstash/plugin_mixins/elasticsearch/common.rb

+33-2
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ def successful_connection?
159159
def after_successful_connection(&block)
160160
Thread.new do
161161
sleep_interval = @retry_initial_interval
162+
# in case of a pipeline's shutdown_requested?, the method #close shutdown also this thread
163+
# so no need to explicitly handle it here and return an AbortedBatchException.
162164
until successful_connection? || @stopping.true?
163165
@logger.debug("Waiting for connectivity to Elasticsearch cluster, retrying in #{sleep_interval}s")
164166
sleep_interval = sleep_for_interval(sleep_interval)
@@ -191,8 +193,14 @@ def retrying_submit(actions)
191193
@logger.info("Retrying individual bulk actions that failed or were rejected by the previous bulk request", count: submit_actions.size)
192194
end
193195
rescue => e
194-
@logger.error("Encountered an unexpected error submitting a bulk request, will retry",
195-
message: e.message, exception: e.class, backtrace: e.backtrace)
196+
if abort_batch_present? && e.instance_of?(org.logstash.execution.AbortedBatchException)
197+
# if Logstash support abort of a batch and the batch is aborting,
198+
# bubble up the exception so that the pipeline can handle it
199+
raise e
200+
else
201+
@logger.error("Encountered an unexpected error submitting a bulk request, will retry",
202+
message: e.message, exception: e.class, backtrace: e.backtrace)
203+
end
196204
end
197205

198206
# Everything was a success!
@@ -331,6 +339,11 @@ def safe_bulk(actions)
331339

332340
sleep_interval = sleep_for_interval(sleep_interval)
333341
@bulk_request_metrics.increment(:failures)
342+
if pipeline_shutdown_requested?
343+
# when any connection is available and a shutdown is requested
344+
# the batch can be aborted, eventually for future retry.
345+
abort_batch_if_available!
346+
end
334347
retry unless @stopping.true?
335348
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
336349
@bulk_request_metrics.increment(:failures)
@@ -349,6 +362,11 @@ def safe_bulk(actions)
349362
end
350363

351364
sleep_interval = sleep_for_interval(sleep_interval)
365+
if pipeline_shutdown_requested?
366+
# In case ES side changes access credentials and a pipeline reload is triggered
367+
# this error becomes a retry on restart
368+
abort_batch_if_available!
369+
end
352370
retry
353371
rescue => e # Stuff that should never happen - print out full connection issues
354372
@logger.error(
@@ -363,6 +381,19 @@ def safe_bulk(actions)
363381
end
364382
end
365383

384+
def pipeline_shutdown_requested?
385+
return super if defined?(super) # since LS 8.1.0
386+
execution_context&.pipeline&.shutdown_requested
387+
end
388+
389+
def abort_batch_if_available!
390+
raise org.logstash.execution.AbortedBatchException.new if abort_batch_present?
391+
end
392+
393+
def abort_batch_present?
394+
::Gem::Version.create(LOGSTASH_VERSION) >= ::Gem::Version.create('8.8.0')
395+
end
396+
366397
def dlq_enabled?
367398
# TODO there should be a better way to query if DLQ is enabled
368399
# See more in: https://github.com/elastic/logstash/issues/8064

Diff for: logstash-output-elasticsearch.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '11.14.1'
3+
s.version = '11.15.0'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

Diff for: spec/unit/outputs/elasticsearch_spec.rb

+98
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,104 @@
5050
subject.close
5151
end
5252

53+
context "check aborting of a batch" do
54+
context "on an unreachable ES instance" do
55+
let(:events) { [ ::LogStash::Event.new("foo" => "bar1"), ::LogStash::Event.new("foo" => "bar2") ] }
56+
57+
let(:shutdown_value) { true }
58+
59+
let(:logger) { double("logger") }
60+
61+
before(:each) do
62+
allow(subject).to receive(:logger).and_return(logger)
63+
allow(logger).to receive(:info)
64+
65+
allow(subject).to receive(:pipeline_shutdown_requested?) do
66+
shutdown_value
67+
end
68+
end
69+
70+
it "the #multi_receive abort while waiting on unreachable and a shutdown is requested" do
71+
expect { subject.multi_receive(events) }.to raise_error(org.logstash.execution.AbortedBatchException)
72+
expect(logger).to have_received(:info).with(/Aborting the batch due to shutdown request while waiting for connections to become live/i)
73+
end
74+
end
75+
76+
context "when a connected ES becomes unreachable" do
77+
# let(:error) do
78+
# ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
79+
# 429, double("url").as_null_object, request_body, double("response body")
80+
# )
81+
# end
82+
83+
shared_examples 'raise an abort error' do
84+
let(:options) {
85+
{
86+
"index" => "my-index",
87+
"hosts" => ["localhost","localhost:9202"],
88+
"path" => "some-path",
89+
"manage_template" => false
90+
}
91+
}
92+
93+
let(:manticore_urls) { subject.client.pool.urls }
94+
let(:manticore_url) { manticore_urls.first }
95+
96+
let(:stub_http_client_pool!) do
97+
[:start_resurrectionist, :start_sniffer, :healthcheck!].each do |method|
98+
allow_any_instance_of(LogStash::Outputs::ElasticSearch::HttpClient::Pool).to receive(method)
99+
end
100+
end
101+
102+
let(:event) { ::LogStash::Event.new("foo" => "bar") }
103+
104+
let(:logger) { double("logger").as_null_object }
105+
let(:response) { { :errors => [], :items => [] } }
106+
107+
let(:request_body) { double(:request_body, :bytesize => 1023) }
108+
109+
before(:each) do
110+
bulk_param = [["index", anything, event.to_hash]]
111+
112+
allow(subject).to receive(:logger).and_return(logger)
113+
114+
# fail consistently for ever
115+
allow(subject.client).to receive(:bulk).with(bulk_param).and_raise(error)
116+
end
117+
118+
it "should exit the retry with an abort exception if shutdown is requested" do
119+
# execute in another thread because it blocks in a retry loop until the shutdown is triggered
120+
th = Thread.new do
121+
subject.multi_receive([event])
122+
rescue org.logstash.execution.AbortedBatchException => e
123+
# return exception's class so that it can be verified when retrieving the thread's value
124+
e.class
125+
end
126+
127+
# trigger the shutdown signal
128+
allow(subject).to receive(:pipeline_shutdown_requested?) { true }
129+
130+
expect(th.value).to eq(org.logstash.execution.AbortedBatchException)
131+
end
132+
end
133+
134+
context "with 429 error" do
135+
let(:error) do
136+
::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
137+
429, double("url").as_null_object, request_body, double("response body")
138+
)
139+
end
140+
141+
it_behaves_like 'raise an abort error'
142+
end
143+
144+
context "with 'no connections' error" do
145+
let(:error) { ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError.new }
146+
147+
it_behaves_like 'raise an abort error'
148+
end
149+
end
150+
end if LOGSTASH_VERSION >= '8.8'
53151

54152
context "with an active instance" do
55153
let(:options) {

0 commit comments

Comments
 (0)