Skip to content

Diamond operators in tests, small packages #6789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 52 additions & 52 deletions src/test/java/io/reactivex/rxjava3/completable/CompletableTest.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void run() {
}));
}

final List<Thread> threads = new ArrayList<Thread>();
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() {
@Override
Expand Down Expand Up @@ -253,7 +253,7 @@ public void run() {

}));

final List<Thread> threads = new ArrayList<Thread>();
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void setOnceTwice() {
List<Throwable> errors = TestHelper.trackPluginErrors();
try {

AtomicReference<Disposable> target = new AtomicReference<Disposable>();
AtomicReference<Disposable> target = new AtomicReference<>();
Disposable d = Disposable.empty();

DisposableHelper.setOnce(target, d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public class FutureDisposableTest extends RxJavaTest {

@Test
public void normal() {
FutureTask<Object> ft = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
FutureTask<Object> ft = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
Disposable d = Disposable.fromFuture(ft);
assertFalse(d.isDisposed());

Expand All @@ -43,7 +43,7 @@ public void normal() {

@Test
public void interruptible() {
FutureTask<Object> ft = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
FutureTask<Object> ft = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
Disposable d = Disposable.fromFuture(ft, true);
assertFalse(d.isDisposed());

Expand All @@ -60,7 +60,7 @@ public void interruptible() {

@Test
public void normalDone() {
FutureTask<Object> ft = new FutureTask<Object>(Functions.EMPTY_RUNNABLE, null);
FutureTask<Object> ft = new FutureTask<>(Functions.EMPTY_RUNNABLE, null);
FutureDisposable d = new FutureDisposable(ft, false);
assertFalse(d.isDisposed());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcur
final int count = 10;
final CountDownLatch end = new CountDownLatch(count);

final List<Thread> threads = new ArrayList<Thread>();
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() {
@Override
Expand Down Expand Up @@ -164,12 +164,12 @@ public void run() {
public void concurrentSetDisposableShouldNotInterleave()
throws InterruptedException {
final int count = 10;
final List<Disposable> subscriptions = new ArrayList<Disposable>();
final List<Disposable> subscriptions = new ArrayList<>();

final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(count);

final List<Thread> threads = new ArrayList<Thread>();
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Disposable subscription = mock(Disposable.class);
subscriptions.add(subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void settingUnderlyingWhenUnsubscribedCausesImmediateUnsubscriptionConcur
final int count = 10;
final CountDownLatch end = new CountDownLatch(count);

final List<Thread> threads = new ArrayList<Thread>();
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Thread t = new Thread() {
@Override
Expand Down Expand Up @@ -164,12 +164,12 @@ public void run() {
public void concurrentSetDisposableShouldNotInterleave()
throws InterruptedException {
final int count = 10;
final List<Disposable> subscriptions = new ArrayList<Disposable>();
final List<Disposable> subscriptions = new ArrayList<>();

final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch end = new CountDownLatch(count);

final List<Thread> threads = new ArrayList<Thread>();
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < count; i++) {
final Disposable subscription = mock(Disposable.class);
subscriptions.add(subscription);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class CompositeExceptionTest extends RxJavaTest {
private final Throwable ex3 = new Throwable("Ex3", ex2);

private CompositeException getNewCompositeExceptionWithEx123() {
List<Throwable> throwables = new ArrayList<Throwable>();
List<Throwable> throwables = new ArrayList<>();
throwables.add(ex1);
throwables.add(ex2);
throwables.add(ex3);
Expand Down Expand Up @@ -65,7 +65,7 @@ public void emptyErrors() {
assertEquals("errors is empty", e.getMessage());
}
try {
new CompositeException(new ArrayList<Throwable>());
new CompositeException(new ArrayList<>());
fail("CompositeException should fail if errors is empty");
} catch (IllegalArgumentException e) {
assertEquals("errors is empty", e.getMessage());
Expand Down Expand Up @@ -134,7 +134,7 @@ public void compositeExceptionFromCompositeAndChild() {

@Test
public void compositeExceptionFromTwoDuplicateComposites() {
List<Throwable> exs = new ArrayList<Throwable>();
List<Throwable> exs = new ArrayList<>();
exs.add(getNewCompositeExceptionWithEx123());
exs.add(getNewCompositeExceptionWithEx123());
CompositeException cex = new CompositeException(exs);
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/io/reactivex/rxjava3/flowable/Burst.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ public static <T> Builder<T> item(T item) {

@SafeVarargs
public static <T> Builder<T> items(T... items) {
return new Builder<T>(Arrays.asList(items));
return new Builder<>(Arrays.asList(items));
}

final class BurstSubscription implements Subscription {
private final Subscriber<? super T> subscriber;
final Queue<T> q = new ConcurrentLinkedQueue<T>(items);
final Queue<T> q = new ConcurrentLinkedQueue<>(items);
final AtomicLong requested = new AtomicLong();
volatile boolean cancelled;

Expand Down Expand Up @@ -121,7 +121,7 @@ public Flowable<T> error(Throwable e) {
}

public Flowable<T> create() {
return new Burst<T>(error, items);
return new Burst<>(error, items);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void doAfterTest() {
public void observeOn() {
int num = (int) (Flowable.bufferSize() * 2.1);
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
incrementingIntegers(c).observeOn(Schedulers.computation()).take(num).subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
ts.assertNoErrors();
Expand All @@ -95,7 +95,7 @@ public void observeOn() {
public void observeOnWithSlowConsumer() {
int num = (int) (Flowable.bufferSize() * 0.2);
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
incrementingIntegers(c).observeOn(Schedulers.computation()).map(
new Function<Integer, Integer>() {
@Override
Expand All @@ -121,7 +121,7 @@ public void mergeSync() {
int num = (int) (Flowable.bufferSize() * 4.1);
AtomicInteger c1 = new AtomicInteger();
AtomicInteger c2 = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable<Integer> merged = Flowable.merge(incrementingIntegers(c1), incrementingIntegers(c2));

merged.take(num).subscribe(ts);
Expand All @@ -142,7 +142,7 @@ public void mergeAsync() {
int num = (int) (Flowable.bufferSize() * 4.1);
AtomicInteger c1 = new AtomicInteger();
AtomicInteger c2 = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable<Integer> merged = Flowable.merge(
incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
incrementingIntegers(c2).subscribeOn(Schedulers.computation()));
Expand Down Expand Up @@ -171,7 +171,7 @@ public void mergeAsyncThenObserveOnLoop() {
AtomicInteger c1 = new AtomicInteger();
AtomicInteger c2 = new AtomicInteger();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable<Integer> merged = Flowable.merge(
incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
incrementingIntegers(c2).subscribeOn(Schedulers.computation()));
Expand All @@ -194,7 +194,7 @@ public void mergeAsyncThenObserveOn() {
int num = (int) (Flowable.bufferSize() * 4.1);
AtomicInteger c1 = new AtomicInteger();
AtomicInteger c2 = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable<Integer> merged = Flowable.merge(
incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
incrementingIntegers(c2).subscribeOn(Schedulers.computation()));
Expand All @@ -216,7 +216,7 @@ public void mergeAsyncThenObserveOn() {
public void flatMapSync() {
int num = (int) (Flowable.bufferSize() * 2.1);
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();

incrementingIntegers(c)
.flatMap(new Function<Integer, Publisher<Integer>>() {
Expand All @@ -240,7 +240,7 @@ public void zipSync() {
int num = (int) (Flowable.bufferSize() * 4.1);
AtomicInteger c1 = new AtomicInteger();
AtomicInteger c2 = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();

Flowable<Integer> zipped = Flowable.zip(
incrementingIntegers(c1),
Expand Down Expand Up @@ -268,7 +268,7 @@ public void zipAsync() {
int num = (int) (Flowable.bufferSize() * 2.1);
AtomicInteger c1 = new AtomicInteger();
AtomicInteger c2 = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
Flowable<Integer> zipped = Flowable.zip(
incrementingIntegers(c1).subscribeOn(Schedulers.computation()),
incrementingIntegers(c2).subscribeOn(Schedulers.computation()),
Expand All @@ -295,8 +295,8 @@ public void subscribeOnScheduling() {
for (int i = 0; i < 100; i++) {
int num = (int) (Flowable.bufferSize() * 2.1);
AtomicInteger c = new AtomicInteger();
ConcurrentLinkedQueue<Thread> threads = new ConcurrentLinkedQueue<Thread>();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
ConcurrentLinkedQueue<Thread> threads = new ConcurrentLinkedQueue<>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
// observeOn is there to make it async and need backpressure
incrementingIntegers(c, threads).subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).take(num).subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -325,7 +325,7 @@ public void subscribeOnScheduling() {
public void takeFilterSkipChainAsync() {
int num = (int) (Flowable.bufferSize() * 2.1);
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
incrementingIntegers(c).observeOn(Schedulers.computation())
.skip(10000)
.filter(new Predicate<Integer>() {
Expand Down Expand Up @@ -452,7 +452,7 @@ public void onNext(Integer t) {
@Test
public void firehoseFailsAsExpected() {
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();

firehose(c).observeOn(Schedulers.computation())
.map(new Function<Integer, Integer>() {
Expand Down Expand Up @@ -496,7 +496,7 @@ public void onBackpressureDrop() {
}
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
firehose(c).onBackpressureDrop()
.observeOn(Schedulers.computation())
.map(SLOW_PASS_THRU).take(num).subscribe(ts);
Expand All @@ -521,7 +521,7 @@ public void onBackpressureDropWithAction() {
final AtomicInteger dropCount = new AtomicInteger();
final AtomicInteger passCount = new AtomicInteger();
final int num = Flowable.bufferSize() * 3; // > 1 so that take doesn't prevent buffer overflow
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();

firehose(emitCount)
.onBackpressureDrop(new Consumer<Integer>() {
Expand Down Expand Up @@ -561,7 +561,7 @@ public void onBackpressureDropSynchronous() {
for (int i = 0; i < 100; i++) {
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
firehose(c).onBackpressureDrop()
.map(SLOW_PASS_THRU).take(num).subscribe(ts);
ts.awaitDone(5, TimeUnit.SECONDS);
Expand All @@ -584,7 +584,7 @@ public void onBackpressureDropSynchronousWithAction() {
final AtomicInteger dropCount = new AtomicInteger();
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();
firehose(c).onBackpressureDrop(new Consumer<Integer>() {
@Override
public void accept(Integer j) {
Expand Down Expand Up @@ -613,7 +613,7 @@ public void accept(Integer j) {
public void onBackpressureBuffer() {
int num = (int) (Flowable.bufferSize() * 1.1); // > 1 so that take doesn't prevent buffer overflow
AtomicInteger c = new AtomicInteger();
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
TestSubscriber<Integer> ts = new TestSubscriber<>();

firehose(c).takeWhile(new Predicate<Integer>() {
@Override
Expand Down
Loading