Skip to content

Simplify BulkProcessor handling and retry logic #24051

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 13, 2017

Conversation

Tim-Brooks
Copy link
Contributor

This commit collapses the SyncBulkRequestHandler and
AsyncBulkRequestHandler into a single BulkRequestHandler. The new
handler executes a bulk request and awaits for the completion if the
BulkProcessor was configured with a concurrentRequests setting of 0.
Otherwise the execution happens asynchronously.

As part of this change the Retry class has been refactored.
withSyncBackoff and withAsyncBackoff have been replaced with two
versions of withBackoff. One method takes a listener that will be
called on completion. The other method returns a future that will been
complete on request completion.

@Tim-Brooks
Copy link
Contributor Author

Tim-Brooks commented Apr 11, 2017

The PR maintains the existing BulkProcessor features. There is this setting concurrentRequests. This impacts the number of concurrent streams that are allowed by the BulkRequestHandler. If it is set to 0, then an "execute" action is blocked until the action is complete.

This setting requires the handler to still be kind of complicated. But I assume we want all the current features of the BulkProcessor to stay the same for backwards compatibility?

@s1monw
Copy link
Contributor

s1monw commented Apr 12, 2017

This setting requires the handler to still be kind of complicated. But I assume we want all the current features of the BulkProcessor to stay the same for backwards compatibility?

my take on this is, if we can fail hard if it's used we can drop it. What would be the way to block for the user instead?

Copy link
Contributor

@s1monw s1monw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this a lot! I left some suggestions can also be followups (the listener)

listener.afterBulk(executionId, bulkRequest, e);
public void execute(BulkRequest bulkRequest, long executionId) {
boolean bulkRequestSetupSuccessful = false;
boolean acquired = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's just a suggestion, I tend to do thsi this way:

Runnable toRelease = () -> {};
//...
semaphore.acquire();
toRelease = semaphore:release

that way you don't need to check any boolean logic and can just call the runnable

semaphore.acquire();
acquired = true;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do have a LatchedActionListener but I see that you need to also call the release method. I wonder if we can generalize LatchedActionListener into something that only takes a Runnable such that users that want to use it with a latch can just pass a method handle for latch::release we could overload ActionListener::wrap to take a runnable as a third parameter and be done with it?

return true;
}
return false;
public boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be public and also does this class need to be subclassable

@Tim-Brooks
Copy link
Contributor Author

What would be the way to block for the user instead?

I just kind of feel weird about the fact that "we will block until completion" functionality is determined by the "concurrentStreams" config being set to 0. Obviously the API could return a future for the user to use to block. Or there could be a blockingFlush() api. But since a flush can happen implicitly (add() when a threshold is crossed) or explicitly (flush()) this is not super easy to do. So I'm not sure that there is an obvious change.

@Tim-Brooks Tim-Brooks merged commit ffaac5a into elastic:master Apr 13, 2017
@Tim-Brooks Tim-Brooks deleted the refactor_bulk branch November 14, 2018 14:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Core/Infra/Core Core issues without another label >non-issue v6.0.0-alpha1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants