From 1886b065c31915bf3ca089e6177dee930140a395 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Fco=2E=20P=C3=A9rez=20Hidalgo?= Date: Thu, 9 Feb 2023 23:24:44 +0100 Subject: [PATCH 1/3] Break the termination deadlock when whole bulk requests are failing in a retry loop during pipeline terminations --- .../plugin_mixins/elasticsearch/common.rb | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 7a4dc966..f34b31c6 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -154,12 +154,17 @@ def successful_connection? !!maximum_seen_major_version && alive_urls_count > 0 end + def shutting_down? + @stopping.true? || (execution_context.pipeline.shutdown_requested? && !execution_context.pipeline.worker_threads_draining?) + end + # launch a thread that waits for an initial successful connection to the ES cluster to call the given block # @param block [Proc] the block to execute upon initial successful connection # @return [Thread] the successful connection wait thread def after_successful_connection(&block) Thread.new do sleep_interval = @retry_initial_interval + # Using `@stopping` instead of `shutting_down` as no batches are being processed yet until successful_connection? || @stopping.true? @logger.debug("Waiting for connectivity to Elasticsearch cluster, retrying in #{sleep_interval}s") sleep_interval = sleep_for_interval(sleep_interval) @@ -211,7 +216,7 @@ def sleep_for_interval(sleep_interval) end def stoppable_sleep(interval) - Stud.stoppable_sleep(interval) { @stopping.true? } + Stud.stoppable_sleep(interval) { shutting_down? } end def next_sleep_interval(current_interval) @@ -322,7 +327,7 @@ def safe_bulk(actions) # We retry until there are no errors! Errors should all go to the retry queue sleep_interval = sleep_for_interval(sleep_interval) @bulk_request_metrics.increment(:failures) - retry unless @stopping.true? + retry unless shutting_down? rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e @logger.error( "Attempted to send a bulk request but there are no living connections in the pool " + @@ -332,7 +337,7 @@ def safe_bulk(actions) sleep_interval = sleep_for_interval(sleep_interval) @bulk_request_metrics.increment(:failures) - retry unless @stopping.true? + retry unless shutting_down? rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e @bulk_request_metrics.increment(:failures) log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s, @@ -350,7 +355,7 @@ def safe_bulk(actions) end sleep_interval = sleep_for_interval(sleep_interval) - retry + retry unless shutting_down? rescue => e # Stuff that should never happen - print out full connection issues @logger.error( "An unknown error occurred sending a bulk request to Elasticsearch (will retry indefinitely)", @@ -360,7 +365,7 @@ def safe_bulk(actions) sleep_interval = sleep_for_interval(sleep_interval) @bulk_request_metrics.increment(:failures) - retry unless @stopping.true? + retry unless shutting_down? end end @@ -378,4 +383,4 @@ def dig_value(val, first_key, *rest_keys) dig_value(val, *rest_keys) end end -end; end; end +end; end; end \ No newline at end of file From 26522048f4852edaf15fe28edecb1a68c77b2f76 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Fco=2E=20P=C3=A9rez=20Hidalgo?= Date: Fri, 10 Feb 2023 16:25:37 +0100 Subject: [PATCH 2/3] Check if pipeline is `nil` to harden against incomplete mocks --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index f34b31c6..09801818 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -155,7 +155,7 @@ def successful_connection? end def shutting_down? - @stopping.true? || (execution_context.pipeline.shutdown_requested? && !execution_context.pipeline.worker_threads_draining?) + @stopping.true? || (!execution_context.pipeline.nil? && execution_context.pipeline.shutdown_requested? && !execution_context.pipeline.worker_threads_draining?) end # launch a thread that waits for an initial successful connection to the ES cluster to call the given block From 08d4c9511ff952284e5f128dfa763c6d8c050318 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pablo=20Fco=2E=20P=C3=A9rez=20Hidalgo?= Date: Mon, 20 Feb 2023 13:13:38 +0100 Subject: [PATCH 3/3] Add check on `execution_context` not being nil --- lib/logstash/plugin_mixins/elasticsearch/common.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/elasticsearch/common.rb b/lib/logstash/plugin_mixins/elasticsearch/common.rb index 09801818..4fb9cce5 100644 --- a/lib/logstash/plugin_mixins/elasticsearch/common.rb +++ b/lib/logstash/plugin_mixins/elasticsearch/common.rb @@ -155,7 +155,7 @@ def successful_connection? end def shutting_down? - @stopping.true? || (!execution_context.pipeline.nil? && execution_context.pipeline.shutdown_requested? && !execution_context.pipeline.worker_threads_draining?) + @stopping.true? || (!execution_context.nil? && !execution_context.pipeline.nil? && execution_context.pipeline.shutdown_requested? && !execution_context.pipeline.worker_threads_draining?) end # launch a thread that waits for an initial successful connection to the ES cluster to call the given block