Skip to content

Commit 24a99a7

Browse files
committed
Wrong SQL result processing
Fix a regression related to processing of SQL response. Fix a wrong parsing of SQL tuple result. Fixes: #141
1 parent 4715727 commit 24a99a7

File tree

6 files changed

+312
-69
lines changed

6 files changed

+312
-69
lines changed

Diff for: src/main/java/org/tarantool/SqlProtoUtils.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ public abstract class SqlProtoUtils {
1111
public static List<Map<String, Object>> readSqlResult(TarantoolPacket pack) {
1212
List<List<?>> data = (List<List<?>>) pack.getBody().get(Key.DATA.getId());
1313

14-
List<Map<String, Object>> values = new ArrayList<Map<String, Object>>(data.size());
14+
List<Map<String, Object>> values = new ArrayList<>(data.size());
1515
List<TarantoolBase.SQLMetaData> metaData = getSQLMetadata(pack);
16-
LinkedHashMap<String, Object> value = new LinkedHashMap<String, Object>();
1716
for (List row : data) {
17+
LinkedHashMap<String, Object> value = new LinkedHashMap<>();
1818
for (int i = 0; i < row.size(); i++) {
1919
value.put(metaData.get(i).getName(), row.get(i));
2020
}

Diff for: src/main/java/org/tarantool/TarantoolClientImpl.java

+44-28
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
import org.tarantool.protocol.ProtoUtils;
44
import org.tarantool.protocol.ReadableViaSelectorChannel;
5-
import org.tarantool.protocol.TarantoolPacket;
65
import org.tarantool.protocol.TarantoolGreeting;
6+
import org.tarantool.protocol.TarantoolPacket;
77

88
import java.io.IOException;
99
import java.nio.ByteBuffer;
@@ -36,7 +36,7 @@ public class TarantoolClientImpl extends TarantoolBase<Future<?>> implements Tar
3636
protected SocketChannelProvider socketProvider;
3737
protected volatile Exception thumbstone;
3838

39-
protected Map<Long, CompletableFuture<?>> futures;
39+
protected Map<Long, TarantoolOp<?>> futures;
4040
protected AtomicInteger wait = new AtomicInteger();
4141
/**
4242
* Write properties
@@ -216,38 +216,38 @@ protected Future<?> exec(Code code, Object... args) {
216216
protected CompletableFuture<?> doExec(Code code, Object[] args) {
217217
validateArgs(args);
218218
long sid = syncId.incrementAndGet();
219-
CompletableFuture<?> q = new CompletableFuture<>();
219+
TarantoolOp<?> future = new TarantoolOp<>(code);
220220

221-
if (isDead(q)) {
222-
return q;
221+
if (isDead(future)) {
222+
return future;
223223
}
224-
futures.put(sid, q);
225-
if (isDead(q)) {
224+
futures.put(sid, future);
225+
if (isDead(future)) {
226226
futures.remove(sid);
227-
return q;
227+
return future;
228228
}
229229
try {
230230
write(code, sid, null, args);
231231
} catch (Exception e) {
232232
futures.remove(sid);
233-
fail(q, e);
233+
fail(future, e);
234234
}
235-
return q;
235+
return future;
236236
}
237237

238238
protected synchronized void die(String message, Exception cause) {
239239
if (thumbstone != null) {
240240
return;
241241
}
242-
final CommunicationException err = new CommunicationException(message, cause);
243-
this.thumbstone = err;
242+
final CommunicationException error = new CommunicationException(message, cause);
243+
this.thumbstone = error;
244244
while (!futures.isEmpty()) {
245-
Iterator<Map.Entry<Long, CompletableFuture<?>>> iterator = futures.entrySet().iterator();
245+
Iterator<Map.Entry<Long, TarantoolOp<?>>> iterator = futures.entrySet().iterator();
246246
while (iterator.hasNext()) {
247-
Map.Entry<Long, CompletableFuture<?>> elem = iterator.next();
247+
Map.Entry<Long, TarantoolOp<?>> elem = iterator.next();
248248
if (elem != null) {
249-
CompletableFuture<?> future = elem.getValue();
250-
fail(future, err);
249+
TarantoolOp<?> future = elem.getValue();
250+
fail(future, error);
251251
}
252252
iterator.remove();
253253
}
@@ -345,7 +345,7 @@ protected void readThread() {
345345
Map<Integer, Object> headers = packet.getHeaders();
346346

347347
Long syncId = (Long) headers.get(Key.SYNC.getId());
348-
CompletableFuture<?> future = futures.remove(syncId);
348+
TarantoolOp<?> future = futures.remove(syncId);
349349
stats.received++;
350350
wait.decrementAndGet();
351351
complete(packet, future);
@@ -395,30 +395,29 @@ protected void fail(CompletableFuture<?> q, Exception e) {
395395
q.completeExceptionally(e);
396396
}
397397

398-
protected void complete(TarantoolPacket packet, CompletableFuture<?> q) {
399-
if (q != null) {
398+
protected void complete(TarantoolPacket packet, TarantoolOp<?> future) {
399+
if (future != null) {
400400
long code = packet.getCode();
401401
if (code == 0) {
402-
403-
if (code == Code.EXECUTE.getId()) {
404-
completeSql(q, packet);
402+
if (future.getCode() == Code.EXECUTE) {
403+
completeSql(future, packet);
405404
} else {
406-
((CompletableFuture) q).complete(packet.getBody().get(Key.DATA.getId()));
405+
((CompletableFuture) future).complete(packet.getBody().get(Key.DATA.getId()));
407406
}
408407
} else {
409408
Object error = packet.getBody().get(Key.ERROR.getId());
410-
fail(q, serverError(code, error));
409+
fail(future, serverError(code, error));
411410
}
412411
}
413412
}
414413

415-
protected void completeSql(CompletableFuture<?> q, TarantoolPacket pack) {
414+
protected void completeSql(CompletableFuture<?> future, TarantoolPacket pack) {
416415
Long rowCount = SqlProtoUtils.getSqlRowCount(pack);
417-
if (rowCount!=null) {
418-
((CompletableFuture) q).complete(rowCount);
416+
if (rowCount != null) {
417+
((CompletableFuture) future).complete(rowCount);
419418
} else {
420419
List<Map<String, Object>> values = SqlProtoUtils.readSqlResult(pack);
421-
((CompletableFuture) q).complete(values);
420+
((CompletableFuture) future).complete(values);
422421
}
423422
}
424423

@@ -715,4 +714,21 @@ private CountDownLatch getStateLatch(int state) {
715714
return null;
716715
}
717716
}
717+
718+
protected static class TarantoolOp<V> extends CompletableFuture<V> {
719+
720+
/**
721+
* Tarantool binary protocol operation code.
722+
*/
723+
final private Code code;
724+
725+
public TarantoolOp(Code code) {
726+
this.code = code;
727+
}
728+
729+
public Code getCode() {
730+
return code;
731+
}
732+
}
733+
718734
}

Diff for: src/main/java/org/tarantool/TarantoolClusterClient.java

+35-39
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
/**
1313
* Basic implementation of a client that may work with the cluster
1414
* of tarantool instances in fault-tolerant way.
15-
*
15+
* <p>
1616
* Failed operations will be retried once connection is re-established
1717
* unless the configured expiration time is over.
1818
*/
@@ -25,21 +25,21 @@ public class TarantoolClusterClient extends TarantoolClientImpl {
2525

2626
/**
2727
* @param config Configuration.
28-
* @param addrs Array of addresses in the form of [host]:[port].
28+
* @param addrs Array of addresses in the form of [host]:[port].
2929
*/
3030
public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addrs) {
3131
this(config, new RoundRobinSocketProviderImpl(addrs).setTimeout(config.operationExpiryTimeMillis));
3232
}
3333

3434
/**
3535
* @param provider Socket channel provider.
36-
* @param config Configuration.
36+
* @param config Configuration.
3737
*/
3838
public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannelProvider provider) {
3939
super(provider, config);
4040

4141
this.executor = config.executor == null ?
42-
Executors.newSingleThreadExecutor() : config.executor;
42+
Executors.newSingleThreadExecutor() : config.executor;
4343
}
4444

4545
@Override
@@ -59,23 +59,23 @@ protected boolean isDead(CompletableFuture<?> q) {
5959
protected CompletableFuture<?> doExec(Code code, Object[] args) {
6060
validateArgs(args);
6161
long sid = syncId.incrementAndGet();
62-
CompletableFuture<?> q = makeFuture(sid, code, args);
62+
ExpirableOp<?> future = makeFuture(sid, code, args);
6363

64-
if (isDead(q)) {
65-
return q;
64+
if (isDead(future)) {
65+
return future;
6666
}
67-
futures.put(sid, q);
68-
if (isDead(q)) {
67+
futures.put(sid, future);
68+
if (isDead(future)) {
6969
futures.remove(sid);
70-
return q;
70+
return future;
7171
}
7272
try {
7373
write(code, sid, null, args);
7474
} catch (Exception e) {
7575
futures.remove(sid);
76-
fail(q, e);
76+
fail(future, e);
7777
}
78-
return q;
78+
return future;
7979
}
8080

8181
@Override
@@ -85,12 +85,12 @@ protected void fail(CompletableFuture<?> q, Exception e) {
8585

8686
protected boolean checkFail(CompletableFuture<?> q, Exception e) {
8787
assert q instanceof ExpirableOp<?>;
88-
if (!isTransientError(e) || ((ExpirableOp<?>)q).hasExpired(System.currentTimeMillis())) {
88+
if (!isTransientError(e) || ((ExpirableOp<?>) q).hasExpired(System.currentTimeMillis())) {
8989
q.completeExceptionally(e);
9090
return true;
9191
} else {
9292
assert retries != null;
93-
retries.put(((ExpirableOp<?>) q).getId(), (ExpirableOp<?>)q);
93+
retries.put(((ExpirableOp<?>) q).getId(), (ExpirableOp<?>) q);
9494
return false;
9595
}
9696
}
@@ -114,12 +114,12 @@ protected boolean isTransientError(Exception e) {
114114
return true;
115115
}
116116
if (e instanceof TarantoolException) {
117-
return ((TarantoolException)e).isTransient();
117+
return ((TarantoolException) e).isTransient();
118118
}
119119
return false;
120120
}
121121

122-
protected CompletableFuture<?> makeFuture(long id, Code code, Object...args) {
122+
private ExpirableOp<?> makeFuture(long id, Code code, Object... args) {
123123
int expireTime = ((TarantoolClusterClientConfig) config).operationExpiryTimeMillis;
124124
return new ExpirableOp(id, expireTime, code, args);
125125
}
@@ -133,20 +133,20 @@ protected void onReconnect() {
133133
// First call is before the constructor finished. Skip it.
134134
return;
135135
}
136-
Collection<ExpirableOp<?>> futsToRetry = new ArrayList<ExpirableOp<?>>(retries.values());
136+
Collection<ExpirableOp<?>> futuresToRetry = new ArrayList<ExpirableOp<?>>(retries.values());
137137
retries.clear();
138138
long now = System.currentTimeMillis();
139-
for (final ExpirableOp<?> fut : futsToRetry) {
140-
if (!fut.hasExpired(now)) {
139+
for (final ExpirableOp<?> future : futuresToRetry) {
140+
if (!future.hasExpired(now)) {
141141
executor.execute(new Runnable() {
142142
@Override
143143
public void run() {
144-
futures.put(fut.getId(), fut);
144+
futures.put(future.getId(), future);
145145
try {
146-
write(fut.getCode(), fut.getId(), null, fut.getArgs());
146+
write(future.getCode(), future.getId(), null, future.getArgs());
147147
} catch (Exception e) {
148-
futures.remove(fut.getId());
149-
fail(fut, e);
148+
futures.remove(future.getId());
149+
fail(future, e);
150150
}
151151
}
152152
});
@@ -157,8 +157,11 @@ public void run() {
157157
/**
158158
* Holds operation code and arguments for retry.
159159
*/
160-
private class ExpirableOp<V> extends CompletableFuture<V> {
161-
/** Moment in time when operation is not considered for retry. */
160+
private class ExpirableOp<V> extends TarantoolOp<V> {
161+
162+
/**
163+
* Moment in time when operation is not considered for retry.
164+
*/
162165
final private long deadline;
163166

164167
/**
@@ -167,24 +170,20 @@ private class ExpirableOp<V> extends CompletableFuture<V> {
167170
final private long id;
168171

169172
/**
170-
* Tarantool binary protocol operation code.
173+
* Arguments of operation.
171174
*/
172-
final private Code code;
173-
174-
/** Arguments of operation. */
175175
final private Object[] args;
176176

177177
/**
178-
*
179-
* @param id Sync.
178+
* @param id Sync.
180179
* @param expireTime Expiration time (relative) in ms.
181-
* @param code Tarantool operation code.
182-
* @param args Operation arguments.
180+
* @param code Tarantool operation code.
181+
* @param args Operation arguments.
183182
*/
184-
ExpirableOp(long id, int expireTime, Code code, Object...args) {
183+
ExpirableOp(long id, int expireTime, Code code, Object... args) {
184+
super(code);
185185
this.id = id;
186186
this.deadline = System.currentTimeMillis() + expireTime;
187-
this.code = code;
188187
this.args = args;
189188
}
190189

@@ -196,12 +195,9 @@ public long getId() {
196195
return id;
197196
}
198197

199-
public Code getCode() {
200-
return code;
201-
}
202-
203198
public Object[] getArgs() {
204199
return args;
205200
}
201+
206202
}
207203
}

0 commit comments

Comments
 (0)