Skip to content

Commit 0d189cd

Browse files
Adding onDropped callback to throttleLast as a part of #7458
1 parent 506413f commit 0d189cd

File tree

7 files changed

+645
-29
lines changed

7 files changed

+645
-29
lines changed

src/main/java/io/reactivex/rxjava3/core/Flowable.java

Lines changed: 228 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14605,6 +14605,38 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit) {
1460514605
return sample(period, unit, Schedulers.computation());
1460614606
}
1460714607

14608+
/**
14609+
* Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
14610+
* within periodic time intervals.
14611+
* <p>
14612+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.v3.png" alt="">
14613+
* <dl>
14614+
* <dt><b>Backpressure:</b></dt>
14615+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
14616+
* <dt><b>Scheduler:</b></dt>
14617+
* <dd>{@code sample} operates by default on the {@code computation} {@link Scheduler}.</dd>
14618+
* </dl>
14619+
*
14620+
* @param period
14621+
* the sampling rate
14622+
* @param unit
14623+
* the {@link TimeUnit} in which {@code period} is defined
14624+
* @param onDropped
14625+
* called with the current entry when it has been replaced by a new one
14626+
* @return the new {@code Flowable} instance
14627+
* @throws NullPointerException if {@code unit} is {@code null} or {@code onDropped} is {@code null}
14628+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14629+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
14630+
* @see #throttleLast(long, TimeUnit)
14631+
*/
14632+
@CheckReturnValue
14633+
@BackpressureSupport(BackpressureKind.ERROR)
14634+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
14635+
@NonNull
14636+
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Consumer<T> onDropped) {
14637+
return sample(period, unit, Schedulers.computation(), onDropped);
14638+
}
14639+
1460814640
/**
1460914641
* Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
1461014642
* within periodic time intervals and optionally emit the very last upstream item when the upstream completes.
@@ -14641,6 +14673,44 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, boolean emi
1464114673
return sample(period, unit, Schedulers.computation(), emitLast);
1464214674
}
1464314675

14676+
/**
14677+
* Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
14678+
* within periodic time intervals and optionally emit the very last upstream item when the upstream completes.
14679+
* <p>
14680+
* <img width="640" height="277" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.emitlast.png" alt="">
14681+
* <dl>
14682+
* <dt><b>Backpressure:</b></dt>
14683+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
14684+
* <dt><b>Scheduler:</b></dt>
14685+
* <dd>{@code sample} operates by default on the {@code computation} {@link Scheduler}.</dd>
14686+
* </dl>
14687+
*
14688+
* <p>History: 2.0.5 - experimental
14689+
* @param period
14690+
* the sampling rate
14691+
* @param unit
14692+
* the {@link TimeUnit} in which {@code period} is defined
14693+
* @param emitLast
14694+
* if {@code true}, and the upstream completes while there is still an unsampled item available,
14695+
* that item is emitted to downstream before completion
14696+
* if {@code false}, an unsampled last item is ignored.
14697+
* @param onDropped
14698+
* called with the current entry when it has been replaced by a new one
14699+
* @return the new {@code Flowable} instance
14700+
* @throws NullPointerException if {@code unit} is {@code null} {@code onDropped} is {@code null}
14701+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14702+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
14703+
* @see #throttleLast(long, TimeUnit)
14704+
* @since 2.1
14705+
*/
14706+
@CheckReturnValue
14707+
@BackpressureSupport(BackpressureKind.ERROR)
14708+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
14709+
@NonNull
14710+
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, boolean emitLast, @NonNull Consumer<T> onDropped) {
14711+
return sample(period, unit, Schedulers.computation(), emitLast, onDropped);
14712+
}
14713+
1464414714
/**
1464514715
* Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
1464614716
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}.
@@ -14672,7 +14742,44 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, boolean emi
1467214742
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler) {
1467314743
Objects.requireNonNull(unit, "unit is null");
1467414744
Objects.requireNonNull(scheduler, "scheduler is null");
14675-
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false));
14745+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false, null));
14746+
}
14747+
14748+
/**
14749+
* Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
14750+
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}.
14751+
* <p>
14752+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.v3.png" alt="">
14753+
* <dl>
14754+
* <dt><b>Backpressure:</b></dt>
14755+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
14756+
* <dt><b>Scheduler:</b></dt>
14757+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
14758+
* </dl>
14759+
*
14760+
* @param period
14761+
* the sampling rate
14762+
* @param unit
14763+
* the {@link TimeUnit} in which {@code period} is defined
14764+
* @param scheduler
14765+
* the {@code Scheduler} to use when sampling
14766+
* @param onDropped
14767+
* called with the current entry when it has been replaced by a new one
14768+
* @return the new {@code Flowable} instance
14769+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
14770+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14771+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
14772+
* @see #throttleLast(long, TimeUnit, Scheduler)
14773+
*/
14774+
@CheckReturnValue
14775+
@NonNull
14776+
@BackpressureSupport(BackpressureKind.ERROR)
14777+
@SchedulerSupport(SchedulerSupport.CUSTOM)
14778+
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
14779+
Objects.requireNonNull(unit, "unit is null");
14780+
Objects.requireNonNull(scheduler, "scheduler is null");
14781+
Objects.requireNonNull(onDropped, "onDropped is null");
14782+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, false, onDropped));
1467614783
}
1467714784

