21
21
22
22
import org .apache .logging .log4j .Logger ;
23
23
import org .apache .logging .log4j .message .ParameterizedMessage ;
24
+ import org .elasticsearch .Assertions ;
24
25
import org .elasticsearch .common .collect .Tuple ;
25
26
import org .elasticsearch .common .unit .TimeValue ;
26
27
import org .elasticsearch .common .util .concurrent .FutureUtils ;
@@ -150,6 +151,9 @@ synchronized void add(final long waitingForGlobalCheckpoint, final GlobalCheckpo
150
151
151
152
@ Override
152
153
public synchronized void close () throws IOException {
154
+ if (closed ) {
155
+ assert listeners .isEmpty () : listeners ;
156
+ }
153
157
closed = true ;
154
158
notifyListeners (UNASSIGNED_SEQ_NO , new IndexShardClosedException (shardId ));
155
159
}
@@ -188,8 +192,8 @@ synchronized void globalCheckpointUpdated(final long globalCheckpoint) {
188
192
}
189
193
190
194
private void notifyListeners (final long globalCheckpoint , final IndexShardClosedException e ) {
191
- assert Thread .holdsLock (this );
192
- assert (globalCheckpoint == UNASSIGNED_SEQ_NO && e != null ) || ( globalCheckpoint >= NO_OPS_PERFORMED && e == null );
195
+ assert Thread .holdsLock (this ) : Thread . currentThread () ;
196
+ assertNotification (globalCheckpoint , e );
193
197
194
198
final Map <GlobalCheckpointListener , Tuple <Long , ScheduledFuture <?>>> listenersToNotify ;
195
199
if (globalCheckpoint != UNASSIGNED_SEQ_NO ) {
@@ -219,6 +223,8 @@ private void notifyListeners(final long globalCheckpoint, final IndexShardClosed
219
223
}
220
224
221
225
private void notifyListener (final GlobalCheckpointListener listener , final long globalCheckpoint , final Exception e ) {
226
+ assertNotification (globalCheckpoint , e );
227
+
222
228
try {
223
229
listener .accept (globalCheckpoint , e );
224
230
} catch (final Exception caught ) {
@@ -231,10 +237,21 @@ private void notifyListener(final GlobalCheckpointListener listener, final long
231
237
} else if (e instanceof IndexShardClosedException ) {
232
238
logger .warn ("error notifying global checkpoint listener of closed shard" , caught );
233
239
} else {
234
- assert e instanceof TimeoutException : e ;
235
240
logger .warn ("error notifying global checkpoint listener of timeout" , caught );
236
241
}
237
242
}
238
243
}
239
244
245
+ private void assertNotification (final long globalCheckpoint , final Exception e ) {
246
+ if (Assertions .ENABLED ) {
247
+ assert globalCheckpoint >= UNASSIGNED_SEQ_NO : globalCheckpoint ;
248
+ if (globalCheckpoint != UNASSIGNED_SEQ_NO ) {
249
+ assert e == null : e ;
250
+ } else {
251
+ assert e != null ;
252
+ assert e instanceof IndexShardClosedException || e instanceof TimeoutException : e ;
253
+ }
254
+ }
255
+ }
256
+
240
257
}
0 commit comments