From cfc2b9b5b820ffed0f5c0fdf9a36bae067a08509 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Duarte?= Date: Wed, 15 Jan 2025 12:05:35 +0000 Subject: [PATCH 1/3] Properly handle `413` Payload Too Large errors Previously when Elasticsearch responds with a 413 (Payload Too Large) status, the manticore adapter raises an error before the response can be processed by the bulk_send error handling. This commit refactors the way `BadErrorResponse` codes are handled. Previously we had logic in the manticore adaptor which special cased raising errors on some codes. This commit refactors such that the adaptor raises on any error status and the caller is now responsible for special case handling the code. --- docs/index.asciidoc | 17 +++++- .../outputs/elasticsearch/http_client.rb | 52 ++++++++++--------- .../http_client/manticore_adapter.rb | 5 +- .../outputs/elasticsearch/http_client/pool.rb | 24 ++++----- spec/unit/outputs/elasticsearch_spec.rb | 7 ++- 5 files changed, 60 insertions(+), 45 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index a4141066..b6dcce8b 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -196,7 +196,22 @@ This plugin uses the Elasticsearch bulk API to optimize its imports into Elastic either partial or total failures. The bulk API sends batches of requests to an HTTP endpoint. Error codes for the HTTP request are handled differently than error codes for individual documents. -HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely. + +HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely, +including 413 (Payload Too Large) responses. + +If you want to handle large payloads differently, you can configure 413 responses to go to the Dead Letter Queue instead: + +[source,ruby] +----- +output { + elasticsearch { + hosts => ["localhost:9200"] + dlq_custom_codes => [413] # Send 413 errors to DLQ instead of retrying + } +----- + +This will capture oversized payloads in the DLQ for analysis rather than retrying them. The following document errors are handled as follows: diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index e0b70e36..077f520d 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} - response = @pool.post(@bulk_path, params, body_stream.string) - - @bulk_response_metrics.increment(response.code.to_s) - - case response.code - when 200 # OK - LogStash::Json.load(response.body) - when 413 # Payload Too Large + begin + response = @pool.post(@bulk_path, params, body_stream.string) + @bulk_response_metrics.increment(response.code.to_s) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + @bulk_response_metrics.increment(e.response_code.to_s) + raise e unless e.response_code == 413 + # special handling for 413, treat it as a document level issue logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') - else - url = ::LogStash::Util::SafeURI.new(response.final_url) - raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( - response.code, url, body_stream.to_s, response.body - ) + return emulate_batch_error_response(batch_actions, 413, 'payload_too_large') + rescue => e # it may be a network issue instead, re-raise + raise e end + + LogStash::Json.load(response.body) end def emulate_batch_error_response(actions, http_code, reason) @@ -411,6 +409,9 @@ def host_to_url(h) def exists?(path, use_get=false) response = use_get ? @pool.get(path) : @pool.head(path) response.code >= 200 && response.code <= 299 + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return true if e.code == 404 + raise e end def template_exists?(template_endpoint, name) @@ -420,7 +421,10 @@ def template_exists?(template_endpoint, name) def template_put(template_endpoint, name, template) path = "#{template_endpoint}/#{name}" logger.info("Installing Elasticsearch template", name: name) - @pool.put(path, nil, LogStash::Json.dump(template)) + response = @pool.put(path, nil, LogStash::Json.dump(template)) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return response if e.code == 404 + raise e end # ILM methods @@ -432,17 +436,15 @@ def rollover_alias_exists?(name) # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition) - begin - @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) - logger.info("Created rollover alias", name: alias_name) - # If the rollover alias already exists, ignore the error that comes back from Elasticsearch - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - if e.response_code == 400 - logger.info("Rollover alias already exists, skipping", name: alias_name) - return - end - raise e + @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) + logger.info("Created rollover alias", name: alias_name) + # If the rollover alias already exists, ignore the error that comes back from Elasticsearch + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + if e.response_code == 400 + logger.info("Rollover alias already exists, skipping", name: alias_name) + return end + raise e end def get_xpack_info diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index c9e49ec7..11f85b53 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil) raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string) end - # 404s are excluded because they are valid codes in the case of - # template installation. We might need a better story around this later - # but for our current purposes this is correct code = resp.code - if code < 200 || code > 299 && code != 404 + if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body) end diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 68715066..1ef9d0f9 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -253,13 +253,11 @@ def get_license(url) def health_check_request(url) logger.debug("Running health check to see if an Elasticsearch connection is working", :healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path) - begin - response = perform_request_to_url(url, :head, @healthcheck_path) - return response, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) - return nil, e - end + response = perform_request_to_url(url, :head, @healthcheck_path) + return response, nil + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) + return nil, e end def healthcheck!(register_phase = true) @@ -312,13 +310,11 @@ def healthcheck!(register_phase = true) end def get_root_path(url, params={}) - begin - resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) - return resp, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) - return nil, e - end + resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) + return resp, nil + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) + return nil, e end def test_serverless_connection(url, root_response) diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 95cffa85..fa183f52 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -915,7 +915,12 @@ allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body| if body.length > max_bytes max_bytes *= 2 # ensure a successful retry - double("Response", :code => 413, :body => "") + raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( + 413, + "test-url", + body, + "" + ) else double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}') end From c903a4bc5a6c00a486feb75920f5f3f57c209cc4 Mon Sep 17 00:00:00 2001 From: donoghuc Date: Thu, 16 Jan 2025 11:31:24 -0800 Subject: [PATCH 2/3] 12.0.2 release prep --- CHANGELOG.md | 3 +++ logstash-output-elasticsearch.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 06d1cb1e..4c3469b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 12.0.2 + - Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199) + ## 12.0.1 - Remove irrelevant log warning about elastic stack version [#1200](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1200) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index 090b1064..a041fe6e 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '12.0.1' + s.version = '12.0.2' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 93eddbc420445b11fba2d10906a9e5c162e40f5b Mon Sep 17 00:00:00 2001 From: donoghuc Date: Tue, 21 Jan 2025 15:31:35 -0800 Subject: [PATCH 3/3] Use `error_code` instead of `code` when handling BadResponseCodeError Previously a few bugs spotted in code review were being obfuscated by the combinations of tests not running in CI and the incorrect method for retrieving a code from a BadResponseCodeError. This commit updates the method names and addresses the feedback from code review. --- lib/logstash/outputs/elasticsearch/http_client.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 077f520d..120d3e67 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -410,7 +410,7 @@ def exists?(path, use_get=false) response = use_get ? @pool.get(path) : @pool.head(path) response.code >= 200 && response.code <= 299 rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - return true if e.code == 404 + return false if e.response_code == 404 raise e end @@ -421,10 +421,9 @@ def template_exists?(template_endpoint, name) def template_put(template_endpoint, name, template) path = "#{template_endpoint}/#{name}" logger.info("Installing Elasticsearch template", name: name) - response = @pool.put(path, nil, LogStash::Json.dump(template)) + @pool.put(path, nil, LogStash::Json.dump(template)) rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - return response if e.code == 404 - raise e + raise e unless e.response_code == 404 end # ILM methods