Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restrict failure stores from replicating via CCR #126355

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
client.threadPool().executor(Ccr.CCR_THREAD_POOL_NAME),
RemoteClusterService.DisconnectedStrategy.RECONNECT_IF_DISCONNECTED
);
checkRemoteClusterLicenseAndFetchClusterState(
doCheckRemoteClusterLicenseAndFetchClusterState(
client,
clusterAlias,
remoteClient,
Expand Down Expand Up @@ -168,6 +168,12 @@ public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
final DataStream remoteDataStream = indexAbstraction.getParentDataStream() != null
? indexAbstraction.getParentDataStream()
: null;
// Ensure that this leader index is not a failure store index, because they are not yet supported in CCR
if (remoteDataStream != null && remoteDataStream.isFailureStoreIndex(leaderIndex)) {
String message = String.format(Locale.ROOT, "cannot follow [%s], because it is a failure store index", leaderIndex);
onFailure.accept(new IllegalArgumentException(message));
return;
}
hasPrivilegesToFollowIndices(client.threadPool().getThreadContext(), remoteClient, new String[] { leaderIndex }, e -> {
if (e == null) {
fetchLeaderHistoryUUIDs(
Expand Down Expand Up @@ -231,6 +237,29 @@ public static void checkRemoteClusterLicenseAndFetchClusterState(
}
}

// overridable for testing
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
final Client client,
final String clusterAlias,
final RemoteClusterClient remoteClient,
final ClusterStateRequest request,
final Consumer<Exception> onFailure,
final Consumer<ClusterStateResponse> leaderClusterStateConsumer,
final Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
final Function<Exception, ElasticsearchStatusException> unknownLicense
) {
checkRemoteClusterLicenseAndFetchClusterState(
client,
clusterAlias,
remoteClient,
request,
onFailure,
leaderClusterStateConsumer,
nonCompliantLicense,
unknownLicense
);
}

/**
* Fetches the leader cluster state from the remote cluster by the specified cluster state request. Before fetching the cluster state,
* the remote cluster is checked for license compliance with CCR. If the remote cluster is not licensed for CCR,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,50 @@

package org.elasticsearch.xpack.ccr;

import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.DataStreamTestHelper;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.security.user.User;
import org.mockito.ArgumentCaptor;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasToString;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

public class CcrLicenseCheckerTests extends ESTestCase {

Expand Down Expand Up @@ -46,4 +78,164 @@ User getUser(final ThreadContext threadContext) {
assertTrue(invoked.get());
}

/**
* Tests all validation logic after obtaining the remote cluster state and before executing the check for follower privileges.
*/
public void testRemoteIndexValidation() {
// A cluster state with
// - a data stream, containing a backing index and a failure index
// - an alias that points to said data stream
// - a standalone index
// - an alias that points to said standalone index
// - a closed index
String indexName = "random-index";
String closedIndexName = "closed-index";
String dataStreamName = "logs-test-data";
String aliasName = "foo-alias";
String dsAliasName = "ds-alias";
IndexMetadata indexMetadata = IndexMetadata.builder(indexName)
.putAlias(AliasMetadata.builder(aliasName))
.settings(settings(IndexVersion.current()))
.numberOfShards(5)
.numberOfReplicas(1)
.build();
IndexMetadata closedIndexMetadata = IndexMetadata.builder(closedIndexName)
.settings(settings(IndexVersion.current()))
.numberOfShards(5)
.numberOfReplicas(1)
.state(IndexMetadata.State.CLOSE)
.build();
IndexMetadata firstBackingIndex = DataStreamTestHelper.createFirstBackingIndex(dataStreamName).build();
IndexMetadata firstFailureStore = DataStreamTestHelper.createFirstFailureStore(dataStreamName).build();
DataStream dataStream = DataStreamTestHelper.newInstance(
dataStreamName,
List.of(firstBackingIndex.getIndex()),
List.of(firstFailureStore.getIndex())
);
ClusterState remoteClusterState = ClusterState.builder(new ClusterName(randomIdentifier()))
.metadata(
Metadata.builder()
.put(indexMetadata, false)
.put(closedIndexMetadata, false)
.put(firstBackingIndex, false)
.put(firstFailureStore, false)
.dataStreams(
Map.of(dataStreamName, dataStream),
Map.of(dsAliasName, new DataStreamAlias(dsAliasName, List.of(dataStreamName), dataStreamName, Map.of()))
)
)
.build();

final boolean isCcrAllowed = randomBoolean();
final CcrLicenseChecker checker = new CcrLicenseChecker(() -> isCcrAllowed, () -> true) {
@Override
User getUser(ThreadContext threadContext) {
return null;
}

@Override
protected void doCheckRemoteClusterLicenseAndFetchClusterState(
Client client,
String clusterAlias,
RemoteClusterClient remoteClient,
ClusterStateRequest request,
Consumer<Exception> onFailure,
Consumer<ClusterStateResponse> leaderClusterStateConsumer,
Function<RemoteClusterLicenseChecker.LicenseCheck, ElasticsearchStatusException> nonCompliantLicense,
Function<Exception, ElasticsearchStatusException> unknownLicense
) {
leaderClusterStateConsumer.accept(new ClusterStateResponse(remoteClusterState.getClusterName(), remoteClusterState, false));
}

@Override
public void hasPrivilegesToFollowIndices(
ThreadContext threadContext,
RemoteClusterClient remoteClient,
String[] indices,
Consumer<Exception> handler
) {
fail("Test case should fail before this code is called");
}
};

String clusterAlias = randomIdentifier();

ExecutorService mockExecutor = mock(ExecutorService.class);
ThreadPool mockThreadPool = mock(ThreadPool.class);
when(mockThreadPool.executor(eq(Ccr.CCR_THREAD_POOL_NAME))).thenReturn(mockExecutor);
RemoteClusterClient mockRemoteClient = mock(RemoteClusterClient.class);

Client mockClient = mock(Client.class);
when(mockClient.threadPool()).thenReturn(mockThreadPool);
when(mockClient.getRemoteClusterClient(eq(clusterAlias), eq(mockExecutor), any())).thenReturn(mockRemoteClient);

// When following an index that does not exist, throw IndexNotFoundException
{
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, "non-existent-index");
assertThat(exception, instanceOf(IndexNotFoundException.class));
assertThat(exception.getMessage(), equalTo("no such index [non-existent-index]"));
}

// When following an alias, throw IllegalArgumentException
{
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, aliasName);
assertThat(exception, instanceOf(IllegalArgumentException.class));
assertThat(exception.getMessage(), equalTo("cannot follow [" + aliasName + "], because it is a ALIAS"));
}

// When following a data stream, throw IllegalArgumentException
{
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dataStreamName);
assertThat(exception, instanceOf(IllegalArgumentException.class));
assertThat(exception.getMessage(), equalTo("cannot follow [" + dataStreamName + "], because it is a DATA_STREAM"));
}

// When following a data stream alias, throw IllegalArgumentException
{
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, dsAliasName);
assertThat(exception, instanceOf(IllegalArgumentException.class));
assertThat(exception.getMessage(), equalTo("cannot follow [" + dsAliasName + "], because it is a ALIAS"));
}

// When following a closed index, throw IndexClosedException
{
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, closedIndexName);
assertThat(exception, instanceOf(IndexClosedException.class));
assertThat(exception.getMessage(), equalTo("closed"));
}

