Skip to content

Make default target_bulk_bytes below AWS limit, but make it configurable #71

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions lib/logstash/outputs/opensearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,9 @@
require 'stringio'

module LogStash; module Outputs; class OpenSearch;
# This is a constant instead of a config option because
# there really isn't a good reason to configure it.
#
# The criteria used are:
# 1. We need a number that's less than 100MiB because OpenSearch
# won't accept bulks larger than that.
# 2. It must be large enough to amortize the connection constant
# across multiple requests.
# 3. It must be small enough that even if multiple threads hit this size
# we won't use a lot of heap.
#
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
# made sense. We picked one on the lowish side to not use too much heap.
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB

class HttpClient
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count, :target_bulk_bytes

# This is here in case we use DEFAULT_OPTIONS in the future
# DEFAULT_OPTIONS = {
# :setting => value
Expand Down Expand Up @@ -72,6 +58,8 @@ def initialize(options={})
# mutex to prevent requests and sniffing to access the
# connection pool at the same time
@bulk_path = @options[:bulk_path]

@target_bulk_bytes = @options[:target_bulk_bytes]
end

def build_url_template
Expand Down Expand Up @@ -104,7 +92,6 @@ def maximum_seen_major_version
def bulk(actions)
@action_count ||= 0
@action_count += actions.size

return if actions.empty?

