Skip to content

Commit 3ef3c0c

Browse files
donoghucjsvd
andauthored
Properly handle 413 Payload Too Large errors (#1199)
* Properly handle `413` Payload Too Large errors Previously when Elasticsearch responds with a 413 (Payload Too Large) status, the manticore adapter raises an error before the response can be processed by the bulk_send error handling. This commit refactors the way `BadErrorResponse` codes are handled. Previously we had logic in the manticore adaptor which special cased raising errors on some codes. This commit refactors such that the adaptor raises on any error status and the caller is now responsible for special case handling the code. * 12.0.2 release prep * Use `error_code` instead of `code` when handling BadResponseCodeError Previously a few bugs spotted in code review were being obfuscated by the combinations of tests not running in CI and the incorrect method for retrieving a code from a BadResponseCodeError. This commit updates the method names and addresses the feedback from code review. --------- Co-authored-by: João Duarte <[email protected]>
1 parent ec27add commit 3ef3c0c

File tree

7 files changed

+62
-45
lines changed

7 files changed

+62
-45
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 12.0.2
2+
- Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199)
3+
14
## 12.0.1
25
- Remove irrelevant log warning about elastic stack version [#1200](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1200)
36

docs/index.asciidoc

+16-1
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,22 @@ This plugin uses the Elasticsearch bulk API to optimize its imports into Elastic
196196
either partial or total failures. The bulk API sends batches of requests to an HTTP endpoint. Error codes for the HTTP
197197
request are handled differently than error codes for individual documents.
198198

199-
HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely.
199+
200+
HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely,
201+
including 413 (Payload Too Large) responses.
202+
203+
If you want to handle large payloads differently, you can configure 413 responses to go to the Dead Letter Queue instead:
204+
205+
[source,ruby]
206+
-----
207+
output {
208+
elasticsearch {
209+
hosts => ["localhost:9200"]
210+
dlq_custom_codes => [413] # Send 413 errors to DLQ instead of retrying
211+
}
212+
-----
213+
214+
This will capture oversized payloads in the DLQ for analysis rather than retrying them.
200215

201216
The following document errors are handled as follows:
202217

lib/logstash/outputs/elasticsearch/http_client.rb

+25-24
Original file line numberDiff line numberDiff line change
@@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses)
182182
def bulk_send(body_stream, batch_actions)
183183
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
184184

185-
response = @pool.post(@bulk_path, params, body_stream.string)
186-
187-
@bulk_response_metrics.increment(response.code.to_s)
188-
189-
case response.code
190-
when 200 # OK
191-
LogStash::Json.load(response.body)
192-
when 413 # Payload Too Large
185+
begin
186+
response = @pool.post(@bulk_path, params, body_stream.string)
187+
@bulk_response_metrics.increment(response.code.to_s)
188+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
189+
@bulk_response_metrics.increment(e.response_code.to_s)
190+
raise e unless e.response_code == 413
191+
# special handling for 413, treat it as a document level issue
193192
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
194-
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
195-
else
196-
url = ::LogStash::Util::SafeURI.new(response.final_url)
197-
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
198-
response.code, url, body_stream.to_s, response.body
199-
)
193+
return emulate_batch_error_response(batch_actions, 413, 'payload_too_large')
194+
rescue => e # it may be a network issue instead, re-raise
195+
raise e
200196
end
197+
198+
LogStash::Json.load(response.body)
201199
end
202200

203201
def emulate_batch_error_response(actions, http_code, reason)
@@ -411,6 +409,9 @@ def host_to_url(h)
411409
def exists?(path, use_get=false)
412410
response = use_get ? @pool.get(path) : @pool.head(path)
413411
response.code >= 200 && response.code <= 299
412+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
413+
return false if e.response_code == 404
414+
raise e
414415
end
415416

416417
def template_exists?(template_endpoint, name)
@@ -421,6 +422,8 @@ def template_put(template_endpoint, name, template)
421422
path = "#{template_endpoint}/#{name}"
422423
logger.info("Installing Elasticsearch template", name: name)
423424
@pool.put(path, nil, LogStash::Json.dump(template))
425+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
426+
raise e unless e.response_code == 404
424427
end
425428

426429
# ILM methods
@@ -432,17 +435,15 @@ def rollover_alias_exists?(name)
432435

433436
# Create a new rollover alias
434437
def rollover_alias_put(alias_name, alias_definition)
435-
begin
436-
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
437-
logger.info("Created rollover alias", name: alias_name)
438-
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
439-
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
440-
if e.response_code == 400
441-
logger.info("Rollover alias already exists, skipping", name: alias_name)
442-
return
443-
end
444-
raise e
438+
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
439+
logger.info("Created rollover alias", name: alias_name)
440+
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
441+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
442+
if e.response_code == 400
443+
logger.info("Rollover alias already exists, skipping", name: alias_name)
444+
return
445445
end
446+
raise e
446447
end
447448

448449
def get_xpack_info

lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb

+1-4
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil)
7676
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
7777
end
7878

79-
# 404s are excluded because they are valid codes in the case of
80-
# template installation. We might need a better story around this later
81-
# but for our current purposes this is correct
8279
code = resp.code
83-
if code < 200 || code > 299 && code != 404
80+
if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret
8481
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
8582
end
8683

lib/logstash/outputs/elasticsearch/http_client/pool.rb

+10-14
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,11 @@ def get_license(url)
252252
def health_check_request(url)
253253
logger.debug("Running health check to see if an Elasticsearch connection is working",
254254
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
255-
begin
256-
response = perform_request_to_url(url, :head, @healthcheck_path)
257-
return response, nil
258-
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
259-
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
260-
return nil, e
261-
end
255+
response = perform_request_to_url(url, :head, @healthcheck_path)
256+
return response, nil
257+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
258+
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
259+
return nil, e
262260
end
263261

264262
def healthcheck!(register_phase = true)
@@ -311,13 +309,11 @@ def healthcheck!(register_phase = true)
311309
end
312310

313311
def get_root_path(url, params={})
314-
begin
315-
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
316-
return resp, nil
317-
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
318-
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
319-
return nil, e
320-
end
312+
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
313+
return resp, nil
314+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
315+
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
316+
return nil, e
321317
end
322318

323319
def test_serverless_connection(url, root_response)

logstash-output-elasticsearch.gemspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '12.0.1'
3+
s.version = '12.0.2'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/unit/outputs/elasticsearch_spec.rb

+6-1
Original file line numberDiff line numberDiff line change
@@ -908,7 +908,12 @@
908908
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
909909
if body.length > max_bytes
910910
max_bytes *= 2 # ensure a successful retry
911-
double("Response", :code => 413, :body => "")
911+
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
912+
413,
913+
"test-url",
914+
body,
915+
""
916+
)
912917
else
913918
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
914919
end

0 commit comments

Comments
 (0)