-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Introduce long polling for changes #33683
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
jasontedor
merged 11 commits into
elastic:master
from
jasontedor:global-checkpoint-polling
Sep 16, 2018
Merged
Changes from 4 commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
669bd24
Introduce long polling for changes
jasontedor d4f8a98
Fork it over
jasontedor 2890693
Ensure we do not lose exceptions
jasontedor 426b359
Merge remote-tracking branch 'elastic/master' into global-checkpoint-…
jasontedor 828b61c
Merge branch 'master' into global-checkpoint-polling
jasontedor 15d1af9
Iteration
jasontedor 9e32f93
Add some trace logging
jasontedor 7d1f975
Add shard ID
jasontedor cdc0e90
Merge branch 'master' into global-checkpoint-polling
jasontedor e072468
Merge branch 'master' into global-checkpoint-polling
jasontedor 086b701
Merge branch 'master' into global-checkpoint-polling
jasontedor File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
package org.elasticsearch.xpack.ccr.action; | ||
|
||
import org.elasticsearch.action.Action; | ||
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.ActionRequestValidationException; | ||
import org.elasticsearch.action.ActionResponse; | ||
import org.elasticsearch.action.support.ActionFilters; | ||
|
@@ -19,6 +20,7 @@ | |
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.unit.TimeValue; | ||
import org.elasticsearch.index.IndexService; | ||
import org.elasticsearch.index.seqno.SeqNoStats; | ||
import org.elasticsearch.index.shard.IndexShard; | ||
|
@@ -36,8 +38,10 @@ | |
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Objects; | ||
import java.util.concurrent.TimeoutException; | ||
|
||
import static org.elasticsearch.action.ValidateActions.addValidationError; | ||
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; | ||
|
||
public class ShardChangesAction extends Action<ShardChangesAction.Response> { | ||
|
||
|
@@ -59,6 +63,7 @@ public static class Request extends SingleShardRequest<Request> { | |
private int maxOperationCount; | ||
private ShardId shardId; | ||
private String expectedHistoryUUID; | ||
private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT; | ||
private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; | ||
|
||
public Request(ShardId shardId, String expectedHistoryUUID) { | ||
|
@@ -102,6 +107,14 @@ public String getExpectedHistoryUUID() { | |
return expectedHistoryUUID; | ||
} | ||
|
||
public TimeValue getPollTimeout() { | ||
return pollTimeout; | ||
} | ||
|
||
public void setPollTimeout(final TimeValue pollTimeout) { | ||
this.pollTimeout = Objects.requireNonNull(pollTimeout, "pollTimeout"); | ||
} | ||
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException validationException = null; | ||
|
@@ -126,6 +139,7 @@ public void readFrom(StreamInput in) throws IOException { | |
maxOperationCount = in.readVInt(); | ||
shardId = ShardId.readShardId(in); | ||
expectedHistoryUUID = in.readString(); | ||
pollTimeout = in.readTimeValue(); | ||
maxOperationSizeInBytes = in.readVLong(); | ||
} | ||
|
||
|
@@ -136,6 +150,7 @@ public void writeTo(StreamOutput out) throws IOException { | |
out.writeVInt(maxOperationCount); | ||
shardId.writeTo(out); | ||
out.writeString(expectedHistoryUUID); | ||
out.writeTimeValue(pollTimeout); | ||
out.writeVLong(maxOperationSizeInBytes); | ||
} | ||
|
||
|
@@ -149,12 +164,13 @@ public boolean equals(final Object o) { | |
maxOperationCount == request.maxOperationCount && | ||
Objects.equals(shardId, request.shardId) && | ||
Objects.equals(expectedHistoryUUID, request.expectedHistoryUUID) && | ||
Objects.equals(pollTimeout, request.pollTimeout) && | ||
maxOperationSizeInBytes == request.maxOperationSizeInBytes; | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, maxOperationSizeInBytes); | ||
return Objects.hash(fromSeqNo, maxOperationCount, shardId, expectedHistoryUUID, pollTimeout, maxOperationSizeInBytes); | ||
} | ||
|
||
@Override | ||
|
@@ -164,6 +180,7 @@ public String toString() { | |
", maxOperationCount=" + maxOperationCount + | ||
", shardId=" + shardId + | ||
", expectedHistoryUUID=" + expectedHistoryUUID + | ||
", pollTimeout=" + pollTimeout + | ||
", maxOperationSizeInBytes=" + maxOperationSizeInBytes + | ||
'}'; | ||
} | ||
|
@@ -265,19 +282,61 @@ public TransportAction(Settings settings, | |
|
||
@Override | ||
protected Response shardOperation(Request request, ShardId shardId) throws IOException { | ||
IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); | ||
IndexShard indexShard = indexService.getShard(request.getShard().id()); | ||
final SeqNoStats seqNoStats = indexShard.seqNoStats(); | ||
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); | ||
final IndexShard indexShard = indexService.getShard(request.getShard().id()); | ||
final SeqNoStats seqNoStats = indexShard.seqNoStats(); | ||
final long mappingVersion = clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); | ||
|
||
final Translog.Operation[] operations = getOperations( | ||
indexShard, | ||
seqNoStats.getGlobalCheckpoint(), | ||
request.fromSeqNo, | ||
request.maxOperationCount, | ||
request.expectedHistoryUUID, | ||
request.maxOperationSizeInBytes); | ||
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); | ||
request.getFromSeqNo(), | ||
request.getMaxOperationCount(), | ||
request.getExpectedHistoryUUID(), | ||
request.getMaxOperationSizeInBytes()); | ||
return getResponse(mappingVersion, seqNoStats, operations); | ||
} | ||
|
||
@Override | ||
protected void asyncShardOperation( | ||
final Request request, | ||
final ShardId shardId, | ||
final ActionListener<Response> listener) throws IOException { | ||
final IndexService indexService = indicesService.indexServiceSafe(request.getShard().getIndex()); | ||
final IndexShard indexShard = indexService.getShard(request.getShard().id()); | ||
final SeqNoStats seqNoStats = indexShard.seqNoStats(); | ||
|
||
if (request.getFromSeqNo() > seqNoStats.getGlobalCheckpoint()) { | ||
assert request.getFromSeqNo() == 1 + seqNoStats.getGlobalCheckpoint(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure if this assertion always holds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
indexShard.addGlobalCheckpointListener( | ||
request.getFromSeqNo(), | ||
(g, e) -> { | ||
if (g == UNASSIGNED_SEQ_NO) { | ||
assert e != null; | ||
if (e instanceof TimeoutException) { | ||
try { | ||
final long mappingVersion = | ||
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion(); | ||
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats(); | ||
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY)); | ||
} catch (final Exception caught) { | ||
listener.onFailure(caught); | ||
} | ||
} else { | ||
listener.onFailure(e); | ||
} | ||
} else { | ||
try { | ||
super.asyncShardOperation(request, shardId, listener); | ||
} catch (final IOException e1) { | ||
listener.onFailure(e1); | ||
} | ||
} | ||
}, | ||
request.getPollTimeout()); | ||
} else { | ||
super.asyncShardOperation(request, shardId, listener); | ||
} | ||
} | ||
|
||
@Override | ||
|
@@ -300,7 +359,7 @@ protected Response newResponse() { | |
|
||
} | ||
|
||
private static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; | ||
static final Translog.Operation[] EMPTY_OPERATIONS_ARRAY = new Translog.Operation[0]; | ||
|
||
/** | ||
* Returns at most maxOperationCount operations from the specified from sequence number. | ||
|
@@ -324,7 +383,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard, | |
historyUUID + "]"); | ||
} | ||
if (fromSeqNo > globalCheckpoint) { | ||
return EMPTY_OPERATIONS_ARRAY; | ||
throw new IllegalStateException( | ||
"not exposing operations from [" + fromSeqNo + "] greater than the global checkpoint [" + globalCheckpoint + "]"); | ||
} | ||
int seenBytes = 0; | ||
// - 1 is needed, because toSeqNo is inclusive | ||
|
@@ -344,4 +404,8 @@ static Translog.Operation[] getOperations(IndexShard indexShard, | |
return operations.toArray(EMPTY_OPERATIONS_ARRAY); | ||
} | ||
|
||
static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) { | ||
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations); | ||
} | ||
|
||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.