From cedf6998c430b9fb10ede33fc006323f46ed17ed Mon Sep 17 00:00:00 2001 From: dengliming Date: Thu, 18 Feb 2021 18:30:21 +0800 Subject: [PATCH 1/4] Add support to Redis Streams using Jedis Client --- .../connection/jedis/JedisConnection.java | 2 +- .../connection/jedis/JedisStreamCommands.java | 496 ++++++++++++++++++ .../connection/jedis/StreamConverters.java | 126 +++++ ...JedisStreamOperationsIntegrationTests.java | 216 ++++++++ 4 files changed, 839 insertions(+), 1 deletion(-) create mode 100644 src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java create mode 100644 src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java create mode 100644 src/test/java/org/springframework/data/redis/core/JedisStreamOperationsIntegrationTests.java diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index 3f0b6baddc..320f7f8338 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -189,7 +189,7 @@ public RedisKeyCommands keyCommands() { */ @Override public RedisStreamCommands streamCommands() { - throw new UnsupportedOperationException("Streams not supported using Jedis!"); + return new JedisStreamCommands(this); } /* diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java new file mode 100644 index 0000000000..4e83d5b112 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java @@ -0,0 +1,496 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.jedis; + +import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.RedisStreamCommands; +import org.springframework.data.redis.connection.RedisZSetCommands; +import org.springframework.data.redis.connection.lettuce.LettuceConverters; +import org.springframework.data.redis.connection.stream.ByteRecord; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.PendingMessages; +import org.springframework.data.redis.connection.stream.PendingMessagesSummary; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.connection.stream.StreamInfo; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.connection.stream.StreamReadOptions; +import org.springframework.util.Assert; +import redis.clients.jedis.BinaryJedis; +import redis.clients.jedis.MultiKeyPipelineBase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.springframework.data.redis.connection.jedis.StreamConverters.convertToByteRecord; + +/** + * @author Dengliming + * @since 2.3 + */ +class JedisStreamCommands implements RedisStreamCommands { + + private final JedisConnection connection; + + JedisStreamCommands(JedisConnection connection) { + this.connection = connection; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xAck(byte[], String, org.springframework.data.redis.connection.stream.RecordId[]) + */ + @Override + public Long xAck(byte[] key, String group, RecordId... recordIds) { + Assert.notNull(key, "Key must not be null!"); + Assert.hasText(group, "Group name must not be null or empty!"); + Assert.notNull(recordIds, "recordIds must not be null!"); + + return connection.invoke().just(BinaryJedis::xack, MultiKeyPipelineBase::xack, key, JedisConverters.toBytes(group), + entryIdsToBytes(recordIds)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xAdd(MapRecord, XAddOptions) + */ + @Override + public RecordId xAdd(MapRecord record, XAddOptions options) { + Assert.notNull(record, "Record must not be null!"); + Assert.notNull(record.getStream(), "Stream must not be null!"); + + byte[] id = JedisConverters.toBytes(record.getId().getValue()); + Long maxLength = Long.MAX_VALUE; + if (options.hasMaxlen()) { + maxLength = options.getMaxlen(); + } + + return connection.invoke().from(BinaryJedis::xadd, MultiKeyPipelineBase::xadd, record.getStream(), id, + record.getValue(), maxLength, false).get(it -> RecordId.of(JedisConverters.toString(it))); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xClaimJustId(byte[], java.lang.String, java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions) + */ + @Override + public List xClaimJustId(byte[] key, String group, String newOwner, XClaimOptions options) { + throw new UnsupportedOperationException("Jedis does not support xClaimJustId."); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xClaim(byte[], java.lang.String, java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions) + */ + @Override + public List xClaim(byte[] key, String group, String newOwner, XClaimOptions options) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(group, "Group must not be null!"); + Assert.notNull(newOwner, "NewOwner must not be null!"); + + long minIdleTime = -1L; + if (options.getMinIdleTime() != null) { + minIdleTime = options.getMinIdleTime().toMillis(); + } + int retryCount = -1; + if (options.getRetryCount() != null) { + retryCount = options.getRetryCount().intValue(); + } + long unixTime = -1L; + if (options.getUnixTime() != null) { + unixTime = options.getUnixTime().toEpochMilli(); + } + try { + if (isPipelined()) { + pipeline(connection.newJedisResult(connection.getRequiredPipeline().xclaim(key, JedisConverters.toBytes(group), + JedisConverters.toBytes(newOwner), minIdleTime, unixTime, retryCount, options.isForce(), + entryIdsToBytes(options.getIds())))); + return null; + } + if (isQueueing()) { + transaction(connection.newJedisResult(connection.getRequiredTransaction().xclaim(key, + JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), minIdleTime, unixTime, retryCount, + options.isForce(), entryIdsToBytes(options.getIds())))); + return null; + } + + return convertToByteRecord(key, + connection.getJedis().xclaim(key, JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), + minIdleTime, unixTime, retryCount, options.isForce(), entryIdsToBytes(options.getIds()))); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xDel(byte[], java.lang.String[]) + */ + @Override + public Long xDel(byte[] key, RecordId... recordIds) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(recordIds, "recordIds must not be null!"); + + return connection.invoke().just(BinaryJedis::xdel, MultiKeyPipelineBase::xdel, key, entryIdsToBytes(recordIds)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupCreate(byte[], org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String) + */ + @Override + public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset) { + return xGroupCreate(key, groupName, readOffset, false); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupCreate(byte[], org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String, boolean) + */ + @Override + public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, boolean mkStream) { + Assert.notNull(key, "Key must not be null!"); + Assert.hasText(groupName, "Group name must not be null or empty!"); + Assert.notNull(readOffset, "ReadOffset must not be null!"); + + return connection.invoke().just(BinaryJedis::xgroupCreate, MultiKeyPipelineBase::xgroupCreate, key, + JedisConverters.toBytes(groupName), JedisConverters.toBytes(readOffset.getOffset()), mkStream); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupDelConsumer(byte[], org.springframework.data.redis.connection.RedisStreamCommands.Consumer) + */ + @Override + public Boolean xGroupDelConsumer(byte[] key, Consumer consumer) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(consumer, "Consumer must not be null!"); + + return connection.invoke().from(BinaryJedis::xgroupDelConsumer, MultiKeyPipelineBase::xgroupDelConsumer, key, + JedisConverters.toBytes(consumer.getGroup()), JedisConverters.toBytes(consumer.getName())).get(r -> r > 0); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupDestroy(byte[], java.lang.String) + */ + @Override + public Boolean xGroupDestroy(byte[] key, String groupName) { + Assert.notNull(key, "Key must not be null!"); + Assert.hasText(groupName, "Group name must not be null or empty!"); + + return connection.invoke() + .from(BinaryJedis::xgroupDestroy, MultiKeyPipelineBase::xgroupDestroy, key, JedisConverters.toBytes(groupName)) + .get(r -> r > 0); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xInfo(byte[]) + */ + @Override + public StreamInfo.XInfoStream xInfo(byte[] key) { + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().from(BinaryJedis::xinfoStream, (k, r) -> { + throw new UnsupportedOperationException("'XINFO' cannot be called in pipeline / transaction mode."); + }, key).get(streamInfo -> StreamInfo.XInfoStream.fromList(mapToList(streamInfo.getStreamInfo()))); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xInfoGroups(byte[]) + */ + @Override + public StreamInfo.XInfoGroups xInfoGroups(byte[] key) { + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().from(BinaryJedis::xinfoGroup, (k, r) -> { + throw new UnsupportedOperationException("'XINFO GROUPS' cannot be called in pipeline / transaction mode."); + }, key).get(streamGroupInfos -> { + List sources = new ArrayList<>(); + streamGroupInfos.forEach(streamGroupInfo -> sources.add(mapToList(streamGroupInfo.getGroupInfo()))); + return StreamInfo.XInfoGroups.fromList(sources); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xInfoConsumers(byte[], java.lang.String) + */ + @Override + public StreamInfo.XInfoConsumers xInfoConsumers(byte[] key, String groupName) { + Assert.notNull(key, "Key must not be null!"); + Assert.hasText(groupName, "Group name must not be null or empty!"); + + return connection.invoke().from(BinaryJedis::xinfoConsumers, (k, v, r) -> { + throw new UnsupportedOperationException("'XINFO CONSUMERS' cannot be called in pipeline / transaction mode."); + }, key, JedisConverters.toBytes(groupName)).get(streamConsumersInfos -> { + List sources = new ArrayList<>(); + streamConsumersInfos + .forEach(streamConsumersInfo -> sources.add(mapToList(streamConsumersInfo.getConsumerInfo()))); + return StreamInfo.XInfoConsumers.fromList(groupName, sources); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xLen(byte[]) + */ + @Override + public Long xLen(byte[] key) { + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().just(BinaryJedis::xlen, MultiKeyPipelineBase::xlen, key); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xPending(byte[], java.lang.String) + */ + @Override + public PendingMessagesSummary xPending(byte[] key, String groupName) { + throw new UnsupportedOperationException("Jedis does not support returning PendingMessagesSummary."); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xPending(byte[], java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions) + */ + @Override + public PendingMessages xPending(byte[] key, String groupName, XPendingOptions options) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(groupName, "GroupName must not be null!"); + + Range range = (Range) options.getRange(); + byte[] group = LettuceConverters.toBytes(groupName); + try { + if (isPipelined()) { + pipeline(connection.newJedisResult(connection.getRequiredPipeline().xpending(key, group, + JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)), + options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName())))); + return null; + } + if (isQueueing()) { + transaction(connection.newJedisResult(connection.getRequiredTransaction().xpending(key, group, + JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)), + options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName())))); + return null; + } + + return StreamConverters.toPendingMessages(groupName, range, + connection.getJedis().xpending(key, group, JedisConverters.toBytes(getLowerValue(range)), + JedisConverters.toBytes(getUpperValue(range)), options.getCount().intValue(), + JedisConverters.toBytes(options.getConsumerName()))); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xRange(byte[], org.springframework.data.domain.Range, org.springframework.data.redis.connection.RedisZSetCommands.Limit) + */ + @Override + public List xRange(byte[] key, Range range, RedisZSetCommands.Limit limit) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(range, "Range must not be null!"); + Assert.notNull(limit, "Limit must not be null!"); + + int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); + try { + if (isPipelined()) { + pipeline(connection.newJedisResult(connection.getRequiredPipeline().xrange(key, + JedisConverters.toBytes(range.getLowerBound().getValue().get()), + JedisConverters.toBytes(range.getUpperBound().getValue().get()), count))); + return null; + } + if (isQueueing()) { + transaction(connection.newJedisResult(connection.getRequiredTransaction().xrange(key, + JedisConverters.toBytes(range.getLowerBound().getValue().get()), + JedisConverters.toBytes(range.getUpperBound().getValue().get()), count))); + return null; + } + + return convertToByteRecord(key, connection.getJedis().xrange(key, JedisConverters.toBytes(getLowerValue(range)), + JedisConverters.toBytes(getUpperValue(range)), count)); + } catch (Exception ex) { + throw convertJedisAccessException(ex); + } + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xRead(org.springframework.data.redis.connection.RedisStreamCommands.StreamReadOptions, org.springframework.data.redis.connection.RedisStreamCommands.StreamOffset[]) + */ + @Override + public List xRead(StreamReadOptions readOptions, StreamOffset... streams) { + Assert.notNull(readOptions, "StreamReadOptions must not be null!"); + Assert.notNull(streams, "StreamOffsets must not be null!"); + + Long block = readOptions.getBlock(); + if (block == null) { + block = -1L; + } + int count = Integer.MAX_VALUE; + if (readOptions.getCount() != null) { + count = readOptions.getCount().intValue(); + } + return connection.invoke().from(BinaryJedis::xread, (t1, t2, t3, r) -> { + throw new UnsupportedOperationException("Operation not supported in pipelining/transaction mode"); + }, count, block, toStreamOffsets(streams)).get(streamsEntries -> convertToByteRecord(streamsEntries)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xReadGroup(org.springframework.data.redis.connection.RedisStreamCommands.Consumer, org.springframework.data.redis.connection.RedisStreamCommands.StreamReadOptions, org.springframework.data.redis.connection.RedisStreamCommands.StreamOffset[]) + */ + @Override + public List xReadGroup(Consumer consumer, StreamReadOptions readOptions, + StreamOffset... streams) { + Assert.notNull(consumer, "Consumer must not be null!"); + Assert.notNull(readOptions, "StreamReadOptions must not be null!"); + Assert.notNull(streams, "StreamOffsets must not be null!"); + + long block = -1L; + if (readOptions.getBlock() != null) { + block = readOptions.getBlock(); + } + int count = -1; + if (readOptions.getCount() != null) { + count = readOptions.getCount().intValue(); + } + return connection.invoke().from(BinaryJedis::xreadGroup, (t1, t2, t3, t4, t5, t6, r) -> { + throw new UnsupportedOperationException("Operation not supported in pipelining/transaction mode"); + }, JedisConverters.toBytes(consumer.getGroup()), JedisConverters.toBytes(consumer.getName()), count, block, + readOptions.isNoack(), toStreamOffsets(streams)).get(streamsEntries -> convertToByteRecord(streamsEntries)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xRevRange(byte[], org.springframework.data.domain.Range, org.springframework.data.redis.connection.RedisZSetCommands.Limit) + */ + @Override + public List xRevRange(byte[] key, Range range, RedisZSetCommands.Limit limit) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(range, "Range must not be null!"); + Assert.notNull(limit, "Limit must not be null!"); + + int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); + return connection.invoke() + .from(BinaryJedis::xrevrange, MultiKeyPipelineBase::xrevrange, key, + JedisConverters.toBytes(getUpperValue(range)), JedisConverters.toBytes(getLowerValue(range)), count) + .get(it -> convertToByteRecord(key, it)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xTrim(byte[], long) + */ + @Override + public Long xTrim(byte[] key, long count) { + return xTrim(key, count, false); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xTrim(byte[], long, boolean) + */ + @Override + public Long xTrim(byte[] key, long count, boolean approximateTrimming) { + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().just(BinaryJedis::xtrim, MultiKeyPipelineBase::xtrim, key, count, approximateTrimming); + } + + private boolean isPipelined() { + return connection.isPipelined(); + } + + private void pipeline(JedisResult result) { + connection.pipeline(result); + } + + private boolean isQueueing() { + return connection.isQueueing(); + } + + private void transaction(JedisResult result) { + connection.transaction(result); + } + + private RuntimeException convertJedisAccessException(Exception ex) { + return connection.convertJedisAccessException(ex); + } + + private byte[][] entryIdsToBytes(RecordId[] recordIds) { + + final byte[][] bids = new byte[recordIds.length][]; + for (int i = 0; i < recordIds.length; ++i) { + RecordId id = recordIds[i]; + bids[i] = JedisConverters.toBytes(id.getValue()); + } + + return bids; + } + + private byte[][] entryIdsToBytes(List recordIds) { + + final byte[][] bids = new byte[recordIds.size()][]; + for (int i = 0; i < recordIds.size(); ++i) { + RecordId id = recordIds.get(i); + bids[i] = JedisConverters.toBytes(id.getValue()); + } + + return bids; + } + + private String getLowerValue(Range range) { + + if (range.getLowerBound().equals(Range.Bound.unbounded())) { + return "-"; + } + + return range.getLowerBound().getValue().orElse("-"); + } + + private String getUpperValue(Range range) { + + if (range.getUpperBound().equals(Range.Bound.unbounded())) { + return "+"; + } + + return range.getUpperBound().getValue().orElse("+"); + } + + private List mapToList(Map map) { + List sources = new ArrayList<>(map.size() * 2); + map.forEach((k, v) -> { + sources.add(k); + sources.add(v); + }); + return sources; + } + + private Map toStreamOffsets(StreamOffset... streams) { + return Arrays.stream(streams) + .collect(Collectors.toMap(k -> k.getKey(), v -> JedisConverters.toBytes(v.getOffset().getOffset()))); + } +} diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java new file mode 100644 index 0000000000..7dfca0d39e --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -0,0 +1,126 @@ +/* + * Copyright 2018-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.jedis; + +import org.springframework.data.redis.connection.stream.ByteRecord; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.PendingMessage; +import org.springframework.data.redis.connection.stream.PendingMessages; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.connection.stream.StreamRecords; +import redis.clients.jedis.BuilderFactory; +import redis.clients.jedis.util.SafeEncoder; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * Converters for Redis Stream-specific types. + *

+ * Converters typically convert between value objects/argument objects retaining the actual types of values (i.e. no + * serialization/deserialization happens here). + * + * @author dengliming + * @since 2.3 + */ +@SuppressWarnings({ "rawtypes" }) +class StreamConverters { + + private static final BiFunction PENDING_MESSAGES_CONVERTER = ( + source, groupName) -> { + + if (null == source) { + return null; + } + + List streamsEntries = (List) source; + List messages = new ArrayList<>(streamsEntries.size()); + for (Object streamObj : streamsEntries) { + List stream = (List) streamObj; + String id = SafeEncoder.encode((byte[]) stream.get(0)); + String consumerName = SafeEncoder.encode((byte[]) stream.get(1)); + long idleTime = BuilderFactory.LONG.build(stream.get(2)); + long deliveredTimes = BuilderFactory.LONG.build(stream.get(3)); + messages.add(new PendingMessage(RecordId.of(id), Consumer.from(groupName, consumerName), + Duration.ofMillis(idleTime), deliveredTimes)); + } + return new PendingMessages(groupName, messages); + }; + + private static final BiFunction> BYTE_RECORD_CONVERTER = (source, key) -> { + if (null == source) { + return Collections.emptyList(); + } + List> objectList = (List>) source; + List result = new ArrayList<>(objectList.size() / 2); + if (objectList.isEmpty()) { + return result; + } + + for (List res : objectList) { + if (res == null) { + result.add(null); + continue; + } + String entryIdString = SafeEncoder.encode((byte[]) res.get(0)); + List hash = (List) res.get(1); + + Iterator hashIterator = hash.iterator(); + Map fields = new HashMap<>(hash.size() / 2); + while (hashIterator.hasNext()) { + fields.put(hashIterator.next(), hashIterator.next()); + } + + result.add(StreamRecords.newRecord().in(key).withId(entryIdString).ofBytes(fields)); + } + return result; + }; + + static final List convertToByteRecord(byte[] key, Object source) { + return BYTE_RECORD_CONVERTER.apply(source, key); + } + + static final List convertToByteRecord(List sources) { + if (sources == null) { + return Collections.emptyList(); + } + List result = new ArrayList<>(); + for (Object streamObj : sources) { + List stream = (List) streamObj; + result.addAll(convertToByteRecord((byte[]) stream.get(0), stream.get(1))); + } + return result; + } + + /** + * Convert the raw Jedis xpending result to {@link PendingMessages}. + * + * @param groupName the group name + * @param range the range of messages requested + * @param source the raw jedis response. + * @return + */ + static org.springframework.data.redis.connection.stream.PendingMessages toPendingMessages(String groupName, + org.springframework.data.domain.Range range, Object source) { + return PENDING_MESSAGES_CONVERTER.apply(source, groupName).withinRange(range); + } +} diff --git a/src/test/java/org/springframework/data/redis/core/JedisStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/JedisStreamOperationsIntegrationTests.java new file mode 100644 index 0000000000..31e7619873 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/core/JedisStreamOperationsIntegrationTests.java @@ -0,0 +1,216 @@ +/* + * Copyright 2018-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.core; + +import org.junit.jupiter.api.BeforeEach; +import org.springframework.data.domain.Range; +import org.springframework.data.redis.ObjectFactory; +import org.springframework.data.redis.RawObjectFactory; +import org.springframework.data.redis.StringObjectFactory; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisZSetCommands; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.PendingMessages; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.test.condition.EnabledOnCommand; +import org.springframework.data.redis.test.extension.RedisStanalone; +import org.springframework.data.redis.test.extension.parametrized.MethodSource; +import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for JedisStreamOperations. + * + * @author dengliming + */ +@MethodSource("testParams") +@EnabledOnCommand("XREAD") +public class JedisStreamOperationsIntegrationTests { + + private final RedisConnectionFactory connectionFactory; + private final RedisTemplate redisTemplate; + private final ObjectFactory keyFactory; + private final ObjectFactory hashKeyFactory; + private final ObjectFactory hashValueFactory; + private final StreamOperations streamOps; + + public JedisStreamOperationsIntegrationTests(RedisTemplate redisTemplate, ObjectFactory keyFactory, + ObjectFactory objectFactory) { + this.redisTemplate = redisTemplate; + this.connectionFactory = redisTemplate.getRequiredConnectionFactory(); + this.keyFactory = keyFactory; + this.hashKeyFactory = (ObjectFactory) keyFactory; + this.hashValueFactory = (ObjectFactory) objectFactory; + this.streamOps = redisTemplate.opsForStream(); + } + + public static Collection testParams() { + ObjectFactory stringFactory = new StringObjectFactory(); + ObjectFactory rawFactory = new RawObjectFactory(); + + JedisConnectionFactory jedisConnectionFactory = JedisConnectionFactoryExtension + .getConnectionFactory(RedisStanalone.class); + + RedisTemplate stringTemplate = new StringRedisTemplate(); + stringTemplate.setConnectionFactory(jedisConnectionFactory); + stringTemplate.afterPropertiesSet(); + + RedisTemplate rawTemplate = new RedisTemplate<>(); + rawTemplate.setConnectionFactory(jedisConnectionFactory); + rawTemplate.setEnableDefaultSerializer(false); + rawTemplate.afterPropertiesSet(); + + return Arrays.asList( + new Object[][] { { stringTemplate, stringFactory, stringFactory }, { rawTemplate, rawFactory, rawFactory } }); + } + + @BeforeEach + void before() { + redisTemplate.execute((RedisCallback) connection -> { + connection.flushDb(); + return null; + }); + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void add() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + List> messages = streamOps.range(key, Range.unbounded()); + + assertThat(messages).hasSize(1); + + MapRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId); + assertThat(message.getStream()).isEqualTo(key); + + if (!(key instanceof byte[] || value instanceof byte[])) { + assertThat(message.getValue()).containsEntry(hashKey, value); + } + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void range() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + List> messages = streamOps.range(key, + Range.from(Range.Bound.inclusive(messageId1.getValue())).to(Range.Bound.inclusive(messageId2.getValue())), + RedisZSetCommands.Limit.limit().count(1)); + + assertThat(messages).hasSize(1); + + MapRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId1); + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void reverseRange() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + List> messages = streamOps.reverseRange(key, Range.unbounded()); + + assertThat(messages).hasSize(2).extracting("id").containsSequence(messageId2, messageId1); + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void read() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + + List> messages = streamOps.read(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(key, ReadOffset.lastConsumed())); + + assertThat(messages).hasSize(1); + + MapRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId); + assertThat(message.getStream()).isEqualTo(key); + + if (!(key instanceof byte[] || value instanceof byte[])) { + assertThat(message.getValue()).containsEntry(hashKey, value); + } + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void size() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + streamOps.add(key, Collections.singletonMap(hashKey, value)); + assertThat(streamOps.size(key)).isEqualTo(1); + + streamOps.add(key, Collections.singletonMap(hashKey, value)); + assertThat(streamOps.size(key)).isEqualTo(2); + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void pending() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + + streamOps.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())); + + PendingMessages pending = streamOps.pending(key, "my-group", Range.unbounded(), 10L); + + assertThat(pending).hasSize(1); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + } +} From dd8c8c1839dc50af4261aac39c92da046b2ec181 Mon Sep 17 00:00:00 2001 From: dengliming Date: Thu, 18 Feb 2021 19:16:30 +0800 Subject: [PATCH 2/4] Check isPipelined beforehand --- .../connection/jedis/JedisStreamCommands.java | 70 +++++++++++-------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java index 4e83d5b112..4a7f64915a 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java @@ -18,7 +18,6 @@ import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.RedisStreamCommands; import org.springframework.data.redis.connection.RedisZSetCommands; -import org.springframework.data.redis.connection.lettuce.LettuceConverters; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; @@ -32,6 +31,8 @@ import org.springframework.util.Assert; import redis.clients.jedis.BinaryJedis; import redis.clients.jedis.MultiKeyPipelineBase; +import redis.clients.jedis.StreamConsumersInfo; +import redis.clients.jedis.StreamGroupInfo; import java.util.ArrayList; import java.util.Arrays; @@ -209,9 +210,14 @@ public Boolean xGroupDestroy(byte[] key, String groupName) { public StreamInfo.XInfoStream xInfo(byte[] key) { Assert.notNull(key, "Key must not be null!"); - return connection.invoke().from(BinaryJedis::xinfoStream, (k, r) -> { + if (isQueueing() || isPipelined()) { throw new UnsupportedOperationException("'XINFO' cannot be called in pipeline / transaction mode."); - }, key).get(streamInfo -> StreamInfo.XInfoStream.fromList(mapToList(streamInfo.getStreamInfo()))); + } + + return connection.invoke().just(it -> { + redis.clients.jedis.StreamInfo streamInfo = it.xinfoStream(key); + return StreamInfo.XInfoStream.fromList(mapToList(streamInfo.getStreamInfo())); + }); } /* @@ -222,9 +228,12 @@ public StreamInfo.XInfoStream xInfo(byte[] key) { public StreamInfo.XInfoGroups xInfoGroups(byte[] key) { Assert.notNull(key, "Key must not be null!"); - return connection.invoke().from(BinaryJedis::xinfoGroup, (k, r) -> { + if (isQueueing() || isPipelined()) { throw new UnsupportedOperationException("'XINFO GROUPS' cannot be called in pipeline / transaction mode."); - }, key).get(streamGroupInfos -> { + } + + return connection.invoke().just(it -> { + List streamGroupInfos = it.xinfoGroup(key); List sources = new ArrayList<>(); streamGroupInfos.forEach(streamGroupInfo -> sources.add(mapToList(streamGroupInfo.getGroupInfo()))); return StreamInfo.XInfoGroups.fromList(sources); @@ -240,9 +249,12 @@ public StreamInfo.XInfoConsumers xInfoConsumers(byte[] key, String groupName) { Assert.notNull(key, "Key must not be null!"); Assert.hasText(groupName, "Group name must not be null or empty!"); - return connection.invoke().from(BinaryJedis::xinfoConsumers, (k, v, r) -> { + if (isQueueing() || isPipelined()) { throw new UnsupportedOperationException("'XINFO CONSUMERS' cannot be called in pipeline / transaction mode."); - }, key, JedisConverters.toBytes(groupName)).get(streamConsumersInfos -> { + } + + return connection.invoke().just(it -> { + List streamConsumersInfos = it.xinfoConsumers(key, JedisConverters.toBytes(groupName)); List sources = new ArrayList<>(); streamConsumersInfos .forEach(streamConsumersInfo -> sources.add(mapToList(streamConsumersInfo.getConsumerInfo()))); @@ -280,7 +292,7 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op Assert.notNull(groupName, "GroupName must not be null!"); Range range = (Range) options.getRange(); - byte[] group = LettuceConverters.toBytes(groupName); + byte[] group = JedisConverters.toBytes(groupName); try { if (isPipelined()) { pipeline(connection.newJedisResult(connection.getRequiredPipeline().xpending(key, group, @@ -345,17 +357,17 @@ public List xRead(StreamReadOptions readOptions, StreamOffset { - throw new UnsupportedOperationException("Operation not supported in pipelining/transaction mode"); - }, count, block, toStreamOffsets(streams)).get(streamsEntries -> convertToByteRecord(streamsEntries)); + + final long block = readOptions.getBlock() == null ? -1L : readOptions.getBlock(); + final int count = readOptions.getCount() != null ? readOptions.getCount().intValue() : Integer.MAX_VALUE; + + return connection.invoke().just(it -> { + List streamsEntries = it.xread(count, block, toStreamOffsets(streams)); + return convertToByteRecord(streamsEntries); + }); } /* @@ -369,18 +381,18 @@ public List xReadGroup(Consumer consumer, StreamReadOptions readOpti Assert.notNull(readOptions, "StreamReadOptions must not be null!"); Assert.notNull(streams, "StreamOffsets must not be null!"); - long block = -1L; - if (readOptions.getBlock() != null) { - block = readOptions.getBlock(); - } - int count = -1; - if (readOptions.getCount() != null) { - count = readOptions.getCount().intValue(); + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'XREADGROUP' cannot be called in pipeline / transaction mode."); } - return connection.invoke().from(BinaryJedis::xreadGroup, (t1, t2, t3, t4, t5, t6, r) -> { - throw new UnsupportedOperationException("Operation not supported in pipelining/transaction mode"); - }, JedisConverters.toBytes(consumer.getGroup()), JedisConverters.toBytes(consumer.getName()), count, block, - readOptions.isNoack(), toStreamOffsets(streams)).get(streamsEntries -> convertToByteRecord(streamsEntries)); + + final long block = readOptions.getBlock() == null ? -1L : readOptions.getBlock(); + final int count = readOptions.getCount() == null ? -1 : readOptions.getCount().intValue(); + + return connection.invoke().just(it -> { + List streamsEntries = it.xreadGroup(JedisConverters.toBytes(consumer.getGroup()), + JedisConverters.toBytes(consumer.getName()), count, block, readOptions.isNoack(), toStreamOffsets(streams)); + return convertToByteRecord(streamsEntries); + }); } /* From fa899f8b547a7dee7f21daa550f6b51e8b49c78a Mon Sep 17 00:00:00 2001 From: dengliming Date: Thu, 18 Feb 2021 19:25:00 +0800 Subject: [PATCH 3/4] Having the conversion code in the actual method --- .../connection/jedis/StreamConverters.java | 63 ++++++++----------- 1 file changed, 27 insertions(+), 36 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index 7dfca0d39e..1a5a98ea5d 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -15,24 +15,24 @@ */ package org.springframework.data.redis.connection.jedis; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.PendingMessage; import org.springframework.data.redis.connection.stream.PendingMessages; import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; + import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.util.SafeEncoder; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.function.BiFunction; - /** * Converters for Redis Stream-specific types. *

@@ -45,28 +45,7 @@ @SuppressWarnings({ "rawtypes" }) class StreamConverters { - private static final BiFunction PENDING_MESSAGES_CONVERTER = ( - source, groupName) -> { - - if (null == source) { - return null; - } - - List streamsEntries = (List) source; - List messages = new ArrayList<>(streamsEntries.size()); - for (Object streamObj : streamsEntries) { - List stream = (List) streamObj; - String id = SafeEncoder.encode((byte[]) stream.get(0)); - String consumerName = SafeEncoder.encode((byte[]) stream.get(1)); - long idleTime = BuilderFactory.LONG.build(stream.get(2)); - long deliveredTimes = BuilderFactory.LONG.build(stream.get(3)); - messages.add(new PendingMessage(RecordId.of(id), Consumer.from(groupName, consumerName), - Duration.ofMillis(idleTime), deliveredTimes)); - } - return new PendingMessages(groupName, messages); - }; - - private static final BiFunction> BYTE_RECORD_CONVERTER = (source, key) -> { + static final List convertToByteRecord(byte[] key, Object source) { if (null == source) { return Collections.emptyList(); } @@ -93,10 +72,6 @@ class StreamConverters { result.add(StreamRecords.newRecord().in(key).withId(entryIdString).ofBytes(fields)); } return result; - }; - - static final List convertToByteRecord(byte[] key, Object source) { - return BYTE_RECORD_CONVERTER.apply(source, key); } static final List convertToByteRecord(List sources) { @@ -121,6 +96,22 @@ static final List convertToByteRecord(List sources) { */ static org.springframework.data.redis.connection.stream.PendingMessages toPendingMessages(String groupName, org.springframework.data.domain.Range range, Object source) { - return PENDING_MESSAGES_CONVERTER.apply(source, groupName).withinRange(range); + + if (null == source) { + return null; + } + + List streamsEntries = (List) source; + List messages = new ArrayList<>(streamsEntries.size()); + for (Object streamObj : streamsEntries) { + List stream = (List) streamObj; + String id = SafeEncoder.encode((byte[]) stream.get(0)); + String consumerName = SafeEncoder.encode((byte[]) stream.get(1)); + long idleTime = BuilderFactory.LONG.build(stream.get(2)); + long deliveredTimes = BuilderFactory.LONG.build(stream.get(3)); + messages.add(new PendingMessage(RecordId.of(id), Consumer.from(groupName, consumerName), + Duration.ofMillis(idleTime), deliveredTimes)); + } + return new PendingMessages(groupName, messages).withinRange(range); } } From 0dd16acbb5a753179d70d54fa687b88de5e0edd4 Mon Sep 17 00:00:00 2001 From: dengliming Date: Thu, 18 Feb 2021 23:14:04 +0800 Subject: [PATCH 4/4] use JedisInvoker --- .../connection/jedis/JedisStreamCommands.java | 129 +++++------------- .../connection/jedis/StreamConverters.java | 26 ++-- 2 files changed, 48 insertions(+), 107 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java index 4a7f64915a..3122f834b0 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java @@ -15,6 +15,14 @@ */ package org.springframework.data.redis.connection.jedis; +import static org.springframework.data.redis.connection.jedis.StreamConverters.convertToByteRecord; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + import org.springframework.data.domain.Range; import org.springframework.data.redis.connection.RedisStreamCommands; import org.springframework.data.redis.connection.RedisZSetCommands; @@ -29,19 +37,13 @@ import org.springframework.data.redis.connection.stream.StreamOffset; import org.springframework.data.redis.connection.stream.StreamReadOptions; import org.springframework.util.Assert; + import redis.clients.jedis.BinaryJedis; +import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.MultiKeyPipelineBase; import redis.clients.jedis.StreamConsumersInfo; import redis.clients.jedis.StreamGroupInfo; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.springframework.data.redis.connection.jedis.StreamConverters.convertToByteRecord; - /** * @author Dengliming * @since 2.3 @@ -106,38 +108,17 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim Assert.notNull(group, "Group must not be null!"); Assert.notNull(newOwner, "NewOwner must not be null!"); - long minIdleTime = -1L; - if (options.getMinIdleTime() != null) { - minIdleTime = options.getMinIdleTime().toMillis(); - } - int retryCount = -1; - if (options.getRetryCount() != null) { - retryCount = options.getRetryCount().intValue(); - } - long unixTime = -1L; - if (options.getUnixTime() != null) { - unixTime = options.getUnixTime().toEpochMilli(); - } - try { - if (isPipelined()) { - pipeline(connection.newJedisResult(connection.getRequiredPipeline().xclaim(key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime, unixTime, retryCount, options.isForce(), - entryIdsToBytes(options.getIds())))); - return null; - } - if (isQueueing()) { - transaction(connection.newJedisResult(connection.getRequiredTransaction().xclaim(key, - JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), minIdleTime, unixTime, retryCount, - options.isForce(), entryIdsToBytes(options.getIds())))); - return null; - } - - return convertToByteRecord(key, - connection.getJedis().xclaim(key, JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), - minIdleTime, unixTime, retryCount, options.isForce(), entryIdsToBytes(options.getIds()))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); - } + final long minIdleTime = options.getMinIdleTime() == null ? -1L : options.getMinIdleTime().toMillis(); + final int retryCount = options.getRetryCount() == null ? -1 : options.getRetryCount().intValue(); + final long unixTime = options.getUnixTime() == null ? -1L : options.getUnixTime().toEpochMilli(); + + return connection.invoke() + .from( + it -> it.xclaim(key, JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), minIdleTime, + unixTime, retryCount, options.isForce(), entryIdsToBytes(options.getIds())), + it -> it.xclaim(key, JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), minIdleTime, + unixTime, retryCount, options.isForce(), entryIdsToBytes(options.getIds()))) + .get(r -> convertToByteRecord(key, r)); } /* @@ -293,27 +274,14 @@ public PendingMessages xPending(byte[] key, String groupName, XPendingOptions op Range range = (Range) options.getRange(); byte[] group = JedisConverters.toBytes(groupName); - try { - if (isPipelined()) { - pipeline(connection.newJedisResult(connection.getRequiredPipeline().xpending(key, group, - JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)), - options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName())))); - return null; - } - if (isQueueing()) { - transaction(connection.newJedisResult(connection.getRequiredTransaction().xpending(key, group, - JedisConverters.toBytes(getLowerValue(range)), JedisConverters.toBytes(getUpperValue(range)), - options.getCount().intValue(), JedisConverters.toBytes(options.getConsumerName())))); - return null; - } - - return StreamConverters.toPendingMessages(groupName, range, - connection.getJedis().xpending(key, group, JedisConverters.toBytes(getLowerValue(range)), - JedisConverters.toBytes(getUpperValue(range)), options.getCount().intValue(), - JedisConverters.toBytes(options.getConsumerName()))); - } catch (Exception ex) { - throw convertJedisAccessException(ex); - } + + return connection.invoke().from((it, t1, t2, t3, t4, t5, t6) -> { + Object r = it.xpending(t1, t2, t3, t4, t5, t6); + return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(r); + }, MultiKeyPipelineBase::xpending, key, group, JedisConverters.toBytes(getLowerValue(range)), + JedisConverters.toBytes(getUpperValue(range)), options.getCount().intValue(), + JedisConverters.toBytes(options.getConsumerName())) + .get(r -> StreamConverters.toPendingMessages(groupName, range, r)); } /* @@ -327,25 +295,14 @@ public List xRange(byte[] key, Range range, RedisZSetCommand Assert.notNull(limit, "Limit must not be null!"); int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); - try { - if (isPipelined()) { - pipeline(connection.newJedisResult(connection.getRequiredPipeline().xrange(key, - JedisConverters.toBytes(range.getLowerBound().getValue().get()), - JedisConverters.toBytes(range.getUpperBound().getValue().get()), count))); - return null; - } - if (isQueueing()) { - transaction(connection.newJedisResult(connection.getRequiredTransaction().xrange(key, - JedisConverters.toBytes(range.getLowerBound().getValue().get()), - JedisConverters.toBytes(range.getUpperBound().getValue().get()), count))); - return null; - } - - return convertToByteRecord(key, connection.getJedis().xrange(key, JedisConverters.toBytes(getLowerValue(range)), - JedisConverters.toBytes(getUpperValue(range)), count)); - } catch (Exception ex) { - throw convertJedisAccessException(ex); - } + + return connection.invoke() + .from( + it -> it.xrange(key, JedisConverters.toBytes(getLowerValue(range)), + JedisConverters.toBytes(getUpperValue(range)), count), + it -> it.xrange(key, JedisConverters.toBytes(getLowerValue(range)), + JedisConverters.toBytes(getUpperValue(range)), count)) + .get(r -> convertToByteRecord(key, r)); } /* @@ -436,22 +393,10 @@ private boolean isPipelined() { return connection.isPipelined(); } - private void pipeline(JedisResult result) { - connection.pipeline(result); - } - private boolean isQueueing() { return connection.isQueueing(); } - private void transaction(JedisResult result) { - connection.transaction(result); - } - - private RuntimeException convertJedisAccessException(Exception ex) { - return connection.convertJedisAccessException(ex); - } - private byte[][] entryIdsToBytes(RecordId[] recordIds) { final byte[][] bids = new byte[recordIds.length][]; diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index 1a5a98ea5d..1c259c1b28 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.springframework.data.redis.connection.stream.ByteRecord; import org.springframework.data.redis.connection.stream.Consumer; @@ -30,7 +31,7 @@ import org.springframework.data.redis.connection.stream.RecordId; import org.springframework.data.redis.connection.stream.StreamRecords; -import redis.clients.jedis.BuilderFactory; +import redis.clients.jedis.StreamPendingEntry; import redis.clients.jedis.util.SafeEncoder; /** @@ -91,27 +92,22 @@ static final List convertToByteRecord(List sources) { * * @param groupName the group name * @param range the range of messages requested - * @param source the raw jedis response. + * @param response the raw jedis response. * @return */ static org.springframework.data.redis.connection.stream.PendingMessages toPendingMessages(String groupName, - org.springframework.data.domain.Range range, Object source) { + org.springframework.data.domain.Range range, List response) { - if (null == source) { + if (null == response) { return null; } - List streamsEntries = (List) source; - List messages = new ArrayList<>(streamsEntries.size()); - for (Object streamObj : streamsEntries) { - List stream = (List) streamObj; - String id = SafeEncoder.encode((byte[]) stream.get(0)); - String consumerName = SafeEncoder.encode((byte[]) stream.get(1)); - long idleTime = BuilderFactory.LONG.build(stream.get(2)); - long deliveredTimes = BuilderFactory.LONG.build(stream.get(3)); - messages.add(new PendingMessage(RecordId.of(id), Consumer.from(groupName, consumerName), - Duration.ofMillis(idleTime), deliveredTimes)); - } + List messages = response.stream() + .map(streamPendingEntry -> new PendingMessage(RecordId.of(streamPendingEntry.getID().toString()), + Consumer.from(groupName, streamPendingEntry.getConsumerName()), + Duration.ofMillis(streamPendingEntry.getIdleTime()), streamPendingEntry.getDeliveredTimes())) + .collect(Collectors.toList()); + return new PendingMessages(groupName, messages).withinRange(range); } }