21
21
import org .elasticsearch .core .TimeValue ;
22
22
import org .elasticsearch .threadpool .ThreadPool ;
23
23
24
+ import java .lang .invoke .MethodHandles ;
25
+ import java .lang .invoke .VarHandle ;
24
26
import java .util .concurrent .ExecutionException ;
25
27
import java .util .concurrent .Executor ;
26
- import java .util .concurrent .atomic .AtomicReference ;
27
28
28
29
/**
29
30
* An {@link ActionListener} to which other {@link ActionListener} instances can subscribe, such that when this listener is completed it
@@ -38,16 +39,18 @@ public class SubscribableListener<T> implements ActionListener<T> {
38
39
private static final Object EMPTY = new Object ();
39
40
40
41
/**
41
- * If we are incomplete, {@code ref } may refer to one of the following depending on how many waiting subscribers there are:
42
+ * If we are incomplete, {@code state } may be one of the following depending on how many waiting subscribers there are:
42
43
* <ul>
43
- * <li>If there are no subscribers yet, {@code ref} refers to {@link #EMPTY}.
44
- * <li>If there is one subscriber, {@code ref} refers to it directly .
45
- * <li>If there are more than one subscriber , {@code ref} refers to the head of a linked list of subscribers in reverse order of their
44
+ * <li>If there are no subscribers yet, {@code state} is {@link #EMPTY}.
45
+ * <li>If there is one subscriber, {@code state} is that subscriber .
46
+ * <li>If there are multiple subscribers , {@code state} is the head of a linked list of subscribers in reverse order of their
46
47
* subscriptions.
47
48
* </ul>
48
- * If we are complete, {@code ref} refers to a {@code Result<T>} which will be used to complete any subsequent subscribers.
49
+ * If we are complete, {@code state} is the {@code SuccessResult<T>} or {@code FailureResult} which will be used to complete any
50
+ * subsequent subscribers.
49
51
*/
50
- private final AtomicReference <Object > ref = new AtomicReference <>(EMPTY );
52
+ @ SuppressWarnings ("FieldMayBeFinal" ) // updated via VH_STATE_FIELD (and _only_ via VH_STATE_FIELD)
53
+ private volatile Object state = EMPTY ;
51
54
52
55
/**
53
56
* Add a listener to this listener's collection of subscribers. If this listener is complete, this method completes the subscribing
@@ -90,12 +93,12 @@ public final void addListener(ActionListener<T> listener) {
90
93
*/
91
94
@ SuppressWarnings ({ "rawtypes" })
92
95
public final void addListener (ActionListener <T > listener , Executor executor , @ Nullable ThreadContext threadContext ) {
93
- if (tryComplete (ref . get () , listener )) {
96
+ if (tryComplete (state , listener )) {
94
97
return ;
95
98
}
96
99
97
100
final ActionListener <T > wrappedListener = fork (executor , preserveContext (threadContext , listener ));
98
- Object currentValue = ref . compareAndExchange (EMPTY , wrappedListener );
101
+ Object currentValue = compareAndExchangeState (EMPTY , wrappedListener );
99
102
if (currentValue == EMPTY ) {
100
103
return ;
101
104
}
@@ -106,7 +109,7 @@ public final void addListener(ActionListener<T> listener, Executor executor, @Nu
106
109
}
107
110
if (currentValue instanceof ActionListener firstListener ) {
108
111
final Cell tail = new Cell (firstListener , null );
109
- currentValue = ref . compareAndExchange (firstListener , tail );
112
+ currentValue = compareAndExchangeState (firstListener , tail );
110
113
if (currentValue == firstListener ) {
111
114
currentValue = tail ;
112
115
}
@@ -118,7 +121,7 @@ public final void addListener(ActionListener<T> listener, Executor executor, @Nu
118
121
} else {
119
122
newCell .next = headCell ;
120
123
}
121
- currentValue = ref . compareAndExchange (headCell , newCell );
124
+ currentValue = compareAndExchangeState (headCell , newCell );
122
125
if (currentValue == headCell ) {
123
126
return ;
124
127
}
@@ -146,7 +149,7 @@ protected Exception wrapException(Exception exception) {
146
149
* @return {@code true} if and only if this listener has been completed (either successfully or exceptionally).
147
150
*/
148
151
public final boolean isDone () {
149
- return isDone (ref . get () );
152
+ return isDone (state );
150
153
}
151
154
152
155
/**
@@ -157,10 +160,10 @@ public final boolean isDone() {
157
160
*/
158
161
@ SuppressWarnings ({ "unchecked" , "rawtypes" })
159
162
protected final T rawResult () throws Exception {
160
- final Object refValue = ref . get () ;
161
- if (refValue instanceof SuccessResult result ) {
163
+ final Object currentState = state ;
164
+ if (currentState instanceof SuccessResult result ) {
162
165
return (T ) result .result ();
163
- } else if (refValue instanceof FailureResult result ) {
166
+ } else if (currentState instanceof FailureResult result ) {
164
167
throw result .exception ();
165
168
} else {
166
169
assert false : "not done" ;
@@ -185,40 +188,40 @@ private static <T> ActionListener<T> fork(Executor executor, ActionListener<T> l
185
188
}
186
189
187
190
@ SuppressWarnings ({ "unchecked" , "rawtypes" })
188
- private static <T > boolean tryComplete (Object refValue , ActionListener <T > listener ) {
189
- if (refValue instanceof SuccessResult successResult ) {
191
+ private static <T > boolean tryComplete (Object currentState , ActionListener <T > listener ) {
192
+ if (currentState instanceof SuccessResult successResult ) {
190
193
successResult .complete (listener );
191
194
return true ;
192
195
}
193
- if (refValue instanceof FailureResult failureResult ) {
196
+ if (currentState instanceof FailureResult failureResult ) {
194
197
failureResult .complete (listener );
195
198
return true ;
196
199
}
197
200
return false ;
198
201
}
199
202
200
203
/**
201
- * If incomplete, atomically update {@link #ref } with the given result and use it to complete any pending listeners.
204
+ * If incomplete, atomically update {@link #state } with the given result and use it to complete any pending listeners.
202
205
*/
203
206
@ SuppressWarnings ("unchecked" )
204
207
private void setResult (Object result ) {
205
208
assert isDone (result );
206
209
207
- Object currentValue = ref . get () ;
210
+ Object currentState = state ;
208
211
while (true ) {
209
- if (isDone (currentValue )) {
212
+ if (isDone (currentState )) {
210
213
// already complete - nothing to do
211
214
return ;
212
215
}
213
216
214
- final Object witness = ref . compareAndExchange ( currentValue , result );
215
- if (witness == currentValue ) {
217
+ final Object witness = compareAndExchangeState ( currentState , result );
218
+ if (witness == currentState ) {
216
219
// we won the race to complete the listener
217
- if (currentValue instanceof ActionListener <?> listener ) {
220
+ if (currentState instanceof ActionListener <?> listener ) {
218
221
// unique subscriber - complete it
219
222
boolean completed = tryComplete (result , listener );
220
223
assert completed ;
221
- } else if (currentValue instanceof Cell currCell ) {
224
+ } else if (currentState instanceof Cell currCell ) {
222
225
// multiple subscribers, but they are currently in reverse order of subscription so reverse them back
223
226
Cell prevCell = null ;
224
227
while (true ) {
@@ -237,18 +240,18 @@ private void setResult(Object result) {
237
240
currCell = currCell .next ;
238
241
}
239
242
} else {
240
- assert currentValue == EMPTY : "unexpected witness: " + currentValue ;
243
+ assert currentState == EMPTY : "unexpected witness: " + currentState ;
241
244
}
242
245
return ;
243
246
}
244
247
245
248
// we lost a race with another setResult or addListener call - retry
246
- currentValue = witness ;
249
+ currentState = witness ;
247
250
}
248
251
}
249
252
250
- private static boolean isDone (Object refValue ) {
251
- return refValue instanceof SubscribableListener . SuccessResult <?> || refValue instanceof SubscribableListener . FailureResult ;
253
+ private static boolean isDone (Object currentState ) {
254
+ return currentState instanceof SuccessResult <?> || currentState instanceof FailureResult ;
252
255
}
253
256
254
257
/**
@@ -322,4 +325,20 @@ private Runnable scheduleTimeout(TimeValue timeout, ThreadPool threadPool, Strin
322
325
return () -> {};
323
326
}
324
327
}
328
+
329
+ private static final VarHandle VH_STATE_FIELD ;
330
+
331
+ static {
332
+ try {
333
+ VH_STATE_FIELD = MethodHandles .lookup ()
334
+ .in (SubscribableListener .class )
335
+ .findVarHandle (SubscribableListener .class , "state" , Object .class );
336
+ } catch (NoSuchFieldException | IllegalAccessException e ) {
337
+ throw new RuntimeException (e );
338
+ }
339
+ }
340
+
341
+ private Object compareAndExchangeState (Object expectedValue , Object newValue ) {
342
+ return VH_STATE_FIELD .compareAndExchange (this , expectedValue , newValue );
343
+ }
325
344
}
0 commit comments