@@ -82,7 +82,7 @@ public void doAfterTest() {
82
82
public void observeOn () {
83
83
int num = (int ) (Flowable .bufferSize () * 2.1 );
84
84
AtomicInteger c = new AtomicInteger ();
85
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
85
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
86
86
incrementingIntegers (c ).observeOn (Schedulers .computation ()).take (num ).subscribe (ts );
87
87
ts .awaitDone (5 , TimeUnit .SECONDS );
88
88
ts .assertNoErrors ();
@@ -95,7 +95,7 @@ public void observeOn() {
95
95
public void observeOnWithSlowConsumer () {
96
96
int num = (int ) (Flowable .bufferSize () * 0.2 );
97
97
AtomicInteger c = new AtomicInteger ();
98
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
98
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
99
99
incrementingIntegers (c ).observeOn (Schedulers .computation ()).map (
100
100
new Function <Integer , Integer >() {
101
101
@ Override
@@ -121,7 +121,7 @@ public void mergeSync() {
121
121
int num = (int ) (Flowable .bufferSize () * 4.1 );
122
122
AtomicInteger c1 = new AtomicInteger ();
123
123
AtomicInteger c2 = new AtomicInteger ();
124
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
124
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
125
125
Flowable <Integer > merged = Flowable .merge (incrementingIntegers (c1 ), incrementingIntegers (c2 ));
126
126
127
127
merged .take (num ).subscribe (ts );
@@ -142,7 +142,7 @@ public void mergeAsync() {
142
142
int num = (int ) (Flowable .bufferSize () * 4.1 );
143
143
AtomicInteger c1 = new AtomicInteger ();
144
144
AtomicInteger c2 = new AtomicInteger ();
145
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
145
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
146
146
Flowable <Integer > merged = Flowable .merge (
147
147
incrementingIntegers (c1 ).subscribeOn (Schedulers .computation ()),
148
148
incrementingIntegers (c2 ).subscribeOn (Schedulers .computation ()));
@@ -171,7 +171,7 @@ public void mergeAsyncThenObserveOnLoop() {
171
171
AtomicInteger c1 = new AtomicInteger ();
172
172
AtomicInteger c2 = new AtomicInteger ();
173
173
174
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
174
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
175
175
Flowable <Integer > merged = Flowable .merge (
176
176
incrementingIntegers (c1 ).subscribeOn (Schedulers .computation ()),
177
177
incrementingIntegers (c2 ).subscribeOn (Schedulers .computation ()));
@@ -194,7 +194,7 @@ public void mergeAsyncThenObserveOn() {
194
194
int num = (int ) (Flowable .bufferSize () * 4.1 );
195
195
AtomicInteger c1 = new AtomicInteger ();
196
196
AtomicInteger c2 = new AtomicInteger ();
197
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
197
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
198
198
Flowable <Integer > merged = Flowable .merge (
199
199
incrementingIntegers (c1 ).subscribeOn (Schedulers .computation ()),
200
200
incrementingIntegers (c2 ).subscribeOn (Schedulers .computation ()));
@@ -216,7 +216,7 @@ public void mergeAsyncThenObserveOn() {
216
216
public void flatMapSync () {
217
217
int num = (int ) (Flowable .bufferSize () * 2.1 );
218
218
AtomicInteger c = new AtomicInteger ();
219
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
219
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
220
220
221
221
incrementingIntegers (c )
222
222
.flatMap (new Function <Integer , Publisher <Integer >>() {
@@ -240,7 +240,7 @@ public void zipSync() {
240
240
int num = (int ) (Flowable .bufferSize () * 4.1 );
241
241
AtomicInteger c1 = new AtomicInteger ();
242
242
AtomicInteger c2 = new AtomicInteger ();
243
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
243
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
244
244
245
245
Flowable <Integer > zipped = Flowable .zip (
246
246
incrementingIntegers (c1 ),
@@ -268,7 +268,7 @@ public void zipAsync() {
268
268
int num = (int ) (Flowable .bufferSize () * 2.1 );
269
269
AtomicInteger c1 = new AtomicInteger ();
270
270
AtomicInteger c2 = new AtomicInteger ();
271
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
271
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
272
272
Flowable <Integer > zipped = Flowable .zip (
273
273
incrementingIntegers (c1 ).subscribeOn (Schedulers .computation ()),
274
274
incrementingIntegers (c2 ).subscribeOn (Schedulers .computation ()),
@@ -295,8 +295,8 @@ public void subscribeOnScheduling() {
295
295
for (int i = 0 ; i < 100 ; i ++) {
296
296
int num = (int ) (Flowable .bufferSize () * 2.1 );
297
297
AtomicInteger c = new AtomicInteger ();
298
- ConcurrentLinkedQueue <Thread > threads = new ConcurrentLinkedQueue <Thread >();
299
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
298
+ ConcurrentLinkedQueue <Thread > threads = new ConcurrentLinkedQueue <>();
299
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
300
300
// observeOn is there to make it async and need backpressure
301
301
incrementingIntegers (c , threads ).subscribeOn (Schedulers .computation ()).observeOn (Schedulers .computation ()).take (num ).subscribe (ts );
302
302
ts .awaitDone (5 , TimeUnit .SECONDS );
@@ -325,7 +325,7 @@ public void subscribeOnScheduling() {
325
325
public void takeFilterSkipChainAsync () {
326
326
int num = (int ) (Flowable .bufferSize () * 2.1 );
327
327
AtomicInteger c = new AtomicInteger ();
328
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
328
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
329
329
incrementingIntegers (c ).observeOn (Schedulers .computation ())
330
330
.skip (10000 )
331
331
.filter (new Predicate <Integer >() {
@@ -452,7 +452,7 @@ public void onNext(Integer t) {
452
452
@ Test
453
453
public void firehoseFailsAsExpected () {
454
454
AtomicInteger c = new AtomicInteger ();
455
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
455
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
456
456
457
457
firehose (c ).observeOn (Schedulers .computation ())
458
458
.map (new Function <Integer , Integer >() {
@@ -496,7 +496,7 @@ public void onBackpressureDrop() {
496
496
}
497
497
int num = (int ) (Flowable .bufferSize () * 1.1 ); // > 1 so that take doesn't prevent buffer overflow
498
498
AtomicInteger c = new AtomicInteger ();
499
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
499
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
500
500
firehose (c ).onBackpressureDrop ()
501
501
.observeOn (Schedulers .computation ())
502
502
.map (SLOW_PASS_THRU ).take (num ).subscribe (ts );
@@ -521,7 +521,7 @@ public void onBackpressureDropWithAction() {
521
521
final AtomicInteger dropCount = new AtomicInteger ();
522
522
final AtomicInteger passCount = new AtomicInteger ();
523
523
final int num = Flowable .bufferSize () * 3 ; // > 1 so that take doesn't prevent buffer overflow
524
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
524
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
525
525
526
526
firehose (emitCount )
527
527
.onBackpressureDrop (new Consumer <Integer >() {
@@ -561,7 +561,7 @@ public void onBackpressureDropSynchronous() {
561
561
for (int i = 0 ; i < 100 ; i ++) {
562
562
int num = (int ) (Flowable .bufferSize () * 1.1 ); // > 1 so that take doesn't prevent buffer overflow
563
563
AtomicInteger c = new AtomicInteger ();
564
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
564
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
565
565
firehose (c ).onBackpressureDrop ()
566
566
.map (SLOW_PASS_THRU ).take (num ).subscribe (ts );
567
567
ts .awaitDone (5 , TimeUnit .SECONDS );
@@ -584,7 +584,7 @@ public void onBackpressureDropSynchronousWithAction() {
584
584
final AtomicInteger dropCount = new AtomicInteger ();
585
585
int num = (int ) (Flowable .bufferSize () * 1.1 ); // > 1 so that take doesn't prevent buffer overflow
586
586
AtomicInteger c = new AtomicInteger ();
587
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
587
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
588
588
firehose (c ).onBackpressureDrop (new Consumer <Integer >() {
589
589
@ Override
590
590
public void accept (Integer j ) {
@@ -613,7 +613,7 @@ public void accept(Integer j) {
613
613
public void onBackpressureBuffer () {
614
614
int num = (int ) (Flowable .bufferSize () * 1.1 ); // > 1 so that take doesn't prevent buffer overflow
615
615
AtomicInteger c = new AtomicInteger ();
616
- TestSubscriber <Integer > ts = new TestSubscriber <Integer >();
616
+ TestSubscriber <Integer > ts = new TestSubscriber <>();
617
617
618
618
firehose (c ).takeWhile (new Predicate <Integer >() {
619
619
@ Override
0 commit comments