// When following a failure store index, throw IllegalArgumentException
{
Exception exception = executeExpectingException(checker, mockClient, clusterAlias, firstFailureStore.getIndex().getName());
assertThat(exception, instanceOf(IllegalArgumentException.class));
assertThat(
exception.getMessage(),
equalTo("cannot follow [" + firstFailureStore.getIndex().getName() + "], because it is a failure store index")
);
}
}

private static Exception executeExpectingException(
CcrLicenseChecker checker,
Client mockClient,
String clusterAlias,
String leaderIndex
) {
@SuppressWarnings("unchecked")
Consumer<Exception> onFailure = mock(Consumer.class);
@SuppressWarnings("unchecked")
BiConsumer<String[], Tuple<IndexMetadata, DataStream>> consumer = mock(BiConsumer.class);
checker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
mockClient,
clusterAlias,
leaderIndex,
onFailure,
consumer
);
ArgumentCaptor<Exception> captor = ArgumentCaptor.forClass(Exception.class);
verify(onFailure, times(1)).accept(captor.capture());
verifyNoInteractions(consumer);
return captor.getValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public void testAutoFollower_dataStream() {
Client client = mock(Client.class);
when(client.getRemoteClusterClient(anyString(), any(), any())).thenReturn(new RedirectToLocalClusterRemoteClusterClient(client));

ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar");
ClusterState remoteState = createRemoteClusterStateWithDataStream("logs-foobar", false, true);

AutoFollowPattern autoFollowPattern = createAutoFollowPattern("remote", "logs-*");
Map<String, AutoFollowPattern> patterns = new HashMap<>();
Expand Down Expand Up @@ -2562,23 +2562,53 @@ private ClusterService mockClusterService() {
}

private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName) {
return createRemoteClusterStateWithDataStream(dataStreamName, false);
return createRemoteClusterStateWithDataStream(dataStreamName, false, false);
}

