Skip to content

Commit 59f94b3

Browse files
authored
Handle missing logstash index exceptions (#63698)
This commit updates the APIs in the logstash plugin to handle IndexNotFoundExceptions that are returned by client calls. Until we have the creation of this index in place, we need to handle this case and not let the exception propagate out of the API.
1 parent 9b8b20a commit 59f94b3

File tree

4 files changed

+119
-4
lines changed

4 files changed

+119
-4
lines changed

x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportDeletePipelineAction.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.elasticsearch.xpack.logstash.action;
88

9+
import org.elasticsearch.ExceptionsHelper;
910
import org.elasticsearch.action.ActionListener;
1011
import org.elasticsearch.action.DocWriteResponse.Result;
1112
import org.elasticsearch.action.support.ActionFilters;
@@ -14,6 +15,7 @@
1415
import org.elasticsearch.client.Client;
1516
import org.elasticsearch.client.OriginSettingClient;
1617
import org.elasticsearch.common.inject.Inject;
18+
import org.elasticsearch.index.IndexNotFoundException;
1719
import org.elasticsearch.tasks.Task;
1820
import org.elasticsearch.transport.TransportService;
1921
import org.elasticsearch.xpack.logstash.Logstash;
@@ -37,8 +39,17 @@ protected void doExecute(Task task, DeletePipelineRequest request, ActionListene
3739
.execute(
3840
ActionListener.wrap(
3941
deleteResponse -> listener.onResponse(new DeletePipelineResponse(deleteResponse.getResult() == Result.DELETED)),
40-
listener::onFailure
42+
e -> handleFailure(e, listener)
4143
)
4244
);
4345
}
46+
47+
private void handleFailure(Exception e, ActionListener<DeletePipelineResponse> listener) {
48+
Throwable cause = ExceptionsHelper.unwrapCause(e);
49+
if (cause instanceof IndexNotFoundException) {
50+
listener.onResponse(new DeletePipelineResponse(false));
51+
} else {
52+
listener.onFailure(e);
53+
}
54+
}
4455
}

x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/action/TransportGetPipelineAction.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.apache.logging.log4j.LogManager;
1010
import org.apache.logging.log4j.Logger;
1111
import org.apache.logging.log4j.message.ParameterizedMessage;
12+
import org.elasticsearch.ExceptionsHelper;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.get.GetResponse;
1415
import org.elasticsearch.action.get.MultiGetItemResponse;
@@ -22,6 +23,7 @@
2223
import org.elasticsearch.common.bytes.BytesReference;
2324
import org.elasticsearch.common.inject.Inject;
2425
import org.elasticsearch.common.unit.TimeValue;
26+
import org.elasticsearch.index.IndexNotFoundException;
2527
import org.elasticsearch.index.query.QueryBuilders;
2628
import org.elasticsearch.search.SearchHit;
2729
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -81,7 +83,7 @@ protected void doExecute(Task task, GetPipelineRequest request, ActionListener<G
8183
}
8284
};
8385
handleSearchResponse(searchResponse, pipelineSources, clearScroll, listener);
84-
}, listener::onFailure));
86+
}, e -> handleFailure(e, listener)));
8587
} else if (request.ids().size() == 1) {
8688
client.prepareGet(Logstash.LOGSTASH_CONCRETE_INDEX_NAME, request.ids().get(0))
8789
.setFetchSource(true)
@@ -91,7 +93,7 @@ protected void doExecute(Task task, GetPipelineRequest request, ActionListener<G
9193
} else {
9294
listener.onResponse(new GetPipelineResponse(Map.of()));
9395
}
94-
}, listener::onFailure));
96+
}, e -> handleFailure(e, listener)));
9597
} else {
9698
client.prepareMultiGet()
9799
.addIds(Logstash.LOGSTASH_CONCRETE_INDEX_NAME, request.ids())
@@ -106,7 +108,16 @@ protected void doExecute(Task task, GetPipelineRequest request, ActionListener<G
106108
.collect(Collectors.toMap(GetResponse::getId, GetResponse::getSourceAsBytesRef))
107109
)
108110
);
109-
}, listener::onFailure));
111+
}, e -> handleFailure(e, listener)));
112+
}
113+
}
114+
115+
private void handleFailure(Exception e, ActionListener<GetPipelineResponse> listener) {
116+
Throwable cause = ExceptionsHelper.unwrapCause(e);
117+
if (cause instanceof IndexNotFoundException) {
118+
listener.onResponse(new GetPipelineResponse(Map.of()));
119+
} else {
120+
listener.onFailure(e);
110121
}
111122
}
112123

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.logstash.action;
8+
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.ActionRequest;
11+
import org.elasticsearch.action.ActionResponse;
12+
import org.elasticsearch.action.ActionType;
13+
import org.elasticsearch.action.support.ActionFilters;
14+
import org.elasticsearch.action.support.PlainActionFuture;
15+
import org.elasticsearch.client.Client;
16+
import org.elasticsearch.index.IndexNotFoundException;
17+
import org.elasticsearch.test.ESTestCase;
18+
import org.elasticsearch.test.client.NoOpClient;
19+
import org.elasticsearch.transport.RemoteTransportException;
20+
import org.elasticsearch.transport.TransportService;
21+
22+
import static org.hamcrest.Matchers.is;
23+
import static org.mockito.Mockito.mock;
24+
25+
public class TransportDeletePipelineActionTests extends ESTestCase {
26+
27+
public void testDeletePipelineWithMissingIndex() throws Exception {
28+
try (Client client = getFailureClient(new IndexNotFoundException("missing .logstash"))) {
29+
final TransportDeletePipelineAction action = new TransportDeletePipelineAction(
30+
mock(TransportService.class),
31+
mock(ActionFilters.class),
32+
client
33+
);
34+
final DeletePipelineRequest request = new DeletePipelineRequest(randomAlphaOfLength(4));
35+
final PlainActionFuture<DeletePipelineResponse> future = new PlainActionFuture<>();
36+
action.doExecute(null, request, future);
37+
assertThat(future.get().isDeleted(), is(false));
38+
}
39+
}
40+
41+
private Client getFailureClient(Exception e) {
42+
return new NoOpClient(getTestName()) {
43+
@Override
44+
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
45+
ActionType<Response> action,
46+
Request request,
47+
ActionListener<Response> listener
48+
) {
49+
if (randomBoolean()) {
50+
listener.onFailure(new RemoteTransportException("failed on other node", e));
51+
} else {
52+
listener.onFailure(e);
53+
}
54+
}
55+
};
56+
}
57+
}

