|
23 | 23 | import org.elasticsearch.rest.RestRequest;
|
24 | 24 | import org.elasticsearch.rest.RestResponse;
|
25 | 25 | import org.elasticsearch.rest.action.RestActionListener;
|
| 26 | +import org.elasticsearch.rest.action.RestCancellableNodeClient; |
26 | 27 | import org.elasticsearch.rest.action.RestResponseListener;
|
| 28 | +import org.elasticsearch.tasks.TaskCancelledException; |
27 | 29 |
|
28 | 30 | import java.util.List;
|
29 | 31 | import java.util.Map;
|
@@ -60,14 +62,19 @@ protected RestChannelConsumer doCatRequest(final RestRequest request, final Node
|
60 | 62 | clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));
|
61 | 63 | clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices);
|
62 | 64 |
|
63 |
| - return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) { |
| 65 | + final RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel()); |
| 66 | + |
| 67 | + return channel -> cancelClient.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) { |
64 | 68 | @Override
|
65 | 69 | public void processResponse(final ClusterStateResponse clusterStateResponse) {
|
66 | 70 | final IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest();
|
67 | 71 | indicesSegmentsRequest.indices(indices);
|
68 |
| - client.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) { |
| 72 | + cancelClient.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) { |
69 | 73 | @Override
|
70 | 74 | public RestResponse buildResponse(final IndicesSegmentResponse indicesSegmentResponse) throws Exception {
|
| 75 | + if (request.getHttpChannel().isOpen() == false) { |
| 76 | + throw new TaskCancelledException("response channel [" + request.getHttpChannel() + "] closed"); |
| 77 | + } |
71 | 78 | final Map<String, IndexSegments> indicesSegments = indicesSegmentResponse.getIndices();
|
72 | 79 | Table tab = buildTable(request, clusterStateResponse, indicesSegments);
|
73 | 80 | return RestTable.buildResponse(tab, channel);
|
|
0 commit comments