Skip to content

Commit 93b3b6c

Browse files
authored
Add cross cluster support to _terms_enum (elastic#73478)
This commit adds the support to search cross cluster indices (with the cross cluster syntax) in the _terms_enum API. Relates elastic#71550
1 parent 95e7f3f commit 93b3b6c

File tree

11 files changed

+633
-159
lines changed

11 files changed

+633
-159
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
package org.elasticsearch.xpack.core.termsenum;
8+
9+
import org.elasticsearch.client.Client;
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.plugins.Plugin;
12+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
13+
import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
14+
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumAction;
15+
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumRequest;
16+
import org.elasticsearch.xpack.core.termsenum.action.TermsEnumResponse;
17+
18+
import java.util.ArrayList;
19+
import java.util.Collection;
20+
import java.util.List;
21+
22+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
23+
import static org.hamcrest.Matchers.equalTo;
24+
25+
public class CCSTermsEnumIT extends AbstractMultiClustersTestCase {
26+
27+
@Override
28+
protected Collection<String> remoteClusterAlias() {
29+
return List.of("remote_cluster");
30+
}
31+
32+
@Override
33+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
34+
final List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
35+
plugins.add(LocalStateCompositeXPackPlugin.class);
36+
return plugins;
37+
}
38+
39+
public void testBasic() {
40+
Settings indexSettings = Settings.builder().put("index.number_of_replicas", 0).build();
41+
final Client localClient = client(LOCAL_CLUSTER);
42+
final Client remoteClient = client("remote_cluster");
43+
String localIndex = "local_test";
44+
assertAcked(localClient.admin().indices().prepareCreate(localIndex).setSettings(indexSettings));
45+
localClient.prepareIndex(localIndex).setSource("foo", "foo").get();
46+
localClient.prepareIndex(localIndex).setSource("foo", "foobar").get();
47+
localClient.admin().indices().prepareRefresh(localIndex).get();
48+
49+
String remoteIndex = "remote_test";
50+
assertAcked(remoteClient.admin().indices().prepareCreate(remoteIndex).setSettings(indexSettings));
51+
remoteClient.prepareIndex(remoteIndex).setSource("foo", "bar").get();
52+
remoteClient.prepareIndex(remoteIndex).setSource("foo", "foobar").get();
53+
remoteClient.prepareIndex(remoteIndex).setSource("foo", "zar").get();
54+
remoteClient.admin().indices().prepareRefresh(remoteIndex).get();
55+
56+
// _terms_enum on a remote cluster
57+
TermsEnumRequest req = new TermsEnumRequest("remote_cluster:remote_test")
58+
.field("foo.keyword");
59+
TermsEnumResponse response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
60+
assertTrue(response.isComplete());
61+
assertThat(response.getTerms().size(), equalTo(3));
62+
assertThat(response.getTerms().get(0), equalTo("bar"));
63+
assertThat(response.getTerms().get(1), equalTo("foobar"));
64+
assertThat(response.getTerms().get(2), equalTo("zar"));
65+
66+
// _terms_enum on mixed clusters (local + remote)
67+
req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
68+
.field("foo.keyword");
69+
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
70+
assertTrue(response.isComplete());
71+
assertThat(response.getTerms().size(), equalTo(4));
72+
assertThat(response.getTerms().get(0), equalTo("bar"));
73+
assertThat(response.getTerms().get(1), equalTo("foo"));
74+
assertThat(response.getTerms().get(2), equalTo("foobar"));
75+
assertThat(response.getTerms().get(3), equalTo("zar"));
76+
77+
req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
78+
.field("foo.keyword")
79+
.searchAfter("foobar");
80+
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
81+
assertTrue(response.isComplete());
82+
assertThat(response.getTerms().size(), equalTo(1));
83+
assertThat(response.getTerms().get(0), equalTo("zar"));
84+
85+
req = new TermsEnumRequest("remote_cluster:remote_test", "local_test")
86+
.field("foo.keyword")
87+
.searchAfter("bar");
88+
response = client().execute(TermsEnumAction.INSTANCE, req).actionGet();
89+
assertTrue(response.isComplete());
90+
assertThat(response.getTerms().size(), equalTo(3));
91+
assertThat(response.getTerms().get(0), equalTo("foo"));
92+
assertThat(response.getTerms().get(1), equalTo("foobar"));
93+
assertThat(response.getTerms().get(2), equalTo("zar"));
94+
}
95+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumRequest.java

+46-42
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.support.IndicesOptions;
1111
import org.elasticsearch.common.io.stream.StreamInput;
1212
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.core.Nullable;
1314
import org.elasticsearch.index.query.QueryBuilder;
1415
import org.elasticsearch.index.shard.ShardId;
1516
import org.elasticsearch.transport.TransportRequest;
@@ -20,7 +21,7 @@
2021
import java.util.Set;
2122

2223
/**
23-
* Internal terms enum request executed directly against a specific node, querying potentially many
24+
* Internal terms enum request executed directly against a specific node, querying potentially many
2425
* shards in one request
2526
*/
2627
public class NodeTermsEnumRequest extends TransportRequest implements IndicesRequest {
@@ -36,12 +37,27 @@ public class NodeTermsEnumRequest extends TransportRequest implements IndicesReq
3637
private final QueryBuilder indexFilter;
3738
private Set<ShardId> shardIds;
3839
private String nodeId;
39-
40+
41+
public NodeTermsEnumRequest(final String nodeId,
42+
final Set<ShardId> shardIds,
43+
TermsEnumRequest request,
44+
long taskStartTimeMillis) {
45+
this.field = request.field();
46+
this.string = request.string();
47+
this.searchAfter = request.searchAfter();
48+
this.caseInsensitive = request.caseInsensitive();
49+
this.size = request.size();
50+
this.timeout = request.timeout().getMillis();
51+
this.taskStartedTimeMillis = taskStartTimeMillis;
52+
this.indexFilter = request.indexFilter();
53+
this.nodeId = nodeId;
54+
this.shardIds = shardIds;
55+
}
4056

4157
public NodeTermsEnumRequest(StreamInput in) throws IOException {
4258
super(in);
4359
field = in.readString();
44-
string = in.readString();
60+
string = in.readOptionalString();
4561
searchAfter = in.readOptionalString();
4662
caseInsensitive = in.readBoolean();
4763
size = in.readVInt();
@@ -56,36 +72,47 @@ public NodeTermsEnumRequest(StreamInput in) throws IOException {
5672
}
5773
}
5874

59-
public NodeTermsEnumRequest(final String nodeId, final Set<ShardId> shardIds, TermsEnumRequest request) {
60-
this.field = request.field();
61-
this.string = request.string();
62-
this.searchAfter = request.searchAfter();
63-
this.caseInsensitive = request.caseInsensitive();
64-
this.size = request.size();
65-
this.timeout = request.timeout().getMillis();
66-
this.taskStartedTimeMillis = request.taskStartTimeMillis;
67-
this.indexFilter = request.indexFilter();
68-
this.nodeId = nodeId;
69-
this.shardIds = shardIds;
75+
@Override
76+
public void writeTo(StreamOutput out) throws IOException {
77+
super.writeTo(out);
78+
out.writeString(field);
79+
out.writeOptionalString(string);
80+
out.writeOptionalString(searchAfter);
81+
out.writeBoolean(caseInsensitive);
82+
out.writeVInt(size);
83+
// Adjust the amount of permitted time the shard has remaining to gather terms.
84+
long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis;
85+
long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode);
86+
// TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0?
87+
out.writeVLong(remainingTimeForShardToUse);
88+
out.writeVLong(taskStartedTimeMillis);
89+
out.writeOptionalNamedWriteable(indexFilter);
90+
out.writeString(nodeId);
91+
out.writeVInt(shardIds.size());
92+
for (ShardId shardId : shardIds) {
93+
shardId.writeTo(out);
94+
}
7095
}
7196

7297
public String field() {
7398
return field;
7499
}
75100

101+
@Nullable
76102
public String string() {
77103
return string;
78104
}
79105

106+
@Nullable
80107
public String searchAfter() {
81108
return searchAfter;
82109
}
83110

84111
public long taskStartedTimeMillis() {
85112
return this.taskStartedTimeMillis;
86113
}
87-
88-
/**
114+
115+
/**
89116
* The time this request was materialized on a node
90117
*/
91118
long nodeStartedTimeMillis() {
@@ -94,12 +121,12 @@ long nodeStartedTimeMillis() {
94121
nodeStartedTimeMillis = System.currentTimeMillis();
95122
}
96123
return this.nodeStartedTimeMillis;
97-
}
98-
124+
}
125+
99126
public void startTimerOnDataNode() {
100127
nodeStartedTimeMillis = System.currentTimeMillis();
101128
}
102-
129+
103130
public Set<ShardId> shardIds() {
104131
return Collections.unmodifiableSet(shardIds);
105132
}
@@ -119,28 +146,6 @@ public String nodeId() {
119146
return nodeId;
120147
}
121148

122-
@Override
123-
public void writeTo(StreamOutput out) throws IOException {
124-
super.writeTo(out);
125-
out.writeString(field);
126-
out.writeString(string);
127-
out.writeOptionalString(searchAfter);
128-
out.writeBoolean(caseInsensitive);
129-
out.writeVInt(size);
130-
// Adjust the amount of permitted time the shard has remaining to gather terms.
131-
long timeSpentSoFarInCoordinatingNode = System.currentTimeMillis() - taskStartedTimeMillis;
132-
long remainingTimeForShardToUse = (timeout - timeSpentSoFarInCoordinatingNode);
133-
// TODO - if already timed out can we shortcut the trip somehow? Throw exception if remaining time < 0?
134-
out.writeVLong(remainingTimeForShardToUse);
135-
out.writeVLong(taskStartedTimeMillis);
136-
out.writeOptionalNamedWriteable(indexFilter);
137-
out.writeString(nodeId);
138-
out.writeVInt(shardIds.size());
139-
for (ShardId shardId : shardIds) {
140-
shardId.writeTo(out);
141-
}
142-
}
143-
144149
public QueryBuilder indexFilter() {
145150
return indexFilter;
146151
}
@@ -162,5 +167,4 @@ public IndicesOptions indicesOptions() {
162167
public boolean remove(ShardId shardId) {
163168
return shardIds.remove(shardId);
164169
}
165-
166170
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/termsenum/action/NodeTermsEnumResponse.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ class NodeTermsEnumResponse extends TransportResponse {
4141
this.complete = complete;
4242
}
4343

44+
@Override
45+
public void writeTo(StreamOutput out) throws IOException {
46+
out.writeCollection(terms);
47+
out.writeOptionalString(error);
48+
out.writeBoolean(complete);
49+
out.writeString(nodeId);
50+
}
51+
4452
public List<TermCount> terms() {
4553
return this.terms;
4654
}
@@ -52,17 +60,8 @@ public String getError() {
5260
public String getNodeId() {
5361
return nodeId;
5462
}
55-
56-
public boolean getComplete() {
57-
return complete;
58-
}
59-
6063

61-
@Override
62-
public void writeTo(StreamOutput out) throws IOException {
63-
out.writeCollection(terms);
64-
out.writeOptionalString(error);
65-
out.writeBoolean(complete);
66-
out.writeString(nodeId);
64+
public boolean isComplete() {
65+
return complete;
6766
}
6867
}

0 commit comments

Comments
 (0)