-
Notifications
You must be signed in to change notification settings - Fork 7.6k
/
Copy pathBaseTestConsumerEx.java
303 lines (273 loc) · 9.43 KB
/
BaseTestConsumerEx.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava3.testsupport;
import java.util.List;
import io.reactivex.rxjava3.functions.Predicate;
import io.reactivex.rxjava3.internal.fuseable.QueueFuseable;
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
import io.reactivex.rxjava3.observers.BaseTestConsumer;
import java.util.Objects;
/**
* Base class with shared infrastructure to support TestSubscriber and TestObserver.
* @param <T> the value type consumed
* @param <U> the subclass of this BaseTestConsumer
*/
public abstract class BaseTestConsumerEx<T, U extends BaseTestConsumerEx<T, U>>
extends BaseTestConsumer<T, U> {
protected int initialFusionMode;
protected int establishedFusionMode;
/**
* The optional tag associated with this test consumer.
* @since 2.0.7
*/
protected CharSequence tag;
/**
* Indicates that one of the awaitX method has timed out.
* @since 2.0.7
*/
protected boolean timeout;
public BaseTestConsumerEx() {
super();
}
/**
* Returns the last thread which called the onXXX methods of this TestObserver/TestSubscriber.
* @return the last thread which called the onXXX methods
*/
public final Thread lastThread() {
return lastThread;
}
// assertion methods
/**
* Assert that this TestObserver/TestSubscriber did not receive an onNext value which is equal to
* the given value with respect to null-safe Object.equals.
*
* <p>History: 2.0.5 - experimental
* @param value the value to expect not being received
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U assertNever(T value) {
int s = values.size();
for (int i = 0; i < s; i++) {
T v = this.values.get(i);
if (Objects.equals(v, value)) {
throw fail("Value at position " + i + " is equal to " + valueAndClass(value) + "; Expected them to be different");
}
}
return (U) this;
}
/**
* Asserts that this TestObserver/TestSubscriber did not receive any onNext value for which
* the provided predicate returns true.
*
* <p>History: 2.0.5 - experimental
* @param valuePredicate the predicate that receives the onNext value
* and should return true for the expected value.
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U assertNever(Predicate<? super T> valuePredicate) {
int s = values.size();
for (int i = 0; i < s; i++) {
T v = this.values.get(i);
try {
if (valuePredicate.test(v)) {
throw fail("Value at position " + i + " matches predicate " + valuePredicate.toString() + ", which was not expected.");
}
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
return (U)this;
}
/**
* Assert that the TestObserver/TestSubscriber terminated (i.e., the terminal latch reached zero).
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertTerminated() {
if (done.getCount() != 0) {
throw fail("Subscriber still running!");
}
long c = completions;
if (c > 1) {
throw fail("Terminated with multiple completions: " + c);
}
int s = errors.size();
if (s > 1) {
throw fail("Terminated with multiple errors: " + s);
}
if (c != 0 && s != 0) {
throw fail("Terminated with multiple completions and errors: " + c);
}
return (U)this;
}
/**
* Assert that the TestObserver/TestSubscriber has not terminated (i.e., the terminal latch is still non-zero).
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertNotTerminated() {
if (done.getCount() == 0) {
throw fail("Subscriber terminated!");
}
return (U)this;
}
/**
* Assert that there is a single error and it has the given message.
* @param message the message expected
* @return this
*/
@SuppressWarnings("unchecked")
public final U assertErrorMessage(String message) {
int s = errors.size();
if (s == 0) {
throw fail("No errors");
} else
if (s == 1) {
Throwable e = errors.get(0);
String errorMessage = e.getMessage();
if (!Objects.equals(message, errorMessage)) {
throw fail("Error message differs; exptected: " + message + " but was: " + errorMessage);
}
} else {
throw fail("Multiple errors");
}
return (U)this;
}
/**
* Assert that the upstream signalled the specified values in order and then failed
* with a Throwable for which the provided predicate returns true.
* @param errorPredicate
* the predicate that receives the error Throwable
* and should return true for expected errors.
* @param values the expected values, asserted in order
* @return this
*/
public final U assertFailure(Predicate<Throwable> errorPredicate, T... values) {
return assertSubscribed()
.assertValues(values)
.assertError(errorPredicate)
.assertNotComplete();
}
/**
* Assert that the upstream signalled the specified values in order,
* then failed with a specific class or subclass of Throwable
* and with the given exact error message.
* @param error the expected exception (parent) class
* @param message the expected failure message
* @param values the expected values, asserted in order
* @return this
*/
public final U assertFailureAndMessage(Class<? extends Throwable> error,
String message, T... values) {
return assertSubscribed()
.assertValues(values)
.assertError(error)
.assertErrorMessage(message)
.assertNotComplete();
}
/**
* Returns true if an await timed out.
* @return true if one of the timeout-based await methods has timed out.
* <p>History: 2.0.7 - experimental
* @see #clearTimeout()
* @see #assertTimeout()
* @see #assertNoTimeout()
* @since 2.1
*/
public final boolean isTimeout() {
return timeout;
}
/**
* Clears the timeout flag set by the await methods when they timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.1
* @see #isTimeout()
*/
@SuppressWarnings("unchecked")
public final U clearTimeout() {
timeout = false;
return (U)this;
}
/**
* Asserts that some awaitX method has timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U assertTimeout() {
if (!timeout) {
throw fail("No timeout?!");
}
return (U)this;
}
/**
* Asserts that some awaitX method has not timed out.
* <p>History: 2.0.7 - experimental
* @return this
* @since 2.1
*/
@SuppressWarnings("unchecked")
public final U assertNoTimeout() {
if (timeout) {
throw fail("Timeout?!");
}
return (U)this;
}
/**
* Returns the internal shared list of errors.
* @return Returns the internal shared list of errors.
*/
public final List<Throwable> errors() {
return errors;
}
/**
* Returns true if this test consumer has terminated in any fashion.
* @return true if this test consumer has terminated in any fashion
*/
public final boolean isTerminated() {
return done.getCount() == 0;
}
/**
* Returns the number of times onComplete() was called.
* @return the number of times onComplete() was called
*/
public final long completions() {
return completions;
}
/**
* Fail with the given message and add the sequence of errors as suppressed ones.
* <p>Note this is deliberately the only fail method. Most of the times an assertion
* would fail but it is possible it was due to an exception somewhere. This construct
* will capture those potential errors and report it along with the original failure.
*
* @param message the message to use
* @return AssertionError the prepared AssertionError instance
*/
public final AssertionError failWith(String message) {
return fail(message);
}
static String fusionModeToString(int mode) {
switch (mode) {
case QueueFuseable.NONE : return "NONE";
case QueueFuseable.SYNC : return "SYNC";
case QueueFuseable.ASYNC : return "ASYNC";
default: return "Unknown(" + mode + ")";
}
}
}