81
81
import org .elasticsearch .index .engine .EngineConfig ;
82
82
import org .elasticsearch .index .engine .EngineException ;
83
83
import org .elasticsearch .index .engine .EngineFactory ;
84
+ import org .elasticsearch .index .engine .ReadOnlyEngine ;
84
85
import org .elasticsearch .index .engine .RefreshFailedEngineException ;
85
86
import org .elasticsearch .index .engine .Segment ;
86
87
import org .elasticsearch .index .engine .SegmentsStats ;
155
156
import java .util .concurrent .atomic .AtomicReference ;
156
157
import java .util .function .BiConsumer ;
157
158
import java .util .function .Consumer ;
159
+ import java .util .function .Function ;
158
160
import java .util .function .Supplier ;
159
161
import java .util .stream .Collectors ;
160
162
import java .util .stream .StreamSupport ;
@@ -686,20 +688,20 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
686
688
public Engine .IndexResult applyIndexOperationOnPrimary (long version , VersionType versionType , SourceToParse sourceToParse ,
687
689
long autoGeneratedTimestamp , boolean isRetry ) throws IOException {
688
690
assert versionType .validateVersionForWrites (version );
689
- return applyIndexOperation (UNASSIGNED_SEQ_NO , operationPrimaryTerm , version , versionType , autoGeneratedTimestamp ,
691
+ return applyIndexOperation (getEngine (), UNASSIGNED_SEQ_NO , operationPrimaryTerm , version , versionType , autoGeneratedTimestamp ,
690
692
isRetry , Engine .Operation .Origin .PRIMARY , sourceToParse );
691
693
}
692
694
693
695
public Engine .IndexResult applyIndexOperationOnReplica (long seqNo , long version , long autoGeneratedTimeStamp ,
694
696
boolean isRetry , SourceToParse sourceToParse )
695
697
throws IOException {
696
- return applyIndexOperation (seqNo , operationPrimaryTerm , version , null , autoGeneratedTimeStamp , isRetry ,
698
+ return applyIndexOperation (getEngine (), seqNo , operationPrimaryTerm , version , null , autoGeneratedTimeStamp , isRetry ,
697
699
Engine .Operation .Origin .REPLICA , sourceToParse );
698
700
}
699
701
700
- private Engine .IndexResult applyIndexOperation (long seqNo , long opPrimaryTerm , long version , @ Nullable VersionType versionType ,
701
- long autoGeneratedTimeStamp , boolean isRetry , Engine . Operation . Origin origin ,
702
- SourceToParse sourceToParse ) throws IOException {
702
+ private Engine .IndexResult applyIndexOperation (Engine engine , long seqNo , long opPrimaryTerm , long version ,
703
+ @ Nullable VersionType versionType , long autoGeneratedTimeStamp , boolean isRetry ,
704
+ Engine . Operation . Origin origin , SourceToParse sourceToParse ) throws IOException {
703
705
assert opPrimaryTerm <= this .operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this .operationPrimaryTerm
704
706
+ "]" ;
705
707
ensureWriteAllowed (origin );
@@ -721,7 +723,7 @@ private Engine.IndexResult applyIndexOperation(long seqNo, long opPrimaryTerm, l
721
723
return new Engine .IndexResult (e , version , opPrimaryTerm , seqNo );
722
724
}
723
725
724
- return index (getEngine () , operation );
726
+ return index (engine , operation );
725
727
}
726
728
727
729
public static Engine .Index prepareIndex (DocumentMapperForType docMapper , Version indexCreatedVersion , SourceToParse source , long seqNo ,
@@ -755,17 +757,17 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
755
757
}
756
758
757
759
public Engine .NoOpResult markSeqNoAsNoop (long seqNo , String reason ) throws IOException {
758
- return markSeqNoAsNoop (seqNo , operationPrimaryTerm , reason , Engine .Operation .Origin .REPLICA );
760
+ return markSeqNoAsNoop (getEngine (), seqNo , operationPrimaryTerm , reason , Engine .Operation .Origin .REPLICA );
759
761
}
760
762
761
- private Engine .NoOpResult markSeqNoAsNoop (long seqNo , long opPrimaryTerm , String reason ,
763
+ private Engine .NoOpResult markSeqNoAsNoop (Engine engine , long seqNo , long opPrimaryTerm , String reason ,
762
764
Engine .Operation .Origin origin ) throws IOException {
763
765
assert opPrimaryTerm <= this .operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this .operationPrimaryTerm
764
766
+ "]" ;
765
767
long startTime = System .nanoTime ();
766
768
ensureWriteAllowed (origin );
767
769
final Engine .NoOp noOp = new Engine .NoOp (seqNo , opPrimaryTerm , origin , startTime , reason );
768
- return noOp (getEngine () , noOp );
770
+ return noOp (engine , noOp );
769
771
}
770
772
771
773
private Engine .NoOpResult noOp (Engine engine , Engine .NoOp noOp ) {
@@ -787,15 +789,15 @@ public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) {
787
789
public Engine .DeleteResult applyDeleteOperationOnPrimary (long version , String type , String id , VersionType versionType )
788
790
throws IOException {
789
791
assert versionType .validateVersionForWrites (version );
790
- return applyDeleteOperation (UNASSIGNED_SEQ_NO , operationPrimaryTerm , version , type , id , versionType ,
792
+ return applyDeleteOperation (getEngine (), UNASSIGNED_SEQ_NO , operationPrimaryTerm , version , type , id , versionType ,
791
793
Engine .Operation .Origin .PRIMARY );
792
794
}
793
795
794
796
public Engine .DeleteResult applyDeleteOperationOnReplica (long seqNo , long version , String type , String id ) throws IOException {
795
- return applyDeleteOperation (seqNo , operationPrimaryTerm , version , type , id , null , Engine .Operation .Origin .REPLICA );
797
+ return applyDeleteOperation (getEngine (), seqNo , operationPrimaryTerm , version , type , id , null , Engine .Operation .Origin .REPLICA );
796
798
}
797
799
798
- private Engine .DeleteResult applyDeleteOperation (long seqNo , long opPrimaryTerm , long version , String type , String id ,
800
+ private Engine .DeleteResult applyDeleteOperation (Engine engine , long seqNo , long opPrimaryTerm , long version , String type , String id ,
799
801
@ Nullable VersionType versionType , Engine .Operation .Origin origin ) throws IOException {
800
802
assert opPrimaryTerm <= this .operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this .operationPrimaryTerm
801
803
+ "]" ;
@@ -826,7 +828,7 @@ private Engine.DeleteResult applyDeleteOperation(long seqNo, long opPrimaryTerm,
826
828
final Term uid = new Term (IdFieldMapper .NAME , Uid .encodeId (id ));
827
829
final Engine .Delete delete = prepareDelete (type , id , uid , seqNo , opPrimaryTerm , version ,
828
830
versionType , origin );
829
- return delete (getEngine () , delete );
831
+ return delete (engine , delete );
830
832
}
831
833
832
834
private Engine .Delete prepareDelete (String type , String id , Term uid , long seqNo , long primaryTerm , long version ,
@@ -1265,6 +1267,11 @@ public void updateMaxUnsafeAutoIdTimestamp(long maxSeenAutoIdTimestampFromPrimar
1265
1267
}
1266
1268
1267
1269
public Engine .Result applyTranslogOperation (Translog .Operation operation , Engine .Operation .Origin origin ) throws IOException {
1270
+ return applyTranslogOperation (getEngine (), operation , origin );
1271
+ }
1272
+
1273
+ private Engine .Result applyTranslogOperation (Engine engine , Translog .Operation operation ,
1274
+ Engine .Operation .Origin origin ) throws IOException {
1268
1275
// If a translog op is replayed on the primary (eg. ccr), we need to use external instead of null for its version type.
1269
1276
final VersionType versionType = (origin == Engine .Operation .Origin .PRIMARY ) ? VersionType .EXTERNAL : null ;
1270
1277
final Engine .Result result ;
@@ -1273,19 +1280,19 @@ public Engine.Result applyTranslogOperation(Translog.Operation operation, Engine
1273
1280
final Translog .Index index = (Translog .Index ) operation ;
1274
1281
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
1275
1282
// autoGeneratedID docs that are coming from the primary are updated correctly.
1276
- result = applyIndexOperation (index .seqNo (), index .primaryTerm (), index .version (),
1283
+ result = applyIndexOperation (engine , index .seqNo (), index .primaryTerm (), index .version (),
1277
1284
versionType , index .getAutoGeneratedIdTimestamp (), true , origin ,
1278
1285
source (shardId .getIndexName (), index .type (), index .id (), index .source (),
1279
1286
XContentHelper .xContentType (index .source ())).routing (index .routing ()));
1280
1287
break ;
1281
1288
case DELETE :
1282
1289
final Translog .Delete delete = (Translog .Delete ) operation ;
1283
- result = applyDeleteOperation (delete .seqNo (), delete .primaryTerm (), delete .version (), delete .type (), delete .id (),
1290
+ result = applyDeleteOperation (engine , delete .seqNo (), delete .primaryTerm (), delete .version (), delete .type (), delete .id (),
1284
1291
versionType , origin );
1285
1292
break ;
1286
1293
case NO_OP :
1287
1294
final Translog .NoOp noOp = (Translog .NoOp ) operation ;
1288
- result = markSeqNoAsNoop (noOp .seqNo (), noOp .primaryTerm (), noOp .reason (), origin );
1295
+ result = markSeqNoAsNoop (engine , noOp .seqNo (), noOp .primaryTerm (), noOp .reason (), origin );
1289
1296
break ;
1290
1297
default :
1291
1298
throw new IllegalStateException ("No operation defined for [" + operation + "]" );
@@ -1304,7 +1311,7 @@ int runTranslogRecovery(Engine engine, Translog.Snapshot snapshot, Engine.Operat
1304
1311
while ((operation = snapshot .next ()) != null ) {
1305
1312
try {
1306
1313
logger .trace ("[translog] recover op {}" , operation );
1307
- Engine .Result result = applyTranslogOperation (operation , origin );
1314
+ Engine .Result result = applyTranslogOperation (engine , operation , origin );
1308
1315
switch (result .getResultType ()) {
1309
1316
case FAILURE :
1310
1317
throw result .getFailure ();
@@ -1384,18 +1391,26 @@ private void innerOpenEngineAndTranslog() throws IOException {
1384
1391
final long globalCheckpoint = Translog .readGlobalCheckpoint (translogConfig .getTranslogPath (), translogUUID );
1385
1392
replicationTracker .updateGlobalCheckpointOnReplica (globalCheckpoint , "read from translog checkpoint" );
1386
1393
trimUnsafeCommits ();
1387
-
1388
- createNewEngine (config );
1389
- verifyNotClosed ();
1390
- // We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
1391
- // we still give sync'd flush a chance to run:
1392
- active .set (true );
1394
+ synchronized (mutex ) {
1395
+ verifyNotClosed ();
1396
+ assert currentEngineReference .get () == null : "engine is running" ;
1397
+ // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
1398
+ final Engine newEngine = engineFactory .newReadWriteEngine (config );
1399
+ onNewEngine (newEngine );
1400
+ currentEngineReference .set (newEngine );
1401
+ // We set active because we are now writing operations to the engine; this way,
1402
+ // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1403
+ active .set (true );
1404
+ }
1405
+ // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
1406
+ // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
1407
+ onSettingsChanged ();
1393
1408
assertSequenceNumbersInCommit ();
1394
1409
assert recoveryState .getStage () == RecoveryState .Stage .TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState .getStage ();
1395
1410
}
1396
1411
1397
1412
private void trimUnsafeCommits () throws IOException {
1398
- assert currentEngineReference .get () == null : "engine is running" ;
1413
+ assert currentEngineReference .get () == null || currentEngineReference . get () instanceof ReadOnlyEngine : "a write engine is running" ;
1399
1414
final String translogUUID = store .readLastCommittedSegmentsInfo ().getUserData ().get (Translog .TRANSLOG_UUID_KEY );
1400
1415
final long globalCheckpoint = Translog .readGlobalCheckpoint (translogConfig .getTranslogPath (), translogUUID );
1401
1416
final long minRetainedTranslogGen = Translog .readMinTranslogGeneration (translogConfig .getTranslogPath (), translogUUID );
@@ -2224,31 +2239,6 @@ public void onFailedEngine(String reason, @Nullable Exception failure) {
2224
2239
}
2225
2240
}
2226
2241
2227
- private Engine createNewEngine (EngineConfig config ) {
2228
- synchronized (mutex ) {
2229
- verifyNotClosed ();
2230
- assert this .currentEngineReference .get () == null ;
2231
- Engine engine = newEngine (config );
2232
- onNewEngine (engine ); // call this before we pass the memory barrier otherwise actions that happen
2233
- // inside the callback are not visible. This one enforces happens-before
2234
- this .currentEngineReference .set (engine );
2235
- }
2236
-
2237
- // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during which
2238
- // settings changes could possibly have happened, so here we forcefully push any config changes to the new engine:
2239
- Engine engine = getEngineOrNull ();
2240
-
2241
- // engine could perhaps be null if we were e.g. concurrently closed:
2242
- if (engine != null ) {
2243
- engine .onSettingsChanged ();
2244
- }
2245
- return engine ;
2246
- }
2247
-
2248
- protected Engine newEngine (EngineConfig config ) {
2249
- return engineFactory .newReadWriteEngine (config );
2250
- }
2251
-
2252
2242
private static void persistMetadata (
2253
2243
final ShardPath shardPath ,
2254
2244
final IndexSettings indexSettings ,
@@ -2852,21 +2842,47 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
2852
2842
void resetEngineToGlobalCheckpoint () throws IOException {
2853
2843
assert getActiveOperationsCount () == 0 : "Ongoing writes [" + getActiveOperations () + "]" ;
2854
2844
sync (); // persist the global checkpoint to disk
2855
- final long globalCheckpoint = getGlobalCheckpoint ();
2856
- final Engine newEngine ;
2845
+ final SeqNoStats seqNoStats = seqNoStats ();
2846
+ final TranslogStats translogStats = translogStats ();
2847
+ // flush to make sure the latest commit, which will be opened by the read-only engine, includes all operations.
2848
+ flush (new FlushRequest ().waitIfOngoing (true ));
2857
2849
synchronized (mutex ) {
2858
2850
verifyNotClosed ();
2859
- IOUtils .close (currentEngineReference .getAndSet (null ));
2851
+ // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2852
+ final Engine readOnlyEngine = new ReadOnlyEngine (newEngineConfig (), seqNoStats , translogStats , false , Function .identity ());
2853
+ IOUtils .close (currentEngineReference .getAndSet (readOnlyEngine ));
2854
+ }
2855
+
2856
+ Engine newEngine = null ;
2857
+ try {
2858
+ final long globalCheckpoint = getGlobalCheckpoint ();
2860
2859
trimUnsafeCommits ();
2861
- newEngine = createNewEngine (newEngineConfig ());
2862
- active .set (true );
2860
+ synchronized (mutex ) {
2861
+ verifyNotClosed ();
2862
+ // we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
2863
+ newEngine = engineFactory .newReadWriteEngine (newEngineConfig ());
2864
+ onNewEngine (newEngine );
2865
+ }
2866
+ newEngine .advanceMaxSeqNoOfUpdatesOrDeletes (globalCheckpoint );
2867
+ final Engine .TranslogRecoveryRunner translogRunner = (engine , snapshot ) -> runTranslogRecovery (
2868
+ engine , snapshot , Engine .Operation .Origin .LOCAL_RESET , () -> {
2869
+ // TODO: add a dedicate recovery stats for the reset translog
2870
+ });
2871
+ newEngine .recoverFromTranslog (translogRunner , globalCheckpoint );
2872
+ synchronized (mutex ) {
2873
+ verifyNotClosed ();
2874
+ IOUtils .close (currentEngineReference .getAndSet (newEngine ));
2875
+ // We set active because we are now writing operations to the engine; this way,
2876
+ // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
2877
+ active .set (true );
2878
+ newEngine = null ;
2879
+ }
2880
+ // time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
2881
+ // which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
2882
+ onSettingsChanged ();
2883
+ } finally {
2884
+ IOUtils .close (newEngine );
2863
2885
}
2864
- newEngine .advanceMaxSeqNoOfUpdatesOrDeletes (globalCheckpoint );
2865
- final Engine .TranslogRecoveryRunner translogRunner = (engine , snapshot ) -> runTranslogRecovery (
2866
- engine , snapshot , Engine .Operation .Origin .LOCAL_RESET , () -> {
2867
- // TODO: add a dedicate recovery stats for the reset translog
2868
- });
2869
- newEngine .recoverFromTranslog (translogRunner , globalCheckpoint );
2870
2886
}
2871
2887
2872
2888
/**
0 commit comments