Skip to content

Commit 989a3b3

Browse files
authored
Support round-robining for timeseries commands (#3359)
1 parent 014182d commit 989a3b3

File tree

4 files changed

+190
-6
lines changed

4 files changed

+190
-6
lines changed

src/main/java/redis/clients/jedis/UnifiedJedis.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4207,6 +4207,10 @@ public List<TSKeyedElements> tsMRange(TSMRangeParams multiRangeParams) {
42074207
return executeCommand(commandObjects.tsMRange(multiRangeParams));
42084208
}
42094209

4210+
public TsMRangeRoundRobin tsMRangeRoundRobin(TSMRangeParams multiRangeParams) {
4211+
return new TsMRangeRoundRobin(provider, false, multiRangeParams);
4212+
}
4213+
42104214
@Override
42114215
public List<TSKeyedElements> tsMRevRange(long fromTimestamp, long toTimestamp, String... filters) {
42124216
return executeCommand(commandObjects.tsMRevRange(fromTimestamp, toTimestamp, filters));
@@ -4217,6 +4221,10 @@ public List<TSKeyedElements> tsMRevRange(TSMRangeParams multiRangeParams) {
42174221
return executeCommand(commandObjects.tsMRevRange(multiRangeParams));
42184222
}
42194223

4224+
public TsMRangeRoundRobin tsMRevRangeRoundRobin(TSMRangeParams multiRangeParams) {
4225+
return new TsMRangeRoundRobin(provider, true, multiRangeParams);
4226+
}
4227+
42204228
@Override
42214229
public TSElement tsGet(String key) {
42224230
return executeCommand(commandObjects.tsGet(key));
@@ -4232,6 +4240,10 @@ public List<TSKeyValue<TSElement>> tsMGet(TSMGetParams multiGetParams, String...
42324240
return executeCommand(commandObjects.tsMGet(multiGetParams, filters));
42334241
}
42344242

4243+
public TsMGetRoundRobin tsMGetRoundRobin(TSMGetParams multiGetParams, String... filters) {
4244+
return new TsMGetRoundRobin(provider, multiGetParams, filters);
4245+
}
4246+
42354247
@Override
42364248
public String tsCreateRule(String sourceKey, String destKey, AggregationType aggregationType, long timeBucket) {
42374249
return executeCommand(commandObjects.tsCreateRule(sourceKey, destKey, aggregationType, timeBucket));
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package redis.clients.jedis.timeseries;
2+
3+
import java.util.List;
4+
5+
import redis.clients.jedis.CommandArguments;
6+
import redis.clients.jedis.providers.ConnectionProvider;
7+
import redis.clients.jedis.util.JedisRoundRobinBase;
8+
9+
public class TsMGetRoundRobin extends JedisRoundRobinBase<List<TSKeyValue<TSElement>>> {
10+
11+
private final CommandArguments args;
12+
13+
public TsMGetRoundRobin(ConnectionProvider connectionProvider, TSMGetParams multiGetParams, String... filters) {
14+
super(connectionProvider, TimeSeriesBuilderFactory.TIMESERIES_MGET_RESPONSE);
15+
this.args = new CommandArguments(TimeSeriesProtocol.TimeSeriesCommand.MGET).addParams(multiGetParams)
16+
.add(TimeSeriesProtocol.TimeSeriesKeyword.FILTER).addObjects((Object[]) filters);
17+
}
18+
19+
@Override
20+
protected boolean isIterationCompleted(List<TSKeyValue<TSElement>> reply) {
21+
return reply != null;
22+
}
23+
24+
@Override
25+
protected CommandArguments initCommandArguments() {
26+
return args;
27+
}
28+
29+
@Override
30+
protected CommandArguments nextCommandArguments(List<TSKeyValue<TSElement>> lastReply) {
31+
throw new IllegalStateException();
32+
}
33+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package redis.clients.jedis.timeseries;
2+
3+
import java.util.List;
4+
5+
import redis.clients.jedis.CommandArguments;
6+
import redis.clients.jedis.providers.ConnectionProvider;
7+
import redis.clients.jedis.timeseries.TimeSeriesProtocol.TimeSeriesCommand;
8+
import redis.clients.jedis.util.JedisRoundRobinBase;
9+
10+
public class TsMRangeRoundRobin extends JedisRoundRobinBase<List<TSKeyedElements>> {
11+
12+
private final CommandArguments args;
13+
14+
/**
15+
* @param connectionProvider connection provider
16+
* @param reverse {@code false} means TS.MRANGE command; {@code true} means TS.MREVRANGE command
17+
* @param multiRangeParams optional arguments and parameters
18+
*/
19+
public TsMRangeRoundRobin(ConnectionProvider connectionProvider, boolean reverse, TSMRangeParams multiRangeParams) {
20+
super(connectionProvider, TimeSeriesBuilderFactory.TIMESERIES_MRANGE_RESPONSE);
21+
this.args = new CommandArguments(!reverse ? TimeSeriesCommand.MRANGE : TimeSeriesCommand.MREVRANGE).addParams(multiRangeParams);
22+
}
23+
24+
@Override
25+
protected boolean isIterationCompleted(List<TSKeyedElements> reply) {
26+
return reply != null;
27+
}
28+
29+
@Override
30+
protected CommandArguments initCommandArguments() {
31+
return args;
32+
}
33+
34+
@Override
35+
protected CommandArguments nextCommandArguments(List<TSKeyedElements> lastReply) {
36+
throw new IllegalStateException();
37+
}
38+
}

src/test/java/redis/clients/jedis/modules/timeseries/TimeSeriesTest.java

Lines changed: 107 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
11
package redis.clients.jedis.modules.timeseries;
22

3-
import static org.junit.Assert.assertEquals;
4-
import static org.junit.Assert.assertNotEquals;
5-
import static org.junit.Assert.assertNotNull;
6-
import static org.junit.Assert.assertNull;
7-
import static org.junit.Assert.assertTrue;
8-
import static org.junit.Assert.fail;
3+
import static org.junit.Assert.*;
94

105
import java.util.*;
116
import org.junit.BeforeClass;
@@ -533,6 +528,71 @@ public void mrangeFilterBy() {
533528
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
534529
}
535530

531+
@Test
532+
public void mrangeFilterByRoundRobin() {
533+
TsMRangeRoundRobin rr;
534+
List<TSKeyedElements> range;
535+
536+
Map<String, String> labels = Collections.singletonMap("label", "multi");
537+
client.tsCreate("ts1", TSCreateParams.createParams().labels(labels));
538+
client.tsCreate("ts2", TSCreateParams.createParams().labels(labels));
539+
String filter = "label=multi";
540+
541+
TSElement[] rawValues = new TSElement[]{
542+
new TSElement(1000L, 1.0),
543+
new TSElement(2000L, 0.9),
544+
new TSElement(3200L, 1.1),
545+
new TSElement(4500L, -1.1)
546+
};
547+
548+
client.tsAdd("ts1", rawValues[0].getTimestamp(), rawValues[0].getValue());
549+
client.tsAdd("ts2", rawValues[1].getTimestamp(), rawValues[1].getValue());
550+
client.tsAdd("ts2", rawValues[2].getTimestamp(), rawValues[2].getValue());
551+
client.tsAdd("ts1", rawValues[3].getTimestamp(), rawValues[3].getValue());
552+
553+
// MRANGE
554+
rr = client.tsMRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L)
555+
.filterByTS(1000L, 2000L).filter(filter));
556+
assertFalse(rr.isRoundRobinCompleted());
557+
range = rr.get();
558+
assertEquals("ts1", range.get(0).getKey());
559+
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
560+
assertEquals("ts2", range.get(1).getKey());
561+
assertEquals(Arrays.asList(rawValues[1]), range.get(1).getValue());
562+
assertTrue(rr.isRoundRobinCompleted());
563+
564+
rr = client.tsMRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L)
565+
.filterByValues(1.0, 1.2).filter(filter));
566+
assertFalse(rr.isRoundRobinCompleted());
567+
range = rr.get();
568+
assertEquals("ts1", range.get(0).getKey());
569+
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
570+
assertEquals("ts2", range.get(1).getKey());
571+
assertEquals(Arrays.asList(rawValues[2]), range.get(1).getValue());
572+
assertTrue(rr.isRoundRobinCompleted());
573+
574+
// MREVRANGE
575+
rr = client.tsMRevRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L)
576+
.filterByTS(1000L, 2000L).filter(filter));
577+
assertFalse(rr.isRoundRobinCompleted());
578+
range = rr.get();
579+
assertEquals("ts1", range.get(0).getKey());
580+
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
581+
assertEquals("ts2", range.get(1).getKey());
582+
assertEquals(Arrays.asList(rawValues[1]), range.get(1).getValue());
583+
assertTrue(rr.isRoundRobinCompleted());
584+
585+
rr = client.tsMRevRangeRoundRobin(TSMRangeParams.multiRangeParams(0L, 5000L)
586+
.filterByValues(1.0, 1.2).filter(filter));
587+
assertFalse(rr.isRoundRobinCompleted());
588+
range = rr.get();
589+
assertEquals("ts1", range.get(0).getKey());
590+
assertEquals(Arrays.asList(rawValues[0]), range.get(0).getValue());
591+
assertEquals("ts2", range.get(1).getKey());
592+
assertEquals(Arrays.asList(rawValues[2]), range.get(1).getValue());
593+
assertTrue(rr.isRoundRobinCompleted());
594+
}
595+
536596
@Test
537597
public void groupByReduce() {
538598
client.tsCreate("ts1", TSCreateParams.createParams().labels(convertMap("metric", "cpu", "metric_name", "system")));
@@ -624,6 +684,47 @@ public void testMGet() {
624684
assertNull(ranges3.get(1).getValue());
625685
}
626686

687+
@Test
688+
public void mgetRoundRobin() {
689+
TsMGetRoundRobin rr;
690+
Map<String, String> labels = new HashMap<>();
691+
labels.put("l1", "v1");
692+
labels.put("l2", "v2");
693+
assertEquals("OK", client.tsCreate("seriesMGet1", TSCreateParams.createParams()
694+
.retention(100 * 1000 /*100sec retentionTime*/).labels(labels)));
695+
assertEquals("OK", client.tsCreate("seriesMGet2", TSCreateParams.createParams()
696+
.retention(100 * 1000 /*100sec retentionTime*/).labels(labels)));
697+
698+
// Test for empty result
699+
rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(false), "l1=v2");
700+
assertFalse(rr.isRoundRobinCompleted());
701+
List<TSKeyValue<TSElement>> ranges1 = rr.get();
702+
assertEquals(0, ranges1.size());
703+
assertTrue(rr.isRoundRobinCompleted());
704+
705+
// Test for empty ranges
706+
rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(true), "l1=v1");
707+
assertFalse(rr.isRoundRobinCompleted());
708+
List<TSKeyValue<TSElement>> ranges2 = rr.get();
709+
assertEquals(2, ranges2.size());
710+
assertEquals(labels, ranges2.get(0).getLabels());
711+
assertEquals(labels, ranges2.get(1).getLabels());
712+
assertNull(ranges2.get(0).getValue());
713+
assertTrue(rr.isRoundRobinCompleted());
714+
715+
// Test for returned result on MGet
716+
client.tsAdd("seriesMGet1", 1500, 1.3);
717+
rr = client.tsMGetRoundRobin(TSMGetParams.multiGetParams().withLabels(false), "l1=v1");
718+
assertFalse(rr.isRoundRobinCompleted());
719+
List<TSKeyValue<TSElement>> ranges3 = rr.get();
720+
assertEquals(2, ranges3.size());
721+
assertEquals(Collections.emptyMap(), ranges3.get(0).getLabels());
722+
assertEquals(Collections.emptyMap(), ranges3.get(1).getLabels());
723+
assertEquals(new TSElement(1500, 1.3), ranges3.get(0).getValue());
724+
assertNull(ranges3.get(1).getValue());
725+
assertTrue(rr.isRoundRobinCompleted());
726+
}
727+
627728
@Test
628729
public void testQueryIndex() {
629730

0 commit comments

Comments
 (0)