From 02dd234054b91bacdd2e055335db6c5bd1a9aeca Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 4 Apr 2024 09:07:04 +0200 Subject: [PATCH 1/3] Prepare issue branch. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 507b76dd15..e98ea6a08d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-redis - 3.3.0-SNAPSHOT + 3.3.0-GH-2888-SNAPSHOT Spring Data Redis Spring Data module for Redis From d220cab94981911a009b8492654edb656944a1aa Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 4 Apr 2024 09:15:26 +0200 Subject: [PATCH 2/3] Accept `CompletableFuture` subtypes for Lettuce pipelining. We now no longer require RedisCommand but resort to CompletableFuture as the general asynchronous result type for Lettuce pipelining to allow subtypes such as PipelinedRedisFuture. --- .../connection/lettuce/LettuceResult.java | 9 ++-- .../connection/ClusterTestVariables.java | 1 + .../LettuceClusterConnectionTests.java | 45 +++++++++++++++---- 3 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceResult.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceResult.java index 86cbd8c93f..4c0a825c85 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceResult.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceResult.java @@ -15,8 +15,7 @@ */ package org.springframework.data.redis.connection.lettuce; -import io.lettuce.core.protocol.RedisCommand; - +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Supplier; @@ -34,7 +33,7 @@ * @since 2.1 */ @SuppressWarnings("rawtypes") -class LettuceResult extends FutureResult> { +class LettuceResult extends FutureResult> { private final boolean convertPipelineAndTxResults; @@ -51,7 +50,7 @@ class LettuceResult extends FutureResult> { LettuceResult(Future resultHolder, Supplier defaultReturnValue, boolean convertPipelineAndTxResults, @Nullable Converter converter) { - super((RedisCommand) resultHolder, converter, defaultReturnValue); + super((CompletableFuture) resultHolder, converter, defaultReturnValue); this.convertPipelineAndTxResults = convertPipelineAndTxResults; } @@ -59,7 +58,7 @@ class LettuceResult extends FutureResult> { @Override @SuppressWarnings("unchecked") public T get() { - return (T) getResultHolder().getOutput().get(); + return (T) getResultHolder().join(); } @Override diff --git a/src/test/java/org/springframework/data/redis/connection/ClusterTestVariables.java b/src/test/java/org/springframework/data/redis/connection/ClusterTestVariables.java index 0ecd097bab..cb33e10de2 100644 --- a/src/test/java/org/springframework/data/redis/connection/ClusterTestVariables.java +++ b/src/test/java/org/springframework/data/redis/connection/ClusterTestVariables.java @@ -25,6 +25,7 @@ public abstract class ClusterTestVariables { public static final String KEY_1 = "key1"; public static final String KEY_2 = "key2"; public static final String KEY_3 = "key3"; + public static final String KEY_4 = "key4"; public static final String VALUE_1 = "value1"; public static final String VALUE_2 = "value2"; diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java index 2fea39cc94..67c9b066ba 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionTests.java @@ -83,6 +83,7 @@ public class LettuceClusterConnectionTests implements ClusterConnectionTests { private static final byte[] KEY_1_BYTES = LettuceConverters.toBytes(KEY_1); private static final byte[] KEY_2_BYTES = LettuceConverters.toBytes(KEY_2); private static final byte[] KEY_3_BYTES = LettuceConverters.toBytes(KEY_3); + private static final byte[] KEY_4_BYTES = LettuceConverters.toBytes(KEY_4); private static final byte[] SAME_SLOT_KEY_1_BYTES = LettuceConverters.toBytes(SAME_SLOT_KEY_1); private static final byte[] SAME_SLOT_KEY_2_BYTES = LettuceConverters.toBytes(SAME_SLOT_KEY_2); @@ -91,6 +92,7 @@ public class LettuceClusterConnectionTests implements ClusterConnectionTests { private static final byte[] VALUE_1_BYTES = LettuceConverters.toBytes(VALUE_1); private static final byte[] VALUE_2_BYTES = LettuceConverters.toBytes(VALUE_2); private static final byte[] VALUE_3_BYTES = LettuceConverters.toBytes(VALUE_3); + private static final byte[] VALUE_4_BYTES = LettuceConverters.toBytes(VALUE_4); private static final GeoLocation ARIGENTO = new GeoLocation<>("arigento", POINT_ARIGENTO); private static final GeoLocation CATANIA = new GeoLocation<>("catania", POINT_CATANIA); @@ -179,7 +181,34 @@ void shouldCreateClusterConnectionWithPooling() { } finally { factory.destroy(); } + } + + @Test // GH-2888 + void shouldPipelineAdvancedClusterApi() { + + LettuceConnectionFactory factory = createConnectionFactory(); + + ConnectionVerifier.create(factory) // + .execute(connection -> { + connection.set(KEY_1_BYTES, VALUE_1_BYTES); + connection.set(KEY_2_BYTES, VALUE_2_BYTES); + connection.set(KEY_4_BYTES, VALUE_4_BYTES); + + connection.openPipeline(); + connection.keyCommands().randomKey(); + connection.stringCommands().mGet(KEY_1_BYTES, KEY_2_BYTES); + + List objects = connection.closePipeline(); + + assertThat(objects).hasSize(2); + assertThat(objects).element(0).isInstanceOf(byte[].class); + assertThat(objects).element(1).isInstanceOf(List.class); + + List mget = (List) objects.get(1); + assertThat(mget).containsExactly(VALUE_1_BYTES, VALUE_2_BYTES); + + }).verifyAndClose(); } @Test // DATAREDIS-315 @@ -2821,13 +2850,13 @@ void bitFieldIncrByWithOverflowShouldWorkCorrectly() { assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1), create().incr(unsigned(2)).valueAt(BitFieldSubCommands.Offset.offset(102L)).overflow(FAIL).by(1L))) - .containsExactly(1L); + .containsExactly(1L); assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1), create().incr(unsigned(2)).valueAt(BitFieldSubCommands.Offset.offset(102L)).overflow(FAIL).by(1L))) - .containsExactly(2L); + .containsExactly(2L); assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1), create().incr(unsigned(2)).valueAt(BitFieldSubCommands.Offset.offset(102L)).overflow(FAIL).by(1L))) - .containsExactly(3L); + .containsExactly(3L); assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1), create().incr(unsigned(2)).valueAt(BitFieldSubCommands.Offset.offset(102L)).overflow(FAIL).by(1L))).isNotNull(); } @@ -2837,7 +2866,7 @@ void bitfieldShouldAllowMultipleSubcommands() { assertThat(clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1), create().incr(signed(5)).valueAt(BitFieldSubCommands.Offset.offset(100L)).by(1L).get(unsigned(4)).valueAt(0L))) - .containsExactly(1L, 0L); + .containsExactly(1L, 0L); } @Test // DATAREDIS-562 @@ -2847,13 +2876,13 @@ void bitfieldShouldWorkUsingNonZeroBasedOffset() { clusterConnection.stringCommands().bitField(LettuceConverters.toBytes(KEY_1), create().set(INT_8).valueAt(BitFieldSubCommands.Offset.offset(0L).multipliedByTypeLength()).to(100L) .set(INT_8).valueAt(BitFieldSubCommands.Offset.offset(1L).multipliedByTypeLength()).to(200L))) - .containsExactly(0L, 0L); + .containsExactly(0L, 0L); assertThat( clusterConnection.stringCommands() .bitField(LettuceConverters.toBytes(KEY_1), create().get(INT_8).valueAt(BitFieldSubCommands.Offset.offset(0L).multipliedByTypeLength()).get(INT_8) - .valueAt(BitFieldSubCommands.Offset.offset(1L).multipliedByTypeLength()))).containsExactly(100L, - -56L); + .valueAt(BitFieldSubCommands.Offset.offset(1L).multipliedByTypeLength()))) + .containsExactly(100L, -56L); } @Test // DATAREDIS-1103 @@ -2864,7 +2893,7 @@ void setKeepTTL() { assertThat( clusterConnection.stringCommands().set(KEY_1_BYTES, VALUE_2_BYTES, Expiration.keepTtl(), SetOption.upsert())) - .isTrue(); + .isTrue(); assertThat(nativeConnection.ttl(KEY_1)).isCloseTo(expireSeconds, Offset.offset(5L)); assertThat(nativeConnection.get(KEY_1)).isEqualTo(VALUE_2); From 0df9a90294037202daf6e011b9f22d1b3f62a559 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 4 Apr 2024 09:23:23 +0200 Subject: [PATCH 3/3] Use Lettuce functionality for Cluster commands where possible. We now remove our own code in favor of Lettuce's advanced cluster support to leverage asynchronous functionality in pipelining. Document pipelining restrictions regarding Redis Cluster. --- .../modules/ROOT/pages/redis/cluster.adoc | 3 + .../modules/ROOT/pages/redis/pipelining.adoc | 8 ++- .../lettuce/LettuceClusterKeyCommands.java | 44 ------------- .../lettuce/LettuceClusterServerCommands.java | 38 ------------ .../lettuce/LettuceClusterStringCommands.java | 22 ------- .../connection/lettuce/LettuceConnection.java | 62 +++++++++++-------- .../LettuceClusterConnectionUnitTests.java | 50 +-------------- .../core/RedisTemplateIntegrationTests.java | 23 ++++--- 8 files changed, 61 insertions(+), 189 deletions(-) diff --git a/src/main/antora/modules/ROOT/pages/redis/cluster.adoc b/src/main/antora/modules/ROOT/pages/redis/cluster.adoc index 0ad017e144..6b35ae5d3f 100644 --- a/src/main/antora/modules/ROOT/pages/redis/cluster.adoc +++ b/src/main/antora/modules/ROOT/pages/redis/cluster.adoc @@ -129,3 +129,6 @@ clusterOps.shutdown(NODE_7379); <1> <1> Shut down node at 7379 and cross fingers there is a replica in place that can take over. ==== + +NOTE: Redis Cluster pipelining is currently only supported throug the Lettuce driver except for the following commands when using cross-slot keys: `rename`, `renameNX`, `sort`, `bLPop`, `bRPop`, `rPopLPush`, `bRPopLPush`, `info`, `sMove`, `sInter`, `sInterStore`, `sUnion`, `sUnionStore`, `sDiff`, `sDiffStore`. +Same-slot keys are fully supported. diff --git a/src/main/antora/modules/ROOT/pages/redis/pipelining.adoc b/src/main/antora/modules/ROOT/pages/redis/pipelining.adoc index 8a4d70665a..7e5ddd0541 100644 --- a/src/main/antora/modules/ROOT/pages/redis/pipelining.adoc +++ b/src/main/antora/modules/ROOT/pages/redis/pipelining.adoc @@ -20,7 +20,9 @@ List results = stringRedisTemplate.executePipelined( }); ---- -The preceding example runs a bulk right pop of items from a queue in a pipeline. The `results` `List` contains all of the popped items. `RedisTemplate` uses its value, hash key, and hash value serializers to deserialize all results before returning, so the returned items in the preceding example are Strings. There are additional `executePipelined` methods that let you pass a custom serializer for pipelined results. +The preceding example runs a bulk right pop of items from a queue in a pipeline. +The `results` `List` contains all the popped items. `RedisTemplate` uses its value, hash key, and hash value serializers to deserialize all results before returning, so the returned items in the preceding example are Strings. +There are additional `executePipelined` methods that let you pass a custom serializer for pipelined results. Note that the value returned from the `RedisCallback` is required to be `null`, as this value is discarded in favor of returning the results of the pipelined commands. @@ -35,3 +37,7 @@ factory.setPipeliningFlushPolicy(PipeliningFlushPolicy.buffered(3)); <1> ---- <1> Buffer locally and flush after every 3rd command. ==== + +NOTE: Pipelining is limited to Redis Standalone. +Redis Cluster is currently only supported through the Lettuce driver except for the following commands when using cross-slot keys: `rename`, `renameNX`, `sort`, `bLPop`, `bRPop`, `rPopLPush`, `bRPopLPush`, `info`, `sMove`, `sInter`, `sInterStore`, `sUnion`, `sUnionStore`, `sDiff`, `sDiffStore`. +Same-slot keys are fully supported. diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyCommands.java index dc4fbc54da..ae3d3b2e49 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterKeyCommands.java @@ -18,11 +18,8 @@ import io.lettuce.core.KeyScanCursor; import io.lettuce.core.ScanArgs; -import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.ThreadLocalRandom; import org.springframework.dao.InvalidDataAccessApiUsageException; import org.springframework.data.redis.connection.ClusterSlotHashUtil; @@ -50,47 +47,6 @@ class LettuceClusterKeyCommands extends LettuceKeyCommands { this.connection = connection; } - @Override - public byte[] randomKey() { - - List nodes = connection.clusterGetNodes(); - Set inspectedNodes = new HashSet<>(nodes.size()); - - do { - - RedisClusterNode node = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); - - while (inspectedNodes.contains(node)) { - node = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); - } - inspectedNodes.add(node); - byte[] key = randomKey(node); - - if (key != null && key.length > 0) { - return key; - } - } while (nodes.size() != inspectedNodes.size()); - - return null; - } - - @Override - public Set keys(byte[] pattern) { - - Assert.notNull(pattern, "Pattern must not be null"); - - Collection> keysPerNode = connection.getClusterCommandExecutor() - .executeCommandOnAllNodes((LettuceClusterCommandCallback>) connection -> connection.keys(pattern)) - .resultsAsList(); - - Set keys = new HashSet<>(); - - for (List keySet : keysPerNode) { - keys.addAll(keySet); - } - return keys; - } - @Override public void rename(byte[] oldKey, byte[] newKey) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterServerCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterServerCommands.java index bab772c478..6d2e66cda0 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterServerCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterServerCommands.java @@ -18,7 +18,6 @@ import io.lettuce.core.api.sync.RedisServerCommands; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -34,7 +33,6 @@ import org.springframework.data.redis.connection.lettuce.LettuceClusterConnection.LettuceClusterCommandCallback; import org.springframework.data.redis.core.types.RedisClientInfo; import org.springframework.util.Assert; -import org.springframework.util.CollectionUtils; /** * @author Mark Paluch @@ -71,37 +69,11 @@ public void save(RedisClusterNode node) { executeCommandOnSingleNode(RedisServerCommands::save, node); } - @Override - public Long dbSize() { - - Collection dbSizes = executeCommandOnAllNodes(RedisServerCommands::dbsize).resultsAsList(); - - if (CollectionUtils.isEmpty(dbSizes)) { - return 0L; - } - - Long size = 0L; - for (Long value : dbSizes) { - size += value; - } - return size; - } - @Override public Long dbSize(RedisClusterNode node) { return executeCommandOnSingleNode(RedisServerCommands::dbsize, node).getValue(); } - @Override - public void flushDb() { - executeCommandOnAllNodes(RedisServerCommands::flushdb); - } - - @Override - public void flushDb(FlushOption option) { - executeCommandOnAllNodes(it -> it.flushdb(LettuceConverters.toFlushMode(option))); - } - @Override public void flushDb(RedisClusterNode node) { executeCommandOnSingleNode(RedisServerCommands::flushdb, node); @@ -112,16 +84,6 @@ public void flushDb(RedisClusterNode node, FlushOption option) { executeCommandOnSingleNode(it -> it.flushdb(LettuceConverters.toFlushMode(option)), node); } - @Override - public void flushAll() { - executeCommandOnAllNodes(RedisServerCommands::flushall); - } - - @Override - public void flushAll(FlushOption option) { - executeCommandOnAllNodes(it -> it.flushall(LettuceConverters.toFlushMode(option))); - } - @Override public void flushAll(RedisClusterNode node) { executeCommandOnSingleNode(RedisServerCommands::flushall, node); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterStringCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterStringCommands.java index c1be6421a5..4b6ae60f04 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterStringCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterStringCommands.java @@ -15,11 +15,6 @@ */ package org.springframework.data.redis.connection.lettuce; -import java.util.Map; - -import org.springframework.data.redis.connection.ClusterSlotHashUtil; -import org.springframework.util.Assert; - /** * @author Christoph Strobl * @author Mark Paluch @@ -31,21 +26,4 @@ class LettuceClusterStringCommands extends LettuceStringCommands { super(connection); } - @Override - public Boolean mSetNX(Map tuples) { - - Assert.notNull(tuples, "Tuples must not be null"); - - if (ClusterSlotHashUtil.isSameSlotForAllKeys(tuples.keySet().toArray(new byte[tuples.keySet().size()][]))) { - return super.mSetNX(tuples); - } - - boolean result = true; - for (Map.Entry entry : tuples.entrySet()) { - if (!setNX(entry.getKey(), entry.getValue()) && result) { - result = false; - } - } - return result; - } } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java index 93f78151b3..c3ecbde730 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java @@ -49,7 +49,9 @@ import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -102,8 +104,8 @@ */ public class LettuceConnection extends AbstractRedisConnection { - private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = - new FallbackExceptionTranslationStrategy(LettuceExceptionConverter.INSTANCE); + private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new FallbackExceptionTranslationStrategy( + LettuceExceptionConverter.INSTANCE); static final RedisCodec CODEC = ByteArrayCodec.INSTANCE; @@ -189,8 +191,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection share /** * Creates a new {@link LettuceConnection}. * - * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. - * Should not be used for transactions or blocking operations. + * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be + * used for transactions or blocking operations. * @param timeout The connection timeout (in milliseconds) * @param client The {@link RedisClient} to use when making pub/sub connections. * @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection. @@ -209,8 +211,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection share /** * Creates a new {@link LettuceConnection}. * - * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. - * Should not be used for transactions or blocking operations. + * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be + * used for transactions or blocking operations. * @param connectionProvider connection provider to obtain and release native connections. * @param timeout The connection timeout (in milliseconds) * @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection. @@ -225,8 +227,8 @@ public LettuceConnection(@Nullable StatefulRedisConnection share /** * Creates a new {@link LettuceConnection}. * - * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. - * Should not be used for transactions or blocking operations. + * @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be + * used for transactions or blocking operations. * @param connectionProvider connection provider to obtain and release native connections. * @param timeout The connection timeout (in milliseconds) * @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection. @@ -453,24 +455,19 @@ private LettuceInvoker doInvoke(RedisClusterAsyncCommands connec LettuceResult newLettuceResult(Future resultHolder, Converter converter) { - return LettuceResultBuilder.forResponse(resultHolder) - .mappedWith(converter) - .convertPipelineAndTxResults(this.convertPipelineAndTxResults) - .build(); + return LettuceResultBuilder. forResponse(resultHolder).mappedWith(converter) + .convertPipelineAndTxResults(this.convertPipelineAndTxResults).build(); } LettuceResult newLettuceResult(Future resultHolder, Converter converter, Supplier defaultValue) { - return LettuceResultBuilder.forResponse(resultHolder) - .mappedWith(converter) - .convertPipelineAndTxResults(this.convertPipelineAndTxResults) - .defaultNullTo(defaultValue) - .build(); + return LettuceResultBuilder. forResponse(resultHolder).mappedWith(converter) + .convertPipelineAndTxResults(this.convertPipelineAndTxResults).defaultNullTo(defaultValue).build(); } LettuceResult newLettuceStatusResult(Future resultHolder) { - return LettuceResultBuilder.forResponse(resultHolder).buildStatusResult(); + return LettuceResultBuilder. forResponse(resultHolder).buildStatusResult(); } void pipeline(LettuceResult result) { @@ -583,7 +580,7 @@ public List closePipeline() { pipeliningFlushState = null; isPipelined = false; - List> futures = new ArrayList<>(ppline.size()); + List> futures = new ArrayList<>(ppline.size()); for (LettuceResult result : ppline) { futures.add(result.getResultHolder()); @@ -600,10 +597,24 @@ public List closePipeline() { if (done) { for (LettuceResult result : ppline) { - if (result.getResultHolder().getOutput().hasError()) { + CompletableFuture resultHolder = result.getResultHolder(); + if (resultHolder.isCompletedExceptionally()) { + + String message; + if (resultHolder instanceof io.lettuce.core.protocol.RedisCommand rc) { + message = rc.getOutput().getError(); + } else { + try { + resultHolder.get(); + message = ""; + } catch (InterruptedException ignore) { + message = ""; + } catch (ExecutionException e) { + message = e.getCause().getMessage(); + } + } - Exception exception = new InvalidDataAccessApiUsageException(result.getResultHolder() - .getOutput().getError()); + Exception exception = new InvalidDataAccessApiUsageException(message); // remember only the first error if (problem == null) { @@ -684,8 +695,8 @@ public List exec() { LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter( new LinkedList<>(txResults), exceptionConverter); - pipeline(newLettuceResult(exec, source -> - resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source)))); + pipeline(newLettuceResult(exec, + source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source)))); return null; } @@ -837,8 +848,7 @@ T failsafeReadScanValues(List source, @SuppressWarnings("rawtypes") @Null try { return (T) (converter != null ? converter.convert(source) : source); - } catch (IndexOutOfBoundsException ignore) { - } + } catch (IndexOutOfBoundsException ignore) {} return null; } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java index 8f1803d17d..25fd46d783 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnectionUnitTests.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.*; import static org.mockito.Mockito.*; import static org.springframework.data.redis.connection.ClusterTestVariables.*; -import static org.springframework.data.redis.test.util.MockitoUtils.*; import io.lettuce.core.RedisFuture; import io.lettuce.core.RedisURI; @@ -46,6 +45,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; + import org.springframework.data.redis.connection.ClusterCommandExecutor; import org.springframework.data.redis.connection.ClusterNodeResourceProvider; import org.springframework.data.redis.connection.ClusterTopologyProvider; @@ -193,22 +193,6 @@ void isClosedShouldReturnConnectionStateCorrectly() { assertThat(connection.isClosed()).isTrue(); } - @Test // DATAREDIS-315 - void keysShouldBeRunOnAllClusterNodes() { - - when(clusterConnection1Mock.keys(any(byte[].class))).thenReturn(Collections. emptyList()); - when(clusterConnection2Mock.keys(any(byte[].class))).thenReturn(Collections. emptyList()); - when(clusterConnection3Mock.keys(any(byte[].class))).thenReturn(Collections. emptyList()); - - byte[] pattern = LettuceConverters.toBytes("*"); - - connection.keys(pattern); - - verify(clusterConnection1Mock, times(1)).keys(pattern); - verify(clusterConnection2Mock, times(1)).keys(pattern); - verify(clusterConnection3Mock, times(1)).keys(pattern); - } - @Test // DATAREDIS-315 void keysShouldOnlyBeRunOnDedicatedNodeWhenPinned() { @@ -223,38 +207,6 @@ void keysShouldOnlyBeRunOnDedicatedNodeWhenPinned() { verify(clusterConnection3Mock, never()).keys(pattern); } - @Test // DATAREDIS-315 - void randomKeyShouldReturnAnyKeyFromRandomNode() { - - when(clusterConnection1Mock.randomkey()).thenReturn(KEY_1_BYTES); - when(clusterConnection2Mock.randomkey()).thenReturn(KEY_2_BYTES); - when(clusterConnection3Mock.randomkey()).thenReturn(KEY_3_BYTES); - - assertThat(connection.randomKey()).isIn(KEY_1_BYTES, KEY_2_BYTES, KEY_3_BYTES); - verifyInvocationsAcross("randomkey", times(1), clusterConnection1Mock, clusterConnection2Mock, - clusterConnection3Mock); - } - - @Test // DATAREDIS-315 - void randomKeyShouldReturnKeyWhenAvailableOnAnyNode() { - - when(clusterConnection3Mock.randomkey()).thenReturn(KEY_3_BYTES); - - for (int i = 0; i < 100; i++) { - assertThat(connection.randomKey()).isEqualTo(KEY_3_BYTES); - } - } - - @Test // DATAREDIS-315 - void randomKeyShouldReturnNullWhenNoKeysPresentOnAllNodes() { - - when(clusterConnection1Mock.randomkey()).thenReturn(null); - when(clusterConnection2Mock.randomkey()).thenReturn(null); - when(clusterConnection3Mock.randomkey()).thenReturn(null); - - assertThat(connection.randomKey()).isNull(); - } - @Test // DATAREDIS-315 void clusterSetSlotImportingShouldBeExecutedCorrectly() { diff --git a/src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java index 93fc2fff45..bbc18c6f0e 100644 --- a/src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/core/RedisTemplateIntegrationTests.java @@ -361,8 +361,7 @@ public Object execute(RedisOperations operations) throws DataAccessException { try { // Await EXEC completion as it's executed on a dedicated connection. Thread.sleep(100); - } catch (InterruptedException ignore) { - } + } catch (InterruptedException ignore) {} operations.opsForValue().set(key1, value1); operations.opsForValue().get(key1); @@ -673,7 +672,16 @@ void testRandomKey() { K key1 = keyFactory.instance(); V value1 = valueFactory.instance(); redisTemplate.opsForValue().set(key1, value1); - assertThat(redisTemplate.randomKey()).isEqualTo(key1); + + for (int i = 0; i < 20; i++) { + + K k = redisTemplate.randomKey(); + if (k == null) { + continue; + } + + assertThat(k).isEqualTo(key1); + } } @ParameterizedRedisTest @@ -723,8 +731,7 @@ public List execute(RedisOperations operations) throws DataAccessExcepti th.start(); try { th.join(); - } catch (InterruptedException ignore) { - } + } catch (InterruptedException ignore) {} operations.multi(); operations.opsForValue().set(key1, value3); @@ -756,8 +763,7 @@ public List execute(RedisOperations operations) throws DataAccessExcepti th.start(); try { th.join(); - } catch (InterruptedException ignore) { - } + } catch (InterruptedException ignore) {} operations.unwatch(); operations.multi(); @@ -794,8 +800,7 @@ public List execute(RedisOperations operations) throws DataAccessExcepti th.start(); try { th.join(); - } catch (InterruptedException ignore) { - } + } catch (InterruptedException ignore) {} operations.multi(); operations.opsForValue().set(key1, value3);