Skip to content

Commit 2cdce69

Browse files
mp911dechristophstrobl
authored andcommitted
Use Lettuce functionality for Cluster commands where possible.
We now remove our own code in favor of Lettuce's advanced cluster support to leverage asynchronous functionality in pipelining. Document pipelining restrictions regarding Redis Cluster. Original Pull Request: #2889
1 parent d785b5f commit 2cdce69

File tree

8 files changed

+61
-189
lines changed

8 files changed

+61
-189
lines changed

Diff for: src/main/antora/modules/ROOT/pages/redis/cluster.adoc

+3
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,6 @@ clusterOps.shutdown(NODE_7379); <1>
129129
130130
<1> Shut down node at 7379 and cross fingers there is a replica in place that can take over.
131131
====
132+
133+
NOTE: Redis Cluster pipelining is currently only supported throug the Lettuce driver except for the following commands when using cross-slot keys: `rename`, `renameNX`, `sort`, `bLPop`, `bRPop`, `rPopLPush`, `bRPopLPush`, `info`, `sMove`, `sInter`, `sInterStore`, `sUnion`, `sUnionStore`, `sDiff`, `sDiffStore`.
134+
Same-slot keys are fully supported.

Diff for: src/main/antora/modules/ROOT/pages/redis/pipelining.adoc

+7-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ List<Object> results = stringRedisTemplate.executePipelined(
2020
});
2121
----
2222

23-
The preceding example runs a bulk right pop of items from a queue in a pipeline. The `results` `List` contains all of the popped items. `RedisTemplate` uses its value, hash key, and hash value serializers to deserialize all results before returning, so the returned items in the preceding example are Strings. There are additional `executePipelined` methods that let you pass a custom serializer for pipelined results.
23+
The preceding example runs a bulk right pop of items from a queue in a pipeline.
24+
The `results` `List` contains all the popped items. `RedisTemplate` uses its value, hash key, and hash value serializers to deserialize all results before returning, so the returned items in the preceding example are Strings.
25+
There are additional `executePipelined` methods that let you pass a custom serializer for pipelined results.
2426

2527
Note that the value returned from the `RedisCallback` is required to be `null`, as this value is discarded in favor of returning the results of the pipelined commands.
2628

@@ -35,3 +37,7 @@ factory.setPipeliningFlushPolicy(PipeliningFlushPolicy.buffered(3)); <1>
3537
----
3638
<1> Buffer locally and flush after every 3rd command.
3739
====
40+
41+
NOTE: Pipelining is limited to Redis Standalone.
42+
Redis Cluster is currently only supported through the Lettuce driver except for the following commands when using cross-slot keys: `rename`, `renameNX`, `sort`, `bLPop`, `bRPop`, `rPopLPush`, `bRPopLPush`, `info`, `sMove`, `sInter`, `sInterStore`, `sUnion`, `sUnionStore`, `sDiff`, `sDiffStore`.
43+
Same-slot keys are fully supported.

Diff for: src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyCommands.java

-44
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,8 @@
1818
import io.lettuce.core.KeyScanCursor;
1919
import io.lettuce.core.ScanArgs;
2020

21-
import java.util.Collection;
22-
import java.util.HashSet;
2321
import java.util.List;
2422
import java.util.Set;
25-
import java.util.concurrent.ThreadLocalRandom;
2623

2724
import org.springframework.dao.InvalidDataAccessApiUsageException;
2825
import org.springframework.data.redis.connection.ClusterSlotHashUtil;
@@ -50,47 +47,6 @@ class LettuceClusterKeyCommands extends LettuceKeyCommands {
5047
this.connection = connection;
5148
}
5249

