Skip to content

Commit f8f6ab7

Browse files
author
teivah
committed
JustItem refinement
1 parent d8026a5 commit f8f6ab7

11 files changed

+63
-19
lines changed

Diff for: README.md

+1
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,7 @@ In this example, we create a pool of 32 goroutines that consume items concurrent
339339
* [SumFloat32/SumFloat64/SumInt64](http://reactivex.io/documentation/operators/sum.html) — calculate the sum of numbers emitted by an Observable and emit this sum
340340

341341
### Operators to Convert Observables
342+
* Error/Errors - convert an observable into an eventual error or list of errors
342343
* [ToMap/ToMapWithValueSelector/ToSlice](http://reactivex.io/documentation/operators/to.html) — convert an Observable into another object or data structure
343344

344345
## Contributions

Diff for: factory.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,9 @@ func Just(items interface{}, opts ...Option) Observable {
233233
}
234234

235235
// JustItem creates a single from one item.
236-
func JustItem(item Item, opts ...Option) Single {
236+
func JustItem(item interface{}, opts ...Option) Single {
237237
return &SingleImpl{
238-
iterable: newSliceIterable([]Item{item}),
238+
iterable: newJustIterable(item, opts...),
239239
}
240240
}
241241

Diff for: factory_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ func Test_FromChannel_ComposedCapacity(t *testing.T) {
223223
}
224224

225225
func Test_FromItem(t *testing.T) {
226-
single := JustItem(Of(1))
226+
single := JustItem(1)
227227
Assert(context.Background(), t, single, HasItem(1), HasNotRaisedError())
228228
Assert(context.Background(), t, single, HasItem(1), HasNotRaisedError())
229229
}

Diff for: iterable_factory.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,7 @@ type factoryIterable struct {
44
factory func(opts ...Option) <-chan Item
55
}
66

7-
// TODO Replace by create
8-
func newColdIterable(factory func(opts ...Option) <-chan Item) Iterable {
7+
func newFactoryIterable(factory func(opts ...Option) <-chan Item) Iterable {
98
return &factoryIterable{factory: factory}
109
}
1110

Diff for: observable.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ type Observable interface {
3434
DoOnNext(nextFunc NextFunc, opts ...Option) Disposed
3535
ElementAt(index uint, opts ...Option) Single
3636
Error() error
37+
Errors() []error
3738
Filter(apply Predicate, opts ...Option) Observable
3839
First(opts ...Option) OptionalSingle
3940
FirstOrDefault(defaultValue interface{}, opts ...Option) Single
@@ -109,7 +110,7 @@ func operator(iterable Iterable, nextFunc, errFunc operatorItem, endFunc operato
109110
}
110111

111112
return &ObservableImpl{
112-
iterable: newColdIterable(func(propagatedOptions ...Option) <-chan Item {
113+
iterable: newFactoryIterable(func(propagatedOptions ...Option) <-chan Item {
113114
mergedOptions := append(opts, propagatedOptions...)
114115
option = parseOptions(mergedOptions...)
115116

@@ -131,7 +132,7 @@ func seq(ctx context.Context, next chan Item, iterable Iterable, nextFunc, errFu
131132
observe := iterable.Observe(opts...)
132133
operator := operatorOptions{
133134
stop: func() {
134-
if option.getErrorStrategy() == StopOnError {
135+
if option.getErrorStrategy() == OnErrorStop {
135136
stopped = true
136137
}
137138
},
@@ -166,7 +167,7 @@ func parallel(ctx context.Context, pool int, next chan Item, iterable Iterable,
166167
observe := iterable.Observe(opts...)
167168
operator := operatorOptions{
168169
stop: func() {
169-
if option.getErrorStrategy() == StopOnError {
170+
if option.getErrorStrategy() == OnErrorStop {
170171
stopped.Set()
171172
}
172173
},

Diff for: observable_operator.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ func (o *ObservableImpl) ElementAt(index uint, opts ...Option) Single {
596596
}
597597

598598
// Error returns the eventual Observable error.
599-
// Note that Error() is blocking.
599+
// This method is blocking.
600600
func (o *ObservableImpl) Error() error {
601601
for item := range o.iterable.Observe() {
602602
if item.Error() {
@@ -606,6 +606,18 @@ func (o *ObservableImpl) Error() error {
606606
return nil
607607
}
608608

609+
// Errors returns an eventual list of Observable errors.
610+
// This method is blocking
611+
func (o *ObservableImpl) Errors() []error {
612+
errs := make([]error, 0)
613+
for item := range o.iterable.Observe() {
614+
if item.Error() {
615+
errs = append(errs, item.E)
616+
}
617+
}
618+
return errs
619+
}
620+
609621
// Filter emits only those items from an Observable that pass a predicate test.
610622
func (o *ObservableImpl) Filter(apply Predicate, opts ...Option) Observable {
611623
return newObservableFromOperator(o, func(_ context.Context, item Item, dst chan<- Item, operator operatorOptions) {

Diff for: observable_operator_test.go

+27-2
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,31 @@ func Test_Observable_Error_Error(t *testing.T) {
357357
assert.Equal(t, errFoo, testObservable(1, errFoo, 3).Error())
358358
}
359359

360+
func Test_Observable_Errors_NoError(t *testing.T) {
361+
assert.Equal(t, 0, len(testObservable(1, 2, 3).Errors()))
362+
}
363+
364+
func Test_Observable_Errors_OneError(t *testing.T) {
365+
assert.Equal(t, 1, len(testObservable(1, errFoo, 3).Errors()))
366+
}
367+
368+
func Test_Observable_Errors_MultipleError(t *testing.T) {
369+
assert.Equal(t, 2, len(testObservable(1, errFoo, errBar).Errors()))
370+
}
371+
372+
func Test_Observable_Errors_MultipleErrorFromMap(t *testing.T) {
373+
errs := testObservable(1, 2, 3, 4).Map(func(i interface{}) (interface{}, error) {
374+
if i == 2 {
375+
return nil, errFoo
376+
}
377+
if i == 3 {
378+
return nil, errBar
379+
}
380+
return i, nil
381+
}, WithErrorStrategy(OnErrorContinue)).Errors()
382+
assert.Equal(t, 2, len(errs))
383+
}
384+
360385
func Test_Observable_Filter(t *testing.T) {
361386
obs := testObservable(1, 2, 3, 4).Filter(
362387
func(i interface{}) bool {
@@ -1133,7 +1158,7 @@ func Test_Observable_Option_WithOnErrorStrategy_Single(t *testing.T) {
11331158
return nil, errFoo
11341159
}
11351160
return i, nil
1136-
}, WithErrorStrategy(ContinueOnError))
1161+
}, WithErrorStrategy(OnErrorContinue))
11371162
Assert(context.Background(), t, obs, HasItems(1, 3), HasRaisedError(errFoo))
11381163
}
11391164

@@ -1150,7 +1175,7 @@ func Test_Observable_Option_WithOnErrorStrategy_Propagate(t *testing.T) {
11501175
return nil, errBar
11511176
}
11521177
return i, nil
1153-
}, WithErrorStrategy(ContinueOnError))
1178+
}, WithErrorStrategy(OnErrorContinue))
11541179
Assert(context.Background(), t, obs, HasItems(3), HasRaisedErrors(errFoo, errBar))
11551180
}
11561181

Diff for: optionalsingle_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
)
77

88
func Test_OptionalSingle_Observe(t *testing.T) {
9-
os := JustItem(Of(1)).Filter(func(i interface{}) bool {
9+
os := JustItem(1).Filter(func(i interface{}) bool {
1010
return i == 1
1111
})
1212
Assert(context.Background(), t, os, HasItem(1), HasNotRaisedError())

Diff for: options.go

+2
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ func WithBackPressureStrategy(strategy BackpressureStrategy) Option {
124124
})
125125
}
126126

127+
// WithErrorStrategy defines how an observable should deal with error.
128+
// This strategy is propagated to the parent observable.
127129
func WithErrorStrategy(strategy OnErrorStrategy) Option {
128130
return newFuncOption(func(options *funcOption) {
129131
options.onErrorStrategy = strategy

Diff for: single_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ import (
66
)
77

88
func Test_Single_Filter_True(t *testing.T) {
9-
os := JustItem(Of(1)).Filter(func(i interface{}) bool {
9+
os := JustItem(1).Filter(func(i interface{}) bool {
1010
return i == 1
1111
})
1212
Assert(context.Background(), t, os, HasItem(1), HasNotRaisedError())
1313
}
1414

1515
func Test_Single_Filter_False(t *testing.T) {
16-
os := JustItem(Of(1)).Filter(func(i interface{}) bool {
16+
os := JustItem(1).Filter(func(i interface{}) bool {
1717
return i == 0
1818
})
1919
Assert(context.Background(), t, os, HasNoItem(), HasNotRaisedError())
2020
}
2121

2222
func Test_Single_Map(t *testing.T) {
23-
single := JustItem(Of(1)).Map(func(i interface{}) (interface{}, error) {
23+
single := JustItem(1).Map(func(i interface{}) (interface{}, error) {
2424
return i.(int) + 1, nil
2525
})
2626
Assert(context.Background(), t, single, HasItem(2), HasNotRaisedError())

Diff for: types.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ package rxgo
33
import "context"
44

55
type (
6-
// BackpressureStrategy is the backpressure strategy type
6+
// BackpressureStrategy is the backpressure strategy type.
77
BackpressureStrategy uint32
8-
OnErrorStrategy uint32
8+
// OnErrorStrategy is the Observable error strategy.
9+
OnErrorStrategy uint32
910

1011
operatorOptions struct {
1112
stop func()
@@ -61,6 +62,9 @@ const (
6162
)
6263

6364
const (
64-
StopOnError OnErrorStrategy = iota
65-
ContinueOnError
65+
// OnErrorStop is the default error strategy.
66+
// An operator will stop processing items on error.
67+
OnErrorStop OnErrorStrategy = iota
68+
// OnErrorContinue means an operator will continue processing items after an error.
69+
OnErrorContinue
6670
)

0 commit comments

Comments
 (0)