15
15
require 'stringio'
16
16
17
17
module LogStash ; module Outputs ; class OpenSearch ;
18
- # This is a constant instead of a config option because
19
- # there really isn't a good reason to configure it.
20
- #
21
- # The criteria used are:
22
- # 1. We need a number that's less than 100MiB because OpenSearch
23
- # won't accept bulks larger than that.
24
- # 2. It must be large enough to amortize the connection constant
25
- # across multiple requests.
26
- # 3. It must be small enough that even if multiple threads hit this size
27
- # we won't use a lot of heap.
28
- #
29
- # We wound up agreeing that a number greater than 10 MiB and less than 100MiB
30
- # made sense. We picked one on the lowish side to not use too much heap.
31
- TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB
32
-
33
18
class HttpClient
34
- attr_reader :client , :options , :logger , :pool , :action_count , :recv_count
19
+ attr_reader :client , :options , :logger , :pool , :action_count , :recv_count , :target_bulk_bytes
20
+
35
21
# This is here in case we use DEFAULT_OPTIONS in the future
36
22
# DEFAULT_OPTIONS = {
37
23
# :setting => value
@@ -73,6 +59,8 @@ def initialize(options={})
73
59
# mutex to prevent requests and sniffing to access the
74
60
# connection pool at the same time
75
61
@bulk_path = @options [ :bulk_path ]
62
+
63
+ @target_bulk_bytes = @options [ :target_bulk_bytes ]
76
64
end
77
65
78
66
def build_url_template
@@ -105,7 +93,6 @@ def maximum_seen_major_version
105
93
def bulk ( actions )
106
94
@action_count ||= 0
107
95
@action_count += actions . size
108
-
109
96
return if actions . empty?
110
97
111
98
bulk_actions = actions . collect do |action , args , source |
@@ -132,7 +119,7 @@ def bulk(actions)
132
119
action . map { |line | LogStash ::Json . dump ( line ) } . join ( "\n " ) :
133
120
LogStash ::Json . dump ( action )
134
121
as_json << "\n "
135
- if ( stream_writer . pos + as_json . bytesize ) > TARGET_BULK_BYTES && stream_writer . pos > 0
122
+ if ( stream_writer . pos + as_json . bytesize ) > @target_bulk_bytes && stream_writer . pos > 0
136
123
stream_writer . flush # ensure writer has sync'd buffers before reporting sizes
137
124
logger . debug ( "Sending partial bulk request for batch with one or more actions remaining." ,
138
125
:action_count => batch_actions . size ,
0 commit comments