@@ -1192,11 +1192,9 @@ public Store.MetadataSnapshot snapshotStoreMetadata() throws IOException {
1192
1192
synchronized (engineMutex ) {
1193
1193
// if the engine is not running, we can access the store directly, but we need to make sure no one starts
1194
1194
// the engine on us. If the engine is running, we can get a snapshot via the deletion policy of the engine.
1195
- synchronized (mutex ) {
1196
- final Engine engine = getEngineOrNull ();
1197
- if (engine != null ) {
1198
- indexCommit = engine .acquireLastIndexCommit (false );
1199
- }
1195
+ final Engine engine = getEngineOrNull ();
1196
+ if (engine != null ) {
1197
+ indexCommit = engine .acquireLastIndexCommit (false );
1200
1198
}
1201
1199
if (indexCommit == null ) {
1202
1200
return store .getMetadata (null , true );
@@ -1320,9 +1318,11 @@ public CacheHelper getReaderCacheHelper() {
1320
1318
}
1321
1319
1322
1320
public void close (String reason , boolean flushEngine ) throws IOException {
1323
- synchronized (mutex ) {
1321
+ synchronized (engineMutex ) {
1324
1322
try {
1325
- changeState (IndexShardState .CLOSED , reason );
1323
+ synchronized (mutex ) {
1324
+ changeState (IndexShardState .CLOSED , reason );
1325
+ }
1326
1326
} finally {
1327
1327
final Engine engine = this .currentEngineReference .getAndSet (null );
1328
1328
try {
@@ -1377,6 +1377,7 @@ public void prepareForIndexRecovery() {
1377
1377
* This is the first operation after the local checkpoint of the safe commit if exists.
1378
1378
*/
1379
1379
public long recoverLocallyUpToGlobalCheckpoint () {
1380
+ assert Thread .holdsLock (mutex ) == false : "recover locally under mutex" ;
1380
1381
if (state != IndexShardState .RECOVERING ) {
1381
1382
throw new IndexShardNotRecoveringException (shardId , state );
1382
1383
}
@@ -1428,7 +1429,7 @@ public long recoverLocallyUpToGlobalCheckpoint() {
1428
1429
getEngine ().recoverFromTranslog (translogRecoveryRunner , globalCheckpoint );
1429
1430
logger .trace ("shard locally recovered up to {}" , getEngine ().getSeqNoStats (globalCheckpoint ));
1430
1431
} finally {
1431
- synchronized (mutex ) {
1432
+ synchronized (engineMutex ) {
1432
1433
IOUtils .close (currentEngineReference .getAndSet (null ));
1433
1434
}
1434
1435
}
@@ -1603,23 +1604,15 @@ private void innerOpenEngineAndTranslog(LongSupplier globalCheckpointSupplier) t
1603
1604
: "expected empty set of retention leases with recovery source [" + recoveryState .getRecoverySource ()
1604
1605
+ "] but got " + getRetentionLeases ();
1605
1606
synchronized (engineMutex ) {
1607
+ assert currentEngineReference .get () == null : "engine is running" ;
1608
+ verifyNotClosed ();
1606
1609
// we must create a new engine under mutex (see IndexShard#snapshotStoreMetadata).
1607
1610
final Engine newEngine = engineFactory .newReadWriteEngine (config );
1608
- synchronized (mutex ) {
1609
- try {
1610
- verifyNotClosed ();
1611
- assert currentEngineReference .get () == null : "engine is running" ;
1612
- onNewEngine (newEngine );
1613
- currentEngineReference .set (newEngine );
1614
- // We set active because we are now writing operations to the engine; this way,
1615
- // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1616
- active .set (true );
1617
- } finally {
1618
- if (currentEngineReference .get () != newEngine ) {
1619
- newEngine .close ();
1620
- }
1621
- }
1622
- }
1611
+ onNewEngine (newEngine );
1612
+ currentEngineReference .set (newEngine );
1613
+ // We set active because we are now writing operations to the engine; this way,
1614
+ // if we go idle after some time and become inactive, we still give sync'd flush a chance to run.
1615
+ active .set (true );
1623
1616
}
1624
1617
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
1625
1618
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
@@ -1650,7 +1643,8 @@ private void onNewEngine(Engine newEngine) {
1650
1643
* called if recovery has to be restarted after network error / delay **
1651
1644
*/
1652
1645
public void performRecoveryRestart () throws IOException {
1653
- synchronized (mutex ) {
1646
+ assert Thread .holdsLock (mutex ) == false : "restart recovery under mutex" ;
1647
+ synchronized (engineMutex ) {
1654
1648
assert refreshListeners .pendingCount () == 0 : "we can't restart with pending listeners" ;
1655
1649
IOUtils .close (currentEngineReference .getAndSet (null ));
1656
1650
resetRecoveryStage ();
@@ -3325,7 +3319,7 @@ public ParsedDocument newNoopTombstoneDoc(String reason) {
3325
3319
* Rollback the current engine to the safe commit, then replay local translog up to the global checkpoint.
3326
3320
*/
3327
3321
void resetEngineToGlobalCheckpoint () throws IOException {
3328
- assert Thread .holdsLock (engineMutex ) == false : "resetting engine under mutex" ;
3322
+ assert Thread .holdsLock (mutex ) == false : "resetting engine under mutex" ;
3329
3323
assert getActiveOperationsCount () == OPERATIONS_BLOCKED
3330
3324
: "resetting engine without blocking operations; active operations are [" + getActiveOperations () + ']' ;
3331
3325
sync (); // persist the global checkpoint to disk
@@ -3338,6 +3332,7 @@ assert getActiveOperationsCount() == OPERATIONS_BLOCKED
3338
3332
final long globalCheckpoint = getLastKnownGlobalCheckpoint ();
3339
3333
assert globalCheckpoint == getLastSyncedGlobalCheckpoint ();
3340
3334
synchronized (engineMutex ) {
3335
+ verifyNotClosed ();
3341
3336
// we must create both new read-only engine and new read-write engine under engineMutex to ensure snapshotStoreMetadata,
3342
3337
// acquireXXXCommit and close works.
3343
3338
final Engine readOnlyEngine =
@@ -3365,7 +3360,7 @@ public IndexCommitRef acquireSafeIndexCommit() {
3365
3360
3366
3361
@ Override
3367
3362
public void close () throws IOException {
3368
- assert Thread .holdsLock (mutex );
3363
+ assert Thread .holdsLock (engineMutex );
3369
3364
3370
3365
Engine newEngine = newEngineReference .get ();
3371
3366
if (newEngine == currentEngineReference .get ()) {
@@ -3375,36 +3370,17 @@ public void close() throws IOException {
3375
3370
IOUtils .close (super ::close , newEngine );
3376
3371
}
3377
3372
};
3378
- synchronized (mutex ) {
3379
- try {
3380
- verifyNotClosed ();
3381
- IOUtils .close (currentEngineReference .getAndSet (readOnlyEngine ));
3382
- } finally {
3383
- if (currentEngineReference .get () != readOnlyEngine ) {
3384
- readOnlyEngine .close ();
3385
- }
3386
- }
3387
- }
3388
- final Engine newReadWriteEngine = engineFactory .newReadWriteEngine (newEngineConfig (replicationTracker ));
3389
- synchronized (mutex ) {
3390
- try {
3391
- verifyNotClosed ();
3392
- newEngineReference .set (newReadWriteEngine );
3393
- onNewEngine (newReadWriteEngine );
3394
- } finally {
3395
- if (newEngineReference .get () != newReadWriteEngine ) {
3396
- newReadWriteEngine .close (); // shard was closed
3397
- }
3398
- }
3399
- }
3373
+ IOUtils .close (currentEngineReference .getAndSet (readOnlyEngine ));
3374
+ newEngineReference .set (engineFactory .newReadWriteEngine (newEngineConfig (replicationTracker )));
3375
+ onNewEngine (newEngineReference .get ());
3400
3376
}
3401
3377
final Engine .TranslogRecoveryRunner translogRunner = (engine , snapshot ) -> runTranslogRecovery (
3402
3378
engine , snapshot , Engine .Operation .Origin .LOCAL_RESET , () -> {
3403
3379
// TODO: add a dedicate recovery stats for the reset translog
3404
3380
});
3405
3381
newEngineReference .get ().recoverFromTranslog (translogRunner , globalCheckpoint );
3406
3382
newEngineReference .get ().refresh ("reset_engine" );
3407
- synchronized (mutex ) {
3383
+ synchronized (engineMutex ) {
3408
3384
verifyNotClosed ();
3409
3385
IOUtils .close (currentEngineReference .getAndSet (newEngineReference .get ()));
3410
3386
// We set active because we are now writing operations to the engine; this way,
0 commit comments