22
22
import java .util .concurrent .atomic .AtomicInteger ;
23
23
import java .util .concurrent .atomic .AtomicReference ;
24
24
import java .util .concurrent .locks .Condition ;
25
- import java .util .concurrent .locks .LockSupport ;
26
25
import java .util .concurrent .locks .ReentrantLock ;
27
26
28
27
@@ -63,17 +62,19 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
63
62
*/
64
63
protected TarantoolClientStats stats ;
65
64
protected StateHelper state = new StateHelper (StateHelper .RECONNECT );
66
- protected Thread reader ;
67
- protected Thread writer ;
65
+ protected volatile Thread reader ;
66
+ protected volatile Thread writer ;
68
67
69
68
protected Thread connector = new Thread (new Runnable () {
70
69
@ Override
71
70
public void run () {
72
71
while (!Thread .currentThread ().isInterrupted ()) {
73
- if (state .compareAndSet (StateHelper .RECONNECT , 0 )) {
74
- reconnect (0 , thumbstone );
72
+ reconnect (0 , thumbstone );
73
+ try {
74
+ state .awaitReconnection ();
75
+ } catch (InterruptedException e ) {
76
+ Thread .currentThread ().interrupt ();
75
77
}
76
- LockSupport .park (state );
77
78
}
78
79
}
79
80
});
@@ -139,16 +140,13 @@ protected void reconnect(int retry, Throwable lastError) {
139
140
protected void connect (final SocketChannel channel ) throws Exception {
140
141
try {
141
142
TarantoolGreeting greeting = ProtoUtils .connect (channel ,
142
- config .username , config .password );
143
+ config .username , config .password );
143
144
this .serverVersion = greeting .getServerVersion ();
144
145
} catch (IOException e ) {
145
- try {
146
- channel .close ();
147
- } catch (IOException ignored ) {
148
- }
149
-
146
+ closeChannel (channel );
150
147
throw new CommunicationException ("Couldn't connect to tarantool" , e );
151
148
}
149
+
152
150
channel .configureBlocking (false );
153
151
this .channel = channel ;
154
152
this .readChannel = new ReadableViaSelectorChannel (channel );
@@ -174,8 +172,16 @@ public void run() {
174
172
readThread ();
175
173
} finally {
176
174
state .release (StateHelper .READING );
177
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
178
- LockSupport .unpark (connector );
175
+ // there're two cases when a read thread is here
176
+ // 1. it's a new generation thread inside/outside
177
+ // a reconnection process (currentThread == reader)
178
+ // 2. It's an old generation thread inside
179
+ // a reconnection process (currentThread != reader)
180
+ // Skip the old gen. attempt to reconnect
181
+ if (state .getState () == StateHelper .UNINITIALIZED
182
+ && Thread .currentThread () == reader ) {
183
+ state .trySignalForReconnection ();
184
+ }
179
185
}
180
186
}
181
187
}
@@ -189,13 +195,28 @@ public void run() {
189
195
writeThread ();
190
196
} finally {
191
197
state .release (StateHelper .WRITING );
192
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
193
- LockSupport .unpark (connector );
198
+ // there're two cases when a write thread is here
199
+ // 1. it's a new generation thread inside/outside
200
+ // a reconnection process (currentThread == writer)
201
+ // 2. It's an old generation thread inside
202
+ // a reconnection process (currentThread != writer)
203
+ // Skip the old gen. attempt to reconnect
204
+ if (state .getState () == StateHelper .UNINITIALIZED
205
+ && Thread .currentThread () == writer ) {
206
+ state .trySignalForReconnection ();
207
+ }
194
208
}
195
209
}
196
210
}
197
211
});
198
212
213
+ // reconnection preparation is done
214
+ // before reconnection state will be released
215
+ // reader/writer threads have been replaced by new ones
216
+ // it's required to be sure that old r/w threads
217
+ // won't affect new r/w threads.
218
+ state .release (StateHelper .RECONNECT );
219
+
199
220
configureThreads (threadName );
200
221
reader .start ();
201
222
writer .start ();
@@ -337,25 +358,21 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
337
358
}
338
359
339
360
protected void readThread () {
340
- try {
341
- while (!Thread .currentThread ().isInterrupted ()) {
342
- try {
343
- TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
361
+ while (!Thread .currentThread ().isInterrupted ()) {
362
+ try {
363
+ TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
344
364
345
- Map <Integer , Object > headers = packet .getHeaders ();
365
+ Map <Integer , Object > headers = packet .getHeaders ();
346
366
347
- Long syncId = (Long ) headers .get (Key .SYNC .getId ());
348
- TarantoolOp <?> future = futures .remove (syncId );
349
- stats .received ++;
350
- wait .decrementAndGet ();
351
- complete (packet , future );
352
- } catch (Exception e ) {
353
- die ("Cant read answer" , e );
354
- return ;
355
- }
367
+ Long syncId = (Long ) headers .get (Key .SYNC .getId ());
368
+ TarantoolOp <?> future = futures .remove (syncId );
369
+ stats .received ++;
370
+ wait .decrementAndGet ();
371
+ complete (packet , future );
372
+ } catch (Exception e ) {
373
+ die ("Cant read answer" , e );
374
+ return ;
356
375
}
357
- } catch (Exception e ) {
358
- die ("Cant init thread" , e );
359
376
}
360
377
}
361
378
@@ -498,7 +515,7 @@ public TarantoolClientOps<Integer, List<?>, Object, List<?>> syncOps() {
498
515
499
516
@ Override
500
517
public TarantoolClientOps <Integer , List <?>, Object , Future <List <?>>> asyncOps () {
501
- return (TarantoolClientOps )this ;
518
+ return (TarantoolClientOps ) this ;
502
519
}
503
520
504
521
@ Override
@@ -514,7 +531,7 @@ public TarantoolClientOps<Integer, List<?>, Object, Long> fireAndForgetOps() {
514
531
515
532
@ Override
516
533
public TarantoolSQLOps <Object , Long , List <Map <String , Object >>> sqlSyncOps () {
517
- return new TarantoolSQLOps <Object , Long , List <Map <String ,Object >>>() {
534
+ return new TarantoolSQLOps <Object , Long , List <Map <String , Object >>>() {
518
535
519
536
@ Override
520
537
public Long update (String sql , Object ... bind ) {
@@ -530,7 +547,7 @@ public List<Map<String, Object>> query(String sql, Object... bind) {
530
547
531
548
@ Override
532
549
public TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>> sqlAsyncOps () {
533
- return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String ,Object >>>>() {
550
+ return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>>() {
534
551
@ Override
535
552
public Future <Long > update (String sql , Object ... bind ) {
536
553
return (Future <Long >) exec (Code .EXECUTE , Key .SQL_TEXT , sql , Key .SQL_BIND , bind );
@@ -618,6 +635,7 @@ public TarantoolClientStats getStats() {
618
635
* Manages state changes.
619
636
*/
620
637
protected final class StateHelper {
638
+ static final int UNINITIALIZED = 0 ;
621
639
static final int READING = 1 ;
622
640
static final int WRITING = 2 ;
623
641
static final int ALIVE = READING | WRITING ;
@@ -627,10 +645,22 @@ protected final class StateHelper {
627
645
private final AtomicInteger state ;
628
646
629
647
private final AtomicReference <CountDownLatch > nextAliveLatch =
630
- new AtomicReference <CountDownLatch >(new CountDownLatch (1 ));
648
+ new AtomicReference <>(new CountDownLatch (1 ));
631
649
632
650
private final CountDownLatch closedLatch = new CountDownLatch (1 );
633
651
652
+ /**
653
+ * The condition variable to signal a reconnection is needed from reader /
654
+ * writer threads and waiting for that signal from the reconnection thread.
655
+ *
656
+ * The lock variable to access this condition.
657
+ *
658
+ * @see #awaitReconnection()
659
+ * @see #trySignalForReconnection()
660
+ */
661
+ protected final ReentrantLock connectorLock = new ReentrantLock ();
662
+ protected final Condition reconnectRequired = connectorLock .newCondition ();
663
+
634
664
protected StateHelper (int state ) {
635
665
this .state = new AtomicInteger (state );
636
666
}
@@ -639,35 +669,60 @@ protected int getState() {
639
669
return state .get ();
640
670
}
641
671
672
+ /**
673
+ * Set CLOSED state, drop RECONNECT state.
674
+ */
642
675
protected boolean close () {
643
- for (;; ) {
676
+ for (; ; ) {
644
677
int st = getState ();
678
+
679
+ /* CLOSED is the terminal state. */
645
680
if ((st & CLOSED ) == CLOSED )
646
681
return false ;
682
+
683
+ /* Drop RECONNECT, set CLOSED. */
647
684
if (compareAndSet (st , (st & ~RECONNECT ) | CLOSED ))
648
685
return true ;
649
686
}
650
687
}
651
688
689
+ /**
690
+ * Move from a current state to a give one.
691
+ *
692
+ * Some moves are forbidden.
693
+ */
652
694
protected boolean acquire (int mask ) {
653
- for (;;) {
654
- int st = getState ();
655
- if ((st & CLOSED ) == CLOSED )
695
+ for (; ; ) {
696
+ int currentState = getState ();
697
+
698
+ /* CLOSED is the terminal state. */
699
+ if ((currentState & CLOSED ) == CLOSED ) {
700
+ return false ;
701
+ }
702
+
703
+ /* Don't move to READING, WRITING or ALIVE from RECONNECT. */
704
+ if ((currentState & RECONNECT ) > mask ) {
656
705
return false ;
706
+ }
657
707
658
- if ((st & mask ) != 0 )
708
+ /* Cannot move from a state to the same state. */
709
+ if ((currentState & mask ) != 0 ) {
659
710
throw new IllegalStateException ("State is already " + mask );
711
+ }
660
712
661
- if (compareAndSet (st , st | mask ))
713
+ /* Set acquired state. */
714
+ if (compareAndSet (currentState , currentState | mask )) {
662
715
return true ;
716
+ }
663
717
}
664
718
}
665
719
666
720
protected void release (int mask ) {
667
- for (;; ) {
721
+ for (; ; ) {
668
722
int st = getState ();
669
- if (compareAndSet (st , st & ~mask ))
723
+ if (compareAndSet (st , st & ~mask )) {
670
724
return ;
725
+ }
671
726
}
672
727
}
673
728
@@ -686,10 +741,18 @@ protected boolean compareAndSet(int expect, int update) {
686
741
return true ;
687
742
}
688
743
744
+ /**
745
+ * Reconnection uses another way to await state via receiving a signal
746
+ * instead of latches.
747
+ */
689
748
protected void awaitState (int state ) throws InterruptedException {
690
- CountDownLatch latch = getStateLatch (state );
691
- if (latch != null ) {
692
- latch .await ();
749
+ if (state == RECONNECT ) {
750
+ awaitReconnection ();
751
+ } else {
752
+ CountDownLatch latch = getStateLatch (state );
753
+ if (latch != null ) {
754
+ latch .await ();
755
+ }
693
756
}
694
757
}
695
758
@@ -709,10 +772,42 @@ private CountDownLatch getStateLatch(int state) {
709
772
CountDownLatch latch = nextAliveLatch .get ();
710
773
/* It may happen so that an error is detected but the state is still alive.
711
774
Wait for the 'next' alive state in such cases. */
712
- return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
775
+ return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
713
776
}
714
777
return null ;
715
778
}
779
+
780
+ /**
781
+ * Blocks until a reconnection signal will be received.
782
+ *
783
+ * @see #trySignalForReconnection()
784
+ */
785
+ private void awaitReconnection () throws InterruptedException {
786
+ connectorLock .lock ();
787
+ try {
788
+ while (getState () != StateHelper .RECONNECT ) {
789
+ reconnectRequired .await ();
790
+ }
791
+ } finally {
792
+ connectorLock .unlock ();
793
+ }
794
+ }
795
+
796
+ /**
797
+ * Signals to the connector that reconnection process can be performed.
798
+ *
799
+ * @see #awaitReconnection()
800
+ */
801
+ private void trySignalForReconnection () {
802
+ if (compareAndSet (StateHelper .UNINITIALIZED , StateHelper .RECONNECT )) {
803
+ connectorLock .lock ();
804
+ try {
805
+ reconnectRequired .signal ();
806
+ } finally {
807
+ connectorLock .unlock ();
808
+ }
809
+ }
810
+ }
716
811
}
717
812
718
813
protected static class TarantoolOp <V > extends CompletableFuture <V > {
0 commit comments