-
Notifications
You must be signed in to change notification settings - Fork 25.2k
[CCR] Added history uuid validation #33546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
61c8ad9
624b416
0ed85c5
0685ee4
8293265
96edbde
b9ae5ce
ee475a0
9df2ac6
eb5f7cd
e7e0f14
43fbbee
9620d7b
8fee5ba
c2952b6
620505e
600626e
8beaa0a
89829fa
7fa9070
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,9 +10,18 @@ | |
import org.elasticsearch.action.ActionListener; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; | ||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; | ||
import org.elasticsearch.action.admin.indices.stats.IndexShardStats; | ||
import org.elasticsearch.action.admin.indices.stats.IndexStats; | ||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; | ||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; | ||
import org.elasticsearch.action.admin.indices.stats.ShardStats; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.metadata.IndexMetaData; | ||
import org.elasticsearch.common.CheckedConsumer; | ||
import org.elasticsearch.index.engine.CommitStats; | ||
import org.elasticsearch.index.engine.Engine; | ||
import org.elasticsearch.index.shard.ShardId; | ||
import org.elasticsearch.license.RemoteClusterLicenseChecker; | ||
import org.elasticsearch.license.XPackLicenseState; | ||
import org.elasticsearch.rest.RestStatus; | ||
|
@@ -21,6 +30,7 @@ | |
import java.util.Collections; | ||
import java.util.Locale; | ||
import java.util.Objects; | ||
import java.util.function.BiConsumer; | ||
import java.util.function.BooleanSupplier; | ||
import java.util.function.Consumer; | ||
import java.util.function.Function; | ||
|
@@ -58,23 +68,24 @@ public boolean isCcrAllowed() { | |
} | ||
|
||
/** | ||
* Fetches the leader index metadata from the remote cluster. Before fetching the index metadata, the remote cluster is checked for | ||
* license compatibility with CCR. If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. | ||
* Otherwise, the specified consumer is invoked with the leader index metadata fetched from the remote cluster. | ||
* Fetches the leader index metadata and history UUIDs for leader index shards from the remote cluster. | ||
* Before fetching the index metadata, the remote cluster is checked for license compatibility with CCR. | ||
* If the remote cluster is not licensed for CCR, the {@code onFailure} consumer is is invoked. Otherwise, | ||
* the specified consumer is invoked with the leader index metadata fetched from the remote cluster. | ||
* | ||
* @param client the client | ||
* @param clusterAlias the remote cluster alias | ||
* @param leaderIndex the name of the leader index | ||
* @param onFailure the failure consumer | ||
* @param leaderIndexMetadataConsumer the leader index metadata consumer | ||
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards | ||
* @param <T> the type of response the listener is waiting for | ||
*/ | ||
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( | ||
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( | ||
final Client client, | ||
final String clusterAlias, | ||
final String leaderIndex, | ||
final Consumer<Exception> onFailure, | ||
final Consumer<IndexMetaData> leaderIndexMetadataConsumer) { | ||
final BiConsumer<String[], IndexMetaData> consumer) { | ||
|
||
final ClusterStateRequest request = new ClusterStateRequest(); | ||
request.clear(); | ||
|
@@ -85,7 +96,13 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( | |
clusterAlias, | ||
request, | ||
onFailure, | ||
leaderClusterState -> leaderIndexMetadataConsumer.accept(leaderClusterState.getMetaData().index(leaderIndex)), | ||
leaderClusterState -> { | ||
IndexMetaData leaderIndexMetaData = leaderClusterState.getMetaData().index(leaderIndex); | ||
final Client leaderClient = client.getRemoteClusterClient(clusterAlias); | ||
fetchLeaderHistoryUUIDs(leaderClient, leaderIndexMetaData, onFailure, historyUUIDs -> { | ||
consumer.accept(historyUUIDs, leaderIndexMetaData); | ||
}); | ||
}, | ||
licenseCheck -> indexMetadataNonCompliantRemoteLicense(leaderIndex, licenseCheck), | ||
e -> indexMetadataUnknownRemoteLicense(leaderIndex, clusterAlias, e)); | ||
} | ||
|
@@ -168,6 +185,41 @@ public void onFailure(final Exception e) { | |
}); | ||
} | ||
|
||
/** | ||
* Fetches the history UUIDs for leader index on per shard basis using the specified leaderClient. | ||
* | ||
* @param leaderClient the leader client | ||
* @param leaderIndexMetaData the leader index metadata | ||
* @param onFailure the failure consumer | ||
* @param historyUUIDConsumer the leader index history uuid and consumer | ||
*/ | ||
// NOTE: Placed this method here; in order to avoid duplication of logic for fetching history UUIDs | ||
// in case of following a local or a remote cluster. | ||
public void fetchLeaderHistoryUUIDs( | ||
final Client leaderClient, | ||
final IndexMetaData leaderIndexMetaData, | ||
final Consumer<Exception> onFailure, | ||
final Consumer<String[]> historyUUIDConsumer) { | ||
|
||
String leaderIndex = leaderIndexMetaData.getIndex().getName(); | ||
CheckedConsumer<IndicesStatsResponse, Exception> indicesStatsHandler = indicesStatsResponse -> { | ||
IndexStats indexStats = indicesStatsResponse.getIndices().get(leaderIndex); | ||
String[] historyUUIDs = new String[leaderIndexMetaData.getNumberOfShards()]; | ||
for (IndexShardStats indexShardStats : indexStats) { | ||
for (ShardStats shardStats : indexShardStats) { | ||
CommitStats commitStats = shardStats.getCommitStats(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
String historyUUID = commitStats.getUserData().get(Engine.HISTORY_UUID_KEY); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Martijn, I am sorry. I should have been clearer here:
Please note the assertion There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Agreed
Good point. I will check each entry individually. |
||
ShardId shardId = shardStats.getShardRouting().shardId(); | ||
historyUUIDs[shardId.id()] = historyUUID; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You might have missed Boaz's comment:
Moreover, not every shard is allocated or associated with a historyUUID. Should we fail if there is no historyUUID for a shardId? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops, I will add the checks.
In what cases does a shard does not have a historyUUID?
In what cases are history uuids not unique between shards? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I see, not yet started shards have no history uuid, which is more likely for replica shards. |
||
} | ||
} | ||
historyUUIDConsumer.accept(historyUUIDs); | ||
}; | ||
IndicesStatsRequest request = new IndicesStatsRequest(); | ||
request.indices(leaderIndex); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can "clear" all flags to reduce this stat request. |
||
leaderClient.admin().indices().stats(request, ActionListener.wrap(indicesStatsHandler, onFailure)); | ||
} | ||
|
||
private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense( | ||
final String leaderIndex, final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) { | ||
final String clusterAlias = licenseCheck.remoteClusterLicenseInfo().clusterAlias(); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
import org.elasticsearch.common.xcontent.ToXContentObject; | ||
import org.elasticsearch.common.xcontent.XContentBuilder; | ||
import org.elasticsearch.common.xcontent.XContentParser; | ||
import org.elasticsearch.index.IndexNotFoundException; | ||
import org.elasticsearch.index.IndexSettings; | ||
import org.elasticsearch.index.IndexingSlowLog; | ||
import org.elasticsearch.index.SearchSlowLog; | ||
|
@@ -47,6 +48,7 @@ | |
import org.elasticsearch.transport.RemoteClusterAware; | ||
import org.elasticsearch.transport.RemoteClusterService; | ||
import org.elasticsearch.transport.TransportService; | ||
import org.elasticsearch.xpack.ccr.Ccr; | ||
import org.elasticsearch.xpack.ccr.CcrLicenseChecker; | ||
import org.elasticsearch.xpack.ccr.CcrSettings; | ||
|
||
|
@@ -352,11 +354,17 @@ private void followLocalIndex(final Request request, | |
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); | ||
// following an index in local cluster, so use local cluster state to fetch leader index metadata | ||
final IndexMetaData leaderIndexMetadata = state.getMetaData().index(request.getLeaderIndex()); | ||
try { | ||
start(request, null, leaderIndexMetadata, followerIndexMetadata, listener); | ||
} catch (final IOException e) { | ||
listener.onFailure(e); | ||
if (leaderIndexMetadata == null) { | ||
throw new IndexNotFoundException(request.getFollowerIndex()); | ||
} | ||
|
||
ccrLicenseChecker.fetchLeaderHistoryUUIDs(client, leaderIndexMetadata, listener::onFailure, historyUUIDs -> { | ||
try { | ||
start(request, null, leaderIndexMetadata, followerIndexMetadata, historyUUIDs, listener); | ||
} catch (final IOException e) { | ||
listener.onFailure(e); | ||
} | ||
}); | ||
} | ||
|
||
private void followRemoteIndex( | ||
|
@@ -366,14 +374,14 @@ private void followRemoteIndex( | |
final ActionListener<AcknowledgedResponse> listener) { | ||
final ClusterState state = clusterService.state(); | ||
final IndexMetaData followerIndexMetadata = state.getMetaData().index(request.getFollowerIndex()); | ||
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadata( | ||
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs( | ||
client, | ||
clusterAlias, | ||
leaderIndex, | ||
listener::onFailure, | ||
leaderIndexMetadata -> { | ||
(leaderHistoryUUID, leaderIndexMetadata) -> { | ||
try { | ||
start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, listener); | ||
start(request, clusterAlias, leaderIndexMetadata, followerIndexMetadata, leaderHistoryUUID, listener); | ||
} catch (final IOException e) { | ||
listener.onFailure(e); | ||
} | ||
|
@@ -395,25 +403,37 @@ void start( | |
String clusterNameAlias, | ||
IndexMetaData leaderIndexMetadata, | ||
IndexMetaData followIndexMetadata, | ||
String[] leaderIndexHistoryUUIDs, | ||
ActionListener<AcknowledgedResponse> handler) throws IOException { | ||
|
||
MapperService mapperService = followIndexMetadata != null ? indicesService.createIndexMapperService(followIndexMetadata) : null; | ||
validate(request, leaderIndexMetadata, followIndexMetadata, mapperService); | ||
validate(request, leaderIndexMetadata, followIndexMetadata, leaderIndexHistoryUUIDs, mapperService); | ||
final int numShards = followIndexMetadata.getNumberOfShards(); | ||
final AtomicInteger counter = new AtomicInteger(numShards); | ||
final AtomicReferenceArray<Object> responses = new AtomicReferenceArray<>(followIndexMetadata.getNumberOfShards()); | ||
Map<String, String> filteredHeaders = threadPool.getThreadContext().getHeaders().entrySet().stream() | ||
.filter(e -> ShardFollowTask.HEADER_FILTERS.contains(e.getKey())) | ||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));for (int i = 0; i < numShards; i++) { | ||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
|
||
for (int i = 0; i < numShards; i++) { | ||
final int shardId = i; | ||
String taskId = followIndexMetadata.getIndexUUID() + "-" + shardId; | ||
String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndexMetadata); | ||
String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId]; | ||
|
||
ShardFollowTask shardFollowTask = new ShardFollowTask(clusterNameAlias, | ||
new ShardId(followIndexMetadata.getIndex(), shardId), | ||
new ShardId(leaderIndexMetadata.getIndex(), shardId), | ||
request.maxBatchOperationCount, request.maxConcurrentReadBatches, request.maxOperationSizeInBytes, | ||
request.maxConcurrentWriteBatches, request.maxWriteBufferSize, request.retryTimeout, | ||
request.idleShardRetryDelay, filteredHeaders); | ||
new ShardId(followIndexMetadata.getIndex(), shardId), | ||
new ShardId(leaderIndexMetadata.getIndex(), shardId), | ||
request.maxBatchOperationCount, | ||
request.maxConcurrentReadBatches, | ||
request.maxOperationSizeInBytes, | ||
request.maxConcurrentWriteBatches, | ||
request.maxWriteBufferSize, | ||
request.retryTimeout, | ||
request.idleShardRetryDelay, | ||
recordedLeaderShardHistoryUUID, | ||
filteredHeaders | ||
); | ||
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, | ||
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<ShardFollowTask>>() { | ||
@Override | ||
|
@@ -510,13 +530,28 @@ void finalizeResponse() { | |
|
||
static void validate(Request request, | ||
IndexMetaData leaderIndex, | ||
IndexMetaData followIndex, MapperService followerMapperService) { | ||
IndexMetaData followIndex, | ||
String[] leaderIndexHistoryUUID, | ||
MapperService followerMapperService) { | ||
if (leaderIndex == null) { | ||
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not exist"); | ||
} | ||
if (followIndex == null) { | ||
throw new IllegalArgumentException("follow index [" + request.followerIndex + "] does not exist"); | ||
} | ||
|
||
String[] recordedHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndex); | ||
assert recordedHistoryUUIDs.length == leaderIndexHistoryUUID.length; | ||
for (int i = 0; i < leaderIndexHistoryUUID.length; i++) { | ||
dnhatn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
String recordedLeaderIndexHistoryUUID = recordedHistoryUUIDs[i]; | ||
String actualLeaderIndexHistoryUUID = leaderIndexHistoryUUID[i]; | ||
if (recordedLeaderIndexHistoryUUID.equals(actualLeaderIndexHistoryUUID) == false) { | ||
throw new IllegalArgumentException("follow index [" + request.followerIndex + "] should reference [" + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe add some info as to how this can happen? (restore from snapshot is a likely cause, I think) |
||
recordedLeaderIndexHistoryUUID + "] as history uuid but instead reference [" + | ||
actualLeaderIndexHistoryUUID + "] as history uuid"); | ||
} | ||
} | ||
|
||
if (leaderIndex.getSettings().getAsBoolean(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), false) == false) { | ||
throw new IllegalArgumentException("leader index [" + request.leaderIndex + "] does not have soft deletes enabled"); | ||
} | ||
|
@@ -568,4 +603,10 @@ private static Settings filter(Settings originalSettings) { | |
return settings.build(); | ||
} | ||
|
||
private static String[] extractIndexShardHistoryUUIDs(IndexMetaData followIndexMetadata) { | ||
String historyUUIDs = followIndexMetadata.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY) | ||
.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS); | ||
return historyUUIDs.split(","); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can re-indent the javadocs.