40
40
import org .elasticsearch .Assertions ;
41
41
import org .elasticsearch .ElasticsearchException ;
42
42
import org .elasticsearch .ExceptionsHelper ;
43
- import org .elasticsearch .Version ;
44
43
import org .elasticsearch .action .ActionListener ;
45
44
import org .elasticsearch .action .admin .indices .flush .FlushRequest ;
46
45
import org .elasticsearch .action .admin .indices .forcemerge .ForceMergeRequest ;
@@ -326,17 +325,15 @@ public IndexShard(
326
325
this .pendingPrimaryTerm = primaryTerm ;
327
326
this .globalCheckpointListeners =
328
327
new GlobalCheckpointListeners (shardId , threadPool .executor (ThreadPool .Names .LISTENER ), threadPool .scheduler (), logger );
329
- final ReplicationTracker replicationTracker =
330
- new ReplicationTracker (
331
- shardId ,
332
- aId ,
333
- indexSettings ,
334
- primaryTerm ,
335
- UNASSIGNED_SEQ_NO ,
336
- globalCheckpointListeners ::globalCheckpointUpdated ,
337
- threadPool ::absoluteTimeInMillis ,
338
- (retentionLeases , listener ) -> retentionLeaseSyncer .sync (shardId , retentionLeases , listener ));
339
- this .replicationTracker = replicationTracker ;
328
+ this .replicationTracker = new ReplicationTracker (
329
+ shardId ,
330
+ aId ,
331
+ indexSettings ,
332
+ primaryTerm ,
333
+ UNASSIGNED_SEQ_NO ,
334
+ globalCheckpointListeners ::globalCheckpointUpdated ,
335
+ threadPool ::absoluteTimeInMillis ,
336
+ (retentionLeases , listener ) -> retentionLeaseSyncer .sync (shardId , retentionLeases , listener ));
340
337
341
338
// the query cache is a node-level thing, however we want the most popular filters
342
339
// to be computed on a per-shard basis
@@ -443,16 +440,17 @@ public void updateShardState(final ShardRouting newRouting,
443
440
final ShardRouting currentRouting ;
444
441
synchronized (mutex ) {
445
442
currentRouting = this .shardRouting ;
443
+ assert currentRouting != null ;
446
444
447
445
if (!newRouting .shardId ().equals (shardId ())) {
448
446
throw new IllegalArgumentException ("Trying to set a routing entry with shardId " +
449
447
newRouting .shardId () + " on a shard with shardId " + shardId ());
450
448
}
451
- if (( currentRouting == null || newRouting .isSameAllocation (currentRouting ) ) == false ) {
449
+ if (newRouting .isSameAllocation (currentRouting ) == false ) {
452
450
throw new IllegalArgumentException ("Trying to set a routing entry with a different allocation. Current " +
453
451
currentRouting + ", new " + newRouting );
454
452
}
455
- if (currentRouting != null && currentRouting .primary () && newRouting .primary () == false ) {
453
+ if (currentRouting .primary () && newRouting .primary () == false ) {
456
454
throw new IllegalArgumentException ("illegal state: trying to move shard from primary mode to replica mode. Current "
457
455
+ currentRouting + ", new " + newRouting );
458
456
}
@@ -586,7 +584,7 @@ public void onFailure(Exception e) {
586
584
: "a started primary with non-pending operation term must be in primary mode " + this .shardRouting ;
587
585
shardStateUpdated .countDown ();
588
586
}
589
- if (currentRouting != null && currentRouting .active () == false && newRouting .active ()) {
587
+ if (currentRouting .active () == false && newRouting .active ()) {
590
588
indexEventListener .afterIndexShardStarted (this );
591
589
}
592
590
if (newRouting .equals (currentRouting ) == false ) {
@@ -631,8 +629,7 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
631
629
public void relocated (final String targetAllocationId , final Consumer <ReplicationTracker .PrimaryContext > consumer )
632
630
throws IllegalIndexShardStateException , IllegalStateException , InterruptedException {
633
631
assert shardRouting .primary () : "only primaries can be marked as relocated: " + shardRouting ;
634
- final Releasable forceRefreshes = refreshListeners .forceRefreshes ();
635
- try {
632
+ try (Releasable forceRefreshes = refreshListeners .forceRefreshes ()) {
636
633
indexShardOperationPermits .blockOperations (30 , TimeUnit .MINUTES , () -> {
637
634
forceRefreshes .close ();
638
635
// no shard operation permits are being held here, move state from started to relocated
@@ -665,8 +662,6 @@ public void relocated(final String targetAllocationId, final Consumer<Replicatio
665
662
// Fail primary relocation source and target shards.
666
663
failShard ("timed out waiting for relocation hand-off to complete" , null );
667
664
throw new IndexShardClosedException (shardId (), "timed out waiting for relocation hand-off to complete" );
668
- } finally {
669
- forceRefreshes .close ();
670
665
}
671
666
}
672
667
@@ -745,7 +740,7 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o
745
740
sourceWithResolvedType = new SourceToParse (sourceToParse .index (), resolvedType , sourceToParse .id (),
746
741
sourceToParse .source (), sourceToParse .getXContentType (), sourceToParse .routing ());
747
742
}
748
- operation = prepareIndex (docMapper (resolvedType ), indexSettings . getIndexVersionCreated (), sourceWithResolvedType ,
743
+ operation = prepareIndex (docMapper (resolvedType ), sourceWithResolvedType ,
749
744
seqNo , opPrimaryTerm , version , versionType , origin , autoGeneratedTimeStamp , isRetry , ifSeqNo , ifPrimaryTerm );
750
745
Mapping update = operation .parsedDoc ().dynamicMappingsUpdate ();
751
746
if (update != null ) {
@@ -763,7 +758,7 @@ private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long o
763
758
return index (engine , operation );
764
759
}
765
760
766
- public static Engine .Index prepareIndex (DocumentMapperForType docMapper , Version indexCreatedVersion , SourceToParse source , long seqNo ,
761
+ public static Engine .Index prepareIndex (DocumentMapperForType docMapper , SourceToParse source , long seqNo ,
767
762
long primaryTerm , long version , VersionType versionType , Engine .Operation .Origin origin ,
768
763
long autoGeneratedIdTimestamp , boolean isRetry ,
769
764
long ifSeqNo , long ifPrimaryTerm ) {
@@ -1529,7 +1524,7 @@ private void innerOpenEngineAndTranslog() throws IOException {
1529
1524
// time elapses after the engine is created above (pulling the config settings) until we set the engine reference, during
1530
1525
// which settings changes could possibly have happened, so here we forcefully push any config changes to the new engine.
1531
1526
onSettingsChanged ();
1532
- assertSequenceNumbersInCommit ();
1527
+ assert assertSequenceNumbersInCommit ();
1533
1528
assert recoveryState .getStage () == RecoveryState .Stage .TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState .getStage ();
1534
1529
}
1535
1530
@@ -1546,7 +1541,7 @@ private boolean assertSequenceNumbersInCommit() throws IOException {
1546
1541
return true ;
1547
1542
}
1548
1543
1549
- protected void onNewEngine (Engine newEngine ) {
1544
+ private void onNewEngine (Engine newEngine ) {
1550
1545
refreshListeners .setCurrentRefreshLocationSupplier (newEngine ::getTranslogLastWriteLocation );
1551
1546
}
1552
1547
@@ -1858,10 +1853,6 @@ public List<Segment> segments(boolean verbose) {
1858
1853
return getEngine ().segments (verbose );
1859
1854
}
1860
1855
1861
- public void flushAndCloseEngine () throws IOException {
1862
- getEngine ().flushAndClose ();
1863
- }
1864
-
1865
1856
public String getHistoryUUID () {
1866
1857
return getEngine ().getHistoryUUID ();
1867
1858
}
@@ -2876,7 +2867,7 @@ protected void write(List<Tuple<Translog.Location, Consumer<Exception>>> candida
2876
2867
}
2877
2868
}
2878
2869
};
2879
- };
2870
+ }
2880
2871
2881
2872
/**
2882
2873
* Syncs the given location with the underlying storage unless already synced. This method might return immediately without
@@ -2988,7 +2979,7 @@ private RefreshListeners buildRefreshListeners() {
2988
2979
return new RefreshListeners (
2989
2980
indexSettings ::getMaxRefreshListeners ,
2990
2981
() -> refresh ("too_many_listeners" ),
2991
- threadPool .executor (ThreadPool .Names .LISTENER ):: execute ,
2982
+ threadPool .executor (ThreadPool .Names .LISTENER ),
2992
2983
logger , threadPool .getThreadContext (),
2993
2984
externalRefreshMetric );
2994
2985
}
0 commit comments