22
22
import java .util .concurrent .atomic .AtomicInteger ;
23
23
import java .util .concurrent .atomic .AtomicReference ;
24
24
import java .util .concurrent .locks .Condition ;
25
- import java .util .concurrent .locks .LockSupport ;
26
25
import java .util .concurrent .locks .ReentrantLock ;
27
26
28
27
29
28
public class TarantoolClientImpl extends TarantoolBase <Future <?>> implements TarantoolClient {
29
+
30
30
public static final CommunicationException NOT_INIT_EXCEPTION = new CommunicationException ("Not connected, initializing connection" );
31
+
31
32
protected TarantoolClientConfig config ;
32
33
33
34
/**
34
35
* External
35
36
*/
36
37
protected SocketChannelProvider socketProvider ;
38
+ protected SocketChannel channel ;
39
+ protected ReadableViaSelectorChannel readChannel ;
40
+
37
41
protected volatile Exception thumbstone ;
38
42
39
43
protected Map <Long , TarantoolOp <?>> futures ;
40
- protected AtomicInteger wait = new AtomicInteger ();
44
+ protected AtomicInteger pendingResponsesCount = new AtomicInteger ();
41
45
/**
42
46
* Write properties
43
47
*/
44
- protected SocketChannel channel ;
45
- protected ReadableViaSelectorChannel readChannel ;
46
-
47
48
protected ByteBuffer sharedBuffer ;
48
- protected ByteBuffer writerBuffer ;
49
49
protected ReentrantLock bufferLock = new ReentrantLock (false );
50
50
protected Condition bufferNotEmpty = bufferLock .newCondition ();
51
51
protected Condition bufferEmpty = bufferLock .newCondition ();
52
+
53
+ protected ByteBuffer writerBuffer ;
52
54
protected ReentrantLock writeLock = new ReentrantLock (true );
53
55
54
56
/**
@@ -66,18 +68,23 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
66
68
protected Thread reader ;
67
69
protected Thread writer ;
68
70
71
+ protected final ReentrantLock connectorLock = new ReentrantLock ();
72
+ protected final Condition reconnectRequired = connectorLock .newCondition ();
73
+
69
74
protected Thread connector = new Thread (new Runnable () {
70
75
@ Override
71
76
public void run () {
72
77
while (!Thread .currentThread ().isInterrupted ()) {
73
- if (state .compareAndSet (StateHelper .RECONNECT , 0 )) {
74
- reconnect (0 , thumbstone );
75
- }
76
- LockSupport .park (state );
78
+ reconnect (0 , thumbstone );
79
+ awaitReconnection ();
77
80
}
78
81
}
79
82
});
80
83
84
+ public TarantoolClientImpl (String address , TarantoolClientConfig config ) {
85
+ this (new SingleSocketChannelProviderImpl (address ), config );
86
+ }
87
+
81
88
public TarantoolClientImpl (SocketChannelProvider socketProvider , TarantoolClientConfig config ) {
82
89
super ();
83
90
this .thumbstone = NOT_INIT_EXCEPTION ;
@@ -99,6 +106,11 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient
99
106
this .fireAndForgetOps .setCallCode (Code .CALL );
100
107
this .composableAsyncOps .setCallCode (Code .CALL );
101
108
}
109
+
110
+ startConnector (config );
111
+ }
112
+
113
+ private void startConnector (TarantoolClientConfig config ) {
102
114
connector .start ();
103
115
try {
104
116
if (!waitAlive (config .initTimeoutMillis , TimeUnit .MILLISECONDS )) {
@@ -130,25 +142,22 @@ protected void reconnect(int retry, Throwable lastError) {
130
142
} catch (Exception e ) {
131
143
closeChannel (channel );
132
144
lastError = e ;
133
- if (e instanceof InterruptedException )
145
+ if (e instanceof InterruptedException ) {
134
146
Thread .currentThread ().interrupt ();
147
+ }
135
148
}
136
149
}
137
150
}
138
151
139
152
protected void connect (final SocketChannel channel ) throws Exception {
140
153
try {
141
- TarantoolGreeting greeting = ProtoUtils .connect (channel ,
142
- config .username , config .password );
154
+ TarantoolGreeting greeting = ProtoUtils .connect (channel , config .username , config .password );
143
155
this .serverVersion = greeting .getServerVersion ();
144
156
} catch (IOException e ) {
145
- try {
146
- channel .close ();
147
- } catch (IOException ignored ) {
148
- }
149
-
157
+ closeChannel (channel );
150
158
throw new CommunicationException ("Couldn't connect to tarantool" , e );
151
159
}
160
+
152
161
channel .configureBlocking (false );
153
162
this .channel = channel ;
154
163
this .readChannel = new ReadableViaSelectorChannel (channel );
@@ -160,11 +169,21 @@ protected void connect(final SocketChannel channel) throws Exception {
160
169
bufferLock .unlock ();
161
170
}
162
171
this .thumbstone = null ;
172
+ pendingResponsesCount .set (0 );
163
173
startThreads (channel .socket ().getRemoteSocketAddress ().toString ());
164
174
}
165
175
166
176
protected void startThreads (String threadName ) throws InterruptedException {
167
177
final CountDownLatch init = new CountDownLatch (2 );
178
+
179
+ if (reader != null ) {
180
+ reader .join (config .initTimeoutMillis / 2 );
181
+ }
182
+ if (writer != null ) {
183
+ writer .join (config .initTimeoutMillis / 2 );
184
+ }
185
+ state .release (StateHelper .RECONNECT );
186
+
168
187
reader = new Thread (new Runnable () {
169
188
@ Override
170
189
public void run () {
@@ -174,8 +193,9 @@ public void run() {
174
193
readThread ();
175
194
} finally {
176
195
state .release (StateHelper .READING );
177
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
178
- LockSupport .unpark (connector );
196
+ if (state .compareAndSet (StateHelper .UNINITIALIZED , StateHelper .RECONNECT )) {
197
+ signalForReconnection ();
198
+ }
179
199
}
180
200
}
181
201
}
@@ -189,8 +209,9 @@ public void run() {
189
209
writeThread ();
190
210
} finally {
191
211
state .release (StateHelper .WRITING );
192
- if (state .compareAndSet (0 , StateHelper .RECONNECT ))
193
- LockSupport .unpark (connector );
212
+ if (state .compareAndSet (StateHelper .UNINITIALIZED , StateHelper .RECONNECT )) {
213
+ signalForReconnection ();
214
+ }
194
215
}
195
216
}
196
217
}
@@ -300,7 +321,7 @@ protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, Timeo
300
321
}
301
322
}
302
323
sharedBuffer .put (buffer );
303
- wait .incrementAndGet ();
324
+ pendingResponsesCount .incrementAndGet ();
304
325
bufferNotEmpty .signalAll ();
305
326
stats .buffered ++;
306
327
} finally {
@@ -323,7 +344,7 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
323
344
}
324
345
writeFully (channel , buffer );
325
346
stats .directWrite ++;
326
- wait .incrementAndGet ();
347
+ pendingResponsesCount .incrementAndGet ();
327
348
} finally {
328
349
writeLock .unlock ();
329
350
}
@@ -337,25 +358,21 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx
337
358
}
338
359
339
360
protected void readThread () {
340
- try {
341
- while (!Thread .currentThread ().isInterrupted ()) {
342
- try {
343
- TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
361
+ while (!Thread .currentThread ().isInterrupted ()) {
362
+ try {
363
+ TarantoolPacket packet = ProtoUtils .readPacket (readChannel );
344
364
345
- Map <Integer , Object > headers = packet .getHeaders ();
365
+ Map <Integer , Object > headers = packet .getHeaders ();
346
366
347
- Long syncId = (Long ) headers .get (Key .SYNC .getId ());
348
- TarantoolOp <?> future = futures .remove (syncId );
349
- stats .received ++;
350
- wait .decrementAndGet ();
351
- complete (packet , future );
352
- } catch (Exception e ) {
353
- die ("Cant read answer" , e );
354
- return ;
355
- }
367
+ Long syncId = (Long ) headers .get (Key .SYNC .getId ());
368
+ TarantoolOp <?> future = futures .remove (syncId );
369
+ stats .received ++;
370
+ pendingResponsesCount .decrementAndGet ();
371
+ complete (packet , future );
372
+ } catch (Exception e ) {
373
+ die ("Cant read answer" , e );
374
+ return ;
356
375
}
357
- } catch (Exception e ) {
358
- die ("Cant init thread" , e );
359
376
}
360
377
}
361
378
@@ -421,9 +438,9 @@ protected void completeSql(CompletableFuture<?> future, TarantoolPacket pack) {
421
438
}
422
439
}
423
440
424
- protected <T > T syncGet (Future <T > r ) {
441
+ protected <T > T syncGet (Future <T > result ) {
425
442
try {
426
- return r .get ();
443
+ return result .get ();
427
444
} catch (ExecutionException e ) {
428
445
if (e .getCause () instanceof CommunicationException ) {
429
446
throw (CommunicationException ) e .getCause ();
@@ -454,7 +471,6 @@ public void close() {
454
471
protected void close (Exception e ) {
455
472
if (state .close ()) {
456
473
connector .interrupt ();
457
-
458
474
die (e .getMessage (), e );
459
475
}
460
476
}
@@ -468,14 +484,44 @@ protected void stopIO() {
468
484
}
469
485
if (readChannel != null ) {
470
486
try {
471
- readChannel .close ();// also closes this.channel
487
+ readChannel .close (); // also closes this.channel
472
488
} catch (IOException ignored ) {
473
489
474
490
}
475
491
}
476
492
closeChannel (channel );
477
493
}
478
494
495
+ /**
496
+ * Blocks until a reconnection process can be carried on
497
+ * @see #signalForReconnection()
498
+ */
499
+ private void awaitReconnection () {
500
+ connectorLock .lock ();
501
+ try {
502
+ while (state .getState () != StateHelper .RECONNECT ) {
503
+ reconnectRequired .await ();
504
+ }
505
+ } catch (InterruptedException ignored ) {
506
+ Thread .currentThread ().interrupt ();
507
+ } finally {
508
+ connectorLock .unlock ();
509
+ }
510
+ }
511
+
512
+ /**
513
+ * Signals to the connector that reconnection process can be performed
514
+ * @see #awaitReconnection()
515
+ */
516
+ private void signalForReconnection () {
517
+ connectorLock .lock ();
518
+ try {
519
+ reconnectRequired .signal ();
520
+ } finally {
521
+ connectorLock .unlock ();
522
+ }
523
+ }
524
+
479
525
@ Override
480
526
public boolean isAlive () {
481
527
return state .getState () == StateHelper .ALIVE && thumbstone == null ;
@@ -498,7 +544,7 @@ public TarantoolClientOps<Integer, List<?>, Object, List<?>> syncOps() {
498
544
499
545
@ Override
500
546
public TarantoolClientOps <Integer , List <?>, Object , Future <List <?>>> asyncOps () {
501
- return (TarantoolClientOps )this ;
547
+ return (TarantoolClientOps ) this ;
502
548
}
503
549
504
550
@ Override
@@ -514,7 +560,7 @@ public TarantoolClientOps<Integer, List<?>, Object, Long> fireAndForgetOps() {
514
560
515
561
@ Override
516
562
public TarantoolSQLOps <Object , Long , List <Map <String , Object >>> sqlSyncOps () {
517
- return new TarantoolSQLOps <Object , Long , List <Map <String ,Object >>>() {
563
+ return new TarantoolSQLOps <Object , Long , List <Map <String , Object >>>() {
518
564
519
565
@ Override
520
566
public Long update (String sql , Object ... bind ) {
@@ -530,7 +576,7 @@ public List<Map<String, Object>> query(String sql, Object... bind) {
530
576
531
577
@ Override
532
578
public TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>> sqlAsyncOps () {
533
- return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String ,Object >>>>() {
579
+ return new TarantoolSQLOps <Object , Future <Long >, Future <List <Map <String , Object >>>>() {
534
580
@ Override
535
581
public Future <Long > update (String sql , Object ... bind ) {
536
582
return (Future <Long >) exec (Code .EXECUTE , Key .SQL_TEXT , sql , Key .SQL_BIND , bind );
@@ -591,7 +637,7 @@ public void close() {
591
637
}
592
638
593
639
protected boolean isDead (CompletableFuture <?> q ) {
594
- if (TarantoolClientImpl . this .thumbstone != null ) {
640
+ if (this .thumbstone != null ) {
595
641
fail (q , new CommunicationException ("Connection is dead" , thumbstone ));
596
642
return true ;
597
643
}
@@ -618,6 +664,7 @@ public TarantoolClientStats getStats() {
618
664
* Manages state changes.
619
665
*/
620
666
protected final class StateHelper {
667
+ static final int UNINITIALIZED = 0 ;
621
668
static final int READING = 1 ;
622
669
static final int WRITING = 2 ;
623
670
static final int ALIVE = READING | WRITING ;
@@ -627,7 +674,7 @@ protected final class StateHelper {
627
674
private final AtomicInteger state ;
628
675
629
676
private final AtomicReference <CountDownLatch > nextAliveLatch =
630
- new AtomicReference <CountDownLatch >(new CountDownLatch (1 ));
677
+ new AtomicReference <>(new CountDownLatch (1 ));
631
678
632
679
private final CountDownLatch closedLatch = new CountDownLatch (1 );
633
680
@@ -640,7 +687,7 @@ protected int getState() {
640
687
}
641
688
642
689
protected boolean close () {
643
- for (;; ) {
690
+ for (; ; ) {
644
691
int st = getState ();
645
692
if ((st & CLOSED ) == CLOSED )
646
693
return false ;
@@ -650,24 +697,29 @@ protected boolean close() {
650
697
}
651
698
652
699
protected boolean acquire (int mask ) {
653
- for (;; ) {
654
- int st = getState ();
655
- if ((st & CLOSED ) == CLOSED )
700
+ for (; ; ) {
701
+ int currentState = getState ();
702
+ if ((currentState & CLOSED ) == CLOSED ) {
656
703
return false ;
657
-
658
- if ((st & mask ) != 0 )
704
+ }
705
+ if ((currentState & RECONNECT ) > mask ) {
706
+ return false ;
707
+ }
708
+ if ((currentState & mask ) != 0 ) {
659
709
throw new IllegalStateException ("State is already " + mask );
660
-
661
- if (compareAndSet (st , st | mask ))
710
+ }
711
+ if (compareAndSet (currentState , currentState | mask )) {
662
712
return true ;
713
+ }
663
714
}
664
715
}
665
716
666
717
protected void release (int mask ) {
667
- for (;; ) {
718
+ for (; ; ) {
668
719
int st = getState ();
669
- if (compareAndSet (st , st & ~mask ))
720
+ if (compareAndSet (st , st & ~mask )) {
670
721
return ;
722
+ }
671
723
}
672
724
}
673
725
@@ -709,7 +761,7 @@ private CountDownLatch getStateLatch(int state) {
709
761
CountDownLatch latch = nextAliveLatch .get ();
710
762
/* It may happen so that an error is detected but the state is still alive.
711
763
Wait for the 'next' alive state in such cases. */
712
- return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
764
+ return (getState () == ALIVE && thumbstone == null ) ? null : latch ;
713
765
}
714
766
return null ;
715
767
}
0 commit comments