Skip to content

Commit 46f1e30

Browse files
committed
Recovery from local gateway should re-introduce new mappings
The delayed mapping intro tests exposed a bug where if a new mapping is introduced, yet not updated on the master, and a full restart occurs, reply of the transaction log will not cause the new mapping to be re-introduced. closes elastic#6659 add comment on the method
1 parent e851908 commit 46f1e30

File tree

4 files changed

+71
-73
lines changed

4 files changed

+71
-73
lines changed

src/main/java/org/elasticsearch/index/gateway/local/LocalIndexShardGateway.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,16 @@
2424
import org.apache.lucene.index.SegmentInfos;
2525
import org.elasticsearch.ElasticsearchException;
2626
import org.elasticsearch.ExceptionsHelper;
27+
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
2728
import org.elasticsearch.common.inject.Inject;
2829
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
2930
import org.elasticsearch.common.lucene.Lucene;
3031
import org.elasticsearch.common.settings.Settings;
3132
import org.elasticsearch.common.unit.TimeValue;
33+
import org.elasticsearch.index.engine.Engine;
3234
import org.elasticsearch.index.gateway.IndexShardGateway;
3335
import org.elasticsearch.index.gateway.IndexShardGatewayRecoveryException;
36+
import org.elasticsearch.index.service.IndexService;
3437
import org.elasticsearch.index.settings.IndexSettings;
3538
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
3639
import org.elasticsearch.index.shard.IndexShardState;
@@ -57,7 +60,8 @@
5760
public class LocalIndexShardGateway extends AbstractIndexShardComponent implements IndexShardGateway {
5861

5962
private final ThreadPool threadPool;
60-
63+
private final MappingUpdatedAction mappingUpdatedAction;
64+
private final IndexService indexService;
6165
private final InternalIndexShard indexShard;
6266

6367
private final RecoveryState recoveryState = new RecoveryState();
@@ -66,9 +70,12 @@ public class LocalIndexShardGateway extends AbstractIndexShardComponent implemen
6670
private final TimeValue syncInterval;
6771

6872
@Inject
69-
public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexShard indexShard) {
73+
public LocalIndexShardGateway(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, MappingUpdatedAction mappingUpdatedAction,
74+
IndexService indexService, IndexShard indexShard) {
7075
super(shardId, indexSettings);
7176
this.threadPool = threadPool;
77+
this.mappingUpdatedAction = mappingUpdatedAction;
78+
this.indexService = indexService;
7279
this.indexShard = (InternalIndexShard) indexShard;
7380

7481
syncInterval = componentSettings.getAsTime("sync", TimeValue.timeValueSeconds(5));
@@ -224,7 +231,10 @@ public void recover(boolean indexShouldExists, RecoveryState recoveryState) thro
224231
break;
225232
}
226233
try {
227-
indexShard.performRecoveryOperation(operation);
234+
Engine.IndexingOperation potentialIndexOperation = indexShard.performRecoveryOperation(operation);
235+
if (potentialIndexOperation != null) {
236+
mappingUpdatedAction.updateMappingOnMaster(indexService.index().name(), potentialIndexOperation.docMapper(), indexService.indexUUID());
237+
}
228238
recoveryState.getTranslog().addTranslogOperations(1);
229239
} catch (ElasticsearchException e) {
230240
if (e.status() == RestStatus.BAD_REQUEST) {

src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java

+17-7
Original file line numberDiff line numberDiff line change
@@ -736,24 +736,33 @@ public void performRecoveryFinalization(boolean withFlush) throws ElasticsearchE
736736
engine.enableGcDeletes(true);
737737
}
738738

739-
public void performRecoveryOperation(Translog.Operation operation) throws ElasticsearchException {
739+
/**
740+
* Performs a single recovery operation, and returns the indexing operation (or null if its not an indexing operation)
741+
* that can then be used for mapping updates (for example) if needed.
742+
*/
743+
public Engine.IndexingOperation performRecoveryOperation(Translog.Operation operation) throws ElasticsearchException {
740744
if (state != IndexShardState.RECOVERING) {
741745
throw new IndexShardNotRecoveringException(shardId, state);
742746
}
747+
Engine.IndexingOperation indexOperation = null;
743748
try {
744749
switch (operation.opType()) {
745750
case CREATE:
746751
Translog.Create create = (Translog.Create) operation;
747-
engine.create(prepareCreate(
752+
Engine.Create engineCreate = prepareCreate(
748753
source(create.source()).type(create.type()).id(create.id())
749-
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
750-
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false));
754+
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
755+
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
756+
engine.create(engineCreate);
757+
indexOperation = engineCreate;
751758
break;
752759
case SAVE:
753760
Translog.Index index = (Translog.Index) operation;
754-
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
755-
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
756-
index.version(),index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true));
761+
Engine.Index engineIndex = prepareIndex(source(index.source()).type(index.type()).id(index.id())
762+
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
763+
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true);
764+
engine.index(engineIndex);
765+
indexOperation = engineIndex;
757766
break;
758767
case DELETE:
759768
Translog.Delete delete = (Translog.Delete) operation;
@@ -786,6 +795,7 @@ public void performRecoveryOperation(Translog.Operation operation) throws Elasti
786795
throw e;
787796
}
788797
}
798+
return indexOperation;
789799
}
790800

791801
/**

0 commit comments

Comments
 (0)