bulk_actions = actions.collect do |action, args, source|
Expand All @@ -131,7 +118,7 @@ def bulk(actions)
action.map {|line| LogStash::Json.dump(line)}.join("\n") :
LogStash::Json.dump(action)
as_json << "\n"
if (stream_writer.pos + as_json.bytesize) > TARGET_BULK_BYTES && stream_writer.pos > 0
if (stream_writer.pos + as_json.bytesize) > @target_bulk_bytes && stream_writer.pos > 0
stream_writer.flush # ensure writer has sync'd buffers before reporting sizes
logger.debug("Sending partial bulk request for batch with one or more actions remaining.",
:action_count => batch_actions.size,
Expand Down
1 change: 1 addition & 0 deletions lib/logstash/outputs/opensearch/http_client_builder.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def self.build(logger, hosts, params)
end

common_options[:timeout] = params["timeout"] if params["timeout"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, does specifying timeout = nil make the request not timeout? For another PR/workitem I'm used to seeing if params.key?(:timeout) and support all values with predictable behavior.

common_options[:target_bulk_bytes] = params["target_bulk_bytes"]

if params["path"]
client_settings[:path] = dedup_slashes("/#{params["path"]}/")
Expand Down
14 changes: 14 additions & 0 deletions lib/logstash/plugin_mixins/opensearch/api_configs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ module APIConfigs
# this defaults to a concatenation of the path parameter and "_bulk"
:bulk_path => { :validate => :string },

# Maximum number of bytes in bulk requests
# The criteria for deciding the default value of target_bulk_bytes is:
# 1. We need a number that's less than 10MiB because OpenSearch is commonly
# configured (particular in AWS Opensearch Service) to not accept
# bulks larger than that.
# 2. It must be large enough to amortize the connection constant
# across multiple requests.
# 3. It must be small enough that even if multiple threads hit this size
# we won't use a lot of heap.
:target_bulk_bytes => {
:validate => :number,
:default => 9 * 1024 * 1024 # 9MiB
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not keep default as 20MiB for backward compatible and allow AWS OpenSearch Service users to set value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it doesn't work for many users giving problems that are hard to resolve. I think it's better if the plugin works out of the box.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing default value will break backwards compatibility. Correct me if I am wrong, but if a user is sending bulk requests to a self managed cluster configured with http.max_content_length > 9 MiB, upgrading to a version with this change would cause the plugin to fail to send all data.

Instead of changing default, maybe a better solution would be to update this documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing default value will break backwards compatibility. Correct me if I am wrong, but if a user is sending bulk requests to a self managed cluster configured with http.max_content_length > 9 MiB, upgrading to a version with this change would cause the plugin to fail to send all data.

Not at all. The only drawback is a slightly lower maximum throughput. But with this fix a user can tweak to their hearts content.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the correction.

Still, I feel this may violate backwards compatibility. If a user is sending bulk requests to a self managed cluster configured with http.max_content_length > 9 MiB, and they upgrade to this version, they may see performance degradation due to the decrease in maximum throughput. So, Im still in favor of keeping it at 20 MiB and letting users modify to work with AWS OpenSearch Service. The documentation above could be updated to prevent confusion.

What are your thoughts @dblock and @VijayanB?

Copy link
Member

@VijayanB VijayanB Sep 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One problem i could see is that we might be flushing bulk request frequent than before for size > 9 and < 20. Let's say, my total request size is 18 MB, with this change it will send two request than one, and proposal is to configure the value to say 20MB @msvticket can you correct if my understanding is wrong?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's correct. My opinion is that it is more important to prevent errors than preserving probably not that common cases.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only thing you're arguing about is whether to increment a minor version for this change, or a major version.

Sounds like the default was poorly chosen. If it causes bugs, then it's a bug fix, and changing the value of the default in a minor version is OK. Otherwise do the change in a major version increment. My take for this case is that it's ok to do it in a minor version increment.

Since it's configurable, you are preserving backwards compatibility. Behavior may change, but it's not broken. You will surprise some people who didn't read the release notes.

},

# Pass a set of key value pairs as the URL query string. This query string is added
# to every host listed in the 'hosts' configuration. If the 'hosts' list contains
# urls that already have query strings, the one specified here will be appended.
Expand Down
9 changes: 4 additions & 5 deletions spec/integration/outputs/index_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
require_relative "../../../spec/opensearch_spec_helper"
require "logstash/outputs/opensearch"

describe "TARGET_BULK_BYTES", :integration => true do
let(:target_bulk_bytes) { LogStash::Outputs::OpenSearch::TARGET_BULK_BYTES }
describe "target_bulk_bytes", :integration => true do
let(:event_count) { 1000 }
let(:events) { event_count.times.map { event }.to_a }
let(:config) {
Expand All @@ -32,11 +31,11 @@
end

describe "batches that are too large for one" do
let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) }
let(:event) { LogStash::Event.new("message" => "a " * (((subject.client.target_bulk_bytes/2) / event_count)+1)) }

it "should send in two batches" do
expect(subject.client).to have_received(:bulk_send).twice do |payload|
expect(payload.size).to be <= target_bulk_bytes
expect(payload.size).to be <= subject.client.target_bulk_bytes
end
end

Expand All @@ -47,7 +46,7 @@

it "should send in one batch" do
expect(subject.client).to have_received(:bulk_send).once do |payload|
expect(payload.size).to be <= target_bulk_bytes
expect(payload.size).to be <= subject.client.target_bulk_bytes
end
end
end
Expand Down
9 changes: 5 additions & 4 deletions spec/unit/outputs/opensearch/http_client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
opts = {
:hosts => [::LogStash::Util::SafeURI.new("127.0.0.1")],
:logger => Cabin::Channel.get,
:target_bulk_bytes => 9_000_000,
:metric => ::LogStash::Instrument::NullMetric.new(:dummy).namespace(:alsodummy)
}

Expand Down Expand Up @@ -226,8 +227,8 @@
end
end

context "if a message is over TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::OpenSearch::TARGET_BULK_BYTES }
context "if a message is over target_bulk_bytes" do
let(:target_bulk_bytes) { subject.target_bulk_bytes }
let(:message) { "a" * (target_bulk_bytes + 1) }

it "should be handled properly" do
Expand Down Expand Up @@ -256,8 +257,8 @@
s = subject.send(:bulk, actions)
end

context "if one exceeds TARGET_BULK_BYTES" do
let(:target_bulk_bytes) { LogStash::Outputs::OpenSearch::TARGET_BULK_BYTES }
context "if one exceeds target_bulk_bytes" do
let(:target_bulk_bytes) { subject.target_bulk_bytes }
let(:message1) { "a" * (target_bulk_bytes + 1) }
it "executes two bulk_send operations" do
allow(subject).to receive(:join_bulk_responses)
Expand Down
4 changes: 2 additions & 2 deletions spec/unit/outputs/opensearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@
end

context '413 errors' do
let(:payload_size) { LogStash::Outputs::OpenSearch::TARGET_BULK_BYTES + 1024 }
let(:payload_size) { subject.client.target_bulk_bytes + 1024 }
let(:event) { ::LogStash::Event.new("message" => ("a" * payload_size ) ) }

let(:logger_stub) { double("logger").as_null_object }
Expand Down Expand Up @@ -357,7 +357,7 @@

expect(logger_stub).to have_received(:warn)
.with(a_string_matching(/413 Payload Too Large/),
hash_including(:action_count => 1, :content_length => a_value > 20_000_000))
hash_including(:action_count => 1, :content_length => a_value > subject.client.target_bulk_bytes))
end
end

Expand Down