diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java index 3f0b6baddc..320f7f8338 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisConnection.java @@ -189,7 +189,7 @@ public RedisKeyCommands keyCommands() { */ @Override public RedisStreamCommands streamCommands() { - throw new UnsupportedOperationException("Streams not supported using Jedis!"); + return new JedisStreamCommands(this); } /* diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java new file mode 100644 index 0000000000..3122f834b0 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java @@ -0,0 +1,453 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.jedis; + +import static org.springframework.data.redis.connection.jedis.StreamConverters.convertToByteRecord; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.springframework.data.domain.Range; +import org.springframework.data.redis.connection.RedisStreamCommands; +import org.springframework.data.redis.connection.RedisZSetCommands; +import org.springframework.data.redis.connection.stream.ByteRecord; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.PendingMessages; +import org.springframework.data.redis.connection.stream.PendingMessagesSummary; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.connection.stream.StreamInfo; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.connection.stream.StreamReadOptions; +import org.springframework.util.Assert; + +import redis.clients.jedis.BinaryJedis; +import redis.clients.jedis.BuilderFactory; +import redis.clients.jedis.MultiKeyPipelineBase; +import redis.clients.jedis.StreamConsumersInfo; +import redis.clients.jedis.StreamGroupInfo; + +/** + * @author Dengliming + * @since 2.3 + */ +class JedisStreamCommands implements RedisStreamCommands { + + private final JedisConnection connection; + + JedisStreamCommands(JedisConnection connection) { + this.connection = connection; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xAck(byte[], String, org.springframework.data.redis.connection.stream.RecordId[]) + */ + @Override + public Long xAck(byte[] key, String group, RecordId... recordIds) { + Assert.notNull(key, "Key must not be null!"); + Assert.hasText(group, "Group name must not be null or empty!"); + Assert.notNull(recordIds, "recordIds must not be null!"); + + return connection.invoke().just(BinaryJedis::xack, MultiKeyPipelineBase::xack, key, JedisConverters.toBytes(group), + entryIdsToBytes(recordIds)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xAdd(MapRecord, XAddOptions) + */ + @Override + public RecordId xAdd(MapRecord record, XAddOptions options) { + Assert.notNull(record, "Record must not be null!"); + Assert.notNull(record.getStream(), "Stream must not be null!"); + + byte[] id = JedisConverters.toBytes(record.getId().getValue()); + Long maxLength = Long.MAX_VALUE; + if (options.hasMaxlen()) { + maxLength = options.getMaxlen(); + } + + return connection.invoke().from(BinaryJedis::xadd, MultiKeyPipelineBase::xadd, record.getStream(), id, + record.getValue(), maxLength, false).get(it -> RecordId.of(JedisConverters.toString(it))); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xClaimJustId(byte[], java.lang.String, java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions) + */ + @Override + public List xClaimJustId(byte[] key, String group, String newOwner, XClaimOptions options) { + throw new UnsupportedOperationException("Jedis does not support xClaimJustId."); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xClaim(byte[], java.lang.String, java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions) + */ + @Override + public List xClaim(byte[] key, String group, String newOwner, XClaimOptions options) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(group, "Group must not be null!"); + Assert.notNull(newOwner, "NewOwner must not be null!"); + + final long minIdleTime = options.getMinIdleTime() == null ? -1L : options.getMinIdleTime().toMillis(); + final int retryCount = options.getRetryCount() == null ? -1 : options.getRetryCount().intValue(); + final long unixTime = options.getUnixTime() == null ? -1L : options.getUnixTime().toEpochMilli(); + + return connection.invoke() + .from( + it -> it.xclaim(key, JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), minIdleTime, + unixTime, retryCount, options.isForce(), entryIdsToBytes(options.getIds())), + it -> it.xclaim(key, JedisConverters.toBytes(group), JedisConverters.toBytes(newOwner), minIdleTime, + unixTime, retryCount, options.isForce(), entryIdsToBytes(options.getIds()))) + .get(r -> convertToByteRecord(key, r)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xDel(byte[], java.lang.String[]) + */ + @Override + public Long xDel(byte[] key, RecordId... recordIds) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(recordIds, "recordIds must not be null!"); + + return connection.invoke().just(BinaryJedis::xdel, MultiKeyPipelineBase::xdel, key, entryIdsToBytes(recordIds)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupCreate(byte[], org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String) + */ + @Override + public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset) { + return xGroupCreate(key, groupName, readOffset, false); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupCreate(byte[], org.springframework.data.redis.connection.RedisStreamCommands.ReadOffset, java.lang.String, boolean) + */ + @Override + public String xGroupCreate(byte[] key, String groupName, ReadOffset readOffset, boolean mkStream) { + Assert.notNull(key, "Key must not be null!"); + Assert.hasText(groupName, "Group name must not be null or empty!"); + Assert.notNull(readOffset, "ReadOffset must not be null!"); + + return connection.invoke().just(BinaryJedis::xgroupCreate, MultiKeyPipelineBase::xgroupCreate, key, + JedisConverters.toBytes(groupName), JedisConverters.toBytes(readOffset.getOffset()), mkStream); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupDelConsumer(byte[], org.springframework.data.redis.connection.RedisStreamCommands.Consumer) + */ + @Override + public Boolean xGroupDelConsumer(byte[] key, Consumer consumer) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(consumer, "Consumer must not be null!"); + + return connection.invoke().from(BinaryJedis::xgroupDelConsumer, MultiKeyPipelineBase::xgroupDelConsumer, key, + JedisConverters.toBytes(consumer.getGroup()), JedisConverters.toBytes(consumer.getName())).get(r -> r > 0); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xGroupDestroy(byte[], java.lang.String) + */ + @Override + public Boolean xGroupDestroy(byte[] key, String groupName) { + Assert.notNull(key, "Key must not be null!"); + Assert.hasText(groupName, "Group name must not be null or empty!"); + + return connection.invoke() + .from(BinaryJedis::xgroupDestroy, MultiKeyPipelineBase::xgroupDestroy, key, JedisConverters.toBytes(groupName)) + .get(r -> r > 0); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xInfo(byte[]) + */ + @Override + public StreamInfo.XInfoStream xInfo(byte[] key) { + Assert.notNull(key, "Key must not be null!"); + + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'XINFO' cannot be called in pipeline / transaction mode."); + } + + return connection.invoke().just(it -> { + redis.clients.jedis.StreamInfo streamInfo = it.xinfoStream(key); + return StreamInfo.XInfoStream.fromList(mapToList(streamInfo.getStreamInfo())); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xInfoGroups(byte[]) + */ + @Override + public StreamInfo.XInfoGroups xInfoGroups(byte[] key) { + Assert.notNull(key, "Key must not be null!"); + + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'XINFO GROUPS' cannot be called in pipeline / transaction mode."); + } + + return connection.invoke().just(it -> { + List streamGroupInfos = it.xinfoGroup(key); + List sources = new ArrayList<>(); + streamGroupInfos.forEach(streamGroupInfo -> sources.add(mapToList(streamGroupInfo.getGroupInfo()))); + return StreamInfo.XInfoGroups.fromList(sources); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xInfoConsumers(byte[], java.lang.String) + */ + @Override + public StreamInfo.XInfoConsumers xInfoConsumers(byte[] key, String groupName) { + Assert.notNull(key, "Key must not be null!"); + Assert.hasText(groupName, "Group name must not be null or empty!"); + + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'XINFO CONSUMERS' cannot be called in pipeline / transaction mode."); + } + + return connection.invoke().just(it -> { + List streamConsumersInfos = it.xinfoConsumers(key, JedisConverters.toBytes(groupName)); + List sources = new ArrayList<>(); + streamConsumersInfos + .forEach(streamConsumersInfo -> sources.add(mapToList(streamConsumersInfo.getConsumerInfo()))); + return StreamInfo.XInfoConsumers.fromList(groupName, sources); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xLen(byte[]) + */ + @Override + public Long xLen(byte[] key) { + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().just(BinaryJedis::xlen, MultiKeyPipelineBase::xlen, key); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xPending(byte[], java.lang.String) + */ + @Override + public PendingMessagesSummary xPending(byte[] key, String groupName) { + throw new UnsupportedOperationException("Jedis does not support returning PendingMessagesSummary."); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xPending(byte[], java.lang.String, org.springframework.data.redis.connection.RedisStreamCommands.XPendingOptions) + */ + @Override + public PendingMessages xPending(byte[] key, String groupName, XPendingOptions options) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(groupName, "GroupName must not be null!"); + + Range range = (Range) options.getRange(); + byte[] group = JedisConverters.toBytes(groupName); + + return connection.invoke().from((it, t1, t2, t3, t4, t5, t6) -> { + Object r = it.xpending(t1, t2, t3, t4, t5, t6); + return BuilderFactory.STREAM_PENDING_ENTRY_LIST.build(r); + }, MultiKeyPipelineBase::xpending, key, group, JedisConverters.toBytes(getLowerValue(range)), + JedisConverters.toBytes(getUpperValue(range)), options.getCount().intValue(), + JedisConverters.toBytes(options.getConsumerName())) + .get(r -> StreamConverters.toPendingMessages(groupName, range, r)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xRange(byte[], org.springframework.data.domain.Range, org.springframework.data.redis.connection.RedisZSetCommands.Limit) + */ + @Override + public List xRange(byte[] key, Range range, RedisZSetCommands.Limit limit) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(range, "Range must not be null!"); + Assert.notNull(limit, "Limit must not be null!"); + + int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); + + return connection.invoke() + .from( + it -> it.xrange(key, JedisConverters.toBytes(getLowerValue(range)), + JedisConverters.toBytes(getUpperValue(range)), count), + it -> it.xrange(key, JedisConverters.toBytes(getLowerValue(range)), + JedisConverters.toBytes(getUpperValue(range)), count)) + .get(r -> convertToByteRecord(key, r)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xRead(org.springframework.data.redis.connection.RedisStreamCommands.StreamReadOptions, org.springframework.data.redis.connection.RedisStreamCommands.StreamOffset[]) + */ + @Override + public List xRead(StreamReadOptions readOptions, StreamOffset... streams) { + Assert.notNull(readOptions, "StreamReadOptions must not be null!"); + Assert.notNull(streams, "StreamOffsets must not be null!"); + + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'XREAD' cannot be called in pipeline / transaction mode."); + } + + final long block = readOptions.getBlock() == null ? -1L : readOptions.getBlock(); + final int count = readOptions.getCount() != null ? readOptions.getCount().intValue() : Integer.MAX_VALUE; + + return connection.invoke().just(it -> { + List streamsEntries = it.xread(count, block, toStreamOffsets(streams)); + return convertToByteRecord(streamsEntries); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xReadGroup(org.springframework.data.redis.connection.RedisStreamCommands.Consumer, org.springframework.data.redis.connection.RedisStreamCommands.StreamReadOptions, org.springframework.data.redis.connection.RedisStreamCommands.StreamOffset[]) + */ + @Override + public List xReadGroup(Consumer consumer, StreamReadOptions readOptions, + StreamOffset... streams) { + Assert.notNull(consumer, "Consumer must not be null!"); + Assert.notNull(readOptions, "StreamReadOptions must not be null!"); + Assert.notNull(streams, "StreamOffsets must not be null!"); + + if (isQueueing() || isPipelined()) { + throw new UnsupportedOperationException("'XREADGROUP' cannot be called in pipeline / transaction mode."); + } + + final long block = readOptions.getBlock() == null ? -1L : readOptions.getBlock(); + final int count = readOptions.getCount() == null ? -1 : readOptions.getCount().intValue(); + + return connection.invoke().just(it -> { + List streamsEntries = it.xreadGroup(JedisConverters.toBytes(consumer.getGroup()), + JedisConverters.toBytes(consumer.getName()), count, block, readOptions.isNoack(), toStreamOffsets(streams)); + return convertToByteRecord(streamsEntries); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xRevRange(byte[], org.springframework.data.domain.Range, org.springframework.data.redis.connection.RedisZSetCommands.Limit) + */ + @Override + public List xRevRange(byte[] key, Range range, RedisZSetCommands.Limit limit) { + Assert.notNull(key, "Key must not be null!"); + Assert.notNull(range, "Range must not be null!"); + Assert.notNull(limit, "Limit must not be null!"); + + int count = limit.isUnlimited() ? Integer.MAX_VALUE : limit.getCount(); + return connection.invoke() + .from(BinaryJedis::xrevrange, MultiKeyPipelineBase::xrevrange, key, + JedisConverters.toBytes(getUpperValue(range)), JedisConverters.toBytes(getLowerValue(range)), count) + .get(it -> convertToByteRecord(key, it)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xTrim(byte[], long) + */ + @Override + public Long xTrim(byte[] key, long count) { + return xTrim(key, count, false); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.redis.connection.RedisStreamCommands#xTrim(byte[], long, boolean) + */ + @Override + public Long xTrim(byte[] key, long count, boolean approximateTrimming) { + Assert.notNull(key, "Key must not be null!"); + + return connection.invoke().just(BinaryJedis::xtrim, MultiKeyPipelineBase::xtrim, key, count, approximateTrimming); + } + + private boolean isPipelined() { + return connection.isPipelined(); + } + + private boolean isQueueing() { + return connection.isQueueing(); + } + + private byte[][] entryIdsToBytes(RecordId[] recordIds) { + + final byte[][] bids = new byte[recordIds.length][]; + for (int i = 0; i < recordIds.length; ++i) { + RecordId id = recordIds[i]; + bids[i] = JedisConverters.toBytes(id.getValue()); + } + + return bids; + } + + private byte[][] entryIdsToBytes(List recordIds) { + + final byte[][] bids = new byte[recordIds.size()][]; + for (int i = 0; i < recordIds.size(); ++i) { + RecordId id = recordIds.get(i); + bids[i] = JedisConverters.toBytes(id.getValue()); + } + + return bids; + } + + private String getLowerValue(Range range) { + + if (range.getLowerBound().equals(Range.Bound.unbounded())) { + return "-"; + } + + return range.getLowerBound().getValue().orElse("-"); + } + + private String getUpperValue(Range range) { + + if (range.getUpperBound().equals(Range.Bound.unbounded())) { + return "+"; + } + + return range.getUpperBound().getValue().orElse("+"); + } + + private List mapToList(Map map) { + List sources = new ArrayList<>(map.size() * 2); + map.forEach((k, v) -> { + sources.add(k); + sources.add(v); + }); + return sources; + } + + private Map toStreamOffsets(StreamOffset... streams) { + return Arrays.stream(streams) + .collect(Collectors.toMap(k -> k.getKey(), v -> JedisConverters.toBytes(v.getOffset().getOffset()))); + } +} diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java new file mode 100644 index 0000000000..1c259c1b28 --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -0,0 +1,113 @@ +/* + * Copyright 2018-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.connection.jedis; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.springframework.data.redis.connection.stream.ByteRecord; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.PendingMessage; +import org.springframework.data.redis.connection.stream.PendingMessages; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.connection.stream.StreamRecords; + +import redis.clients.jedis.StreamPendingEntry; +import redis.clients.jedis.util.SafeEncoder; + +/** + * Converters for Redis Stream-specific types. + *

+ * Converters typically convert between value objects/argument objects retaining the actual types of values (i.e. no + * serialization/deserialization happens here). + * + * @author dengliming + * @since 2.3 + */ +@SuppressWarnings({ "rawtypes" }) +class StreamConverters { + + static final List convertToByteRecord(byte[] key, Object source) { + if (null == source) { + return Collections.emptyList(); + } + List> objectList = (List>) source; + List result = new ArrayList<>(objectList.size() / 2); + if (objectList.isEmpty()) { + return result; + } + + for (List res : objectList) { + if (res == null) { + result.add(null); + continue; + } + String entryIdString = SafeEncoder.encode((byte[]) res.get(0)); + List hash = (List) res.get(1); + + Iterator hashIterator = hash.iterator(); + Map fields = new HashMap<>(hash.size() / 2); + while (hashIterator.hasNext()) { + fields.put(hashIterator.next(), hashIterator.next()); + } + + result.add(StreamRecords.newRecord().in(key).withId(entryIdString).ofBytes(fields)); + } + return result; + } + + static final List convertToByteRecord(List sources) { + if (sources == null) { + return Collections.emptyList(); + } + List result = new ArrayList<>(); + for (Object streamObj : sources) { + List stream = (List) streamObj; + result.addAll(convertToByteRecord((byte[]) stream.get(0), stream.get(1))); + } + return result; + } + + /** + * Convert the raw Jedis xpending result to {@link PendingMessages}. + * + * @param groupName the group name + * @param range the range of messages requested + * @param response the raw jedis response. + * @return + */ + static org.springframework.data.redis.connection.stream.PendingMessages toPendingMessages(String groupName, + org.springframework.data.domain.Range range, List response) { + + if (null == response) { + return null; + } + + List messages = response.stream() + .map(streamPendingEntry -> new PendingMessage(RecordId.of(streamPendingEntry.getID().toString()), + Consumer.from(groupName, streamPendingEntry.getConsumerName()), + Duration.ofMillis(streamPendingEntry.getIdleTime()), streamPendingEntry.getDeliveredTimes())) + .collect(Collectors.toList()); + + return new PendingMessages(groupName, messages).withinRange(range); + } +} diff --git a/src/test/java/org/springframework/data/redis/core/JedisStreamOperationsIntegrationTests.java b/src/test/java/org/springframework/data/redis/core/JedisStreamOperationsIntegrationTests.java new file mode 100644 index 0000000000..31e7619873 --- /dev/null +++ b/src/test/java/org/springframework/data/redis/core/JedisStreamOperationsIntegrationTests.java @@ -0,0 +1,216 @@ +/* + * Copyright 2018-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.redis.core; + +import org.junit.jupiter.api.BeforeEach; +import org.springframework.data.domain.Range; +import org.springframework.data.redis.ObjectFactory; +import org.springframework.data.redis.RawObjectFactory; +import org.springframework.data.redis.StringObjectFactory; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisZSetCommands; +import org.springframework.data.redis.connection.jedis.JedisConnectionFactory; +import org.springframework.data.redis.connection.jedis.extension.JedisConnectionFactoryExtension; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.PendingMessages; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.test.condition.EnabledOnCommand; +import org.springframework.data.redis.test.extension.RedisStanalone; +import org.springframework.data.redis.test.extension.parametrized.MethodSource; +import org.springframework.data.redis.test.extension.parametrized.ParameterizedRedisTest; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Integration tests for JedisStreamOperations. + * + * @author dengliming + */ +@MethodSource("testParams") +@EnabledOnCommand("XREAD") +public class JedisStreamOperationsIntegrationTests { + + private final RedisConnectionFactory connectionFactory; + private final RedisTemplate redisTemplate; + private final ObjectFactory keyFactory; + private final ObjectFactory hashKeyFactory; + private final ObjectFactory hashValueFactory; + private final StreamOperations streamOps; + + public JedisStreamOperationsIntegrationTests(RedisTemplate redisTemplate, ObjectFactory keyFactory, + ObjectFactory objectFactory) { + this.redisTemplate = redisTemplate; + this.connectionFactory = redisTemplate.getRequiredConnectionFactory(); + this.keyFactory = keyFactory; + this.hashKeyFactory = (ObjectFactory) keyFactory; + this.hashValueFactory = (ObjectFactory) objectFactory; + this.streamOps = redisTemplate.opsForStream(); + } + + public static Collection testParams() { + ObjectFactory stringFactory = new StringObjectFactory(); + ObjectFactory rawFactory = new RawObjectFactory(); + + JedisConnectionFactory jedisConnectionFactory = JedisConnectionFactoryExtension + .getConnectionFactory(RedisStanalone.class); + + RedisTemplate stringTemplate = new StringRedisTemplate(); + stringTemplate.setConnectionFactory(jedisConnectionFactory); + stringTemplate.afterPropertiesSet(); + + RedisTemplate rawTemplate = new RedisTemplate<>(); + rawTemplate.setConnectionFactory(jedisConnectionFactory); + rawTemplate.setEnableDefaultSerializer(false); + rawTemplate.afterPropertiesSet(); + + return Arrays.asList( + new Object[][] { { stringTemplate, stringFactory, stringFactory }, { rawTemplate, rawFactory, rawFactory } }); + } + + @BeforeEach + void before() { + redisTemplate.execute((RedisCallback) connection -> { + connection.flushDb(); + return null; + }); + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void add() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + List> messages = streamOps.range(key, Range.unbounded()); + + assertThat(messages).hasSize(1); + + MapRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId); + assertThat(message.getStream()).isEqualTo(key); + + if (!(key instanceof byte[] || value instanceof byte[])) { + assertThat(message.getValue()).containsEntry(hashKey, value); + } + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void range() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + List> messages = streamOps.range(key, + Range.from(Range.Bound.inclusive(messageId1.getValue())).to(Range.Bound.inclusive(messageId2.getValue())), + RedisZSetCommands.Limit.limit().count(1)); + + assertThat(messages).hasSize(1); + + MapRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId1); + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void reverseRange() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId1 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + RecordId messageId2 = streamOps.add(key, Collections.singletonMap(hashKey, value)); + + List> messages = streamOps.reverseRange(key, Range.unbounded()); + + assertThat(messages).hasSize(2).extracting("id").containsSequence(messageId2, messageId1); + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void read() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + + List> messages = streamOps.read(Consumer.from("my-group", "my-consumer"), + StreamOffset.create(key, ReadOffset.lastConsumed())); + + assertThat(messages).hasSize(1); + + MapRecord message = messages.get(0); + + assertThat(message.getId()).isEqualTo(messageId); + assertThat(message.getStream()).isEqualTo(key); + + if (!(key instanceof byte[] || value instanceof byte[])) { + assertThat(message.getValue()).containsEntry(hashKey, value); + } + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void size() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + streamOps.add(key, Collections.singletonMap(hashKey, value)); + assertThat(streamOps.size(key)).isEqualTo(1); + + streamOps.add(key, Collections.singletonMap(hashKey, value)); + assertThat(streamOps.size(key)).isEqualTo(2); + } + + @ParameterizedRedisTest // DATAREDIS-1140 + void pending() { + + K key = keyFactory.instance(); + HK hashKey = hashKeyFactory.instance(); + HV value = hashValueFactory.instance(); + + RecordId messageId = streamOps.add(key, Collections.singletonMap(hashKey, value)); + streamOps.createGroup(key, ReadOffset.from("0-0"), "my-group"); + + streamOps.read(Consumer.from("my-group", "my-consumer"), StreamOffset.create(key, ReadOffset.lastConsumed())); + + PendingMessages pending = streamOps.pending(key, "my-group", Range.unbounded(), 10L); + + assertThat(pending).hasSize(1); + assertThat(pending.get(0).getGroupName()).isEqualTo("my-group"); + assertThat(pending.get(0).getConsumerName()).isEqualTo("my-consumer"); + assertThat(pending.get(0).getTotalDeliveryCount()).isOne(); + } +}