59
59
import java .util .Map ;
60
60
import java .util .Optional ;
61
61
import java .util .Set ;
62
+ import java .util .concurrent .atomic .AtomicReference ;
62
63
import java .util .function .BiConsumer ;
63
64
import java .util .function .Function ;
64
65
import java .util .function .LongSupplier ;
@@ -84,7 +85,7 @@ public class JoinHelper {
84
85
85
86
final Set <Tuple <DiscoveryNode , JoinRequest >> pendingOutgoingJoins = ConcurrentCollections .newConcurrentSet ();
86
87
87
- private volatile FailedJoinAttempt lastFailedJoinAttempt ;
88
+ private AtomicReference < FailedJoinAttempt > lastFailedJoinAttempt = new AtomicReference <>() ;
88
89
89
90
JoinHelper (Settings settings , AllocationService allocationService , MasterService masterService ,
90
91
TransportService transportService , LongSupplier currentTermSupplier , Supplier <ClusterState > currentStateSupplier ,
@@ -217,9 +218,10 @@ void logWarnWithTimestamp() {
217
218
218
219
219
220
void logLastFailedJoinAttempt () {
220
- FailedJoinAttempt attempt = lastFailedJoinAttempt ;
221
+ FailedJoinAttempt attempt = lastFailedJoinAttempt . get () ;
221
222
if (attempt != null ) {
222
223
attempt .logWarnWithTimestamp ();
224
+ lastFailedJoinAttempt .compareAndSet (attempt , null );
223
225
}
224
226
}
225
227
@@ -241,15 +243,15 @@ public Empty read(StreamInput in) {
241
243
public void handleResponse (Empty response ) {
242
244
pendingOutgoingJoins .remove (dedupKey );
243
245
logger .debug ("successfully joined {} with {}" , destination , joinRequest );
244
- lastFailedJoinAttempt = null ;
246
+ lastFailedJoinAttempt . set ( null ) ;
245
247
}
246
248
247
249
@ Override
248
250
public void handleException (TransportException exp ) {
249
251
pendingOutgoingJoins .remove (dedupKey );
250
252
FailedJoinAttempt attempt = new FailedJoinAttempt (destination , joinRequest , exp );
251
253
attempt .logNow ();
252
- lastFailedJoinAttempt = attempt ;
254
+ lastFailedJoinAttempt . set ( attempt ) ;
253
255
}
254
256
255
257
@ Override
0 commit comments