x-pack/plugin/logstash/src/test/java/org/elasticsearch/xpack/logstash/action/TransportGetPipelineActionTests.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,20 @@
1717
import org.elasticsearch.action.get.MultiGetItemResponse;
1818
import org.elasticsearch.action.get.MultiGetResponse;
1919
import org.elasticsearch.action.support.ActionFilters;
20+
import org.elasticsearch.action.support.PlainActionFuture;
2021
import org.elasticsearch.client.Client;
2122
import org.elasticsearch.common.bytes.BytesReference;
2223
import org.elasticsearch.common.logging.Loggers;
24+
import org.elasticsearch.index.IndexNotFoundException;
2325
import org.elasticsearch.test.ESTestCase;
2426
import org.elasticsearch.test.MockLogAppender;
2527
import org.elasticsearch.test.client.NoOpClient;
28+
import org.elasticsearch.transport.RemoteTransportException;
2629
import org.elasticsearch.transport.TransportService;
2730

2831
import java.util.List;
2932

33+
import static org.hamcrest.Matchers.anEmptyMap;
3034
import static org.hamcrest.Matchers.equalTo;
3135
import static org.hamcrest.Matchers.is;
3236
import static org.hamcrest.Matchers.notNullValue;
@@ -98,6 +102,21 @@ public void onFailure(Exception e) {
98102
}
99103
}
100104

105+
public void testMissingIndexHandling() throws Exception {
106+
try (Client failureClient = getFailureClient(new IndexNotFoundException("foo"))) {
107+
final TransportGetPipelineAction action = new TransportGetPipelineAction(
108+
mock(TransportService.class),
109+
mock(ActionFilters.class),
110+
failureClient
111+
);
112+
final List<String> pipelines = randomList(0, 10, () -> randomAlphaOfLengthBetween(1, 8));
113+
final GetPipelineRequest request = new GetPipelineRequest(pipelines);
114+
PlainActionFuture<GetPipelineResponse> future = new PlainActionFuture<>();
115+
action.doExecute(null, request, future);
116+
assertThat(future.get().pipelines(), anEmptyMap());
117+
}
118+
}
119+
101120
private Client getMockClient(ActionResponse response) {
102121
return new NoOpClient(getTestName()) {
103122
@Override
@@ -111,4 +130,21 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
111130
}
112131
};
113132
}
133+
134+
private Client getFailureClient(Exception e) {
135+
return new NoOpClient(getTestName()) {
136+
@Override
137+
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
138+
ActionType<Response> action,
139+
Request request,
140+
ActionListener<Response> listener
141+
) {
142+
if (randomBoolean()) {
143+
listener.onFailure(new RemoteTransportException("failed on other node", e));
144+
} else {
145+
listener.onFailure(e);
146+
}
147+
}
148+
};
149+
}
114150
}

0 commit comments

Comments
 (0)