1467814785
/**
@@ -14713,7 +14820,51 @@ public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Sc
1471314820
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast) {
1471414821
Objects.requireNonNull(unit, "unit is null");
1471514822
Objects.requireNonNull(scheduler, "scheduler is null");
14716-
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast));
14823+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, null));
14824+
}
14825+
14826+
/**
14827+
* Returns a {@code Flowable} that emits the most recently emitted item (if any) emitted by the current {@code Flowable}
14828+
* within periodic time intervals, where the intervals are defined on a particular {@link Scheduler}
14829+
* and optionally emit the very last upstream item when the upstream completes.
14830+
* <p>
14831+
* <img width="640" height="277" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/sample.s.emitlast.png" alt="">
14832+
* <dl>
14833+
* <dt><b>Backpressure:</b></dt>
14834+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
14835+
* <dt><b>Scheduler:</b></dt>
14836+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
14837+
* </dl>
14838+
*
14839+
* <p>History: 2.0.5 - experimental
14840+
* @param period
14841+
* the sampling rate
14842+
* @param unit
14843+
* the {@link TimeUnit} in which {@code period} is defined
14844+
* @param scheduler
14845+
* the {@code Scheduler} to use when sampling
14846+
* @param emitLast
14847+
* if {@code true} and the upstream completes while there is still an unsampled item available,
14848+
* that item is emitted to downstream before completion
14849+
* if {@code false}, an unsampled last item is ignored.
14850+
* @param onDropped
14851+
* called with the current entry when it has been replaced by a new one
14852+
* @return the new {@code Flowable} instance
14853+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
14854+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
14855+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
14856+
* @see #throttleLast(long, TimeUnit, Scheduler)
14857+
* @since 2.1
14858+
*/
14859+
@CheckReturnValue
14860+
@NonNull
14861+
@BackpressureSupport(BackpressureKind.ERROR)
14862+
@SchedulerSupport(SchedulerSupport.CUSTOM)
14863+
public final Flowable<T> sample(long period, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, boolean emitLast, @NonNull Consumer<T> onDropped) {
14864+
Objects.requireNonNull(unit, "unit is null");
14865+
Objects.requireNonNull(scheduler, "scheduler is null");
14866+
Objects.requireNonNull(onDropped, "onDropped is null");
14867+
return RxJavaPlugins.onAssembly(new FlowableSampleTimed<>(this, period, unit, scheduler, emitLast, onDropped));
1471714868
}
1471814869

