Skip to content

Commit 04ebc63

Browse files
cbismuthjavanna
authored andcommitted
RoutingMissingException in more like this (#33974)
More like this query allows to provide identifiers of documents to be retrieved as like/unlike items. It can happen that at retrieval time an error is thrown, for instance caused by missing routing value when `_routing` is set required in the mapping. Instead of ignoring such error and returning no documents for the query, the error should be re-thrown and returned to users. As part of this change also mget and mtermvectors are unified in the way they throw such exception like it happens in other places, so that a `RoutingMissingException` is raised. Closes #29678
1 parent 9bdbba2 commit 04ebc63

File tree

6 files changed

+529
-7
lines changed

6 files changed

+529
-7
lines changed

server/src/main/java/org/elasticsearch/action/get/TransportMultiGetAction.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.get;
2121

2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.RoutingMissingException;
2324
import org.elasticsearch.action.support.ActionFilters;
2425
import org.elasticsearch.action.support.HandledTransportAction;
2526
import org.elasticsearch.cluster.ClusterState;
@@ -69,8 +70,8 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL
6970

7071
item.routing(clusterState.metaData().resolveIndexRouting(item.routing(), item.index()));
7172
if ((item.routing() == null) && (clusterState.getMetaData().routingRequired(concreteSingleIndex, item.type()))) {
72-
String message = "routing is required for [" + concreteSingleIndex + "]/[" + item.type() + "]/[" + item.id() + "]";
73-
responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(), new IllegalArgumentException(message)));
73+
responses.set(i, newItemFailure(concreteSingleIndex, item.type(), item.id(),
74+
new RoutingMissingException(concreteSingleIndex, item.type(), item.id())));
7475
continue;
7576
}
7677
} catch (Exception e) {
@@ -95,6 +96,12 @@ protected void doExecute(Task task, final MultiGetRequest request, final ActionL
9596
listener.onResponse(new MultiGetResponse(responses.toArray(new MultiGetItemResponse[responses.length()])));
9697
}
9798

99+
executeShardAction(listener, responses, shardRequests);
100+
}
101+
102+
protected void executeShardAction(ActionListener<MultiGetResponse> listener,
103+
AtomicArray<MultiGetItemResponse> responses,
104+
Map<ShardId, MultiGetShardRequest> shardRequests) {
98105
final AtomicInteger counter = new AtomicInteger(shardRequests.size());
99106

100107
for (final MultiGetShardRequest shardRequest : shardRequests.values()) {

server/src/main/java/org/elasticsearch/action/termvectors/TransportMultiTermVectorsAction.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.termvectors;
2121

2222
import org.elasticsearch.action.ActionListener;
23+
import org.elasticsearch.action.RoutingMissingException;
2324
import org.elasticsearch.action.support.ActionFilters;
2425
import org.elasticsearch.action.support.HandledTransportAction;
2526
import org.elasticsearch.cluster.ClusterState;
@@ -67,17 +68,17 @@ protected void doExecute(Task task, final MultiTermVectorsRequest request, final
6768
termVectorsRequest.routing(clusterState.metaData().resolveIndexRouting(termVectorsRequest.routing(),
6869
termVectorsRequest.index()));
6970
if (!clusterState.metaData().hasConcreteIndex(termVectorsRequest.index())) {
70-
responses.set(i, new MultiTermVectorsItemResponse(null, new MultiTermVectorsResponse.Failure(termVectorsRequest.index(),
71-
termVectorsRequest.type(), termVectorsRequest.id(), new IndexNotFoundException(termVectorsRequest.index()))));
71+
responses.set(i, new MultiTermVectorsItemResponse(null,
72+
new MultiTermVectorsResponse.Failure(termVectorsRequest.index(), termVectorsRequest.type(), termVectorsRequest.id(),
73+
new IndexNotFoundException(termVectorsRequest.index()))));
7274
continue;
7375
}
7476
String concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, termVectorsRequest).getName();
7577
if (termVectorsRequest.routing() == null &&
76-
clusterState.getMetaData().routingRequired(concreteSingleIndex, termVectorsRequest.type())) {
78+
clusterState.getMetaData().routingRequired(concreteSingleIndex, termVectorsRequest.type())) {
7779
responses.set(i, new MultiTermVectorsItemResponse(null,
7880
new MultiTermVectorsResponse.Failure(concreteSingleIndex, termVectorsRequest.type(), termVectorsRequest.id(),
79-
new IllegalArgumentException("routing is required for [" + concreteSingleIndex + "]/[" +
80-
termVectorsRequest.type() + "]/[" + termVectorsRequest.id() + "]"))));
81+
new RoutingMissingException(concreteSingleIndex, termVectorsRequest.type(), termVectorsRequest.id()))));
8182
continue;
8283
}
8384
ShardId shardId = clusterService.operationRouting().shardId(clusterState, concreteSingleIndex,
@@ -96,7 +97,14 @@ protected void doExecute(Task task, final MultiTermVectorsRequest request, final
9697
listener.onResponse(new MultiTermVectorsResponse(responses.toArray(new MultiTermVectorsItemResponse[responses.length()])));
9798
}
9899

100+
executeShardAction(listener, responses, shardRequests);
101+
}
102+
103+
protected void executeShardAction(ActionListener<MultiTermVectorsResponse> listener,
104+
AtomicArray<MultiTermVectorsItemResponse> responses,
105+
Map<ShardId, MultiTermVectorsShardRequest> shardRequests) {
99106
final AtomicInteger counter = new AtomicInteger(shardRequests.size());
107+
100108
for (final MultiTermVectorsShardRequest shardRequest : shardRequests.values()) {
101109
shardAction.execute(shardRequest, new ActionListener<MultiTermVectorsShardResponse>() {
102110
@Override

server/src/main/java/org/elasticsearch/index/query/MoreLikeThisQueryBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.lucene.search.Query;
2727
import org.elasticsearch.ElasticsearchParseException;
2828
import org.elasticsearch.ExceptionsHelper;
29+
import org.elasticsearch.action.RoutingMissingException;
2930
import org.elasticsearch.action.termvectors.MultiTermVectorsItemResponse;
3031
import org.elasticsearch.action.termvectors.MultiTermVectorsRequest;
3132
import org.elasticsearch.action.termvectors.MultiTermVectorsResponse;
@@ -1110,6 +1111,7 @@ private static Fields[] getFieldsFor(MultiTermVectorsResponse responses) throws
11101111

11111112
for (MultiTermVectorsItemResponse response : responses) {
11121113
if (response.isFailed()) {
1114+
checkRoutingMissingException(response);
11131115
continue;
11141116
}
11151117
TermVectorsResponse getResponse = response.getResponse();
@@ -1121,6 +1123,13 @@ private static Fields[] getFieldsFor(MultiTermVectorsResponse responses) throws
11211123
return likeFields.toArray(Fields.EMPTY_ARRAY);
11221124
}
11231125

1126+
private static void checkRoutingMissingException(MultiTermVectorsItemResponse response) {
1127+
Throwable cause = ExceptionsHelper.unwrap(response.getFailure().getCause(), RoutingMissingException.class);
1128+
if (cause != null) {
1129+
throw ((RoutingMissingException) cause);
1130+
}
1131+
}
1132+
11241133
private static void handleExclude(BooleanQuery.Builder boolQuery, Item[] likeItems, QueryShardContext context) {
11251134
MappedFieldType idField = context.fieldMapper(IdFieldMapper.NAME);
11261135
if (idField == null) {
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.get;
21+
22+
import org.elasticsearch.Version;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.action.IndicesRequest;
25+
import org.elasticsearch.action.RoutingMissingException;
26+
import org.elasticsearch.action.support.ActionFilters;
27+
import org.elasticsearch.client.node.NodeClient;
28+
import org.elasticsearch.cluster.ClusterName;
29+
import org.elasticsearch.cluster.ClusterState;
30+
import org.elasticsearch.cluster.metadata.IndexMetaData;
31+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
32+
import org.elasticsearch.cluster.metadata.MetaData;
33+
import org.elasticsearch.cluster.node.DiscoveryNode;
34+
import org.elasticsearch.cluster.routing.OperationRouting;
35+
import org.elasticsearch.cluster.routing.ShardIterator;
36+
import org.elasticsearch.cluster.service.ClusterService;
37+
import org.elasticsearch.common.bytes.BytesReference;
38+
import org.elasticsearch.common.settings.Settings;
39+
import org.elasticsearch.common.util.concurrent.AtomicArray;
40+
import org.elasticsearch.common.xcontent.XContentFactory;
41+
import org.elasticsearch.common.xcontent.XContentHelper;
42+
import org.elasticsearch.common.xcontent.XContentType;
43+
import org.elasticsearch.index.Index;
44+
import org.elasticsearch.index.shard.ShardId;
45+
import org.elasticsearch.indices.IndicesService;
46+
import org.elasticsearch.tasks.Task;
47+
import org.elasticsearch.tasks.TaskId;
48+
import org.elasticsearch.tasks.TaskManager;
49+
import org.elasticsearch.test.ESTestCase;
50+
import org.elasticsearch.threadpool.TestThreadPool;
51+
import org.elasticsearch.threadpool.ThreadPool;
52+
import org.elasticsearch.transport.Transport;
53+
import org.elasticsearch.transport.TransportService;
54+
import org.junit.AfterClass;
55+
import org.junit.BeforeClass;
56+
57+
import java.util.Map;
58+
import java.util.concurrent.TimeUnit;
59+
import java.util.concurrent.atomic.AtomicBoolean;
60+
61+
import static java.util.Collections.emptyMap;
62+
import static java.util.Collections.emptySet;
63+
import static org.elasticsearch.common.UUIDs.randomBase64UUID;
64+
import static org.hamcrest.Matchers.equalTo;
65+
import static org.hamcrest.Matchers.instanceOf;
66+
import static org.mockito.Matchers.anyString;
67+
import static org.mockito.Matchers.eq;
68+
import static org.mockito.Mockito.mock;
69+
import static org.mockito.Mockito.when;
70+
71+
public class TransportMultiGetActionTests extends ESTestCase {
72+
73+
private static ThreadPool threadPool;
74+
private static TransportService transportService;
75+
private static ClusterService clusterService;
76+
private static TransportMultiGetAction transportAction;
77+
private static TransportShardMultiGetAction shardAction;
78+
79+
@BeforeClass
80+
public static void beforeClass() throws Exception {
81+
threadPool = new TestThreadPool(TransportMultiGetActionTests.class.getSimpleName());
82+
83+
transportService = new TransportService(Settings.EMPTY, mock(Transport.class), threadPool,
84+
TransportService.NOOP_TRANSPORT_INTERCEPTOR,
85+
boundAddress -> DiscoveryNode.createLocal(Settings.builder().put("node.name", "node1").build(),
86+
boundAddress.publishAddress(), randomBase64UUID()), null, emptySet()) {
87+
@Override
88+
public TaskManager getTaskManager() {
89+
return taskManager;
90+
}
91+
};
92+
93+
final Index index1 = new Index("index1", randomBase64UUID());
94+
final ClusterState clusterState = ClusterState.builder(new ClusterName(TransportMultiGetActionTests.class.getSimpleName()))
95+
.metaData(new MetaData.Builder()
96+
.put(new IndexMetaData.Builder(index1.getName())
97+
.settings(Settings.builder().put("index.version.created", Version.CURRENT)
98+
.put("index.number_of_shards", 1)
99+
.put("index.number_of_replicas", 1)
100+
.put(IndexMetaData.SETTING_INDEX_UUID, index1.getUUID()))
101+
.putMapping("type1",
102+
XContentHelper.convertToJson(BytesReference.bytes(XContentFactory.jsonBuilder()
103+
.startObject()
104+
.startObject("type1")
105+
.startObject("_routing")
106+
.field("required", false)
107+
.endObject()
108+
.endObject()
109+
.endObject()), true, XContentType.JSON))
110+
.putMapping("type2",
111+
XContentHelper.convertToJson(BytesReference.bytes(XContentFactory.jsonBuilder()
112+
.startObject()
113+
.startObject("type2")
114+
.startObject("_routing")
115+
.field("required", true)
116+
.endObject()
117+
.endObject()
118+
.endObject()), true, XContentType.JSON)))).build();
119+
120+
final ShardIterator shardIterator = mock(ShardIterator.class);
121+
when(shardIterator.shardId()).thenReturn(new ShardId(index1, randomInt()));
122+
123+
final OperationRouting operationRouting = mock(OperationRouting.class);
124+
when(operationRouting.getShards(eq(clusterState), eq(index1.getName()), anyString(), anyString(), anyString()))
125+
.thenReturn(shardIterator);
126+
when(operationRouting.shardId(eq(clusterState), eq(index1.getName()), anyString(), anyString()))
127+
.thenReturn(new ShardId(index1, randomInt()));
128+
129+
clusterService = mock(ClusterService.class);
130+
when(clusterService.localNode()).thenReturn(transportService.getLocalNode());
131+
when(clusterService.state()).thenReturn(clusterState);
132+
when(clusterService.operationRouting()).thenReturn(operationRouting);
133+
134+
shardAction = new TransportShardMultiGetAction(clusterService, transportService, mock(IndicesService.class), threadPool,
135+
new ActionFilters(emptySet()), new Resolver()) {
136+
@Override
137+
protected void doExecute(Task task, MultiGetShardRequest request, ActionListener<MultiGetShardResponse> listener) {
138+
}
139+
};
140+
}
141+
142+
@AfterClass
143+
public static void afterClass() {
144+
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
145+
threadPool = null;
146+
transportService = null;
147+
clusterService = null;
148+
transportAction = null;
149+
shardAction = null;
150+
}
151+
152+
public void testTransportMultiGetAction() {
153+
final Task task = createTask();
154+
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
155+
final MultiGetRequestBuilder request = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
156+
request.add(new MultiGetRequest.Item("index1", "type1", "1"));
157+
request.add(new MultiGetRequest.Item("index1", "type1", "2"));
158+
159+
final AtomicBoolean shardActionInvoked = new AtomicBoolean(false);
160+
transportAction = new TransportMultiGetAction(transportService, clusterService, shardAction,
161+
new ActionFilters(emptySet()), new Resolver()) {
162+
@Override
163+
protected void executeShardAction(final ActionListener<MultiGetResponse> listener,
164+
final AtomicArray<MultiGetItemResponse> responses,
165+
final Map<ShardId, MultiGetShardRequest> shardRequests) {
166+
shardActionInvoked.set(true);
167+
assertEquals(2, responses.length());
168+
assertNull(responses.get(0));
169+
assertNull(responses.get(1));
170+
}
171+
};
172+
173+
transportAction.execute(task, request.request(), new ActionListenerAdapter());
174+
assertTrue(shardActionInvoked.get());
175+
}
176+
177+
public void testTransportMultiGetAction_withMissingRouting() {
178+
final Task task = createTask();
179+
final NodeClient client = new NodeClient(Settings.EMPTY, threadPool);
180+
final MultiGetRequestBuilder request = new MultiGetRequestBuilder(client, MultiGetAction.INSTANCE);
181+
request.add(new MultiGetRequest.Item("index1", "type2", "1").routing("1"));
182+
request.add(new MultiGetRequest.Item("index1", "type2", "2"));
183+
184+
final AtomicBoolean shardActionInvoked = new AtomicBoolean(false);
185+
transportAction = new TransportMultiGetAction(transportService, clusterService, shardAction,
186+
new ActionFilters(emptySet()), new Resolver()) {
187+
@Override
188+
protected void executeShardAction(final ActionListener<MultiGetResponse> listener,
189+
final AtomicArray<MultiGetItemResponse> responses,
190+
final Map<ShardId, MultiGetShardRequest> shardRequests) {
191+
shardActionInvoked.set(true);
192+
assertEquals(2, responses.length());
193+
assertNull(responses.get(0));
194+
assertThat(responses.get(1).getFailure().getFailure(), instanceOf(RoutingMissingException.class));
195+
assertThat(responses.get(1).getFailure().getFailure().getMessage(),
196+
equalTo("routing is required for [index1]/[type2]/[2]"));
197+
}
198+
};
199+
200+
transportAction.execute(task, request.request(), new ActionListenerAdapter());
201+
assertTrue(shardActionInvoked.get());
202+
203+
}
204+
205+
private static Task createTask() {
206+
return new Task(randomLong(), "transport", MultiGetAction.NAME, "description",
207+
new TaskId(randomLong() + ":" + randomLong()), emptyMap());
208+
}
209+
210+
static class Resolver extends IndexNameExpressionResolver {
211+
212+
@Override
213+
public Index concreteSingleIndex(ClusterState state, IndicesRequest request) {
214+
return new Index("index1", randomBase64UUID());
215+
}
216+
}
217+
218+
static class ActionListenerAdapter implements ActionListener<MultiGetResponse> {
219+
220+
@Override
221+
public void onResponse(MultiGetResponse response) {
222+
}
223+
224+
@Override
225+
public void onFailure(Exception e) {
226+
}
227+
}
228+
}

0 commit comments

Comments
 (0)