Skip to content

Commit 8b60f29

Browse files
authored
Support iteration for SCAN, FT.SEARCH, FT.AGGREGATE commands (#3378)
* Support iteration for SCAN, FT.SEARCH, FT.AGGREGATE commands * format * Support fluent collect()
1 parent 959cf6b commit 8b60f29

File tree

10 files changed

+191
-64
lines changed

10 files changed

+191
-64
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,35 @@
11
package redis.clients.jedis;
22

3+
import java.util.Collection;
34
import java.util.function.Function;
5+
46
import redis.clients.jedis.params.ScanParams;
57
import redis.clients.jedis.Protocol.Keyword;
68
import redis.clients.jedis.providers.ConnectionProvider;
79
import redis.clients.jedis.resps.ScanResult;
8-
import redis.clients.jedis.util.JedisRoundRobinBase;
10+
import redis.clients.jedis.util.JedisCommandIterationBase;
911

10-
public class ScanRoundRobin extends JedisRoundRobinBase<ScanResult<String>> {
12+
public class ScanIteration extends JedisCommandIterationBase<ScanResult<String>, String> {
1113

1214
private final int count;
1315
private final Function<String, CommandArguments> args;
1416

15-
public ScanRoundRobin(ConnectionProvider connectionProvider, int batchCount, String match) {
17+
public ScanIteration(ConnectionProvider connectionProvider, int batchCount, String match) {
1618
super(connectionProvider, BuilderFactory.SCAN_RESPONSE);
1719
this.count = batchCount;
1820
this.args = (cursor) -> new CommandArguments(Protocol.Command.SCAN).add(cursor)
1921
.add(Keyword.MATCH).add(match).add(Keyword.COUNT).add(count);
2022
}
2123

22-
public ScanRoundRobin(ConnectionProvider connectionProvider, int batchCount, String match, String type) {
24+
public ScanIteration(ConnectionProvider connectionProvider, int batchCount, String match, String type) {
2325
super(connectionProvider, BuilderFactory.SCAN_RESPONSE);
2426
this.count = batchCount;
2527
this.args = (cursor) -> new CommandArguments(Protocol.Command.SCAN).add(cursor)
2628
.add(Keyword.MATCH).add(match).add(Keyword.COUNT).add(count).add(Keyword.TYPE).add(type);
2729
}
2830

2931
@Override
30-
protected boolean isIterationCompleted(ScanResult<String> reply) {
32+
protected boolean isNodeCompleted(ScanResult<String> reply) {
3133
return reply.isCompleteIteration();
3234
}
3335

@@ -40,4 +42,9 @@ protected CommandArguments initCommandArguments() {
4042
protected CommandArguments nextCommandArguments(ScanResult<String> lastReply) {
4143
return args.apply(lastReply.getCursor());
4244
}
45+
46+
@Override
47+
protected Collection<String> convertBatchToData(ScanResult<String> batch) {
48+
return batch.getResult();
49+
}
4350
}

src/main/java/redis/clients/jedis/UnifiedJedis.java

+35-18
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import redis.clients.jedis.search.*;
3131
import redis.clients.jedis.search.aggr.AggregationBuilder;
3232
import redis.clients.jedis.search.aggr.AggregationResult;
33-
import redis.clients.jedis.search.aggr.FtAggregateRoundRobin;
33+
import redis.clients.jedis.search.aggr.FtAggregateIteration;
3434
import redis.clients.jedis.search.schemafields.SchemaField;
3535
import redis.clients.jedis.timeseries.*;
3636
import redis.clients.jedis.util.IOUtils;
@@ -595,12 +595,23 @@ public ScanResult<String> scan(String cursor, ScanParams params, String type) {
595595
return executeCommand(commandObjects.scan(cursor, params, type));
596596
}
597597

598-
public ScanRoundRobin scan(int batchCount, String match) {
599-
return new ScanRoundRobin(provider, batchCount, match);
598+
/**
599+
* @param batchCount COUNT for each batch execution
600+
* @param match pattern
601+
* @return scan iteration
602+
*/
603+
public ScanIteration scanIteration(int batchCount, String match) {
604+
return new ScanIteration(provider, batchCount, match);
600605
}
601606

602-
public ScanRoundRobin scan(int batchCount, String match, String type) {
603-
return new ScanRoundRobin(provider, batchCount, match, type);
607+
/**
608+
* @param batchCount COUNT for each batch execution
609+
* @param match pattern
610+
* @param type key type
611+
* @return scan iteration
612+
*/
613+
public ScanIteration scanIteration(int batchCount, String match, String type) {
614+
return new ScanIteration(provider, batchCount, match, type);
604615
}
605616

606617
@Override
@@ -3559,14 +3570,14 @@ public SearchResult ftSearch(String indexName, String query, FTSearchParams para
35593570

35603571
/**
35613572
* {@link FTSearchParams#limit(int, int)} will be ignored.
3562-
* @param batchSize
3563-
* @param indexName
3564-
* @param query
3573+
* @param batchSize batch size
3574+
* @param indexName index name
3575+
* @param query query
35653576
* @param params limit will be ignored
3566-
* @return search
3577+
* @return search iteration
35673578
*/
3568-
public FtSearchRoundRobin ftSearch(int batchSize, String indexName, String query, FTSearchParams params) {
3569-
return new FtSearchRoundRobin(provider, batchSize, indexName, query, params);
3579+
public FtSearchIteration ftSearchIteration(int batchSize, String indexName, String query, FTSearchParams params) {
3580+
return new FtSearchIteration(provider, batchSize, indexName, query, params);
35703581
}
35713582

35723583
@Override
@@ -3576,13 +3587,13 @@ public SearchResult ftSearch(String indexName, Query query) {
35763587

35773588
/**
35783589
* {@link Query#limit(java.lang.Integer, java.lang.Integer)} will be ignored.
3579-
* @param batchSize
3580-
* @param indexName
3590+
* @param batchSize batch size
3591+
* @param indexName index name
35813592
* @param query limit will be ignored
3582-
* @return search
3593+
* @return search iteration
35833594
*/
3584-
public FtSearchRoundRobin ftSearch(int batchSize, String indexName, Query query) {
3585-
return new FtSearchRoundRobin(provider, batchSize, indexName, query);
3595+
public FtSearchIteration ftSearchIteration(int batchSize, String indexName, Query query) {
3596+
return new FtSearchIteration(provider, batchSize, indexName, query);
35863597
}
35873598

35883599
@Override
@@ -3615,8 +3626,14 @@ public String ftCursorDel(String indexName, long cursorId) {
36153626
return executeCommand(commandObjects.ftCursorDel(indexName, cursorId));
36163627
}
36173628

3618-
public FtAggregateRoundRobin ftAggregateRoundRobin(String indexName, AggregationBuilder aggr) {
3619-
return new FtAggregateRoundRobin(provider, indexName, aggr);
3629+
/**
3630+
* {@link AggregationBuilder#cursor(int, long) CURSOR} must be set.
3631+
* @param indexName index name
3632+
* @param aggr cursor must be set
3633+
* @return aggregate iteration
3634+
*/
3635+
public FtAggregateIteration ftAggregateIteration(String indexName, AggregationBuilder aggr) {
3636+
return new FtAggregateIteration(provider, indexName, aggr);
36203637
}
36213638

36223639
@Override

src/main/java/redis/clients/jedis/search/FtSearchRoundRobin.java renamed to src/main/java/redis/clients/jedis/search/FtSearchIteration.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package redis.clients.jedis.search;
22

3+
import java.util.Collection;
34
import java.util.function.IntFunction;
45

56
import redis.clients.jedis.CommandArguments;
67
import redis.clients.jedis.providers.ConnectionProvider;
78
import redis.clients.jedis.search.SearchResult.SearchResultBuilder;
8-
import redis.clients.jedis.util.JedisRoundRobinBase;
9+
import redis.clients.jedis.util.JedisCommandIterationBase;
910

10-
public class FtSearchRoundRobin extends JedisRoundRobinBase<SearchResult> {
11+
public class FtSearchIteration extends JedisCommandIterationBase<SearchResult, Document> {
1112

1213
private int batchStart;
1314
private final int batchSize;
@@ -16,7 +17,7 @@ public class FtSearchRoundRobin extends JedisRoundRobinBase<SearchResult> {
1617
/**
1718
* {@link FTSearchParams#limit(int, int)} will be ignored.
1819
*/
19-
public FtSearchRoundRobin(ConnectionProvider connectionProvider, int batchSize, String indexName, String query, FTSearchParams params) {
20+
public FtSearchIteration(ConnectionProvider connectionProvider, int batchSize, String indexName, String query, FTSearchParams params) {
2021
super(connectionProvider, new SearchResultBuilder(!params.getNoContent(), params.getWithScores(), false, true));
2122
this.batchSize = batchSize;
2223
this.args = (limitFirst) -> new CommandArguments(SearchProtocol.SearchCommand.SEARCH)
@@ -26,15 +27,15 @@ public FtSearchRoundRobin(ConnectionProvider connectionProvider, int batchSize,
2627
/**
2728
* {@link Query#limit(java.lang.Integer, java.lang.Integer)} will be ignored.
2829
*/
29-
public FtSearchRoundRobin(ConnectionProvider connectionProvider, int batchSize, String indexName, Query query) {
30+
public FtSearchIteration(ConnectionProvider connectionProvider, int batchSize, String indexName, Query query) {
3031
super(connectionProvider, new SearchResultBuilder(!query.getNoContent(), query.getWithScores(), query.getWithPayloads(), true));
3132
this.batchSize = batchSize;
3233
this.args = (limitFirst) -> new CommandArguments(SearchProtocol.SearchCommand.SEARCH)
3334
.add(indexName).addParams(query.limit(limitFirst, this.batchSize));
3435
}
3536

3637
@Override
37-
protected boolean isIterationCompleted(SearchResult reply) {
38+
protected boolean isNodeCompleted(SearchResult reply) {
3839
return batchStart >= reply.getTotalResults() - batchSize;
3940
}
4041

@@ -49,4 +50,9 @@ protected CommandArguments nextCommandArguments(SearchResult lastReply) {
4950
batchStart += batchSize;
5051
return args.apply(batchStart);
5152
}
53+
54+
@Override
55+
protected Collection<Document> convertBatchToData(SearchResult batch) {
56+
return batch.getDocuments();
57+
}
5258
}

src/main/java/redis/clients/jedis/search/aggr/FtAggregateRoundRobin.java renamed to src/main/java/redis/clients/jedis/search/aggr/FtAggregateIteration.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
package redis.clients.jedis.search.aggr;
22

3+
import java.util.Collection;
4+
35
import redis.clients.jedis.CommandArguments;
46
import redis.clients.jedis.providers.ConnectionProvider;
57
import redis.clients.jedis.search.SearchBuilderFactory;
68
import redis.clients.jedis.search.SearchProtocol;
7-
import redis.clients.jedis.util.JedisRoundRobinBase;
9+
import redis.clients.jedis.util.JedisCommandIterationBase;
810

9-
public class FtAggregateRoundRobin extends JedisRoundRobinBase<AggregationResult> {
11+
public class FtAggregateIteration extends JedisCommandIterationBase<AggregationResult, Row> {
1012

1113
private final String indexName;
1214
private final CommandArguments args;
1315

14-
public FtAggregateRoundRobin(ConnectionProvider connectionProvider, String indexName, AggregationBuilder aggr) {
16+
/**
17+
* {@link AggregationBuilder#cursor(int, long) CURSOR} must be set.
18+
* @param connectionProvider connection provider
19+
* @param indexName index name
20+
* @param aggr cursor must be set
21+
*/
22+
public FtAggregateIteration(ConnectionProvider connectionProvider, String indexName, AggregationBuilder aggr) {
1523
super(connectionProvider, SearchBuilderFactory.SEARCH_AGGREGATION_RESULT_WITH_CURSOR);
1624
if (!aggr.isWithCursor()) throw new IllegalArgumentException("cursor must be set");
1725
this.indexName = indexName;
1826
this.args = new CommandArguments(SearchProtocol.SearchCommand.AGGREGATE).add(this.indexName).addObjects(aggr.getArgs());
1927
}
2028

2129
@Override
22-
protected boolean isIterationCompleted(AggregationResult reply) {
30+
protected boolean isNodeCompleted(AggregationResult reply) {
2331
return reply.getCursorId() == 0L;
2432
}
2533

@@ -33,4 +41,9 @@ protected CommandArguments nextCommandArguments(AggregationResult lastReply) {
3341
return new CommandArguments(SearchProtocol.SearchCommand.CURSOR).add(SearchProtocol.SearchKeyword.READ)
3442
.add(indexName).add(lastReply.getCursorId());
3543
}
44+
45+
@Override
46+
protected Collection<Row> convertBatchToData(AggregationResult batch) {
47+
return batch.getRows();
48+
}
3649
}

src/main/java/redis/clients/jedis/util/JedisRoundRobinBase.java renamed to src/main/java/redis/clients/jedis/util/JedisCommandIterationBase.java

+28-9
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,37 @@
11
package redis.clients.jedis.util;
22

33
import java.util.ArrayList;
4+
import java.util.Collection;
45
import java.util.Collections;
56
import java.util.LinkedList;
67
import java.util.Map;
78
import java.util.NoSuchElementException;
89
import java.util.Queue;
10+
import java.util.function.Supplier;
911

1012
import redis.clients.jedis.Builder;
1113
import redis.clients.jedis.CommandArguments;
1214
import redis.clients.jedis.Connection;
1315
import redis.clients.jedis.providers.ConnectionProvider;
1416

15-
public abstract class JedisRoundRobinBase<R> {
17+
/**
18+
* @param <B> Type of each batch reply
19+
* @param <D> Type of each data
20+
*/
21+
public abstract class JedisCommandIterationBase<B, D> {
1622

17-
private final Builder<R> builder;
23+
private final Builder<B> builder;
1824

1925
private final Queue<Map.Entry> connections;
2026

2127
private Map.Entry connection;
2228

23-
private R lastReply;
29+
private B lastReply;
2430

2531
private boolean roundRobinCompleted;
2632
private boolean iterationCompleted;
2733

28-
protected JedisRoundRobinBase(ConnectionProvider connectionProvider, Builder<R> responseBuilder) {
34+
protected JedisCommandIterationBase(ConnectionProvider connectionProvider, Builder<B> responseBuilder) {
2935
Map connectionMap = connectionProvider.getConnectionMap();
3036
ArrayList<Map.Entry> connectionList = new ArrayList<>(connectionMap.entrySet());
3137
Collections.shuffle(connectionList);
@@ -35,17 +41,17 @@ protected JedisRoundRobinBase(ConnectionProvider connectionProvider, Builder<R>
3541
this.roundRobinCompleted = this.connections.isEmpty();
3642
}
3743

38-
public final boolean isRoundRobinCompleted() {
44+
public final boolean isIterationCompleted() {
3945
return roundRobinCompleted;
4046
}
4147

42-
protected abstract boolean isIterationCompleted(R reply);
48+
protected abstract boolean isNodeCompleted(B reply);
4349

4450
protected abstract CommandArguments initCommandArguments();
4551

46-
protected abstract CommandArguments nextCommandArguments(R lastReply);
52+
protected abstract CommandArguments nextCommandArguments(B lastReply);
4753

48-
public final R get() {
54+
public final B nextBatch() {
4955
if (roundRobinCompleted) {
5056
throw new NoSuchElementException();
5157
}
@@ -70,12 +76,25 @@ public final R get() {
7076
}
7177

7278
lastReply = builder.build(rawReply);
73-
iterationCompleted = isIterationCompleted(lastReply);
79+
iterationCompleted = isNodeCompleted(lastReply);
7480
if (iterationCompleted) {
7581
if (connections.isEmpty()) {
7682
roundRobinCompleted = true;
7783
}
7884
}
7985
return lastReply;
8086
}
87+
88+
protected abstract Collection<D> convertBatchToData(B batch);
89+
90+
public final Collection<D> nextBatchList() {
91+
return convertBatchToData(nextBatch());
92+
}
93+
94+
public final Collection<D> collect(Collection<D> c) {
95+
while (!isIterationCompleted()) {
96+
c.addAll(nextBatchList());
97+
}
98+
return c;
99+
}
81100
}

src/test/java/redis/clients/jedis/commands/jedis/ClusterValuesCommandsTest.java

+30-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import redis.clients.jedis.GeoCoordinate;
1919
import redis.clients.jedis.JedisPubSub;
2020
import redis.clients.jedis.Protocol;
21-
import redis.clients.jedis.ScanRoundRobin;
21+
import redis.clients.jedis.ScanIteration;
2222
import redis.clients.jedis.args.GeoUnit;
2323
import redis.clients.jedis.params.GeoRadiusParam;
2424
import redis.clients.jedis.params.GeoRadiusStoreParam;
@@ -138,7 +138,7 @@ public void flushAllBroadcast() {
138138
}
139139

140140
@Test
141-
public void scanRoundRobin() {
141+
public void scanIteration() {
142142
Set<String> allIn = new HashSet<>(26 * 26);
143143
char[] arr = new char[2];
144144
for (int i = 0; i < 26; i++) {
@@ -152,11 +152,36 @@ public void scanRoundRobin() {
152152
}
153153

154154
Set<String> allScan = new HashSet<>();
155-
ScanRoundRobin scan = cluster.scan(10, "*");
156-
while (!scan.isRoundRobinCompleted()) {
157-
ScanResult<String> batch = scan.get();
155+
ScanIteration scan = cluster.scanIteration(10, "*");
156+
while (!scan.isIterationCompleted()) {
157+
ScanResult<String> batch = scan.nextBatch();
158158
allScan.addAll(batch.getResult());
159159
}
160160
assertEquals(allIn, allScan);
161+
162+
Set<String> allTypeScan = new HashSet<>();
163+
ScanIteration typeScan = cluster.scanIteration(10, "*", "string");
164+
while (!typeScan.isIterationCompleted()) {
165+
ScanResult<String> batch = typeScan.nextBatch();
166+
allTypeScan.addAll(batch.getResult());
167+
}
168+
assertEquals(allIn, allTypeScan);
169+
}
170+
171+
@Test
172+
public void scanIterationCollect() {
173+
Set<String> allIn = new HashSet<>(26 * 26);
174+
char[] arr = new char[2];
175+
for (int i = 0; i < 26; i++) {
176+
arr[0] = (char) ('a' + i);
177+
for (int j = 0; j < 26; j++) {
178+
arr[1] = (char) ('a' + j);
179+
String str = new String(arr);
180+
cluster.incr(str);
181+
allIn.add(str);
182+
}
183+
}
184+
185+
assertEquals(allIn, cluster.scanIteration(100, "*").collect(new HashSet<>(26 * 26)));
161186
}
162187
}

0 commit comments

Comments
 (0)