1471914870
/**
@@ -17174,6 +17325,42 @@ public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit u
1717417325
return sample(intervalDuration, unit);
1717517326
}
1717617327

17328+
/**
17329+
* Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential
17330+
* time windows of a specified duration.
17331+
* <p>
17332+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas
17333+
* {@link #throttleFirst} does not tick, it just tracks the passage of time.
17334+
* <p>
17335+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.v3.png" alt="">
17336+
* <dl>
17337+
* <dt><b>Backpressure:</b></dt>
17338+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
17339+
* <dt><b>Scheduler:</b></dt>
17340+
* <dd>{@code throttleLast} operates by default on the {@code computation} {@link Scheduler}.</dd>
17341+
* </dl>
17342+
*
17343+
* @param intervalDuration
17344+
* duration of windows within which the last item emitted by the current {@code Flowable} will be
17345+
* emitted
17346+
* @param unit
17347+
* the unit of time of {@code intervalDuration}
17348+
* @param onDropped
17349+
* called with the current entry when it has been replaced by a new one
17350+
* @return the new {@code Flowable} instance
17351+
* @throws NullPointerException if {@code unit} is {@code null} or {@code onDropped} is {@code null}
17352+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
17353+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
17354+
* @see #sample(long, TimeUnit)
17355+
*/
17356+
@CheckReturnValue
17357+
@BackpressureSupport(BackpressureKind.ERROR)
17358+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
17359+
@NonNull
17360+
public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Consumer<T> onDropped) {
17361+
return sample(intervalDuration, unit, onDropped);
17362+
}
17363+
1717717364
/**
1717817365
* Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential
1717917366
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
@@ -17211,6 +17398,45 @@ public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit u
1721117398
return sample(intervalDuration, unit, scheduler);
1721217399
}
1721317400

17401+
/**
17402+
* Returns a {@code Flowable} that emits only the last item emitted by the current {@code Flowable} during sequential
17403+
* time windows of a specified duration, where the duration is governed by a specified {@link Scheduler}.
17404+
* <p>
17405+
* This differs from {@link #throttleFirst(long, TimeUnit, Scheduler)} in that this ticks along at a scheduled interval whereas
17406+
* {@code throttleFirst} does not tick, it just tracks the passage of time.
17407+
* <p>
17408+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleLast.s.v3.png" alt="">
17409+
* <dl>
17410+
* <dt><b>Backpressure:</b></dt>
17411+
* <dd>This operator does not support backpressure as it uses time to control data flow.</dd>
17412+
* <dt><b>Scheduler:</b></dt>
17413+
* <dd>You specify which {@code Scheduler} this operator will use.</dd>
17414+
* </dl>
17415+
*
17416+
* @param intervalDuration
17417+
* duration of windows within which the last item emitted by the current {@code Flowable} will be
17418+
* emitted
17419+
* @param unit
17420+
* the unit of time of {@code intervalDuration}
17421+
* @param scheduler
17422+
* the {@code Scheduler} to use internally to manage the timers that handle timeout for each
17423+
* event
17424+
* @param onDropped
17425+
* called with the current entry when it has been replaced by a new one
17426+
* @return the new {@code Flowable} instance
17427+
* @throws NullPointerException if {@code unit} or {@code scheduler} is {@code null} or {@code onDropped} is {@code null}
17428+
* @see <a href="http://reactivex.io/documentation/operators/sample.html">ReactiveX operators documentation: Sample</a>
17429+
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Backpressure">RxJava wiki: Backpressure</a>
17430+
* @see #sample(long, TimeUnit, Scheduler)
17431+
*/
17432+
@CheckReturnValue
17433+
@BackpressureSupport(BackpressureKind.ERROR)
17434+
@SchedulerSupport(SchedulerSupport.CUSTOM)
17435+
@NonNull
17436+
public final Flowable<T> throttleLast(long intervalDuration, @NonNull TimeUnit unit, @NonNull Scheduler scheduler, @NonNull Consumer<T> onDropped) {
17437+
return sample(intervalDuration, unit, scheduler, onDropped);
17438+
}
17439+
1721417440
/**
1721517441
* Throttles items from the upstream {@code Flowable} by first emitting the next
1721617442
* item from upstream, then periodically emitting the latest item (if any) when

0 commit comments

Comments
 (0)