Skip to content

Fix LocalIndexFollowingIT#testRemoveRemoteConnection() test #38709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ public final class RemoteInfoResponse extends ActionResponse implements ToXConte
this.infos = Collections.unmodifiableList(new ArrayList<>(infos));
}

public List<RemoteConnectionInfo> getInfos() {
return infos;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,30 @@ public RemoteConnectionInfo(StreamInput input) throws IOException {
skipUnavailable = input.readBoolean();
}

public List<String> getSeedNodes() {
return seedNodes;
}

public int getConnectionsPerCluster() {
return connectionsPerCluster;
}

public TimeValue getInitialConnectionTimeout() {
return initialConnectionTimeout;
}

public int getNumNodesConnected() {
return numNodesConnected;
}

public String getClusterAlias() {
return clusterAlias;
}

public boolean isSkipUnavailable() {
return skipUnavailable;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
Expand Down Expand Up @@ -113,7 +114,16 @@ protected void innerUpdateMapping(long minRequiredMappingVersion, LongConsumer h
final Index followerIndex = params.getFollowShardId().getIndex();
final Index leaderIndex = params.getLeaderShardId().getIndex();
final Supplier<TimeValue> timeout = () -> isStopped() ? TimeValue.MINUS_ONE : waitForMetadataTimeOut;
CcrRequests.getIndexMetadata(remoteClient(params), leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(

final Client remoteClient;
try {
remoteClient = remoteClient(params);
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
return;
}

CcrRequests.getIndexMetadata(remoteClient, leaderIndex, minRequiredMappingVersion, 0L, timeout, ActionListener.wrap(
indexMetaData -> {
if (indexMetaData.getMappings().isEmpty()) {
assert indexMetaData.getMappingVersion() == 1;
Expand Down Expand Up @@ -172,7 +182,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
};
try {
remoteClient(params).admin().cluster().state(clusterStateRequest, ActionListener.wrap(onResponse, errorHandler));
} catch (Exception e) {
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
}
Expand Down Expand Up @@ -230,7 +240,7 @@ protected void innerSendShardChangesRequest(long from, int maxOperationCount, Co
request.setPollTimeout(params.getReadPollTimeout());
try {
remoteClient(params).execute(ShardChangesAction.INSTANCE, request, ActionListener.wrap(handler::accept, errorHandler));
} catch (Exception e) {
} catch (NoSuchRemoteClusterException e) {
errorHandler.accept(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

package org.elasticsearch.xpack;

import org.elasticsearch.action.admin.cluster.remote.RemoteInfoAction;
import org.elasticsearch.action.admin.cluster.remote.RemoteInfoRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -15,6 +17,7 @@
import org.elasticsearch.license.LicensesMetaData;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.ccr.LocalStateCcr;
Expand All @@ -30,6 +33,7 @@

import java.util.Collection;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.xpack.CcrIntegTestCase.removeCCRRelatedMetadataFromClusterState;
Expand Down Expand Up @@ -57,11 +61,17 @@ protected Collection<Class<? extends Plugin>> getPlugins() {
}

@Before
public void setupLocalRemote() {
public void setupLocalRemote() throws Exception {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
String address = getInstanceFromNode(TransportService.class).boundAddress().publishAddress().toString();
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", address));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

assertBusy(() -> {
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(1));
assertThat(infos.get(0).getNumNodesConnected(), equalTo(1));
});
}

@Before
Expand All @@ -76,10 +86,15 @@ public void purgeCCRMetadata() throws Exception {
}

@After
public void removeLocalRemote() {
public void removeLocalRemote() throws Exception {
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.transientSettings(Settings.builder().put("cluster.remote.local.seeds", (String) null));
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

assertBusy(() -> {
List<RemoteConnectionInfo> infos = client().execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest()).get().getInfos();
assertThat(infos.size(), equalTo(0));
});
}

protected AutoFollowStats getAutoFollowStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ public void testDoNotCreateFollowerIfLeaderDoesNotHaveSoftDeletes() throws Excep
assertThat(client().admin().indices().prepareExists("follower-index").get().isExists(), equalTo(false));
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/38695")
public void testRemoveRemoteConnection() throws Exception {
PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request();
request.setName("my_pattern");
Expand Down