Skip to content

Commit 01cae89

Browse files
authored
3.x: Fix Observable amb, combineLatest & zip ArrayStoreException (#6756)
1 parent ec81e8c commit 01cae89

14 files changed

+196
-3
lines changed

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmb.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public void subscribeActual(Observer<? super T> observer) {
3636
ObservableSource<? extends T>[] sources = this.sources;
3737
int count = 0;
3838
if (sources == null) {
39-
sources = new Observable[8];
39+
sources = new ObservableSource[8];
4040
try {
4141
for (ObservableSource<? extends T> p : sourcesIterable) {
4242
if (p == null) {

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public void subscribeActual(Observer<? super R> observer) {
4848
ObservableSource<? extends T>[] sources = this.sources;
4949
int count = 0;
5050
if (sources == null) {
51-
sources = new Observable[8];
51+
sources = new ObservableSource[8];
5252
for (ObservableSource<? extends T> p : sourcesIterable) {
5353
if (count == sources.length) {
5454
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZip.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void subscribeActual(Observer<? super R> observer) {
5050
ObservableSource<? extends T>[] sources = this.sources;
5151
int count = 0;
5252
if (sources == null) {
53-
sources = new Observable[8];
53+
sources = new ObservableSource[8];
5454
for (ObservableSource<? extends T> p : sourcesIterable) {
5555
if (count == sources.length) {
5656
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableAmbTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -315,4 +315,18 @@ public void run() throws Exception {
315315
assertFalse("Interrupted!", interrupted.get());
316316
}
317317
}
318+
319+
@Test
320+
public void completableSourcesInIterable() {
321+
CompletableSource source = new CompletableSource() {
322+
@Override
323+
public void subscribe(CompletableObserver observer) {
324+
Completable.complete().subscribe(observer);
325+
}
326+
};
327+
328+
Completable.amb(Arrays.asList(source, source))
329+
.test()
330+
.assertResult();
331+
}
318332
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableAmbTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -667,4 +667,19 @@ public void run() throws Exception {
667667
assertFalse("Interrupted!", interrupted.get());
668668
}
669669
}
670+
671+
@SuppressWarnings("unchecked")
672+
@Test
673+
public void publishersInIterable() {
674+
Publisher<Integer> source = new Publisher<Integer>() {
675+
@Override
676+
public void subscribe(Subscriber<? super Integer> subscriber) {
677+
Flowable.just(1).subscribe(subscriber);
678+
}
679+
};
680+
681+
Flowable.amb(Arrays.asList(source, source))
682+
.test()
683+
.assertResult(1);
684+
}
670685
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCombineLatestTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -1551,4 +1551,24 @@ public Object apply(Object[] a) throws Exception {
15511551
.awaitDone(5, TimeUnit.SECONDS)
15521552
.assertFailure(TestException.class, 42);
15531553
}
1554+
1555+
@SuppressWarnings("unchecked")
1556+
@Test
1557+
public void publishersInIterable() {
1558+
Publisher<Integer> source = new Publisher<Integer>() {
1559+
@Override
1560+
public void subscribe(Subscriber<? super Integer> subscriber) {
1561+
Flowable.just(1).subscribe(subscriber);
1562+
}
1563+
};
1564+
1565+
Flowable.combineLatest(Arrays.asList(source, source), new Function<Object[], Integer>() {
1566+
@Override
1567+
public Integer apply(Object[] t) throws Throwable {
1568+
return 2;
1569+
}
1570+
})
1571+
.test()
1572+
.assertResult(2);
1573+
}
15541574
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -1896,4 +1896,24 @@ public Object apply(Object[] a) throws Exception {
18961896

18971897
assertEquals(0, counter.get());
18981898
}
1899+
1900+
@SuppressWarnings("unchecked")
1901+
@Test
1902+
public void publishersInIterable() {
1903+
Publisher<Integer> source = new Publisher<Integer>() {
1904+
@Override
1905+
public void subscribe(Subscriber<? super Integer> subscriber) {
1906+
Flowable.just(1).subscribe(subscriber);
1907+
}
1908+
};
1909+
1910+
Flowable.zip(Arrays.asList(source, source), new Function<Object[], Integer>() {
1911+
@Override
1912+
public Integer apply(Object[] t) throws Throwable {
1913+
return 2;
1914+
}
1915+
})
1916+
.test()
1917+
.assertResult(2);
1918+
}
18991919
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeAmbTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -254,4 +254,19 @@ public void run() {
254254
}
255255
}
256256
}
257+
258+
@SuppressWarnings("unchecked")
259+
@Test
260+
public void maybeSourcesInIterable() {
261+
MaybeSource<Integer> source = new MaybeSource<Integer>() {
262+
@Override
263+
public void subscribe(MaybeObserver<? super Integer> observer) {
264+
Maybe.just(1).subscribe(observer);
265+
}
266+
};
267+
268+
Maybe.amb(Arrays.asList(source, source))
269+
.test()
270+
.assertResult(1);
271+
}
257272
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeZipIterableTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -221,4 +221,24 @@ public void singleSourceZipperReturnsNull() {
221221
.to(TestHelper.<Object>testConsumer())
222222
.assertFailureAndMessage(NullPointerException.class, "The zipper returned a null value");
223223
}
224+
225+
@SuppressWarnings("unchecked")
226+
@Test
227+
public void maybeSourcesInIterable() {
228+
MaybeSource<Integer> source = new MaybeSource<Integer>() {
229+
@Override
230+
public void subscribe(MaybeObserver<? super Integer> observer) {
231+
Maybe.just(1).subscribe(observer);
232+
}
233+
};
234+
235+
Maybe.zip(Arrays.asList(source, source), new Function<Object[], Integer>() {
236+
@Override
237+
public Integer apply(Object[] t) throws Throwable {
238+
return 2;
239+
}
240+
})
241+
.test()
242+
.assertResult(2);
243+
}
224244
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableAmbTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -467,4 +467,18 @@ public void run() throws Exception {
467467
}
468468
}
469469

470+
@SuppressWarnings("unchecked")
471+
@Test
472+
public void observableSourcesInIterable() {
473+
ObservableSource<Integer> source = new ObservableSource<Integer>() {
474+
@Override
475+
public void subscribe(Observer<? super Integer> observer) {
476+
Observable.just(1).subscribe(observer);
477+
}
478+
};
479+
480+
Observable.amb(Arrays.asList(source, source))
481+
.test()
482+
.assertResult(1);
483+
}
470484
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCombineLatestTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -1212,4 +1212,24 @@ public Object apply(Object[] a) throws Exception {
12121212
.awaitDone(5, TimeUnit.SECONDS)
12131213
.assertFailure(TestException.class, 42);
12141214
}
1215+
1216+
@SuppressWarnings("unchecked")
1217+
@Test
1218+
public void observableSourcesInIterable() {
1219+
ObservableSource<Integer> source = new ObservableSource<Integer>() {
1220+
@Override
1221+
public void subscribe(Observer<? super Integer> observer) {
1222+
Observable.just(1).subscribe(observer);
1223+
}
1224+
};
1225+
1226+
Observable.combineLatest(Arrays.asList(source, source), new Function<Object[], Integer>() {
1227+
@Override
1228+
public Integer apply(Object[] t) throws Throwable {
1229+
return 2;
1230+
}
1231+
})
1232+
.test()
1233+
.assertResult(2);
1234+
}
12151235
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableZipTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -1429,4 +1429,24 @@ public Object apply(Object[] a) throws Exception {
14291429

14301430
assertEquals(0, counter.get());
14311431
}
1432+
1433+
@SuppressWarnings("unchecked")
1434+
@Test
1435+
public void observableSourcesInIterable() {
1436+
ObservableSource<Integer> source = new ObservableSource<Integer>() {
1437+
@Override
1438+
public void subscribe(Observer<? super Integer> observer) {
1439+
Observable.just(1).subscribe(observer);
1440+
}
1441+
};
1442+
1443+
Observable.zip(Arrays.asList(source, source), new Function<Object[], Integer>() {
1444+
@Override
1445+
public Integer apply(Object[] t) throws Throwable {
1446+
return 2;
1447+
}
1448+
})
1449+
.test()
1450+
.assertResult(2);
1451+
}
14321452
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleAmbTest.java

+15
Original file line numberDiff line numberDiff line change
@@ -342,4 +342,19 @@ public void accept(Object v, Throwable e) throws Exception {
342342
assertFalse("Interrupted!", interrupted.get());
343343
}
344344
}
345+
346+
@SuppressWarnings("unchecked")
347+
@Test
348+
public void singleSourcesInIterable() {
349+
SingleSource<Integer> source = new SingleSource<Integer>() {
350+
@Override
351+
public void subscribe(SingleObserver<? super Integer> observer) {
352+
Single.just(1).subscribe(observer);
353+
}
354+
};
355+
356+
Single.amb(Arrays.asList(source, source))
357+
.test()
358+
.assertResult(1);
359+
}
345360
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleZipIterableTest.java

+20
Original file line numberDiff line numberDiff line change
@@ -245,4 +245,24 @@ public void singleSourceZipperReturnsNull() {
245245
.to(TestHelper.<Object>testConsumer())
246246
.assertFailureAndMessage(NullPointerException.class, "The zipper returned a null value");
247247
}
248+
249+
@SuppressWarnings("unchecked")
250+
@Test
251+
public void singleSourcesInIterable() {
252+
SingleSource<Integer> source = new SingleSource<Integer>() {
253+
@Override
254+
public void subscribe(SingleObserver<? super Integer> observer) {
255+
Single.just(1).subscribe(observer);
256+
}
257+
};
258+
259+
Single.zip(Arrays.asList(source, source), new Function<Object[], Integer>() {
260+
@Override
261+
public Integer apply(Object[] t) throws Throwable {
262+
return 2;
263+
}
264+
})
265+
.test()
266+
.assertResult(2);
267+
}
248268
}

0 commit comments

Comments
 (0)