53-
@Override
54-
public byte[] randomKey() {
55-
56-
List<RedisClusterNode> nodes = connection.clusterGetNodes();
57-
Set<RedisClusterNode> inspectedNodes = new HashSet<>(nodes.size());
58-
59-
do {
60-
61-
RedisClusterNode node = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
62-
63-
while (inspectedNodes.contains(node)) {
64-
node = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
65-
}
66-
inspectedNodes.add(node);
67-
byte[] key = randomKey(node);
68-
69-
if (key != null && key.length > 0) {
70-
return key;
71-
}
72-
} while (nodes.size() != inspectedNodes.size());
73-
74-
return null;
75-
}
76-
77-
@Override
78-
public Set<byte[]> keys(byte[] pattern) {
79-
80-
Assert.notNull(pattern, "Pattern must not be null");
81-
82-
Collection<List<byte[]>> keysPerNode = connection.getClusterCommandExecutor()
83-
.executeCommandOnAllNodes((LettuceClusterCommandCallback<List<byte[]>>) connection -> connection.keys(pattern))
84-
.resultsAsList();
85-
86-
Set<byte[]> keys = new HashSet<>();
87-
88-
for (List<byte[]> keySet : keysPerNode) {
89-
keys.addAll(keySet);
90-
}
91-
return keys;
92-
}
93-
9450
@Override
9551
public void rename(byte[] oldKey, byte[] newKey) {
9652

Diff for: src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterServerCommands.java

-38
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import io.lettuce.core.api.sync.RedisServerCommands;
1919

2020
import java.util.ArrayList;
21-
import java.util.Collection;
2221
import java.util.List;
2322
import java.util.Map;
2423
import java.util.Map.Entry;
@@ -34,7 +33,6 @@
3433
import org.springframework.data.redis.connection.lettuce.LettuceClusterConnection.LettuceClusterCommandCallback;
3534
import org.springframework.data.redis.core.types.RedisClientInfo;
3635
import org.springframework.util.Assert;
37-
import org.springframework.util.CollectionUtils;
3836

3937
/**
4038
* @author Mark Paluch
@@ -71,37 +69,11 @@ public void save(RedisClusterNode node) {
7169
executeCommandOnSingleNode(RedisServerCommands::save, node);
7270
}
7371

74-
@Override
75-
public Long dbSize() {
76-
77-
Collection<Long> dbSizes = executeCommandOnAllNodes(RedisServerCommands::dbsize).resultsAsList();
78-
79-
if (CollectionUtils.isEmpty(dbSizes)) {
80-
return 0L;
81-
}
82-
83-
Long size = 0L;
84-
for (Long value : dbSizes) {
85-
size += value;
86-
}
87-
return size;
88-
}
89-
9072
@Override
9173
public Long dbSize(RedisClusterNode node) {
9274
return executeCommandOnSingleNode(RedisServerCommands::dbsize, node).getValue();
9375
}
9476

95-
@Override
96-
public void flushDb() {
97-
executeCommandOnAllNodes(RedisServerCommands::flushdb);
98-
}
99-
100-
@Override
101-
public void flushDb(FlushOption option) {
102-
executeCommandOnAllNodes(it -> it.flushdb(LettuceConverters.toFlushMode(option)));
103-
}
104-
10577
@Override
10678
public void flushDb(RedisClusterNode node) {
10779
executeCommandOnSingleNode(RedisServerCommands::flushdb, node);
@@ -112,16 +84,6 @@ public void flushDb(RedisClusterNode node, FlushOption option) {
11284
executeCommandOnSingleNode(it -> it.flushdb(LettuceConverters.toFlushMode(option)), node);
11385
}
11486

115-
@Override
116-
public void flushAll() {
117-
executeCommandOnAllNodes(RedisServerCommands::flushall);
118-
}
119-
120-
@Override
121-
public void flushAll(FlushOption option) {
122-
executeCommandOnAllNodes(it -> it.flushall(LettuceConverters.toFlushMode(option)));
123-
}
124-
12587
@Override
12688
public void flushAll(RedisClusterNode node) {
12789
executeCommandOnSingleNode(RedisServerCommands::flushall, node);

Diff for: src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterStringCommands.java

-22
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,6 @@
1515
*/
1616
package org.springframework.data.redis.connection.lettuce;
1717

18-
import java.util.Map;
19-
20-
import org.springframework.data.redis.connection.ClusterSlotHashUtil;
21-
import org.springframework.util.Assert;
22-
2318
/**
2419
* @author Christoph Strobl
2520
* @author Mark Paluch
@@ -31,21 +26,4 @@ class LettuceClusterStringCommands extends LettuceStringCommands {
3126
super(connection);
3227
}
3328

34-
@Override
35-
public Boolean mSetNX(Map<byte[], byte[]> tuples) {
36-
37-
Assert.notNull(tuples, "Tuples must not be null");
38-
39-
if (ClusterSlotHashUtil.isSameSlotForAllKeys(tuples.keySet().toArray(new byte[tuples.keySet().size()][]))) {
40-
return super.mSetNX(tuples);
41-
}
42-
43-
boolean result = true;
44-
for (Map.Entry<byte[], byte[]> entry : tuples.entrySet()) {
45-
if (!setNX(entry.getKey(), entry.getValue()) && result) {
46-
result = false;
47-
}
48-
}
49-
return result;
50-
}
5129
}

Diff for: src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java

+36-26
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@
4949
import java.util.List;
5050
import java.util.Map;
5151
import java.util.Queue;
52+
import java.util.concurrent.CompletableFuture;
5253
import java.util.concurrent.ConcurrentHashMap;
54+
import java.util.concurrent.ExecutionException;
5355
import java.util.concurrent.Future;
5456
import java.util.concurrent.TimeUnit;
5557
import java.util.concurrent.atomic.AtomicLong;
@@ -102,8 +104,8 @@
102104
*/
103105
public class LettuceConnection extends AbstractRedisConnection {
104106

105-
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION =
106-
new FallbackExceptionTranslationStrategy(LettuceExceptionConverter.INSTANCE);
107+
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new FallbackExceptionTranslationStrategy(
108+
LettuceExceptionConverter.INSTANCE);
107109

108110
static final RedisCodec<byte[], byte[]> CODEC = ByteArrayCodec.INSTANCE;
109111

@@ -189,8 +191,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> share
189191
/**
190192
* Creates a new {@link LettuceConnection}.
191193
*
192-
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
193-
* Should not be used for transactions or blocking operations.
194+
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
195+
* used for transactions or blocking operations.
194196
* @param timeout The connection timeout (in milliseconds)
195197
* @param client The {@link RedisClient} to use when making pub/sub connections.
196198
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
@@ -209,8 +211,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> share
209211
/**
210212
* Creates a new {@link LettuceConnection}.
211213
*
212-
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
213-
* Should not be used for transactions or blocking operations.
214+
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
215+
* used for transactions or blocking operations.
214216
* @param connectionProvider connection provider to obtain and release native connections.
215217
* @param timeout The connection timeout (in milliseconds)
216218
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
@@ -225,8 +227,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> share
225227
/**
226228
* Creates a new {@link LettuceConnection}.
227229
*
228-
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s.
229-
* Should not be used for transactions or blocking operations.
230+
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
231+
* used for transactions or blocking operations.
230232
* @param connectionProvider connection provider to obtain and release native connections.
231233
* @param timeout The connection timeout (in milliseconds)
232234
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
@@ -453,24 +455,19 @@ private LettuceInvoker doInvoke(RedisClusterAsyncCommands<byte[], byte[]> connec
453455

454456
<T, R> LettuceResult<T, R> newLettuceResult(Future<T> resultHolder, Converter<T, R> converter) {
455457

456-
return LettuceResultBuilder.<T, R>forResponse(resultHolder)
457-
.mappedWith(converter)
458-
.convertPipelineAndTxResults(this.convertPipelineAndTxResults)
459-
.build();
458+
return LettuceResultBuilder.<T, R> forResponse(resultHolder).mappedWith(converter)
459+
.convertPipelineAndTxResults(this.convertPipelineAndTxResults).build();
460460
}
461461

462462
<T, R> LettuceResult<T, R> newLettuceResult(Future<T> resultHolder, Converter<T, R> converter,
463463
Supplier<R> defaultValue) {
464464

465-
return LettuceResultBuilder.<T, R>forResponse(resultHolder)
466-
.mappedWith(converter)
467-
.convertPipelineAndTxResults(this.convertPipelineAndTxResults)
468-
.defaultNullTo(defaultValue)
469-
.build();
465+
return LettuceResultBuilder.<T, R> forResponse(resultHolder).mappedWith(converter)
466+
.convertPipelineAndTxResults(this.convertPipelineAndTxResults).defaultNullTo(defaultValue).build();
470467
}
471468

472469
<T, R> LettuceResult<T, R> newLettuceStatusResult(Future<T> resultHolder) {
473-
return LettuceResultBuilder.<T, R>forResponse(resultHolder).buildStatusResult();
470+
return LettuceResultBuilder.<T, R> forResponse(resultHolder).buildStatusResult();
474471
}
475472

476473
void pipeline(LettuceResult<?, ?> result) {
@@ -583,7 +580,7 @@ public List<Object> closePipeline() {
583580
pipeliningFlushState = null;
584581
isPipelined = false;
585582

586-
List<io.lettuce.core.protocol.RedisCommand<?, ?, ?>> futures = new ArrayList<>(ppline.size());
583+
List<CompletableFuture<?>> futures = new ArrayList<>(ppline.size());
587584

588585
for (LettuceResult<?, ?> result : ppline) {
589586
futures.add(result.getResultHolder());
@@ -600,10 +597,24 @@ public List<Object> closePipeline() {
600597
if (done) {
601598
for (LettuceResult<?, ?> result : ppline) {
602599

603-
if (result.getResultHolder().getOutput().hasError()) {
600+
CompletableFuture<?> resultHolder = result.getResultHolder();
601+
if (resultHolder.isCompletedExceptionally()) {
602+
603+
String message;
604+
if (resultHolder instanceof io.lettuce.core.protocol.RedisCommand<?, ?, ?> rc) {
605+
message = rc.getOutput().getError();
606+
} else {
607+
try {
608+
resultHolder.get();
609+
message = "";
610+
} catch (InterruptedException ignore) {
611+
message = "";
612+
} catch (ExecutionException e) {
613+
message = e.getCause().getMessage();
614+
}
615+
}
604616

605-
Exception exception = new InvalidDataAccessApiUsageException(result.getResultHolder()
606-
.getOutput().getError());
617+
Exception exception = new InvalidDataAccessApiUsageException(message);
607618

608619
// remember only the first error
609620
if (problem == null) {
@@ -684,8 +695,8 @@ public List<Object> exec() {
684695
LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter(
685696
new LinkedList<>(txResults), exceptionConverter);
686697

687-
pipeline(newLettuceResult(exec, source ->
688-
resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
698+
pipeline(newLettuceResult(exec,
699+
source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
689700

690701
return null;
691702
}
@@ -837,8 +848,7 @@ <T> T failsafeReadScanValues(List<?> source, @SuppressWarnings("rawtypes") @Null
837848

838849
try {
839850
return (T) (converter != null ? converter.convert(source) : source);
840-
} catch (IndexOutOfBoundsException ignore) {
841-
}
851+
} catch (IndexOutOfBoundsException ignore) {}
842852

843853
return null;
844854
}

0 commit comments

Comments
 (0)