diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java index ea00d4df2c..74413e85d3 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapMaybe.java @@ -191,14 +191,18 @@ void innerSuccess(InnerObserver inner, R value) { } } else { SpscLinkedArrayQueue q = getOrCreateQueue(); - q.offer(value); + synchronized (q) { + q.offer(value); + } } if (decrementAndGet() == 0) { return; } } else { SpscLinkedArrayQueue q = getOrCreateQueue(); - q.offer(value); + synchronized (q) { + q.offer(value); + } active.decrementAndGet(); if (getAndIncrement() != 0) { return; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java index 9b9dbc8d89..434d36d01f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapSingle.java @@ -191,14 +191,18 @@ void innerSuccess(InnerObserver inner, R value) { } } else { SpscLinkedArrayQueue q = getOrCreateQueue(); - q.offer(value); + synchronized (q) { + q.offer(value); + } } if (decrementAndGet() == 0) { return; } } else { SpscLinkedArrayQueue q = getOrCreateQueue(); - q.offer(value); + synchronized (q) { + q.offer(value); + } active.decrementAndGet(); if (getAndIncrement() != 0) { return; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java index 0de37223ac..28a8547670 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapMaybe.java @@ -167,7 +167,9 @@ void innerSuccess(InnerObserver inner, R value) { } } else { SpscLinkedArrayQueue q = getOrCreateQueue(); - q.offer(value); + synchronized (q) { + q.offer(value); + } active.decrementAndGet(); if (getAndIncrement() != 0) { return; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java index 3b8c2cc2cd..8fb7e63813 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableFlatMapSingle.java @@ -167,7 +167,9 @@ void innerSuccess(InnerObserver inner, R value) { } } else { SpscLinkedArrayQueue q = getOrCreateQueue(); - q.offer(value); + synchronized (q) { + q.offer(value); + } active.decrementAndGet(); if (getAndIncrement() != 0) { return;