Skip to content

Commit 699b806

Browse files
cbismuthjavanna
authored andcommitted
RoutingMissingException in more like this (#33974)
More like this query allows to provide identifiers of documents to be retrieved as like/unlike items. It can happen that at retrieval time an error is thrown, for instance caused by missing routing value when `_routing` is set required in the mapping. Instead of ignoring such error and returning no documents for the query, the error should be re-thrown and returned to users. As part of this change also mget and mtermvectors are unified in the way they throw such exception like it happens in other places, so that a `RoutingMissingException` is raised. Closes #29678
1 parent c673ad6 commit 699b806

File tree

6 files changed

+540
-8
lines changed

6 files changed

+540
-8
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.get;
2121

2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.RoutingMissingException;
2324
import org.elasticsearch.action.support.ActionFilters;
2425
import org.elasticsearch.action.support.HandledTransportAction;
2526
import org.elasticsearch.cluster.ClusterState;
@@ -69,8 +70,8 @@ protected void doExecute(final MultiGetRequest request, final ActionListener<Mul
6970

7071
item.routing(clusterState.metaData().resolveIndexRouting(item.parent(), item.routing(), item.index()));
7172
if ((item.routing() == null) && (clusterState.getMetaData().routingRequired(concreteSingleIndex, item.type()))) {
72-
String message = "routing is required for [" + concreteSingleIndex + "]/[" + item.type() + "]/[" + item.id() + "]";
73-
responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(), new IllegalArgumentException(message)));
73+
responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(),
74+
new RoutingMissingException(concreteSingleIndex, item.type(), item.id())));
7475
continue;
7576
}
7677
} catch (Exception e) {
@@ -95,6 +96,12 @@ protected void doExecute(final MultiGetRequest request, final ActionListener<Mul
9596
listener.onResponse(new MultiGetResponse(responses.toArray(new MultiGetItemResponse[responses.length()])));
9697
}
9798

99+
executeShardAction(listener, responses, shardRequests);
100+
}
101+
102+
protected void executeShardAction(ActionListener<MultiGetResponse> listener,
103+
AtomicArray<MultiGetItemResponse> responses,
104+
Map<ShardId, MultiGetShardRequest> shardRequests) {
98105
final AtomicInteger counter = new AtomicInteger(shardRequests.size());
99106

100107
for (final MultiGetShardRequest shardRequest : shardRequests.values()) {

server/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.termvectors;
2121

2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.RoutingMissingException;
2324
import org.elasticsearch.action.support.ActionFilters;
2425
import org.elasticsearch.action.support.HandledTransportAction;
2526
import org.elasticsearch.cluster.ClusterState;
@@ -68,17 +69,17 @@ protected void doExecute(final MultiTermVectorsRequest request, final ActionList
6869
termVectorsRequest.routing(clusterState.metaData()
6970
.resolveIndexRouting(termVectorsRequest.parent(), termVectorsRequest.routing(), termVectorsRequest.index()));
7071
if (!clusterState.metaData().hasConcreteIndex(termVectorsRequest.index())) {
71-
responses.set(i, new MultiTermVectorsItemResponse(null, new MultiTermVectorsResponse.Failure(termVectorsRequest.index(),
72-
termVectorsRequest.type(), termVectorsRequest.id(), new IndexNotFoundException(termVectorsRequest.index()))));
72+
responses.set(i, new MultiTermVectorsItemResponse(null,
73+
new MultiTermVectorsResponse.Failure(termVectorsRequest.index(), termVectorsRequest.type(), termVectorsRequest.id(),
74+
new IndexNotFoundException(termVectorsRequest.index()))));
7375
continue;
7476
}
7577
String concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, termVectorsRequest).getName();
76-
if (termVectorsRequest.routing() == null && clusterState.getMetaData()
77-
.routingRequired(concreteSingleIndex, termVectorsRequest.type())) {
78+
if (termVectorsRequest.routing() == null &&
79+
clusterState.getMetaData().routingRequired(concreteSingleIndex, termVectorsRequest.type())) {
7880
responses.set(i, new MultiTermVectorsItemResponse(null,
7981
new MultiTermVectorsResponse.Failure(concreteSingleIndex, termVectorsRequest.type(), termVectorsRequest.id(),
80-
new IllegalArgumentException("routing is required for [" + concreteSingleIndex + "]/[" +
81-
termVectorsRequest.type() + "]/[" + termVectorsRequest.id() + "]"))));
82+
new RoutingMissingException(concreteSingleIndex, termVectorsRequest.type(), termVectorsRequest.id()))));
8283
continue;
8384
}
8485
ShardId shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex,
@@ -97,7 +98,14 @@ protected void doExecute(final MultiTermVectorsRequest request, final ActionList
9798
listener.onResponse(new MultiTermVectorsResponse(responses.toArray(new MultiTermVectorsItemResponse[responses.length()])));
9899
}
99100

