Skip to content

Commit 3b047ae

Browse files
committed
Introduce global checkpoint listeners (#32696)
This commit introduces the ability for global checkpoint listeners to be registered at the shard level. These listeners are notified when the global checkpoint is updated, and also when the shard closes. To encapsulate these listeners, we introduce a shard-level component that handles synchronization of notification and modifications to the collection of listeners.
1 parent da9e984 commit 3b047ae

File tree

4 files changed

+658
-4
lines changed

4 files changed

+658
-4
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.shard;
21+
22+
import org.apache.logging.log4j.Logger;
23+
import org.apache.logging.log4j.message.ParameterizedMessage;
24+
25+
import java.io.Closeable;
26+
import java.io.IOException;
27+
import java.util.ArrayList;
28+
import java.util.List;
29+
import java.util.Objects;
30+
import java.util.concurrent.Executor;
31+
32+
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
33+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
34+
35+
/**
36+
* Represents a collection of global checkpoint listeners. This collection can be added to, and all listeners present at the time of an
37+
* update will be notified together. All listeners will be notified when the shard is closed.
38+
*/
39+
public class GlobalCheckpointListeners implements Closeable {
40+
41+
/**
42+
* A global checkpoint listener consisting of a callback that is notified when the global checkpoint is updated or the shard is closed.
43+
*/
44+
@FunctionalInterface
45+
public interface GlobalCheckpointListener {
46+
/**
47+
* Callback when the global checkpoint is updated or the shard is closed. If the shard is closed, the value of the global checkpoint
48+
* will be set to {@link org.elasticsearch.index.seqno.SequenceNumbers#UNASSIGNED_SEQ_NO} and the exception will be non-null. If the
49+
* global checkpoint is updated, the exception will be null.
50+
*
51+
* @param globalCheckpoint the updated global checkpoint
52+
* @param e if non-null, the shard is closed
53+
*/
54+
void accept(long globalCheckpoint, IndexShardClosedException e);
55+
}
56+
57+
// guarded by this
58+
private boolean closed;
59+
private volatile List<GlobalCheckpointListener> listeners;
60+
private long lastKnownGlobalCheckpoint = UNASSIGNED_SEQ_NO;
61+
62+
private final ShardId shardId;
63+
private final Executor executor;
64+
private final Logger logger;
65+
66+
/**
67+
* Construct a global checkpoint listeners collection.
68+
*
69+
* @param shardId the shard ID on which global checkpoint updates can be listened to
70+
* @param executor the executor for listener notifications
71+
* @param logger a shard-level logger
72+
*/
73+
GlobalCheckpointListeners(
74+
final ShardId shardId,
75+
final Executor executor,
76+
final Logger logger) {
77+
this.shardId = Objects.requireNonNull(shardId);
78+
this.executor = Objects.requireNonNull(executor);
79+
this.logger = Objects.requireNonNull(logger);
80+
}
81+
82+
/**
83+
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
84+
* listener will be asynchronously notified on the executor used to construct this collection of global checkpoint listeners. If the
85+
* shard is closed then the listener will be asynchronously notified on the executor used to construct this collection of global
86+
* checkpoint listeners. The listener will only be notified of at most one event, either the global checkpoint is updated or the shard
87+
* is closed. A listener must re-register after one of these events to receive subsequent events.
88+
*
89+
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
90+
* @param listener the listener
91+
*/
92+
synchronized void add(final long currentGlobalCheckpoint, final GlobalCheckpointListener listener) {
93+
if (closed) {
94+
executor.execute(() -> notifyListener(listener, UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId)));
95+
return;
96+
}
97+
if (lastKnownGlobalCheckpoint > currentGlobalCheckpoint) {
98+
// notify directly
99+
executor.execute(() -> notifyListener(listener, lastKnownGlobalCheckpoint, null));
100+
return;
101+
} else {
102+
if (listeners == null) {
103+
listeners = new ArrayList<>();
104+
}
105+
listeners.add(listener);
106+
}
107+
}
108+
109+
@Override
110+
public synchronized void close() throws IOException {
111+
closed = true;
112+
notifyListeners(UNASSIGNED_SEQ_NO, new IndexShardClosedException(shardId));
113+
}
114+
115+
synchronized int pendingListeners() {
116+
return listeners == null ? 0 : listeners.size();
117+
}
118+
119+
/**
120+
* Invoke to notify all registered listeners of an updated global checkpoint.
121+
*
122+
* @param globalCheckpoint the updated global checkpoint
123+
*/
124+
synchronized void globalCheckpointUpdated(final long globalCheckpoint) {
125+
assert globalCheckpoint >= NO_OPS_PERFORMED;
126+
assert globalCheckpoint > lastKnownGlobalCheckpoint
127+
: "updated global checkpoint [" + globalCheckpoint + "]"
128+
+ " is not more than the last known global checkpoint [" + lastKnownGlobalCheckpoint + "]";
129+
lastKnownGlobalCheckpoint = globalCheckpoint;
130+
notifyListeners(globalCheckpoint, null);
131+
}
132+
133+
private void notifyListeners(final long globalCheckpoint, final IndexShardClosedException e) {
134+
assert Thread.holdsLock(this);
135+
assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null) || (globalCheckpoint >= NO_OPS_PERFORMED && e == null);
136+
if (listeners != null) {
137+
// capture the current listeners
138+
final List<GlobalCheckpointListener> currentListeners = listeners;
139+
listeners = null;
140+
if (currentListeners != null) {
141+
executor.execute(() -> {
142+
for (final GlobalCheckpointListener listener : currentListeners) {
143+
notifyListener(listener, globalCheckpoint, e);
144+
}
145+
});
146+
}
147+
}
148+
}
149+
150+
private void notifyListener(final GlobalCheckpointListener listener, final long globalCheckpoint, final IndexShardClosedException e) {
151+
try {
152+
listener.accept(globalCheckpoint, e);
153+
} catch (final Exception caught) {
154+
if (globalCheckpoint != UNASSIGNED_SEQ_NO) {
155+
logger.warn(
156+
new ParameterizedMessage(
157+
"error notifying global checkpoint listener of updated global checkpoint [{}]",
158+
globalCheckpoint),
159+
caught);
160+
} else {
161+
logger.warn("error notifying global checkpoint listener of closed shard", caught);
162+
}
163+
}
164+
}
165+
166+
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,8 @@
162162
import java.util.stream.StreamSupport;
163163