private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system) {
private static ClusterState createRemoteClusterStateWithDataStream(String dataStreamName, boolean system, boolean withFailures) {
long currentTimeMillis = System.currentTimeMillis();

Settings.Builder indexSettings = settings(IndexVersion.current());
indexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
indexSettings.put("index.hidden", true);

IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1))
IndexMetadata indexMetadata = IndexMetadata.builder(DataStream.getDefaultBackingIndexName(dataStreamName, 1, currentTimeMillis))
.settings(indexSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.system(system)
.build();
DataStream dataStream = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system).build();
ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote"))
.metadata(Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L));

IndexMetadata failureIndexMetadata = null;
DataStream.DataStreamIndices failureStore = null;
if (withFailures) {
Settings.Builder failureIndexSettings = settings(IndexVersion.current());
failureIndexSettings.put(IndexMetadata.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()));
failureIndexSettings.put("index.hidden", true);

String defaultFailureStoreName = DataStream.getDefaultFailureStoreName(dataStreamName, 1, currentTimeMillis);
failureIndexMetadata = IndexMetadata.builder(defaultFailureStoreName)
.settings(failureIndexSettings)
.numberOfShards(1)
.numberOfReplicas(0)
.system(system)
.build();

failureStore = DataStream.DataStreamIndices.failureIndicesBuilder(List.of(failureIndexMetadata.getIndex())).build();
}

var dataStreamBuilder = DataStream.builder(dataStreamName, List.of(indexMetadata.getIndex())).setSystem(system);
if (withFailures) {
dataStreamBuilder.setFailureIndices(failureStore);
}
DataStream dataStream = dataStreamBuilder.build();

var mdBuilder = Metadata.builder().put(indexMetadata, true).put(dataStream).version(0L);
if (withFailures) {
mdBuilder.put(failureIndexMetadata, true);
}

var routingTableBuilder = RoutingTable.builder();

ShardRouting shardRouting = TestShardRouting.newShardRouting(
new ShardId(indexMetadata.getIndex(), 0),
Expand All @@ -2587,7 +2617,22 @@ private static ClusterState createRemoteClusterStateWithDataStream(String dataSt
ShardRoutingState.INITIALIZING
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetadata.getIndex()).addShard(shardRouting).build();
return csBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build();
routingTableBuilder.add(indexRoutingTable);

if (withFailures) {
ShardRouting failureShardRouting = TestShardRouting.newShardRouting(
new ShardId(failureIndexMetadata.getIndex(), 0),
"1",
true,
ShardRoutingState.INITIALIZING
).moveToStarted(ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
IndexRoutingTable failureIndexRoutingTable = IndexRoutingTable.builder(failureIndexMetadata.getIndex())
.addShard(failureShardRouting)
.build();
routingTableBuilder.add(failureIndexRoutingTable);
}

return ClusterState.builder(new ClusterName("remote")).metadata(mdBuilder).routingTable(routingTableBuilder.build()).build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public static boolean match(
final DataStream parentDataStream = indexAbstraction.getParentDataStream();
return parentDataStream != null
&& parentDataStream.isSystem() == false
&& parentDataStream.isFailureStoreIndex(indexAbstraction.getName()) == false
&& Regex.simpleMatch(leaderIndexExclusionPatterns, indexAbstraction.getParentDataStream().getName()) == false
&& Regex.simpleMatch(leaderIndexPatterns, indexAbstraction.getParentDataStream().getName());
}
Expand Down