26
26
import org .elasticsearch .common .io .stream .NamedWriteableAwareStreamInput ;
27
27
import org .elasticsearch .common .io .stream .NamedWriteableRegistry ;
28
28
import org .elasticsearch .common .io .stream .OutputStreamStreamOutput ;
29
+ import org .elasticsearch .common .io .stream .ReleasableBytesStreamOutput ;
29
30
import org .elasticsearch .common .io .stream .StreamInput ;
30
31
import org .elasticsearch .common .io .stream .Writeable ;
31
32
import org .elasticsearch .common .settings .Settings ;
33
+ import org .elasticsearch .common .util .BigArrays ;
32
34
import org .elasticsearch .common .util .concurrent .ThreadContext ;
33
35
import org .elasticsearch .common .xcontent .XContentBuilder ;
34
36
import org .elasticsearch .common .xcontent .XContentFactory ;
@@ -136,20 +138,23 @@ public static SystemIndexDescriptor getSystemIndexDescriptor() {
136
138
private final SecurityContext securityContext ;
137
139
private final NamedWriteableRegistry registry ;
138
140
private final Writeable .Reader <R > reader ;
141
+ private final BigArrays bigArrays ;
139
142
140
143
public AsyncTaskIndexService (String index ,
141
144
ClusterService clusterService ,
142
145
ThreadContext threadContext ,
143
146
Client client ,
144
147
String origin ,
145
148
Writeable .Reader <R > reader ,
146
- NamedWriteableRegistry registry ) {
149
+ NamedWriteableRegistry registry ,
150
+ BigArrays bigArrays ) {
147
151
this .index = index ;
148
152
this .securityContext = new SecurityContext (clusterService .getSettings (), threadContext );
149
153
this .client = client ;
150
154
this .clientWithOrigin = new OriginSettingClient (client , origin );
151
155
this .registry = registry ;
152
156
this .reader = reader ;
157
+ this .bigArrays = bigArrays ;
153
158
}
154
159
155
160
/**
@@ -182,18 +187,22 @@ public void createResponse(String docId,
182
187
R response ,
183
188
ActionListener <IndexResponse > listener ) throws IOException {
184
189
try {
185
- // TODO: Integrate with circuit breaker
186
- final XContentBuilder source = XContentFactory .jsonBuilder ()
190
+ final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput (0 , bigArrays .withCircuitBreaking ());
191
+ final XContentBuilder source = XContentFactory .jsonBuilder (buffer );
192
+ listener = ActionListener .runBefore (listener , source ::close );
193
+ source
187
194
.startObject ()
188
195
.field (HEADERS_FIELD , headers )
189
196
.field (EXPIRATION_TIME_FIELD , response .getExpirationTime ())
190
197
.directFieldAsBase64 (RESULT_FIELD , os -> writeResponse (response , os ))
191
198
.endObject ();
192
-
199
+ // do not close the buffer or the XContentBuilder until the IndexRequest is completed (i.e., listener is notified);
200
+ // otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage.
201
+ source .flush ();
193
202
final IndexRequest indexRequest = new IndexRequest (index )
194
203
.create (true )
195
204
.id (docId )
196
- .source (source );
205
+ .source (buffer . bytes (), source . contentType () );
197
206
clientWithOrigin .index (indexRequest , listener );
198
207
} catch (Exception e ) {
199
208
listener .onFailure (e );
@@ -204,20 +213,25 @@ public void createResponse(String docId,
204
213
* Stores the final response if the place-holder document is still present (update).
205
214
*/
206
215
public void updateResponse (String docId ,
207
- Map <String , List <String >> responseHeaders ,
208
- R response ,
209
- ActionListener <UpdateResponse > listener ) {
216
+ Map <String , List <String >> responseHeaders ,
217
+ R response ,
218
+ ActionListener <UpdateResponse > listener ) {
210
219
try {
211
- // TODO: Integrate with circuit breaker
212
- final XContentBuilder source = XContentFactory .jsonBuilder ()
220
+ final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput (0 , bigArrays .withCircuitBreaking ());
221
+ final XContentBuilder source = XContentFactory .jsonBuilder (buffer );
222
+ listener = ActionListener .runBefore (listener , source ::close );
223
+ source
213
224
.startObject ()
214
225
.field (RESPONSE_HEADERS_FIELD , responseHeaders )
215
226
.directFieldAsBase64 (RESULT_FIELD , os -> writeResponse (response , os ))
216
227
.endObject ();
217
- UpdateRequest request = new UpdateRequest ()
228
+ // do not close the buffer or the XContentBuilder until the UpdateRequest is completed (i.e., listener is notified);
229
+ // otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage.
230
+ source .flush ();
231
+ final UpdateRequest request = new UpdateRequest ()
218
232
.index (index )
219
233
.id (docId )
220
- .doc (source )
234
+ .doc (buffer . bytes (), source . contentType () )
221
235
.retryOnConflict (5 );
222
236
clientWithOrigin .update (request , listener );
223
237
} catch (Exception e ) {
0 commit comments