164164
import static org.elasticsearch.index.mapper.SourceToParse.source;
165+
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
166+
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
165167

166168
public class IndexShard extends AbstractIndexShardComponent implements IndicesClusterStateService.Shard {
167169

@@ -190,6 +192,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
190192

191193
private final SearchOperationListener searchOperationListener;
192194

195+
private final GlobalCheckpointListeners globalCheckpointListeners;
193196
private final ReplicationTracker replicationTracker;
194197

195198
protected volatile ShardRouting shardRouting;
@@ -296,8 +299,10 @@ public IndexShard(
296299
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
297300
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
298301
final String aId = shardRouting.allocationId().getId();
302+
this.globalCheckpointListeners = new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), logger);
299303
this.replicationTracker =
300-
new ReplicationTracker(shardId, aId, indexSettings, SequenceNumbers.UNASSIGNED_SEQ_NO, globalCheckpoint -> {});
304+
new ReplicationTracker(shardId, aId, indexSettings, UNASSIGNED_SEQ_NO, globalCheckpointListeners::globalCheckpointUpdated);
305+
301306
// the query cache is a node-level thing, however we want the most popular filters
302307
// to be computed on a per-shard basis
303308
if (IndexModule.INDEX_QUERY_CACHE_EVERYTHING_SETTING.get(settings)) {
@@ -1223,7 +1228,7 @@ public void close(String reason, boolean flushEngine) throws IOException {
12231228
} finally {
12241229
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
12251230
// Also closing refreshListeners to prevent us from accumulating any more listeners
1226-
IOUtils.close(engine, refreshListeners);
1231+
IOUtils.close(engine, globalCheckpointListeners, refreshListeners);
12271232
indexShardOperationPermits.close();
12281233
}
12291234
}
@@ -1764,6 +1769,19 @@ public void updateGlobalCheckpointForShard(final String allocationId, final long
17641769
replicationTracker.updateGlobalCheckpointForShard(allocationId, globalCheckpoint);
17651770
}
17661771

1772+
/**
1773+
* Add a global checkpoint listener. If the global checkpoint is above the current global checkpoint known to the listener then the
1774+
* listener will fire immediately on the calling thread.
1775+
*
1776+
* @param currentGlobalCheckpoint the current global checkpoint known to the listener
1777+
* @param listener the listener
1778+
*/
1779+
public void addGlobalCheckpointListener(
1780+
final long currentGlobalCheckpoint,
1781+
final GlobalCheckpointListeners.GlobalCheckpointListener listener) {
1782+
this.globalCheckpointListeners.add(currentGlobalCheckpoint, listener);
1783+
}
1784+
17671785
/**
17681786
* Waits for all operations up to the provided sequence number to complete.
17691787
*
@@ -2308,8 +2326,8 @@ public void acquireReplicaOperationPermit(final long opPrimaryTerm, final long g
23082326
updateGlobalCheckpointOnReplica(globalCheckpoint, "primary term transition");
23092327
final long currentGlobalCheckpoint = getGlobalCheckpoint();
23102328
final long localCheckpoint;
2311-
if (currentGlobalCheckpoint == SequenceNumbers.UNASSIGNED_SEQ_NO) {
2312-
localCheckpoint = SequenceNumbers.NO_OPS_PERFORMED;
2329+
if (currentGlobalCheckpoint == UNASSIGNED_SEQ_NO) {
2330+
localCheckpoint = NO_OPS_PERFORMED;
23132331
} else {
23142332
localCheckpoint = currentGlobalCheckpoint;
23152333
}

0 commit comments

Comments
 (0)