File tree 2 files changed +50
-1
lines changed
2 files changed +50
-1
lines changed Original file line number Diff line number Diff line change @@ -20,7 +20,7 @@ import class Workflow.Lifetime
20
20
21
21
extension Observable : AnyWorkflowConvertible {
22
22
public func asAnyWorkflow( ) -> AnyWorkflow < Void , Element > {
23
- return ObservableWorkflow ( observable: self ) . asAnyWorkflow ( )
23
+ ObservableWorkflow ( observable: self ) . asAnyWorkflow ( )
24
24
}
25
25
}
26
26
@@ -41,6 +41,7 @@ struct ObservableWorkflow<Value>: Workflow {
41
41
let disposable = observable
42
42
. map { AnyWorkflowAction ( sendingOutput: $0) }
43
43
. subscribe ( on: MainScheduler . asyncInstance)
44
+ . observe ( on: MainScheduler . asyncInstance)
44
45
. subscribe ( onNext: { value in
45
46
sink. send ( value)
46
47
} )
Original file line number Diff line number Diff line change @@ -38,6 +38,54 @@ class Rx_ReactiveWorkersTests: XCTestCase {
38
38
39
39
disposable? . dispose ( )
40
40
}
41
+
42
+ func test_observes_on_main_queue( ) {
43
+ struct TestWorkflow : Workflow {
44
+ enum Action : WorkflowAction {
45
+ typealias WorkflowType = TestWorkflow
46
+ case complete
47
+
48
+ func apply( toState state: inout State ) -> Output ? {
49
+ switch self {
50
+ case . complete:
51
+ return . finished
52
+ }
53
+ }
54
+ }
55
+
56
+ enum Output {
57
+ case finished
58
+ }
59
+
60
+ func render( state: Void , context: RenderContext < Self > ) {
61
+ Single < Void > . create { observer in
62
+ DispatchQueue . global ( ) . async {
63
+ observer ( . success( ( ) ) )
64
+ }
65
+ return Disposables . create ( )
66
+ }
67
+ . asObservable ( )
68
+ . running ( in: context) { _ in
69
+ XCTAssert ( Thread . isMainThread)
70
+ return Action . complete
71
+ }
72
+ }
73
+ }
74
+
75
+ let host = WorkflowHost (
76
+ workflow: TestWorkflow ( )
77
+ )
78
+
79
+ let expectation = XCTestExpectation ( )
80
+ let disposable = host. output. signal. observeValues { output in
81
+ if output == . finished {
82
+ expectation. fulfill ( )
83
+ }
84
+ }
85
+
86
+ wait ( for: [ expectation] , timeout: 1.0 )
87
+ disposable? . dispose ( )
88
+ }
41
89
}
42
90
43
91
struct CombinedWorkflow : Workflow {
You can’t perform that action at this time.
0 commit comments