Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix termination deadlock when the output is retrying whole failed bulk requests #1117

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions lib/logstash/plugin_mixins/elasticsearch/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,17 @@ def successful_connection?
!!maximum_seen_major_version && alive_urls_count > 0
end

def shutting_down?
@stopping.true? || (!execution_context.nil? && !execution_context.pipeline.nil? && execution_context.pipeline.shutdown_requested? && !execution_context.pipeline.worker_threads_draining?)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid the cascading nil checks we could use the &. safe navigation operator. That would simplify as:

Suggested change
@stopping.true? || (!execution_context.nil? && !execution_context.pipeline.nil? && execution_context.pipeline.shutdown_requested? && !execution_context.pipeline.worker_threads_draining?)
@stopping.true? || (execution_context&.pipeline&.shutdown_requested? && !execution_context&.pipeline&.worker_threads_draining?)

end

# launch a thread that waits for an initial successful connection to the ES cluster to call the given block
# @param block [Proc] the block to execute upon initial successful connection
# @return [Thread] the successful connection wait thread
def after_successful_connection(&block)
Thread.new do
sleep_interval = @retry_initial_interval
# Using `@stopping` instead of `shutting_down` as no batches are being processed yet
until successful_connection? || @stopping.true?
@logger.debug("Waiting for connectivity to Elasticsearch cluster, retrying in #{sleep_interval}s")
sleep_interval = sleep_for_interval(sleep_interval)
Expand Down Expand Up @@ -211,7 +216,7 @@ def sleep_for_interval(sleep_interval)
end

def stoppable_sleep(interval)
Stud.stoppable_sleep(interval) { @stopping.true? }
Stud.stoppable_sleep(interval) { shutting_down? }
end

def next_sleep_interval(current_interval)
Expand Down Expand Up @@ -322,7 +327,7 @@ def safe_bulk(actions)
# We retry until there are no errors! Errors should all go to the retry queue
sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
retry unless shutting_down?
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::NoConnectionAvailableError => e
@logger.error(
"Attempted to send a bulk request but there are no living connections in the pool " +
Expand All @@ -332,7 +337,7 @@ def safe_bulk(actions)

sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
retry unless shutting_down?
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_request_metrics.increment(:failures)
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s,
Expand All @@ -350,7 +355,7 @@ def safe_bulk(actions)
end

sleep_interval = sleep_for_interval(sleep_interval)
retry
retry unless shutting_down?
rescue => e # Stuff that should never happen - print out full connection issues
@logger.error(
"An unknown error occurred sending a bulk request to Elasticsearch (will retry indefinitely)",
Expand All @@ -360,7 +365,7 @@ def safe_bulk(actions)

sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?
retry unless shutting_down?
end
end

Expand All @@ -378,4 +383,4 @@ def dig_value(val, first_key, *rest_keys)
dig_value(val, *rest_keys)
end
end
end; end; end
end; end; end