Skip to content

2.0 Design: Resource Cleanup #2780

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Feb 28, 2015 · 4 comments
Closed

2.0 Design: Resource Cleanup #2780

benjchristensen opened this issue Feb 28, 2015 · 4 comments
Milestone

Comments

@benjchristensen
Copy link
Member

As part of 2.0 (#2450) we need to figure out what to do with the ReactiveX contract that is enforced by SafeSubscriber: https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/observers/SafeSubscriber.java

There are 2 important things it does:

These come from Rx Design Guidelines 4.3 and 6.4, 6.6 and various discussions and bug fixes over the last couple years.

A 3rd thing it does which isn't required is it filters out any events after the first terminal event. This is just a safety net for incorrect Observable sources.

@benjchristensen benjchristensen added this to the 2.0 milestone Feb 28, 2015
This was referenced Feb 28, 2015
@akarnokd
Copy link
Member

In my reimplementation for JDK 9 Flow API, I had to split RxJava's Subscriber into several classes providing the different required behaviors:

  • SafeSubscriber ensures the reactive-streams contract are kept, but assumes serialized access, but does not allow adding resources to it
  • SerializedSubscriber serializes access and ensures the reactive-streams contract, but does not allow adding resources
  • CancellableSubscriber expects serialized access and checks the reactive-streams contract, allows adding resources and allows asynchronous cancellation (even before onSubscribe()) is called. Cleans up after terminal event.
  • AbstractSubscriber stores the Subscription from upstream and ensures the reactive-streams contract in onSubscribe only.
  • CheckedSubscriber allows wrapping subscribers and adds checks for the reactive-streams contract.

The reactive-streams spec allows throwing NullPointerException but no other exceptions, therefore, if I couldn't route the exception to onError without violating other rules, I send it to the Thread's uncaughtExceptionHandler.

@benjchristensen
Copy link
Member Author

I think I've convinced myself that we don't need the SafeSubscriber. The experience of the past few years has left me thinking it is overly conservative, and solving things in the wrong place.

  1. Cleanup

There is no need to ensure cancel/unsubscribe is invoked after a terminal event at the bottom of the chain. Any operator or Observable that needs to do cleanup can register its cleanup hooks with the cancel or terminal events. Every Observable that is not infinite will either receive an onError, onComplete, or cancel. If it receive an onError or onComplete it does not also need a cancel.

  1. Filtering Events

Let's do this in the Observable.create generators (as discussed in #2785 (comment)) so things behave correctly at the source, rather than burdening the consumer. And let's have proper unit tests on each operator that they behave correctly.

If someone creates their own operator, or does an unsafeCreate and breaks the contract, they break the contract. Let's not pay a perf penalty forever to protect against that.

  1. OnErrorNotImplemented

This is a somewhat valid concern, and one we can address when Observer is being used, or the subscribe overloads that take lambdas.

We can't do anything about it though if someone passes in a Reactive Streams Subscriber that swallows errors.


Anyone disagree with these view points? Can we proceed without SafeSubscriber, and most importantly without the cancel event propagating after every terminal event (a very expensive thing).

@akarnokd
Copy link
Member

I think a SafeSubscriber class on its own could help to tame a badly misbehaving Publisher/Subscriber pair to some extend so I think the default subscribe() should be plain and we provide a subscribeSafe or safeSubscribe convenience method to do the wrapping.

  1. Cleanup

If an onError or onComplete event reaches a Subscriber, the associated Subscription should be considered cancelled before the call, at least according to the RS spec, therefore there is no need to call cancel.

One needs to cancel() though if an error condition happens in the onNext or onNext wishes to terminate.

Now the trouble is with the observeOn operator. It has to release the Worker after the call to the child's onError or onComplete method, otherwise their run could encounter an interrupt due to Future.cancel.

  1. Filtering events

I don't understand the paragraph.

  1. OnErrorNotImplemented

Subscriber methods are not allowed to throw so what's left is RxJavaPlugins.onError and just return to the caller.

@akarnokd
Copy link
Member

Closing with the following resolution:

  • Subscribers and Observers are considered safe by default so they can't throw from their onXXX methods.
  • subscribe(X) by default won't wrap consumers into a safe consumer.
  • In case a consumer is expected to throw, there are the safeSubscribe that adds the necessary safeguards.
  • Non-fatal exceptions thrown but not deliverable due to protocol restrictions are all routed to RxJavaPlugins.onError and can be hooked globally.
  • Cleaning up is now the responsibility of the operators; as there are no safe wrappers by default, when a sequence terminates, its upstream connection and all associated resources has to be cleaned up by the upstream before the call or after the call to onError/onComplete of the downstream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants