Skip to content

Commit 3c3c14f

Browse files
authored
Introduce GlobalCheckpointSyncer interface (#96345)
This commit replaces the bare `Consumer` and/or `Runnable` used to represent the global checkpoint sync action with a dedicated interface so that we can document it, and to ease a future change that will need to add more parameters to this interface.
1 parent e0852a5 commit 3c3c14f

File tree

15 files changed

+79
-35
lines changed

15 files changed

+79
-35
lines changed

server/src/internalClusterTest/java/org/elasticsearch/index/shard/IndexShardIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ public static final IndexShard newIndexShard(
647647
null,
648648
Collections.emptyList(),
649649
Arrays.asList(listeners),
650-
() -> {},
650+
IndexShardTestCase.NOOP_GCP_SYNCER,
651651
RetentionLeaseSyncer.EMPTY,
652652
cbs,
653653
IndexModule.DEFAULT_SNAPSHOT_COMMIT_SUPPLIER,

server/src/main/java/org/elasticsearch/index/IndexService.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.elasticsearch.index.query.SearchExecutionContext;
5656
import org.elasticsearch.index.query.SearchIndexNameMatcher;
5757
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
58+
import org.elasticsearch.index.shard.GlobalCheckpointSyncer;
5859
import org.elasticsearch.index.shard.IndexEventListener;
5960
import org.elasticsearch.index.shard.IndexShard;
6061
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -412,7 +413,7 @@ private long getAvgShardSizeInBytes() throws IOException {
412413

413414
public synchronized IndexShard createShard(
414415
final ShardRouting routing,
415-
final Consumer<ShardId> globalCheckpointSyncer,
416+
final GlobalCheckpointSyncer globalCheckpointSyncer,
416417
final RetentionLeaseSyncer retentionLeaseSyncer
417418
) throws IOException {
418419
Objects.requireNonNull(retentionLeaseSyncer);
@@ -520,7 +521,7 @@ public synchronized IndexShard createShard(
520521
engineWarmer,
521522
searchOperationListeners,
522523
indexingOperationListeners,
523-
() -> globalCheckpointSyncer.accept(shardId),
524+
globalCheckpointSyncer,
524525
retentionLeaseSyncer,
525526
circuitBreakerService,
526527
snapshotCommitSupplier,
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.index.shard;
10+
11+
public interface GlobalCheckpointSyncer {
12+
/**
13+
* Synchronize the global checkpoints across the replication group. This is used when indexing traffic stops and the primary's global
14+
* checkpoint reaches the max seqno, because in this state the replicas will have an older global checkpoint as carried by the earlier
15+
* indexing traffic, and may not receive any further updates without the explicit sync that this method triggers.
16+
* <p>
17+
* It's also used if {@link org.elasticsearch.index.translog.Translog.Durability#ASYNC} is selected, because in that case indexing
18+
* traffic does not advance the persisted global checkpoint.
19+
* <p>
20+
* In production this triggers a {@link org.elasticsearch.index.seqno.GlobalCheckpointSyncAction}.
21+
*
22+
* @param shardId The ID of the shard to synchronize.
23+
*/
24+
void syncGlobalCheckpoints(ShardId shardId);
25+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -231,11 +231,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
231231
final EngineFactory engineFactory;
232232

233233
private final IndexingOperationListener indexingOperationListeners;
234-
private final Runnable globalCheckpointSyncer;
235-
236-
Runnable getGlobalCheckpointSyncer() {
237-
return globalCheckpointSyncer;
238-
}
234+
private final GlobalCheckpointSyncer globalCheckpointSyncer;
239235

240236
private final RetentionLeaseSyncer retentionLeaseSyncer;
241237

@@ -307,7 +303,7 @@ public IndexShard(
307303
final Engine.Warmer warmer,
308304
final List<SearchOperationListener> searchOperationListener,
309305
final List<IndexingOperationListener> listeners,
310-
final Runnable globalCheckpointSyncer,
306+
final GlobalCheckpointSyncer globalCheckpointSyncer,
311307
final RetentionLeaseSyncer retentionLeaseSyncer,
312308
final CircuitBreakerService circuitBreakerService,
313309
final IndexStorePlugin.SnapshotCommitSupplier snapshotCommitSupplier,
@@ -2749,12 +2745,21 @@ public void maybeSyncGlobalCheckpoint(final String reason) {
27492745
|| trackedGlobalCheckpointsNeedSync;
27502746
// only sync if index is not closed and there is a shard lagging the primary
27512747
if (syncNeeded && indexSettings.getIndexMetadata().getState() == IndexMetadata.State.OPEN) {
2752-
logger.trace("syncing global checkpoint for [{}]", reason);
2753-
globalCheckpointSyncer.run();
2748+
syncGlobalCheckpoints(reason);
27542749
}
27552750
}
27562751
}
27572752

2753+
private void syncGlobalCheckpoints(String reason) {
2754+
logger.trace("syncing global checkpoint for [{}]", reason);
2755+
globalCheckpointSyncer.syncGlobalCheckpoints(shardId);
2756+
}
2757+
2758+
// exposed for tests
2759+
GlobalCheckpointSyncer getGlobalCheckpointSyncer() {
2760+
return globalCheckpointSyncer;
2761+
}
2762+
27582763
/**
27592764
* Returns the current replication group for the shard.
27602765
*

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@
108108
import org.elasticsearch.index.seqno.RetentionLeaseStats;
109109
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
110110
import org.elasticsearch.index.seqno.SeqNoStats;
111+
import org.elasticsearch.index.shard.GlobalCheckpointSyncer;
111112
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
112113
import org.elasticsearch.index.shard.IndexEventListener;
113114
import org.elasticsearch.index.shard.IndexShard;
@@ -848,7 +849,7 @@ public IndexShard createShard(
848849
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
849850
final RepositoriesService repositoriesService,
850851
final Consumer<IndexShard.ShardFailure> onShardFailure,
851-
final Consumer<ShardId> globalCheckpointSyncer,
852+
final GlobalCheckpointSyncer globalCheckpointSyncer,
852853
final RetentionLeaseSyncer retentionLeaseSyncer,
853854
final DiscoveryNode targetNode,
854855
final DiscoveryNode sourceNode

server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
5252
import org.elasticsearch.index.seqno.ReplicationTracker;
5353
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
54+
import org.elasticsearch.index.shard.GlobalCheckpointSyncer;
5455
import org.elasticsearch.index.shard.IndexEventListener;
5556
import org.elasticsearch.index.shard.IndexShard;
5657
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -1097,7 +1098,7 @@ T createShard(
10971098
PeerRecoveryTargetService.RecoveryListener recoveryListener,
10981099
RepositoriesService repositoriesService,
10991100
Consumer<IndexShard.ShardFailure> onShardFailure,
1100-
Consumer<ShardId> globalCheckpointSyncer,
1101+
GlobalCheckpointSyncer globalCheckpointSyncer,
11011102
RetentionLeaseSyncer retentionLeaseSyncer,
11021103
DiscoveryNode targetNode,
11031104
@Nullable DiscoveryNode sourceNode

server/src/test/java/org/elasticsearch/index/IndexModuleTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
6868
import org.elasticsearch.index.shard.IndexEventListener;
6969
import org.elasticsearch.index.shard.IndexShard;
70+
import org.elasticsearch.index.shard.IndexShardTestCase;
7071
import org.elasticsearch.index.shard.IndexingOperationListener;
7172
import org.elasticsearch.index.shard.SearchOperationListener;
7273
import org.elasticsearch.index.shard.ShardId;
@@ -691,7 +692,7 @@ public void onIndexCommitDelete(ShardId shardId, IndexCommit deletedCommit) {
691692
IndexService indexService = newIndexService(module);
692693
closeables.add(() -> indexService.close("close index service at end of test", false));
693694

694-
IndexShard indexShard = indexService.createShard(shardRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
695+
IndexShard indexShard = indexService.createShard(shardRouting, IndexShardTestCase.NOOP_GCP_SYNCER, RetentionLeaseSyncer.EMPTY);
695696
closeables.add(() -> indexShard.close("close shard at end of test", true));
696697
indexShard.markAsRecovering("test", new RecoveryState(shardRouting, TestDiscoveryNode.create("_node_id", "_node_id"), null));
697698

server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,7 @@ public void testGlobalCheckpointSync() throws IOException {
11851185
indexMetadata.build(),
11861186
null,
11871187
new InternalEngineFactory(),
1188-
() -> synced.set(true),
1188+
ignoredShardId -> synced.set(true),
11891189
RetentionLeaseSyncer.EMPTY
11901190
);
11911191
// add a replica
@@ -1254,7 +1254,7 @@ public void testClosedIndicesSkipSyncGlobalCheckpoint() throws Exception {
12541254
indexMetadata.build(),
12551255
null,
12561256
new InternalEngineFactory(),
1257-
() -> synced.set(true),
1257+
ignoredShardId -> synced.set(true),
12581258
RetentionLeaseSyncer.EMPTY
12591259
);
12601260
recoverShardFromStore(primaryShard);
@@ -1630,7 +1630,7 @@ public String[] listAll() throws IOException {
16301630
i -> store,
16311631
null,
16321632
new InternalEngineFactory(),
1633-
() -> {},
1633+
NOOP_GCP_SYNCER,
16341634
RetentionLeaseSyncer.EMPTY,
16351635
EMPTY_EVENT_LISTENER
16361636
);
@@ -2622,7 +2622,7 @@ public void testReaderWrapperIsUsed() throws IOException {
26222622
null,
26232623
wrapper,
26242624
new InternalEngineFactory(),
2625-
() -> {},
2625+
NOOP_GCP_SYNCER,
26262626
RetentionLeaseSyncer.EMPTY,
26272627
EMPTY_EVENT_LISTENER
26282628
);
@@ -2757,7 +2757,7 @@ public void testSearchIsReleaseIfWrapperFails() throws IOException {
27572757
null,
27582758
wrapper,
27592759
new InternalEngineFactory(),
2760-
() -> {},
2760+
NOOP_GCP_SYNCER,
27612761
RetentionLeaseSyncer.EMPTY,
27622762
EMPTY_EVENT_LISTENER
27632763
);
@@ -4538,7 +4538,7 @@ public void testShardExposesWriteLoadStats() throws Exception {
45384538
null,
45394539
null,
45404540
new InternalEngineFactory(),
4541-
() -> {},
4541+
NOOP_GCP_SYNCER,
45424542
RetentionLeaseSyncer.EMPTY,
45434543
EMPTY_EVENT_LISTENER,
45444544
fakeClock

server/src/test/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommandTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void setup() throws IOException {
170170
null,
171171
null,
172172
new InternalEngineFactory(),
173-
() -> {},
173+
NOOP_GCP_SYNCER,
174174
RetentionLeaseSyncer.EMPTY,
175175
EMPTY_EVENT_LISTENER
176176
),

server/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerSingleNodeTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public void afterIndexRemoved(Index index, IndexSettings indexSettings, IndexRem
117117
newRouting = newRouting.moveToUnassigned(unassignedInfo)
118118
.updateUnassigned(unassignedInfo, RecoverySource.EmptyStoreRecoverySource.INSTANCE);
119119
newRouting = ShardRoutingHelper.initialize(newRouting, nodeId);
120-
IndexShard shard = index.createShard(newRouting, s -> {}, RetentionLeaseSyncer.EMPTY);
120+
IndexShard shard = index.createShard(newRouting, IndexShardTestCase.NOOP_GCP_SYNCER, RetentionLeaseSyncer.EMPTY);
121121
IndexShardTestCase.updateRoutingEntry(shard, newRouting);
122122
assertEquals(5, counter.get());
123123
final DiscoveryNode localNode = TestDiscoveryNode.create("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet());

server/src/test/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.index.IndexService;
2424
import org.elasticsearch.index.IndexSettings;
2525
import org.elasticsearch.index.seqno.RetentionLeaseSyncer;
26+
import org.elasticsearch.index.shard.GlobalCheckpointSyncer;
2627
import org.elasticsearch.index.shard.IndexEventListener;
2728
import org.elasticsearch.index.shard.IndexShard;
2829
import org.elasticsearch.index.shard.IndexShardState;
@@ -234,7 +235,7 @@ public MockIndexShard createShard(
234235
final PeerRecoveryTargetService.RecoveryListener recoveryListener,
235236
final RepositoriesService repositoriesService,
236237
final Consumer<IndexShard.ShardFailure> onShardFailure,
237-
final Consumer<ShardId> globalCheckpointSyncer,
238+
final GlobalCheckpointSyncer globalCheckpointSyncer,
238239
final RetentionLeaseSyncer retentionLeaseSyncer,
239240
final DiscoveryNode targetNode,
240241
final DiscoveryNode sourceNode

server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException {
113113
null,
114114
null,
115115
new InternalEngineFactory(),
116-
() -> {},
116+
NOOP_GCP_SYNCER,
117117
RetentionLeaseSyncer.EMPTY,
118118
EMPTY_EVENT_LISTENER
119119
);

test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,14 @@ protected class ReplicationGroup implements AutoCloseable, Iterable<IndexShard>
203203

204204
protected ReplicationGroup(final IndexMetadata indexMetadata) throws IOException {
205205
final ShardRouting primaryRouting = this.createShardRouting("s0", true);
206-
primary = newShard(primaryRouting, indexMetadata, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer);
206+
primary = newShard(
207+
primaryRouting,
208+
indexMetadata,
209+
null,
210+
getEngineFactory(primaryRouting),
211+
NOOP_GCP_SYNCER,
212+
retentionLeaseSyncer
213+
);
207214
replicas = new CopyOnWriteArrayList<>();
208215
this.indexMetadata = indexMetadata;
209216
updateAllocationIDsOnPrimary();
@@ -324,7 +331,7 @@ public IndexShard addReplica() throws IOException {
324331
indexMetadata,
325332
null,
326333
getEngineFactory(replicaRouting),
327-
() -> {},
334+
NOOP_GCP_SYNCER,
328335
retentionLeaseSyncer
329336
);
330337
addReplica(replica);
@@ -363,7 +370,7 @@ public synchronized IndexShard addReplicaWithExistingPath(final ShardPath shardP
363370
null,
364371
null,
365372
getEngineFactory(shardRouting),
366-
() -> {},
373+
NOOP_GCP_SYNCER,
367374
retentionLeaseSyncer,
368375
EMPTY_EVENT_LISTENER
369376
);

test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
112112
public static final IndexEventListener EMPTY_EVENT_LISTENER = new IndexEventListener() {
113113
};
114114

115+
public static final GlobalCheckpointSyncer NOOP_GCP_SYNCER = shardId -> {};
116+
115117
private static final AtomicBoolean failOnShardFailures = new AtomicBoolean(true);
116118

117119
private static final Consumer<IndexShard.ShardFailure> DEFAULT_SHARD_FAILURE_HANDLER = failure -> {
@@ -265,7 +267,7 @@ protected IndexShard newShard(
265267
.settings(indexSettings)
266268
.primaryTerm(0, primaryTerm)
267269
.putMapping("{ \"properties\": {} }");
268-
return newShard(shardRouting, metadata.build(), null, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, listeners);
270+
return newShard(shardRouting, metadata.build(), null, engineFactory, NOOP_GCP_SYNCER, RetentionLeaseSyncer.EMPTY, listeners);
269271
}
270272

271273
/**
@@ -302,7 +304,7 @@ protected IndexShard newShard(
302304
IndexMetadata indexMetadata,
303305
@Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper
304306
) throws IOException {
305-
return newShard(shardId, primary, nodeId, indexMetadata, readerWrapper, () -> {});
307+
return newShard(shardId, primary, nodeId, indexMetadata, readerWrapper, NOOP_GCP_SYNCER);
306308
}
307309

308310
/**
@@ -319,7 +321,7 @@ protected IndexShard newShard(
319321
String nodeId,
320322
IndexMetadata indexMetadata,
321323
@Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> readerWrapper,
322-
Runnable globalCheckpointSyncer
324+
GlobalCheckpointSyncer globalCheckpointSyncer
323325
) throws IOException {
324326
ShardRouting shardRouting = TestShardRouting.newShardRouting(
325327
shardId,
@@ -353,7 +355,7 @@ protected IndexShard newShard(
353355
EngineFactory engineFactory,
354356
IndexingOperationListener... listeners
355357
) throws IOException {
356-
return newShard(routing, indexMetadata, indexReaderWrapper, engineFactory, () -> {}, RetentionLeaseSyncer.EMPTY, listeners);
358+
return newShard(routing, indexMetadata, indexReaderWrapper, engineFactory, NOOP_GCP_SYNCER, RetentionLeaseSyncer.EMPTY, listeners);
357359
}
358360

359361
/**
@@ -370,7 +372,7 @@ protected IndexShard newShard(
370372
IndexMetadata indexMetadata,
371373
@Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
372374
@Nullable EngineFactory engineFactory,
373-
Runnable globalCheckpointSyncer,
375+
GlobalCheckpointSyncer globalCheckpointSyncer,
374376
RetentionLeaseSyncer retentionLeaseSyncer,
375377
IndexingOperationListener... listeners
376378
) throws IOException {
@@ -410,7 +412,7 @@ protected IndexShard newShard(
410412
@Nullable CheckedFunction<IndexSettings, Store, IOException> storeProvider,
411413
@Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
412414
@Nullable EngineFactory engineFactory,
413-
Runnable globalCheckpointSyncer,
415+
GlobalCheckpointSyncer globalCheckpointSyncer,
414416
RetentionLeaseSyncer retentionLeaseSyncer,
415417
IndexEventListener indexEventListener,
416418
IndexingOperationListener... listeners
@@ -449,7 +451,7 @@ protected IndexShard newShard(
449451
@Nullable CheckedFunction<IndexSettings, Store, IOException> storeProvider,
450452
@Nullable CheckedFunction<DirectoryReader, DirectoryReader, IOException> indexReaderWrapper,
451453
@Nullable EngineFactory engineFactory,
452-
Runnable globalCheckpointSyncer,
454+
GlobalCheckpointSyncer globalCheckpointSyncer,
453455
RetentionLeaseSyncer retentionLeaseSyncer,
454456
IndexEventListener indexEventListener,
455457
LongSupplier relativeTimeSupplier,

x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/sourceonly/SourceOnlySnapshotShardTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ public void onFailure(Exception e) {
370370
metadata,
371371
null,
372372
SourceOnlySnapshotRepository.getEngineFactory(),
373-
() -> {},
373+
NOOP_GCP_SYNCER,
374374
RetentionLeaseSyncer.EMPTY
375375
);
376376
DiscoveryNode discoveryNode = TestDiscoveryNode.create("node_g");

0 commit comments

Comments
 (0)