Skip to content

Commit e5b004f

Browse files
committed
Make default target_bulk_bytes below AWS limit, but make it configurable
1 parent 32a8a43 commit e5b004f

File tree

6 files changed

+31
-29
lines changed

6 files changed

+31
-29
lines changed

lib/logstash/outputs/opensearch/http_client.rb

+5-18
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,9 @@
1515
require 'stringio'
1616

1717
module LogStash; module Outputs; class OpenSearch;
18-
# This is a constant instead of a config option because
19-
# there really isn't a good reason to configure it.
20-
#
21-
# The criteria used are:
22-
# 1. We need a number that's less than 100MiB because OpenSearch
23-
# won't accept bulks larger than that.
24-
# 2. It must be large enough to amortize the connection constant
25-
# across multiple requests.
26-
# 3. It must be small enough that even if multiple threads hit this size
27-
# we won't use a lot of heap.
28-
#
29-
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
30-
# made sense. We picked one on the lowish side to not use too much heap.
31-
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB
32-
3318
class HttpClient
34-
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
19+
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count, :target_bulk_bytes
20+
3521
# This is here in case we use DEFAULT_OPTIONS in the future
3622
# DEFAULT_OPTIONS = {
3723
# :setting => value
@@ -72,6 +58,8 @@ def initialize(options={})
7258
# mutex to prevent requests and sniffing to access the
7359
# connection pool at the same time
7460
@bulk_path = @options[:bulk_path]
61+
62+
@target_bulk_bytes = @options[:target_bulk_bytes]
7563
end
7664

7765
def build_url_template
@@ -104,7 +92,6 @@ def maximum_seen_major_version
10492
def bulk(actions)
10593
@action_count ||= 0
10694
@action_count += actions.size
107-
10895
return if actions.empty?
10996

11097
bulk_actions = actions.collect do |action, args, source|
@@ -131,7 +118,7 @@ def bulk(actions)
131118
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
132119
LogStash::Json.dump(action)
133120
as_json << "\n"
134-
if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0
121+
if (stream_writer.pos + as_json.bytesize) > @target_bulk_bytes && stream_writer.pos > 0
135122
stream_writer.flush # ensure writer has sync'd buffers before reporting sizes
136123
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
137124
:action_count => batch_actions.size,

lib/logstash/outputs/opensearch/http_client_builder.rb

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ def self.build(logger, hosts, params)
3535
end
3636

3737
common_options[:timeout] = params["timeout"] if params["timeout"]
38+
common_options[:target_bulk_bytes] = params["target_bulk_bytes"]
3839

3940
if params["path"]
4041
client_settings[:path] = dedup_slashes("/#{params["path"]}/")

lib/logstash/plugin_mixins/opensearch/api_configs.rb

+14
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,20 @@ module APIConfigs
3434
# this defaults to a concatenation of the path parameter and "_bulk"
3535
:bulk_path => { :validate => :string },
3636

37+
# Maximum number of bytes in bulk requests
38+
# The criteria for deciding the default value of target_bulk_bytes is:
39+
# 1. We need a number that's less than 10MiB because OpenSearch is commonly
40+
# configured (particular in AWS Opensearch Service) to not accept
41+
# bulks larger than that.
42+
# 2. It must be large enough to amortize the connection constant
43+
# across multiple requests.
44+
# 3. It must be small enough that even if multiple threads hit this size
45+
# we won't use a lot of heap.
46+
:target_bulk_bytes => {
47+
:validate => :number,
48+
:default => 9 * 1024 * 1024 # 9MiB
49+
},
50+
3751
# Pass a set of key value pairs as the URL query string. This query string is added
3852
# to every host listed in the 'hosts' configuration. If the 'hosts' list contains
3953
# urls that already have query strings, the one specified here will be appended.

spec/integration/outputs/index_spec.rb

+4-5
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,7 @@
1010
require_relative "../../../spec/opensearch_spec_helper"
1111
require "logstash/outputs/opensearch"
1212

13-
describe "TARGET_BULK_BYTES", :integration => true do
14-
let(:target_bulk_bytes) { LogStash::Outputs::OpenSearch::TARGET_BULK_BYTES }
13+
describe "target_bulk_bytes", :integration => true do
1514
let(:event_count) { 1000 }
1615
let(:events) { event_count.times.map { event }.to_a }
1716
let(:config) {
@@ -32,11 +31,11 @@
3231
end
3332

3433
describe "batches that are too large for one" do
35-
let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) }
34+
let(:event) { LogStash::Event.new("message" => "a " * (((subject.client.target_bulk_bytes/2) / event_count)+1)) }
3635

3736
it "should send in two batches" do
3837
expect(subject.client).to have_received(:bulk_send).twice do |payload|
39-
expect(payload.size).to be <= target_bulk_bytes
38+
expect(payload.size).to be <= subject.client.target_bulk_bytes
4039
end
4140
end
4241

@@ -47,7 +46,7 @@
4746

4847
it "should send in one batch" do
4948
expect(subject.client).to have_received(:bulk_send).once do |payload|
50-
expect(payload.size).to be <= target_bulk_bytes
49+
expect(payload.size).to be <= subject.client.target_bulk_bytes
5150
end
5251
end
5352
end

spec/unit/outputs/opensearch/http_client_spec.rb

+5-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
opts = {
1818
:hosts => [::LogStash::Util::SafeURI.new("127.0.0.1")],
1919
:logger => Cabin::Channel.get,
20+
:target_bulk_bytes => 9_000_000,
2021
:metric => ::LogStash::Instrument::NullMetric.new(:dummy).namespace(:alsodummy)
2122
}
2223

@@ -226,8 +227,8 @@
226227
end
227228
end
228229

229-
context "if a message is over TARGET_BULK_BYTES" do
230-
let(:target_bulk_bytes) { LogStash::Outputs::OpenSearch::TARGET_BULK_BYTES }
230+
context "if a message is over target_bulk_bytes" do
231+
let(:target_bulk_bytes) { subject.target_bulk_bytes }
231232
let(:message) { "a" * (target_bulk_bytes + 1) }
232233

233234
it "should be handled properly" do
@@ -256,8 +257,8 @@
256257
s = subject.send(:bulk, actions)
257258
end
258259

259-
context "if one exceeds TARGET_BULK_BYTES" do
260-
let(:target_bulk_bytes) { LogStash::Outputs::OpenSearch::TARGET_BULK_BYTES }
260+
context "if one exceeds target_bulk_bytes" do
261+
let(:target_bulk_bytes) { subject.target_bulk_bytes }
261262
let(:message1) { "a" * (target_bulk_bytes + 1) }
262263
it "executes two bulk_send operations" do
263264
allow(subject).to receive(:join_bulk_responses)

spec/unit/outputs/opensearch_spec.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@
325325
end
326326

327327
context '413 errors' do
328-
let(:payload_size) { LogStash::Outputs::OpenSearch::TARGET_BULK_BYTES + 1024 }
328+
let(:payload_size) { subject.client.target_bulk_bytes + 1024 }
329329
let(:event) { ::LogStash::Event.new("message" => ("a" * payload_size ) ) }
330330

331331
let(:logger_stub) { double("logger").as_null_object }
@@ -357,7 +357,7 @@
357357

358358
expect(logger_stub).to have_received(:warn)
359359
.with(a_string_matching(/413 Payload Too Large/),
360-
hash_including(:action_count => 1, :content_length => a_value > 20_000_000))
360+
hash_including(:action_count => 1, :content_length => a_value > subject.client.target_bulk_bytes))
361361
end
362362
end
363363

0 commit comments

Comments
 (0)