6
6
package org .elasticsearch .xpack .monitoring .exporter .http ;
7
7
8
8
import org .apache .http .entity .ContentType ;
9
- import org .apache .http .nio . entity .NByteArrayEntity ;
9
+ import org .apache .http .entity .InputStreamEntity ;
10
10
import org .apache .logging .log4j .LogManager ;
11
11
import org .apache .logging .log4j .Logger ;
12
- import org .apache .logging .log4j .message .ParameterizedMessage ;
13
- import org .apache .logging .log4j .util .Supplier ;
14
- import org .apache .lucene .util .BytesRef ;
15
12
import org .elasticsearch .action .ActionListener ;
16
13
import org .elasticsearch .client .Request ;
17
14
import org .elasticsearch .client .Response ;
18
15
import org .elasticsearch .client .ResponseListener ;
19
16
import org .elasticsearch .client .RestClient ;
20
17
import org .elasticsearch .common .bytes .BytesReference ;
21
18
import org .elasticsearch .common .io .stream .BytesStreamOutput ;
19
+ import org .elasticsearch .common .io .stream .StreamOutput ;
22
20
import org .elasticsearch .common .time .DateFormatter ;
23
21
import org .elasticsearch .common .util .concurrent .ThreadContext ;
22
+ import org .elasticsearch .common .xcontent .ToXContent ;
24
23
import org .elasticsearch .common .xcontent .XContent ;
25
24
import org .elasticsearch .common .xcontent .XContentBuilder ;
26
- import org .elasticsearch .common .xcontent .XContentHelper ;
27
25
import org .elasticsearch .common .xcontent .XContentType ;
28
26
import org .elasticsearch .xpack .core .monitoring .exporter .MonitoringDoc ;
29
27
import org .elasticsearch .xpack .core .monitoring .exporter .MonitoringTemplateUtils ;
@@ -60,7 +58,7 @@ class HttpExportBulk extends ExportBulk {
60
58
/**
61
59
* The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
62
60
*/
63
- private byte [] payload = null ;
61
+ private BytesReference payload = null ;
64
62
65
63
HttpExportBulk (final String name , final RestClient client , final Map <String , String > parameters ,
66
64
final DateFormatter dateTimeFormatter , final ThreadContext threadContext ) {
@@ -77,12 +75,11 @@ public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
77
75
if (docs != null && docs .isEmpty () == false ) {
78
76
try (BytesStreamOutput payload = new BytesStreamOutput ()) {
79
77
for (MonitoringDoc monitoringDoc : docs ) {
80
- // any failure caused by an individual doc will be written as an empty byte[], thus not impacting the rest
81
- payload .write (toBulkBytes (monitoringDoc ));
78
+ writeDocument (monitoringDoc , payload );
82
79
}
83
80
84
81
// store the payload until we flush
85
- this .payload = BytesReference . toBytes ( payload .bytes () );
82
+ this .payload = payload .bytes ();
86
83
}
87
84
}
88
85
} catch (Exception e ) {
@@ -94,12 +91,19 @@ public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
94
91
public void doFlush (ActionListener <Void > listener ) throws ExportException {
95
92
if (payload == null ) {
96
93
listener .onFailure (new ExportException ("unable to send documents because none were loaded for export bulk [{}]" , name ));
97
- } else if (payload .length != 0 ) {
94
+ } else if (payload .length () != 0 ) {
98
95
final Request request = new Request ("POST" , "/_bulk" );
99
96
for (Map .Entry <String , String > param : params .entrySet ()) {
100
97
request .addParameter (param .getKey (), param .getValue ());
101
98
}
102
- request .setEntity (new NByteArrayEntity (payload , ContentType .APPLICATION_JSON ));
99
+ try {
100
+ request .setEntity (new InputStreamEntity (payload .streamInput (), payload .length (), ContentType .APPLICATION_JSON ));
101
+ } catch (IOException e ) {
102
+ listener .onFailure (e );
103
+ return ;
104
+ }
105
+ // null out serialized docs to make things easier on the GC
106
+ payload = null ;
103
107
104
108
client .performRequestAsync (request , new ResponseListener () {
105
109
@ Override
@@ -123,51 +127,43 @@ public void onFailure(Exception exception) {
123
127
}
124
128
}
125
129
126
- private byte [] toBulkBytes ( final MonitoringDoc doc ) throws IOException {
130
+ private void writeDocument ( MonitoringDoc doc , StreamOutput out ) throws IOException {
127
131
final XContentType xContentType = XContentType .JSON ;
128
132
final XContent xContent = xContentType .xContent ();
129
133
130
134
final String index = MonitoringTemplateUtils .indexName (formatter , doc .getSystem (), doc .getTimestamp ());
131
135
final String id = doc .getId ();
132
136
133
- try (BytesStreamOutput out = new BytesStreamOutput ()) {
134
- try (XContentBuilder builder = new XContentBuilder (xContent , out )) {
135
- // Builds the bulk action metadata line
136
- builder .startObject ();
137
+ try (XContentBuilder builder = new XContentBuilder (xContent , out )) {
138
+ // Builds the bulk action metadata line
139
+ builder .startObject ();
140
+ {
141
+ builder .startObject ("index" );
137
142
{
138
- builder .startObject ("index" );
139
- {
140
- builder .field ("_index" , index );
141
- if (id != null ) {
142
- builder .field ("_id" , id );
143
- }
143
+ builder .field ("_index" , index );
144
+ if (id != null ) {
145
+ builder .field ("_id" , id );
144
146
}
145
- builder .endObject ();
146
147
}
147
148
builder .endObject ();
148
149
}
150
+ builder .endObject ();
151
+ }
149
152
150
- // Adds action metadata line bulk separator
151
- out .write (xContent .streamSeparator ());
152
-
153
- // Adds the source of the monitoring document
154
- final BytesRef source = XContentHelper .toXContent (doc , xContentType , false ).toBytesRef ();
155
- out .write (source .bytes , source .offset , source .length );
156
-
157
- // Adds final bulk separator
158
- out .write (xContent .streamSeparator ());
153
+ // Adds action metadata line bulk separator
154
+ out .write (xContent .streamSeparator ());
159
155
160
- logger . trace (
161
- "http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]" ,
162
- name , index , id , doc .getType ()
163
- );
156
+ // Adds the source of the monitoring document
157
+ try ( XContentBuilder builder = new XContentBuilder ( xContent , out )) {
158
+ doc .toXContent ( builder , ToXContent . EMPTY_PARAMS );
159
+ }
164
160
165
- return BytesReference .toBytes (out .bytes ());
166
- } catch (Exception e ) {
167
- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("failed to render document [{}], skipping it [{}]" , doc , name ), e );
161
+ // Adds final bulk separator
162
+ out .write (xContent .streamSeparator ());
168
163
169
- return BytesRef .EMPTY_BYTES ;
170
- }
164
+ logger .trace (
165
+ "http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]" ,
166
+ name , index , id , doc .getType ()
167
+ );
171
168
}
172
-
173
169
}
0 commit comments