Skip to content

[CCR] Record follower index historyUUIDs #34078

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
public static final String CCR_THREAD_POOL_NAME = "ccr";
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
public static final String CCR_CUSTOM_METADATA_FOLLOWER_INDEX_SHARD_HISTORY_UUIDS = "follower_index_shard_history_uuids";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not used any more?

public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";

private final boolean enabled;
Expand Down Expand Up @@ -148,7 +149,7 @@ public Collection<Object> createComponents(
@Override
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService,
ThreadPool threadPool, Client client) {
return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool));
return Collections.singletonList(new ShardFollowTasksExecutor(settings, client, threadPool, clusterService));
}

public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,9 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
final Client leaderClient = client.getRemoteClusterClient(clusterAlias);
hasPrivilegesToFollowIndices(leaderClient, new String[] {leaderIndex}, e -> {
if (e == null) {
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs ->
consumer.accept(historyUUIDs, leaderIndexMetaData));
fetchHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is unneeded now?

consumer.accept(historyUUIDs, leaderIndexMetaData);
});
} else {
onFailure.accept(e);
}
Expand Down Expand Up @@ -226,25 +227,25 @@ public void onFailure(final Exception e) {
}

/**
* Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient.
* Fetches the history UUIDs for specified index on per shard basis using the specified client.
*
* @param leaderClient the leader client
* @param leaderIndexMetaData the leader index metadata
* @param client the specified client
* @param indexMetaData the index metadata of the specified index
* @param onFailure the failure consumer
* @param historyUUIDConsumer the leader index history uuid and consumer
* @param historyUUIDConsumer the index history uuid consumer
*/
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs
// in case of following a local or a remote cluster.
public void fetchLeaderHistoryUUIDs(
final Client leaderClient,
final IndexMetaData leaderIndexMetaData,
public void fetchHistoryUUIDs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert all of this?

final Client client,
final IndexMetaData indexMetaData,
final Consumer<Exception> onFailure,
final Consumer<String[]> historyUUIDConsumer) {

String leaderIndex = leaderIndexMetaData.getIndex().getName();
String leaderIndex = indexMetaData.getIndex().getName();
CheckedConsumer<IndicesStatsResponse, Exception> indicesStatsHandler = indicesStatsResponse -> {
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex);
String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()];
String[] historyUUIDs = new String[indexMetaData.getNumberOfShards()];
for (IndexShardStats indexShardStats : indexStats) {
for (ShardStats shardStats : indexShardStats) {
// Ignore replica shards as they may not have yet started and
Expand Down Expand Up @@ -274,7 +275,7 @@ public void fetchLeaderHistoryUUIDs(
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
request.indices(leaderIndex);
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
client.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
private final BiConsumer<TimeValue, Runnable> scheduler;
private final LongSupplier relativeTimeProvider;

private String followerHistoryUUID;
private long leaderGlobalCheckpoint;
private long leaderMaxSeqNo;
private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -110,15 +111,17 @@ protected boolean removeEldestEntry(final Map.Entry<Long, Tuple<AtomicInteger, E
}

void start(
final long leaderGlobalCheckpoint,
final long leaderMaxSeqNo,
final long followerGlobalCheckpoint,
final long followerMaxSeqNo) {
final String followerHistoryUUID,
final long leaderGlobalCheckpoint,
final long leaderMaxSeqNo,
final long followerGlobalCheckpoint,
final long followerMaxSeqNo) {
/*
* While this should only ever be called once and before any other threads can touch these fields, we use synchronization here to
* avoid the need to declare these fields as volatile. That is, we are ensuring thesefields are always accessed under the same lock.
*/
synchronized (this) {
this.followerHistoryUUID = followerHistoryUUID;
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
this.leaderMaxSeqNo = leaderMaxSeqNo;
this.followerGlobalCheckpoint = followerGlobalCheckpoint;
Expand Down Expand Up @@ -305,7 +308,7 @@ private void sendBulkShardOperationsRequest(List<Translog.Operation> operations,
AtomicInteger retryCounter) {
assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "mus is not replicated";
final long startTime = relativeTimeProvider.getAsLong();
innerSendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes,
innerSendBulkShardOperationsRequest(followerHistoryUUID, operations, leaderMaxSeqNoOfUpdatesOrDeletes,
response -> {
synchronized (ShardFollowNodeTask.this) {
totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
Expand Down Expand Up @@ -404,8 +407,11 @@ static boolean shouldRetry(Exception e) {
// These methods are protected for testing purposes:
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);

protected abstract void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,
Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);
protected abstract void innerSendBulkShardOperationsRequest(String followerHistoryUUID,
List<Translog.Operation> operations,
long leaderMaxSeqNoOfUpdatesOrDeletes,
Consumer<BulkShardOperationsResponse> handler,
Consumer<Exception> errorHandler);

protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
Consumer<Exception> errorHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,12 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
public static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout");
public static final ParseField RECORDED_HISTORY_UUID = new ParseField("recorded_history_uuid");

@SuppressWarnings("unchecked")
private static ConstructingObjectParser<ShardFollowTask, Void> PARSER = new ConstructingObjectParser<>(NAME,
(a) -> new ShardFollowTask((String) a[0], new ShardId((String) a[1], (String) a[2], (int) a[3]),
new ShardId((String) a[4], (String) a[5], (int) a[6]), (int) a[7], (int) a[8], (ByteSizeValue) a[9],
(int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (String) a[14], (Map<String, String>) a[15]));
(int) a[10], (int) a[11], (TimeValue) a[12], (TimeValue) a[13], (Map<String, String>) a[14]));

static {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_CLUSTER_ALIAS_FIELD);
Expand All @@ -82,7 +81,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
PARSER.declareField(ConstructingObjectParser.constructorArg(),
(p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()),
POLL_TIMEOUT, ObjectParser.ValueType.STRING);
PARSER.declareString(ConstructingObjectParser.constructorArg(), RECORDED_HISTORY_UUID);
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> p.mapStrings(), HEADERS);
}

Expand All @@ -96,7 +94,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
private final int maxWriteBufferSize;
private final TimeValue maxRetryDelay;
private final TimeValue pollTimeout;
private final String recordedLeaderIndexHistoryUUID;
private final Map<String, String> headers;

ShardFollowTask(
Expand All @@ -110,7 +107,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
final int maxWriteBufferSize,
final TimeValue maxRetryDelay,
final TimeValue pollTimeout,
final String recordedLeaderIndexHistoryUUID,
final Map<String, String> headers) {
this.leaderClusterAlias = leaderClusterAlias;
this.followShardId = followShardId;
Expand All @@ -122,7 +118,6 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
this.maxWriteBufferSize = maxWriteBufferSize;
this.maxRetryDelay = maxRetryDelay;
this.pollTimeout = pollTimeout;
this.recordedLeaderIndexHistoryUUID = recordedLeaderIndexHistoryUUID;
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
}

Expand All @@ -137,7 +132,6 @@ public ShardFollowTask(StreamInput in) throws IOException {
this.maxWriteBufferSize = in.readVInt();
this.maxRetryDelay = in.readTimeValue();
this.pollTimeout = in.readTimeValue();
this.recordedLeaderIndexHistoryUUID = in.readString();
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
}

Expand Down Expand Up @@ -185,10 +179,6 @@ public String getTaskId() {
return followShardId.getIndex().getUUID() + "-" + followShardId.getId();
}

public String getRecordedLeaderIndexHistoryUUID() {
return recordedLeaderIndexHistoryUUID;
}

public Map<String, String> getHeaders() {
return headers;
}
Expand All @@ -210,7 +200,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(maxWriteBufferSize);
out.writeTimeValue(maxRetryDelay);
out.writeTimeValue(pollTimeout);
out.writeString(recordedLeaderIndexHistoryUUID);
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
}

Expand All @@ -237,7 +226,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep());
builder.field(RECORDED_HISTORY_UUID.getPreferredName(), recordedLeaderIndexHistoryUUID);
builder.field(HEADERS.getPreferredName(), headers);
return builder.endObject();
}
Expand All @@ -257,7 +245,6 @@ public boolean equals(Object o) {
maxWriteBufferSize == that.maxWriteBufferSize &&
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
Objects.equals(pollTimeout, that.pollTimeout) &&
Objects.equals(recordedLeaderIndexHistoryUUID, that.recordedLeaderIndexHistoryUUID) &&
Objects.equals(headers, that.headers);
}

Expand All @@ -274,8 +261,8 @@ public int hashCode() {
maxWriteBufferSize,
maxRetryDelay,
pollTimeout,
recordedLeaderIndexHistoryUUID,
headers);
headers
);
}

public String toString() {
Expand Down
Loading