22
22
import com .google .common .collect .Sets ;
23
23
import org .elasticsearch .ElasticsearchException ;
24
24
import org .elasticsearch .ElasticsearchIllegalStateException ;
25
+ import org .elasticsearch .ElasticsearchWrapperException ;
25
26
import org .elasticsearch .ExceptionsHelper ;
26
27
import org .elasticsearch .action .ActionListener ;
27
28
import org .elasticsearch .action .ActionRequest ;
44
45
import org .elasticsearch .cluster .metadata .MappingMetaData ;
45
46
import org .elasticsearch .cluster .node .DiscoveryNode ;
46
47
import org .elasticsearch .cluster .routing .ShardIterator ;
48
+ import org .elasticsearch .common .Nullable ;
47
49
import org .elasticsearch .common .bytes .BytesReference ;
48
50
import org .elasticsearch .common .collect .Tuple ;
49
51
import org .elasticsearch .common .inject .Inject ;
@@ -141,7 +143,7 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
141
143
final BulkShardRequest request = shardRequest .request ;
142
144
IndexShard indexShard = indicesService .indexServiceSafe (shardRequest .request .index ()).shardSafe (shardRequest .shardId );
143
145
Engine .IndexingOperation [] ops = null ;
144
- Set <Tuple <String , String >> mappingsToUpdate = null ;
146
+ final Set <Tuple <String , String >> mappingsToUpdate = Sets . newHashSet () ;
145
147
146
148
BulkItemResponse [] responses = new BulkItemResponse [request .items ().length ];
147
149
long [] preVersions = new long [request .items ().length ];
@@ -150,22 +152,28 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
150
152
if (item .request () instanceof IndexRequest ) {
151
153
IndexRequest indexRequest = (IndexRequest ) item .request ();
152
154
try {
153
- WriteResult result = shardIndexOperation (request , indexRequest , clusterState , indexShard , true );
154
- // add the response
155
- IndexResponse indexResponse = result .response ();
156
- responses [requestIndex ] = new BulkItemResponse (item .id (), indexRequest .opType ().lowercase (), indexResponse );
157
- preVersions [requestIndex ] = result .preVersion ;
158
- if (result .mappingToUpdate != null ) {
159
- if (mappingsToUpdate == null ) {
160
- mappingsToUpdate = Sets .newHashSet ();
155
+
156
+ try {
157
+ WriteResult result = shardIndexOperation (request , indexRequest , clusterState , indexShard , true );
158
+ // add the response
159
+ IndexResponse indexResponse = result .response ();
160
+ responses [requestIndex ] = new BulkItemResponse (item .id (), indexRequest .opType ().lowercase (), indexResponse );
161
+ preVersions [requestIndex ] = result .preVersion ;
162
+ if (result .mappingToUpdate != null ) {
163
+ mappingsToUpdate .add (result .mappingToUpdate );
161
164
}
162
- mappingsToUpdate .add (result .mappingToUpdate );
163
- }
164
- if (result .op != null ) {
165
- if (ops == null ) {
166
- ops = new Engine .IndexingOperation [request .items ().length ];
165
+ if (result .op != null ) {
166
+ if (ops == null ) {
167
+ ops = new Engine .IndexingOperation [request .items ().length ];
168
+ }
169
+ ops [requestIndex ] = result .op ;
170
+ }
171
+ } catch (WriteFailure e ){
172
+ Tuple <String , String > mappingsToUpdateOnFailure = e .mappingsToUpdate ;
173
+ if (mappingsToUpdateOnFailure != null ) {
174
+ mappingsToUpdate .add (mappingsToUpdateOnFailure );
167
175
}
168
- ops [ requestIndex ] = result . op ;
176
+ throw e . getCause () ;
169
177
}
170
178
} catch (Throwable e ) {
171
179
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
@@ -174,6 +182,9 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
174
182
for (int j = 0 ; j < requestIndex ; j ++) {
175
183
applyVersion (request .items ()[j ], preVersions [j ]);
176
184
}
185
+ for (Tuple <String , String > mappingToUpdate : mappingsToUpdate ) {
186
+ updateMappingOnMaster (mappingToUpdate .v1 (), mappingToUpdate .v2 ());
187
+ }
177
188
throw (ElasticsearchException ) e ;
178
189
}
179
190
if (e instanceof ElasticsearchException && ((ElasticsearchException ) e ).status () == RestStatus .CONFLICT ) {
@@ -239,9 +250,6 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
239
250
responses [requestIndex ] = new BulkItemResponse (item .id (), "update" , updateResponse );
240
251
preVersions [requestIndex ] = result .preVersion ;
241
252
if (result .mappingToUpdate != null ) {
242
- if (mappingsToUpdate == null ) {
243
- mappingsToUpdate = Sets .newHashSet ();
244
- }
245
253
mappingsToUpdate .add (result .mappingToUpdate );
246
254
}
247
255
if (result .op != null ) {
@@ -277,8 +285,6 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
277
285
// we can't try any more
278
286
responses [requestIndex ] = new BulkItemResponse (item .id (), "update" ,
279
287
new BulkItemResponse .Failure (updateRequest .index (), updateRequest .type (), updateRequest .id (), t ));
280
- ;
281
-
282
288
request .items ()[requestIndex ] = null ; // do not send to replicas
283
289
}
284
290
} else {
@@ -331,10 +337,8 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
331
337
332
338
}
333
339
334
- if (mappingsToUpdate != null ) {
335
- for (Tuple <String , String > mappingToUpdate : mappingsToUpdate ) {
336
- updateMappingOnMaster (mappingToUpdate .v1 (), mappingToUpdate .v2 ());
337
- }
340
+ for (Tuple <String , String > mappingToUpdate : mappingsToUpdate ) {
341
+ updateMappingOnMaster (mappingToUpdate .v1 (), mappingToUpdate .v2 ());
338
342
}
339
343
340
344
if (request .refresh ()) {
@@ -369,6 +373,17 @@ <T> T response() {
369
373
370
374
}
371
375
376
+ static class WriteFailure extends ElasticsearchException implements ElasticsearchWrapperException {
377
+ @ Nullable
378
+ final Tuple <String , String > mappingsToUpdate ;
379
+
380
+ WriteFailure (Throwable cause , Tuple <String , String > mappingsToUpdate ) {
381
+ super (null , cause );
382
+ assert cause != null ;
383
+ this .mappingsToUpdate = mappingsToUpdate ;
384
+ }
385
+ }
386
+
372
387
private WriteResult shardIndexOperation (BulkShardRequest request , IndexRequest indexRequest , ClusterState clusterState ,
373
388
IndexShard indexShard , boolean processed ) {
374
389
@@ -387,30 +402,38 @@ private WriteResult shardIndexOperation(BulkShardRequest request, IndexRequest i
387
402
SourceToParse sourceToParse = SourceToParse .source (SourceToParse .Origin .PRIMARY , indexRequest .source ()).type (indexRequest .type ()).id (indexRequest .id ())
388
403
.routing (indexRequest .routing ()).parent (indexRequest .parent ()).timestamp (indexRequest .timestamp ()).ttl (indexRequest .ttl ());
389
404
405
+ // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
406
+ Tuple <String , String > mappingsToUpdate = null ;
407
+
390
408
long version ;
391
409
boolean created ;
392
410
Engine .IndexingOperation op ;
393
- if (indexRequest .opType () == IndexRequest .OpType .INDEX ) {
394
- Engine .Index index = indexShard .prepareIndex (sourceToParse ).version (indexRequest .version ()).versionType (indexRequest .versionType ()).origin (Engine .Operation .Origin .PRIMARY );
395
- indexShard .index (index );
396
- version = index .version ();
397
- op = index ;
398
- created = index .created ();
399
- } else {
400
- Engine .Create create = indexShard .prepareCreate (sourceToParse ).version (indexRequest .version ()).versionType (indexRequest .versionType ()).origin (Engine .Operation .Origin .PRIMARY );
401
- indexShard .create (create );
402
- version = create .version ();
403
- op = create ;
404
- created = true ;
405
- }
406
411
long preVersion = indexRequest .version ();
407
- // update the version on request so it will happen on the replicas
408
- indexRequest .version (version );
409
-
410
- // update mapping on master if needed, we won't update changes to the same type, since once its changed, it won't have mappers added
411
- Tuple <String , String > mappingsToUpdate = null ;
412
- if (op .parsedDoc ().mappingsModified ()) {
413
- mappingsToUpdate = Tuple .tuple (indexRequest .index (), indexRequest .type ());
412
+ try {
413
+ if (indexRequest .opType () == IndexRequest .OpType .INDEX ) {
414
+ Engine .Index index = indexShard .prepareIndex (sourceToParse ).version (indexRequest .version ()).versionType (indexRequest .versionType ()).origin (Engine .Operation .Origin .PRIMARY );
415
+ if (index .parsedDoc ().mappingsModified ()) {
416
+ mappingsToUpdate = Tuple .tuple (indexRequest .index (), indexRequest .type ());
417
+ }
418
+ indexShard .index (index );
419
+ version = index .version ();
420
+ op = index ;
421
+ created = index .created ();
422
+ } else {
423
+ Engine .Create create = indexShard .prepareCreate (sourceToParse ).version (indexRequest .version ()).versionType (indexRequest .versionType ()).origin (Engine .Operation .Origin .PRIMARY );
424
+ if (create .parsedDoc ().mappingsModified ()) {
425
+ mappingsToUpdate = Tuple .tuple (indexRequest .index (), indexRequest .type ());
426
+ }
427
+ indexShard .create (create );
428
+ version = create .version ();
429
+ op = create ;
430
+ created = true ;
431
+ }
432
+ // update the version on request so it will happen on the replicas
433
+ indexRequest .versionType (indexRequest .versionType ());
434
+ indexRequest .version (version );
435
+ } catch (Throwable t ) {
436
+ throw new WriteFailure (t , mappingsToUpdate );
414
437
}
415
438
416
439
IndexResponse indexResponse = new IndexResponse (indexRequest .index (), indexRequest .type (), indexRequest .id (), version , created );
0 commit comments