101+
executeShardAction(listener, responses, shardRequests);
102+
}
103+
104+
protected void executeShardAction(ActionListener<MultiTermVectorsResponse> listener,
105+
AtomicArray<MultiTermVectorsItemResponse> responses,
106+
Map<ShardId, MultiTermVectorsShardRequest> shardRequests) {
100107
final AtomicInteger counter = new AtomicInteger(shardRequests.size());
108+
101109
for (final MultiTermVectorsShardRequest shardRequest : shardRequests.values()) {
102110
shardAction.execute(shardRequest, new ActionListener<MultiTermVectorsShardResponse>() {
103111
@Override

server/src/main/java/org/elasticsearch/index/query/MoreLikeThisQueryBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.ElasticsearchParseException;
2929
import org.elasticsearch.ExceptionsHelper;
3030
import org.elasticsearch.Version;
31+
import org.elasticsearch.action.RoutingMissingException;
3132
import org.elasticsearch.action.termvectors.MultiTermVectorsItemResponse;
3233
import org.elasticsearch.action.termvectors.MultiTermVectorsRequest;
3334
import org.elasticsearch.action.termvectors.MultiTermVectorsResponse;
@@ -1121,6 +1122,7 @@ private static Fields[] getFieldsFor(MultiTermVectorsResponse responses) throws
11211122

11221123
for (MultiTermVectorsItemResponse response : responses) {
11231124
if (response.isFailed()) {
1125+
checkRoutingMissingException(response);
11241126
continue;
11251127
}
11261128
TermVectorsResponse getResponse = response.getResponse();
@@ -1132,6 +1134,13 @@ private static Fields[] getFieldsFor(MultiTermVectorsResponse responses) throws
11321134
return likeFields.toArray(Fields.EMPTY_ARRAY);
11331135
}
11341136

1137+
private static void checkRoutingMissingException(MultiTermVectorsItemResponse response) {
1138+
Throwable cause = ExceptionsHelper.unwrap(response.getFailure().getCause(), RoutingMissingException.class);
1139+
if (cause != null) {
1140+
throw ((RoutingMissingException) cause);
1141+
}
1142+
}
1143+
11351144
private static void handleExclude(BooleanQuery.Builder boolQuery, Item[] likeItems, QueryShardContext context) {
11361145
MappedFieldType uidField = context.fieldMapper(UidFieldMapper.NAME);
11371146
if (uidField == null) {
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.get;
21+
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.IndicesRequest;
25+
import org.elasticsearch.action.RoutingMissingException;
26+
import org.elasticsearch.action.support.ActionFilters;
27+
import org.elasticsearch.client.node.NodeClient;
28+
import org.elasticsearch.cluster.ClusterName;
29+
import org.elasticsearch.cluster.ClusterState;
30+
import org.elasticsearch.cluster.metadata.IndexMetaData;
31+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
32+
import org.elasticsearch.cluster.metadata.MetaData;
33+
import org.elasticsearch.cluster.node.DiscoveryNode;
34+
import org.elasticsearch.cluster.routing.OperationRouting;
35+
import org.elasticsearch.cluster.routing.ShardIterator;
36+
import org.elasticsearch.cluster.service.ClusterService;
37+
import org.elasticsearch.common.bytes.BytesReference;
38+
import org.elasticsearch.common.settings.Settings;
39+
import org.elasticsearch.common.util.concurrent.AtomicArray;
40+
import org.elasticsearch.common.xcontent.XContentFactory;
41+
import org.elasticsearch.common.xcontent.XContentHelper;
42+
import org.elasticsearch.common.xcontent.XContentType;
43+
import org.elasticsearch.index.Index;
44+
import org.elasticsearch.index.shard.ShardId;
45+
import org.elasticsearch.indices.IndicesService;
46+
import org.elasticsearch.tasks.Task;
47+
import org.elasticsearch.tasks.TaskId;
48+
import org.elasticsearch.tasks.TaskManager;
49+
import org.elasticsearch.test.ESTestCase;
50+
import org.elasticsearch.threadpool.TestThreadPool;
51+
import org.elasticsearch.threadpool.ThreadPool;
52+
import org.elasticsearch.transport.Transport;
53+
import org.elasticsearch.transport.TransportService;
54+
import org.junit.AfterClass;
55+
import org.junit.BeforeClass;
56+
57+
import java.util.Map;
58+
import java.util.concurrent.TimeUnit;
59+
import java.util.concurrent.atomic.AtomicBoolean;
60+
61+
import static java.util.Collections.emptyMap;
62+
import static java.util.Collections.emptySet;
63+
import static org.elasticsearch.common.UUIDs.randomBase64UUID;
64+
import static org.hamcrest.Matchers.equalTo;
65+
import static org.hamcrest.Matchers.instanceOf;
66+
import static org.mockito.Matchers.anyString;
67+
import static org.mockito.Matchers.eq;
68+
import static org.mockito.Mockito.mock;
69+
import static org.mockito.Mockito.when;
70+
71+
public class TransportMultiGetActionTests extends ESTestCase {
72+
73+
private static ThreadPool threadPool;
74+
private static TransportService transportService;
75+
private static ClusterService clusterService;
76+
private static TransportMultiGetAction transportAction;
77+
private static TransportShardMultiGetAction shardAction;
78+
79+
@BeforeClass
80+
public static void beforeClass() throws Exception {
81+
threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName());
82+
83+
transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool,
84+
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
85+
boundAddress -> DiscoveryNode.createLocal(Settings.builder().put("node.name", "node1").build(),
86+
boundAddress.publishAddress(), randomBase64UUID()), null, emptySet()) {
87+
@Override
88+
public TaskManager getTaskManager() {
89+
return taskManager;
90+
}
91+
};
92+
93+
final Index index1 = new Index("index1", randomBase64UUID());
94+
final ClusterState clusterState = ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName()))
95+
.metaData(new MetaData.Builder()
96+
.put(new IndexMetaData.Builder(index1.getName())
97+
.settings(Settings.builder().put("index.version.created", Version.CURRENT)
98+
.put("index.number_of_shards", 1)
99+
.put("index.number_of_replicas", 1)
100+
.put(IndexMetaData.SETTING_INDEX_UUID, index1.getUUID()))
101+
.putMapping("type1",
102+
XContentHelper.convertToJson(BytesReference.bytes(XContentFactory.jsonBuilder()
103+
.startObject()
104+
.startObject("type1")
105+
.startObject("_routing")
106+
.field("required", false)
107+
.endObject()
108+
.endObject()
109+
.endObject()), true, XContentType.JSON))
110+
.putMapping("type2",
111+
XContentHelper.convertToJson(BytesReference.bytes(XContentFactory.jsonBuilder()
112+
.startObject()
113+
.startObject("type2")
114+
.startObject("_routing")
115+
.field("required", true)
116+
.endObject()
117+
.endObject()
118+
.endObject()), true, XContentType.JSON)))).build();
119+
120+
final ShardIterator shardIterator = mock(ShardIterator.class);
121+
when(shardIterator.shardId()).thenReturn(new ShardId(index1, randomInt()));
122+
123+
final OperationRouting operationRouting = mock(OperationRouting.class);
124+
when(operationRouting.getShards(eq(clusterState), eq(index1.getName()), anyString(), anyString(), anyString()))
125+
.thenReturn(shardIterator);
126+
when(operationRouting.shardId(eq(clusterState), eq(index1.getName()), anyString(), anyString()))
127+
.thenReturn(new ShardId(index1, randomInt()));
128+
129+
clusterService = mock(ClusterService.class);
130+
when(clusterService.localNode()).thenReturn(transportService.getLocalNode());
131+
when(clusterService.state()).thenReturn(clusterState);
132+
when(clusterService.operationRouting()).thenReturn(operationRouting);
133+
134+
shardAction = new TransportShardMultiGetAction(Settings.EMPTY, clusterService, transportService, mock(IndicesService.class),
135+
threadPool,
136+
new ActionFilters(emptySet()), new Resolver()) {
137+
@Override
138+
protected void doExecute(Task task, MultiGetShardRequest request, ActionListener<MultiGetShardResponse> listener) {
139+
}
140+
};
141+
}
142+
143+
@AfterClass
144+
public static void afterClass() {
145+
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
146+
threadPool = null;
147+
transportService = null;
148+
clusterService = null;
149+
transportAction = null;
150+
shardAction = null;
151+
}
152+
153+
public void testTransportMultiGetAction() {
154+
final Task task = createTask();
155+
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
156+
final MultiGetRequestBuilder request = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
157+
request.add(new MultiGetRequest.Item("index1", "type1", "1"));
158+
request.add(new MultiGetRequest.Item("index1", "type1", "2"));
159+
160+
final AtomicBoolean shardActionInvoked = new AtomicBoolean(false);
161+
transportAction = new TransportMultiGetAction(Settings.EMPTY, threadPool, transportService, clusterService, shardAction,
162+
new ActionFilters(emptySet()), new Resolver()) {
163+
@Override
164+
protected void executeShardAction(final ActionListener<MultiGetResponse> listener,
165+
final AtomicArray<MultiGetItemResponse> responses,
166+
final Map<ShardId, MultiGetShardRequest> shardRequests) {
167+
shardActionInvoked.set(true);
168+
assertEquals(2, responses.length());
169+
assertNull(responses.get(0));
170+
assertNull(responses.get(1));
171+
}
172+
};
173+
174+
transportAction.execute(task, request.request(), new ActionListenerAdapter());
175+
assertTrue(shardActionInvoked.get());
176+
}
177+
178+
public void testTransportMultiGetAction_withMissingRouting() {
179+
final Task task = createTask();
180+
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
181+
final MultiGetRequestBuilder request = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
182+
request.add(new MultiGetRequest.Item("index1", "type2", "1").routing("1"));
183+
request.add(new MultiGetRequest.Item("index1", "type2", "2"));
184+
185+
final AtomicBoolean shardActionInvoked = new AtomicBoolean(false);
186+
transportAction = new TransportMultiGetAction(Settings.EMPTY, threadPool, transportService, clusterService, shardAction,
187+
new ActionFilters(emptySet()), new Resolver()) {
188+
@Override
189+
protected void executeShardAction(final ActionListener<MultiGetResponse> listener,
190+
final AtomicArray<MultiGetItemResponse> responses,
191+
final Map<ShardId, MultiGetShardRequest> shardRequests) {
192+
shardActionInvoked.set(true);
193+
assertEquals(2, responses.length());
194+
assertNull(responses.get(0));
195+
assertThat(responses.get(1).getFailure().getFailure(), instanceOf(RoutingMissingException.class));
196+
assertThat(responses.get(1).getFailure().getFailure().getMessage(),
197+
equalTo("routing is required for [index1]/[type2]/[2]"));
198+
}
199+
};
200+
201+
transportAction.execute(task, request.request(), new ActionListenerAdapter());
202+
assertTrue(shardActionInvoked.get());
203+
204+
}
205+
206+
private static Task createTask() {
207+
return new Task(randomLong(), "transport", MultiGetAction.NAME, "description",
208+
new TaskId(randomLong() + ":" + randomLong()), emptyMap());
209+
}
210+
211+
static class Resolver extends IndexNameExpressionResolver {
212+
213+
Resolver() {
214+
super(Settings.EMPTY);
215+
}
216+
217+
@Override
218+
public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
219+
return new Index("index1", randomBase64UUID());
220+
}
221+
}
222+
223+
static class ActionListenerAdapter implements ActionListener<MultiGetResponse> {
224+
225+
@Override
226+
public void onResponse(MultiGetResponse response) {
227+
}
228+
229+
@Override
230+
public void onFailure(Exception e) {
231+
}
232+
}
233+
}

0 commit comments

Comments
 (0)