22
22
import java .util .concurrent .atomic .AtomicInteger ;
23
23
import java .util .concurrent .atomic .AtomicReference ;
24
24
import java .util .concurrent .locks .Condition ;
25
- import java .util .concurrent .locks .LockSupport ;
26
25
import java .util .concurrent .locks .ReentrantLock ;
27
26
28
27
@@ -70,10 +69,12 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
70
69
@ Override
71
70
public void run () {
72
71
while (!Thread .currentThread ().isInterrupted ()) {
73
- if (state .compareAndSet (StateHelper .RECONNECT , 0 )) {
74
- reconnect (0 , thumbstone );
72
+ reconnect (0 , thumbstone );
73
+ try {
74
+ state .awaitReconnection ();
75
+ } catch (InterruptedException e ) {
76
+ Thread .currentThread ().interrupt ();
75
77
}
76
- LockSupport .park (state );
77
78
}
78
79
}
79
80
});
@@ -139,16 +140,13 @@ protected void reconnect(int retry, Throwable lastError) {
139
140
protected void connect (final SocketChannel channel ) throws Exception {
140
141
try {
141
142
TarantoolGreeting greeting = ProtoUtils .connect (channel ,
142
- config .username , config .password );
143
+ config .username , config .password );
143
144
this .serverVersion = greeting .getServerVersion ();
144
145
} catch (IOException e ) {
145
- try {
146
- channel .close ();
147
- } catch (IOException ignored ) {
148
- }
149
-
146
+ closeChannel (channel );
150
147
throw new CommunicationException ("Couldn't connect to tarantool" , e );
151
148
}
149
+
152
150
channel .configureBlocking (false );
153
151
this .channel = channel ;
154
152
this .readChannel = new ReadableViaSelectorChannel (channel );
@@ -164,42 +162,42 @@ protected void connect(final SocketChannel channel) throws Exception {
164
162
}
165
163
166
164
protected void startThreads (String threadName ) throws InterruptedException {
167
- final CountDownLatch init = new CountDownLatch (2 );
168
- reader = new Thread ( new Runnable () {
169
- @ Override
170
- public void run () {
171
- init . countDown ();
172
- if ( state . acquire ( StateHelper . READING )) {
173
- try {
174
- readThread ();
175
- } finally {
176
- state . release ( StateHelper . READING );
177
- if (state . compareAndSet ( 0 , StateHelper . RECONNECT ))
178
- LockSupport . unpark ( connector );
165
+ final CountDownLatch ioThreadStarted = new CountDownLatch (2 );
166
+ final AtomicInteger leftIoThreads = new AtomicInteger ( 2 );
167
+ reader = new Thread (() -> {
168
+ ioThreadStarted . countDown ();
169
+ if ( state . acquire ( StateHelper . READING )) {
170
+ try {
171
+ readThread ();
172
+ } finally {
173
+ state . release ( StateHelper . READING );
174
+ // only last of two IO-threads can signal for reconnection
175
+ if (leftIoThreads . decrementAndGet () == 0 ) {
176
+ state . trySignalForReconnection ( );
179
177
}
180
178
}
181
179
}
182
180
});
183
- writer = new Thread (new Runnable () {
184
- @ Override
185
- public void run () {
186
- init .countDown ();
187
- if (state .acquire (StateHelper .WRITING )) {
188
- try {
189
- writeThread ();
190
- } finally {
191
- state .release (StateHelper .WRITING );
192
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
193
- LockSupport .unpark (connector );
181
+ writer = new Thread (() -> {
182
+ ioThreadStarted .countDown ();
183
+ if (state .acquire (StateHelper .WRITING )) {
184
+ try {
185
+ writeThread ();
186
+ } finally {
187
+ state .release (StateHelper .WRITING );
188
+ // only last of two IO-threads can signal for reconnection
189
+ if (leftIoThreads .decrementAndGet () == 0 ) {
190
+ state .trySignalForReconnection ();
194
191
}
195
192
}
196
193
}
197
194
});
195
+ state .release (StateHelper .RECONNECT );
198
196
199
197
configureThreads (threadName );
200
198
reader .start ();
201
199
writer .start ();
202
- init .await ();
200
+ ioThreadStarted .await ();
203
201
}
204
202
205
203
protected void configureThreads (String threadName ) {
@@ -337,25 +335,21 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
337
335
}
338
336
339
337
protected void readThread () {
340
- try {
341
- while (!Thread .currentThread ().isInterrupted ()) {
342
- try {
343
- TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
338
+ while (!Thread .currentThread ().isInterrupted ()) {
339
+ try {
340
+ TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
344
341
345
- Map <Integer , Object > headers = packet .getHeaders ();
342
+ Map <Integer , Object > headers = packet .getHeaders ();
346
343
347
- Long syncId = (Long ) headers .get (Key .SYNC .getId ());
348
- TarantoolOp <?> future = futures .remove (syncId );
349
- stats .received ++;
350
- wait .decrementAndGet ();
351
- complete (packet , future );
352
- } catch (Exception e ) {
353
- die ("Cant read answer" , e );
354
- return ;
355
- }
344
+ Long syncId = (Long ) headers .get (Key .SYNC .getId ());
345
+ TarantoolOp <?> future = futures .remove (syncId );
346
+ stats .received ++;
347
+ wait .decrementAndGet ();
348
+ complete (packet , future );
349
+ } catch (Exception e ) {
350
+ die ("Cant read answer" , e );
351
+ return ;
356
352
}
357
- } catch (Exception e ) {
358
- die ("Cant init thread" , e );
359
353
}
360
354
}
361
355
@@ -498,7 +492,7 @@ public TarantoolClientOps<Integer, List<?>, Object, List<?>> syncOps() {
498
492
499
493
@ Override
500
494
public TarantoolClientOps <Integer , List <?>, Object , Future <List <?>>> asyncOps () {
501
- return (TarantoolClientOps )this ;
495
+ return (TarantoolClientOps ) this ;
502
496
}
503
497
504
498
@ Override
@@ -514,7 +508,7 @@ public TarantoolClientOps<Integer, List<?>, Object, Long> fireAndForgetOps() {
514
508
515
509
@ Override
516
510
public TarantoolSQLOps <Object , Long , List <Map <String , Object >>> sqlSyncOps () {
517
- return new TarantoolSQLOps <Object , Long , List <Map <String ,Object >>>() {
511
+ return new TarantoolSQLOps <Object , Long , List <Map <String , Object >>>() {
518
512
519
513
@ Override
520
514
public Long update (String sql , Object ... bind ) {
@@ -530,7 +524,7 @@ public List<Map<String, Object>> query(String sql, Object... bind) {
530
524
531
525
@ Override
532
526
public TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>> sqlAsyncOps () {
533
- return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String ,Object >>>>() {
527
+ return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>>() {
534
528
@ Override
535
529
public Future <Long > update (String sql , Object ... bind ) {
536
530
return (Future <Long >) exec (Code .EXECUTE , Key .SQL_TEXT , sql , Key .SQL_BIND , bind );
@@ -618,6 +612,7 @@ public TarantoolClientStats getStats() {
618
612
* Manages state changes.
619
613
*/
620
614
protected final class StateHelper {
615
+ static final int UNINITIALIZED = 0 ;
621
616
static final int READING = 1 ;
622
617
static final int WRITING = 2 ;
623
618
static final int ALIVE = READING | WRITING ;
@@ -627,10 +622,22 @@ protected final class StateHelper {
627
622
private final AtomicInteger state ;
628
623
629
624
private final AtomicReference <CountDownLatch > nextAliveLatch =
630
- new AtomicReference <CountDownLatch >(new CountDownLatch (1 ));
625
+ new AtomicReference <>(new CountDownLatch (1 ));
631
626
632
627
private final CountDownLatch closedLatch = new CountDownLatch (1 );
633
628
629
+ /**
630
+ * The condition variable to signal a reconnection is needed from reader /
631
+ * writer threads and waiting for that signal from the reconnection thread.
632
+ *
633
+ * The lock variable to access this condition.
634
+ *
635
+ * @see #awaitReconnection()
636
+ * @see #trySignalForReconnection()
637
+ */
638
+ protected final ReentrantLock connectorLock = new ReentrantLock ();
639
+ protected final Condition reconnectRequired = connectorLock .newCondition ();
640
+
634
641
protected StateHelper (int state ) {
635
642
this .state = new AtomicInteger (state );
636
643
}
@@ -639,35 +646,60 @@ protected int getState() {
639
646
return state .get ();
640
647
}
641
648
649
+ /**
650
+ * Set CLOSED state, drop RECONNECT state.
651
+ */
642
652
protected boolean close () {
643
- for (;; ) {
653
+ for (; ; ) {
644
654
int st = getState ();
655
+
656
+ /* CLOSED is the terminal state. */
645
657
if ((st & CLOSED ) == CLOSED )
646
658
return false ;
659
+
660
+ /* Drop RECONNECT, set CLOSED. */
647
661
if (compareAndSet (st , (st & ~RECONNECT ) | CLOSED ))
648
662
return true ;
649
663
}
650
664
}
651
665
666
+ /**
667
+ * Move from a current state to a give one.
668
+ *
669
+ * Some moves are forbidden.
670
+ */
652
671
protected boolean acquire (int mask ) {
653
- for (;;) {
654
- int st = getState ();
655
- if ((st & CLOSED ) == CLOSED )
672
+ for (; ; ) {
673
+ int currentState = getState ();
674
+
675
+ /* CLOSED is the terminal state. */
676
+ if ((currentState & CLOSED ) == CLOSED ) {
656
677
return false ;
678
+ }
679
+
680
+ /* Don't move to READING, WRITING or ALIVE from RECONNECT. */
681
+ if ((currentState & RECONNECT ) > mask ) {
682
+ return false ;
683
+ }
657
684
658
- if ((st & mask ) != 0 )
685
+ /* Cannot move from a state to the same state. */
686
+ if ((currentState & mask ) != 0 ) {
659
687
throw new IllegalStateException ("State is already " + mask );
688
+ }
660
689
661
- if (compareAndSet (st , st | mask ))
690
+ /* Set acquired state. */
691
+ if (compareAndSet (currentState , currentState | mask )) {
662
692
return true ;
693
+ }
663
694
}
664
695
}
665
696
666
697
protected void release (int mask ) {
667
- for (;; ) {
698
+ for (; ; ) {
668
699
int st = getState ();
669
- if (compareAndSet (st , st & ~mask ))
700
+ if (compareAndSet (st , st & ~mask )) {
670
701
return ;
702
+ }
671
703
}
672
704
}
673
705
@@ -686,10 +718,18 @@ protected boolean compareAndSet(int expect, int update) {
686
718
return true ;
687
719
}
688
720
721
+ /**
722
+ * Reconnection uses another way to await state via receiving a signal
723
+ * instead of latches.
724
+ */
689
725
protected void awaitState (int state ) throws InterruptedException {
690
- CountDownLatch latch = getStateLatch (state );
691
- if (latch != null ) {
692
- latch .await ();
726
+ if (state == RECONNECT ) {
727
+ awaitReconnection ();
728
+ } else {
729
+ CountDownLatch latch = getStateLatch (state );
730
+ if (latch != null ) {
731
+ latch .await ();
732
+ }
693
733
}
694
734
}
695
735
@@ -709,10 +749,42 @@ private CountDownLatch getStateLatch(int state) {
709
749
CountDownLatch latch = nextAliveLatch .get ();
710
750
/* It may happen so that an error is detected but the state is still alive.
711
751
Wait for the 'next' alive state in such cases. */
712
- return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
752
+ return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
713
753
}
714
754
return null ;
715
755
}
756
+
757
+ /**
758
+ * Blocks until a reconnection signal will be received.
759
+ *
760
+ * @see #trySignalForReconnection()
761
+ */
762
+ private void awaitReconnection () throws InterruptedException {
763
+ connectorLock .lock ();
764
+ try {
765
+ while (getState () != StateHelper .RECONNECT ) {
766
+ reconnectRequired .await ();
767
+ }
768
+ } finally {
769
+ connectorLock .unlock ();
770
+ }
771
+ }
772
+
773
+ /**
774
+ * Signals to the connector that reconnection process can be performed.
775
+ *
776
+ * @see #awaitReconnection()
777
+ */
778
+ private void trySignalForReconnection () {
779
+ if (compareAndSet (StateHelper .UNINITIALIZED , StateHelper .RECONNECT )) {
780
+ connectorLock .lock ();
781
+ try {
782
+ reconnectRequired .signal ();
783
+ } finally {
784
+ connectorLock .unlock ();
785
+ }
786
+ }
787
+ }
716
788
}
717
789
718
790
protected static class TarantoolOp <V > extends CompletableFuture <V > {
0 commit comments