22
22
import java .util .concurrent .atomic .AtomicInteger ;
23
23
import java .util .concurrent .atomic .AtomicReference ;
24
24
import java .util .concurrent .locks .Condition ;
25
- import java .util .concurrent .locks .LockSupport ;
26
25
import java .util .concurrent .locks .ReentrantLock ;
27
26
28
27
@@ -70,10 +69,12 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
70
69
@ Override
71
70
public void run () {
72
71
while (!Thread .currentThread ().isInterrupted ()) {
73
- if (state .compareAndSet (StateHelper .RECONNECT , 0 )) {
74
- reconnect (0 , thumbstone );
72
+ reconnect (0 , thumbstone );
73
+ try {
74
+ state .awaitReconnection ();
75
+ } catch (InterruptedException e ) {
76
+ Thread .currentThread ().interrupt ();
75
77
}
76
- LockSupport .park (state );
77
78
}
78
79
}
79
80
});
@@ -139,16 +140,13 @@ protected void reconnect(int retry, Throwable lastError) {
139
140
protected void connect (final SocketChannel channel ) throws Exception {
140
141
try {
141
142
TarantoolGreeting greeting = ProtoUtils .connect (channel ,
142
- config .username , config .password );
143
+ config .username , config .password );
143
144
this .serverVersion = greeting .getServerVersion ();
144
145
} catch (IOException e ) {
145
- try {
146
- channel .close ();
147
- } catch (IOException ignored ) {
148
- }
149
-
146
+ closeChannel (channel );
150
147
throw new CommunicationException ("Couldn't connect to tarantool" , e );
151
148
}
149
+
152
150
channel .configureBlocking (false );
153
151
this .channel = channel ;
154
152
this .readChannel = new ReadableViaSelectorChannel (channel );
@@ -165,6 +163,7 @@ protected void connect(final SocketChannel channel) throws Exception {
165
163
166
164
protected void startThreads (String threadName ) throws InterruptedException {
167
165
final CountDownLatch init = new CountDownLatch (2 );
166
+ final AtomicInteger generationSync = new AtomicInteger (2 );
168
167
reader = new Thread (new Runnable () {
169
168
@ Override
170
169
public void run () {
@@ -174,8 +173,11 @@ public void run() {
174
173
readThread ();
175
174
} finally {
176
175
state .release (StateHelper .READING );
177
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
178
- LockSupport .unpark (connector );
176
+ // avoid a case when this thread falls asleep here
177
+ // after READING flag released and then can pollute the state
178
+ if (generationSync .decrementAndGet () == 0 ) {
179
+ state .trySignalForReconnection ();
180
+ }
179
181
}
180
182
}
181
183
}
@@ -189,13 +191,23 @@ public void run() {
189
191
writeThread ();
190
192
} finally {
191
193
state .release (StateHelper .WRITING );
192
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
193
- LockSupport .unpark (connector );
194
+ // avoid a case when this thread falls asleep here
195
+ // after WRITING flag released and then can pollute the state
196
+ if (generationSync .decrementAndGet () == 0 ) {
197
+ state .trySignalForReconnection ();
198
+ }
194
199
}
195
200
}
196
201
}
197
202
});
198
203
204
+ // reconnection preparation is done
205
+ // before reconnection the state will be released
206
+ // reader/writer threads have been replaced by new ones
207
+ // it's required to be sure that old r/w threads see correct
208
+ // client's r/w references
209
+ state .release (StateHelper .RECONNECT );
210
+
199
211
configureThreads (threadName );
200
212
reader .start ();
201
213
writer .start ();
@@ -337,25 +349,21 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
337
349
}
338
350
339
351
protected void readThread () {
340
- try {
341
- while (!Thread .currentThread ().isInterrupted ()) {
342
- try {
343
- TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
352
+ while (!Thread .currentThread ().isInterrupted ()) {
353
+ try {
354
+ TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
344
355
345
- Map <Integer , Object > headers = packet .getHeaders ();
356
+ Map <Integer , Object > headers = packet .getHeaders ();
346
357
347
- Long syncId = (Long ) headers .get (Key .SYNC .getId ());
348
- TarantoolOp <?> future = futures .remove (syncId );
349
- stats .received ++;
350
- wait .decrementAndGet ();
351
- complete (packet , future );
352
- } catch (Exception e ) {
353
- die ("Cant read answer" , e );
354
- return ;
355
- }
358
+ Long syncId = (Long ) headers .get (Key .SYNC .getId ());
359
+ TarantoolOp <?> future = futures .remove (syncId );
360
+ stats .received ++;
361
+ wait .decrementAndGet ();
362
+ complete (packet , future );
363
+ } catch (Exception e ) {
364
+ die ("Cant read answer" , e );
365
+ return ;
356
366
}
357
- } catch (Exception e ) {
358
- die ("Cant init thread" , e );
359
367
}
360
368
}
361
369
@@ -498,7 +506,7 @@ public TarantoolClientOps<Integer, List<?>, Object, List<?>> syncOps() {
498
506
499
507
@ Override
500
508
public TarantoolClientOps <Integer , List <?>, Object , Future <List <?>>> asyncOps () {
501
- return (TarantoolClientOps )this ;
509
+ return (TarantoolClientOps ) this ;
502
510
}
503
511
504
512
@ Override
@@ -514,7 +522,7 @@ public TarantoolClientOps<Integer, List<?>, Object, Long> fireAndForgetOps() {
514
522
515
523
@ Override
516
524
public TarantoolSQLOps <Object , Long , List <Map <String , Object >>> sqlSyncOps () {
517
- return new TarantoolSQLOps <Object , Long , List <Map <String ,Object >>>() {
525
+ return new TarantoolSQLOps <Object , Long , List <Map <String , Object >>>() {
518
526
519
527
@ Override
520
528
public Long update (String sql , Object ... bind ) {
@@ -530,7 +538,7 @@ public List<Map<String, Object>> query(String sql, Object... bind) {
530
538
531
539
@ Override
532
540
public TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>> sqlAsyncOps () {
533
- return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String ,Object >>>>() {
541
+ return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>>() {
534
542
@ Override
535
543
public Future <Long > update (String sql , Object ... bind ) {
536
544
return (Future <Long >) exec (Code .EXECUTE , Key .SQL_TEXT , sql , Key .SQL_BIND , bind );
@@ -618,6 +626,7 @@ public TarantoolClientStats getStats() {
618
626
* Manages state changes.
619
627
*/
620
628
protected final class StateHelper {
629
+ static final int UNINITIALIZED = 0 ;
621
630
static final int READING = 1 ;
622
631
static final int WRITING = 2 ;
623
632
static final int ALIVE = READING | WRITING ;
@@ -627,10 +636,22 @@ protected final class StateHelper {
627
636
private final AtomicInteger state ;
628
637
629
638
private final AtomicReference <CountDownLatch > nextAliveLatch =
630
- new AtomicReference <CountDownLatch >(new CountDownLatch (1 ));
639
+ new AtomicReference <>(new CountDownLatch (1 ));
631
640
632
641
private final CountDownLatch closedLatch = new CountDownLatch (1 );
633
642
643
+ /**
644
+ * The condition variable to signal a reconnection is needed from reader /
645
+ * writer threads and waiting for that signal from the reconnection thread.
646
+ *
647
+ * The lock variable to access this condition.
648
+ *
649
+ * @see #awaitReconnection()
650
+ * @see #trySignalForReconnection()
651
+ */
652
+ protected final ReentrantLock connectorLock = new ReentrantLock ();
653
+ protected final Condition reconnectRequired = connectorLock .newCondition ();
654
+
634
655
protected StateHelper (int state ) {
635
656
this .state = new AtomicInteger (state );
636
657
}
@@ -639,35 +660,60 @@ protected int getState() {
639
660
return state .get ();
640
661
}
641
662
663
+ /**
664
+ * Set CLOSED state, drop RECONNECT state.
665
+ */
642
666
protected boolean close () {
643
- for (;; ) {
667
+ for (; ; ) {
644
668
int st = getState ();
669
+
670
+ /* CLOSED is the terminal state. */
645
671
if ((st & CLOSED ) == CLOSED )
646
672
return false ;
673
+
674
+ /* Drop RECONNECT, set CLOSED. */
647
675
if (compareAndSet (st , (st & ~RECONNECT ) | CLOSED ))
648
676
return true ;
649
677
}
650
678
}
651
679
680
+ /**
681
+ * Move from a current state to a give one.
682
+ *
683
+ * Some moves are forbidden.
684
+ */
652
685
protected boolean acquire (int mask ) {
653
- for (;;) {
654
- int st = getState ();
655
- if ((st & CLOSED ) == CLOSED )
686
+ for (; ; ) {
687
+ int currentState = getState ();
688
+
689
+ /* CLOSED is the terminal state. */
690
+ if ((currentState & CLOSED ) == CLOSED ) {
691
+ return false ;
692
+ }
693
+
694
+ /* Don't move to READING, WRITING or ALIVE from RECONNECT. */
695
+ if ((currentState & RECONNECT ) > mask ) {
656
696
return false ;
697
+ }
657
698
658
- if ((st & mask ) != 0 )
699
+ /* Cannot move from a state to the same state. */
700
+ if ((currentState & mask ) != 0 ) {
659
701
throw new IllegalStateException ("State is already " + mask );
702
+ }
660
703
661
- if (compareAndSet (st , st | mask ))
704
+ /* Set acquired state. */
705
+ if (compareAndSet (currentState , currentState | mask )) {
662
706
return true ;
707
+ }
663
708
}
664
709
}
665
710
666
711
protected void release (int mask ) {
667
- for (;; ) {
712
+ for (; ; ) {
668
713
int st = getState ();
669
- if (compareAndSet (st , st & ~mask ))
714
+ if (compareAndSet (st , st & ~mask )) {
670
715
return ;
716
+ }
671
717
}
672
718
}
673
719
@@ -686,10 +732,18 @@ protected boolean compareAndSet(int expect, int update) {
686
732
return true ;
687
733
}
688
734
735
+ /**
736
+ * Reconnection uses another way to await state via receiving a signal
737
+ * instead of latches.
738
+ */
689
739
protected void awaitState (int state ) throws InterruptedException {
690
- CountDownLatch latch = getStateLatch (state );
691
- if (latch != null ) {
692
- latch .await ();
740
+ if (state == RECONNECT ) {
741
+ awaitReconnection ();
742
+ } else {
743
+ CountDownLatch latch = getStateLatch (state );
744
+ if (latch != null ) {
745
+ latch .await ();
746
+ }
693
747
}
694
748
}
695
749
@@ -709,10 +763,42 @@ private CountDownLatch getStateLatch(int state) {
709
763
CountDownLatch latch = nextAliveLatch .get ();
710
764
/* It may happen so that an error is detected but the state is still alive.
711
765
Wait for the 'next' alive state in such cases. */
712
- return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
766
+ return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
713
767
}
714
768
return null ;
715
769
}
770
+
771
+ /**
772
+ * Blocks until a reconnection signal will be received.
773
+ *
774
+ * @see #trySignalForReconnection()
775
+ */
776
+ private void awaitReconnection () throws InterruptedException {
777
+ connectorLock .lock ();
778
+ try {
779
+ while (getState () != StateHelper .RECONNECT ) {
780
+ reconnectRequired .await ();
781
+ }
782
+ } finally {
783
+ connectorLock .unlock ();
784
+ }
785
+ }
786
+
787
+ /**
788
+ * Signals to the connector that reconnection process can be performed.
789
+ *
790
+ * @see #awaitReconnection()
791
+ */
792
+ private void trySignalForReconnection () {
793
+ if (compareAndSet (StateHelper .UNINITIALIZED , StateHelper .RECONNECT )) {
794
+ connectorLock .lock ();
795
+ try {
796
+ reconnectRequired .signal ();
797
+ } finally {
798
+ connectorLock .unlock ();
799
+ }
800
+ }
801
+ }
716
802
}
717
803
718
804
protected static class TarantoolOp <V > extends CompletableFuture <V > {
0 commit comments