Skip to content

Commit 7197172

Browse files
committed
[reindex] Properly register status
Without this commit fetching the status of a reindex from a node that isn't coordinating the reindex will fail. This commit properly registers reindex's status so this doesn't happen. To do so it moves all task status registration into NetworkModule and creates a method to register other statuses which the reindex plugin calls.
1 parent d83e120 commit 7197172

File tree

13 files changed

+107
-34
lines changed

13 files changed

+107
-34
lines changed

core/src/main/java/org/elasticsearch/common/network/NetworkModule.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Arrays;
2323
import java.util.List;
2424

25+
import org.elasticsearch.action.support.replication.ReplicationTask;
2526
import org.elasticsearch.client.transport.TransportClientNodesService;
2627
import org.elasticsearch.client.transport.support.TransportProxyClient;
2728
import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -139,6 +140,7 @@
139140
import org.elasticsearch.rest.action.termvectors.RestMultiTermVectorsAction;
140141
import org.elasticsearch.rest.action.termvectors.RestTermVectorsAction;
141142
import org.elasticsearch.rest.action.update.RestUpdateAction;
143+
import org.elasticsearch.tasks.Task;
142144
import org.elasticsearch.transport.Transport;
143145
import org.elasticsearch.transport.TransportService;
144146
import org.elasticsearch.transport.local.LocalTransport;
@@ -326,6 +328,7 @@ public NetworkModule(NetworkService networkService, Settings settings, boolean t
326328
registerTransportService(NETTY_TRANSPORT, TransportService.class);
327329
registerTransport(LOCAL_TRANSPORT, LocalTransport.class);
328330
registerTransport(NETTY_TRANSPORT, NettyTransport.class);
331+
registerTaskStatus(ReplicationTask.Status.PROTOTYPE);
329332

330333
if (transportClient == false) {
331334
registerHttpTransport(NETTY_TRANSPORT, NettyHttpServerTransport.class);
@@ -371,6 +374,10 @@ public void registerRestHandler(Class<? extends RestHandler> clazz) {
371374
}
372375
}
373376

377+
public void registerTaskStatus(Task.Status prototype) {
378+
namedWriteableRegistry.registerPrototype(Task.Status.class, prototype);
379+
}
380+
374381
@Override
375382
protected void configure() {
376383
bind(NetworkService.class).toInstance(networkService);

core/src/main/java/org/elasticsearch/transport/TransportService.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,11 @@
2020
package org.elasticsearch.transport;
2121

2222
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
23-
import org.elasticsearch.action.support.replication.ReplicationTask;
2423
import org.elasticsearch.cluster.node.DiscoveryNode;
2524
import org.elasticsearch.common.Strings;
2625
import org.elasticsearch.common.collect.MapBuilder;
2726
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2827
import org.elasticsearch.common.inject.Inject;
29-
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
3028
import org.elasticsearch.common.logging.ESLogger;
3129
import org.elasticsearch.common.logging.Loggers;
3230
import org.elasticsearch.common.metrics.MeanMetric;
@@ -43,7 +41,6 @@
4341
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
4442
import org.elasticsearch.common.util.concurrent.FutureUtils;
4543
import org.elasticsearch.common.util.concurrent.ThreadContext;
46-
import org.elasticsearch.tasks.Task;
4744
import org.elasticsearch.tasks.TaskManager;
4845
import org.elasticsearch.threadpool.ThreadPool;
4946

@@ -113,11 +110,11 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
113110
volatile DiscoveryNode localNode = null;
114111

115112
public TransportService(Transport transport, ThreadPool threadPool) {
116-
this(EMPTY_SETTINGS, transport, threadPool, new NamedWriteableRegistry());
113+
this(EMPTY_SETTINGS, transport, threadPool);
117114
}
118115

119116
@Inject
120-
public TransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
117+
public TransportService(Settings settings, Transport transport, ThreadPool threadPool) {
121118
super(settings);
122119
this.transport = transport;
123120
this.threadPool = threadPool;
@@ -126,7 +123,6 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
126123
tracerLog = Loggers.getLogger(logger, ".tracer");
127124
adapter = createAdapter();
128125
taskManager = createTaskManager();
129-
namedWriteableRegistry.registerPrototype(Task.Status.class, ReplicationTask.Status.PROTOTYPE);
130126
}
131127

132128
/**

core/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public static class TestNode implements Releasable {
183183
public TestNode(String name, ThreadPool threadPool, Settings settings) {
184184
transportService = new TransportService(settings,
185185
new LocalTransport(settings, threadPool, Version.CURRENT, new NamedWriteableRegistry()),
186-
threadPool, new NamedWriteableRegistry()) {
186+
threadPool) {
187187
@Override
188188
protected TaskManager createTaskManager() {
189189
if (MockTaskManager.USE_MOCK_TASK_MANAGER_SETTING.get(settings)) {

core/src/test/java/org/elasticsearch/client/transport/TransportClientHeadersTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,8 @@ public Settings additionalSettings() {
128128
CountDownLatch clusterStateLatch = new CountDownLatch(1);
129129

130130
@Inject
131-
public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
132-
super(settings, transport, threadPool, namedWriteableRegistry);
131+
public InternalTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
132+
super(settings, transport, threadPool);
133133
}
134134

135135
@Override @SuppressWarnings("unchecked")

core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ protected TestResponse newResponse() {
7171
return new TestResponse();
7272
}
7373
};
74-
transportService = new TransportService(Settings.EMPTY, transport, threadPool, new NamedWriteableRegistry());
74+
transportService = new TransportService(Settings.EMPTY, transport, threadPool);
7575
transportService.start();
7676
transportService.acceptIncomingRequests();
7777
transportClientNodesService = new TransportClientNodesService(Settings.EMPTY, ClusterName.DEFAULT, transportService, threadPool, Version.CURRENT);

core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java

Lines changed: 57 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,18 @@
1919

2020
package org.elasticsearch.common.network;
2121

22+
import org.elasticsearch.action.support.replication.ReplicationTask;
2223
import org.elasticsearch.client.Client;
2324
import org.elasticsearch.common.Table;
2425
import org.elasticsearch.common.component.AbstractLifecycleComponent;
2526
import org.elasticsearch.common.inject.ModuleTestCase;
27+
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
28+
import org.elasticsearch.common.io.stream.StreamInput;
29+
import org.elasticsearch.common.io.stream.StreamOutput;
2630
import org.elasticsearch.common.settings.Settings;
2731
import org.elasticsearch.common.transport.BoundTransportAddress;
32+
import org.elasticsearch.common.xcontent.XContentBuilder;
33+
import org.elasticsearch.common.xcontent.ToXContent.Params;
2834
import org.elasticsearch.http.HttpInfo;
2935
import org.elasticsearch.http.HttpServerAdapter;
3036
import org.elasticsearch.http.HttpServerTransport;
@@ -36,10 +42,16 @@
3642
import org.elasticsearch.rest.action.cat.AbstractCatAction;
3743
import org.elasticsearch.rest.action.cat.RestNodesAction;
3844
import org.elasticsearch.rest.action.main.RestMainAction;
45+
import org.elasticsearch.tasks.Task;
46+
import org.elasticsearch.tasks.Task.Status;
3947
import org.elasticsearch.test.transport.AssertingLocalTransport;
4048
import org.elasticsearch.transport.Transport;
4149
import org.elasticsearch.transport.TransportService;
4250

51+
import java.io.IOException;
52+
53+
import static org.hamcrest.Matchers.sameInstance;
54+
4355
public class NetworkModuleTests extends ModuleTestCase {
4456

4557
static class FakeTransportService extends TransportService {
@@ -104,36 +116,36 @@ protected Table getTableWithHeader(RestRequest request) {
104116

105117
public void testRegisterTransportService() {
106118
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, "custom").build();
107-
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
119+
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
108120
module.registerTransportService("custom", FakeTransportService.class);
109121
assertBinding(module, TransportService.class, FakeTransportService.class);
110122

111123
// check it works with transport only as well
112-
module = new NetworkModule(new NetworkService(settings), settings, true, null);
124+
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
113125
module.registerTransportService("custom", FakeTransportService.class);
114126
assertBinding(module, TransportService.class, FakeTransportService.class);
115127
}
116128

117129
public void testRegisterTransport() {
118130
Settings settings = Settings.builder().put(NetworkModule.TRANSPORT_TYPE_KEY, "custom").build();
119-
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
131+
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
120132
module.registerTransport("custom", FakeTransport.class);
121133
assertBinding(module, Transport.class, FakeTransport.class);
122134

123135
// check it works with transport only as well
124-
module = new NetworkModule(new NetworkService(settings), settings, true, null);
136+
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
125137
module.registerTransport("custom", FakeTransport.class);
126138
assertBinding(module, Transport.class, FakeTransport.class);
127139
}
128140

129141
public void testRegisterHttpTransport() {
130142
Settings settings = Settings.builder().put(NetworkModule.HTTP_TYPE_SETTING.getKey(), "custom").build();
131-
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
143+
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
132144
module.registerHttpTransport("custom", FakeHttpTransport.class);
133145
assertBinding(module, HttpServerTransport.class, FakeHttpTransport.class);
134146

135147
// check registration not allowed for transport only
136-
module = new NetworkModule(new NetworkService(settings), settings, true, null);
148+
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
137149
try {
138150
module.registerHttpTransport("custom", FakeHttpTransport.class);
139151
fail();
@@ -144,19 +156,19 @@ public void testRegisterHttpTransport() {
144156

145157
// not added if http is disabled
146158
settings = Settings.builder().put(NetworkModule.HTTP_ENABLED.getKey(), false).build();
147-
module = new NetworkModule(new NetworkService(settings), settings, false, null);
159+
module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
148160
assertNotBound(module, HttpServerTransport.class);
149161
}
150162

151163
public void testRegisterRestHandler() {
152164
Settings settings = Settings.EMPTY;
153-
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
165+
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
154166
module.registerRestHandler(FakeRestHandler.class);
155167
// also check a builtin is bound
156168
assertSetMultiBinding(module, RestHandler.class, FakeRestHandler.class, RestMainAction.class);
157169

158170
// check registration not allowed for transport only
159-
module = new NetworkModule(new NetworkService(settings), settings, true, null);
171+
module = new NetworkModule(new NetworkService(settings), settings, true, new NamedWriteableRegistry());
160172
try {
161173
module.registerRestHandler(FakeRestHandler.class);
162174
fail();
@@ -168,9 +180,44 @@ public void testRegisterRestHandler() {
168180

169181
public void testRegisterCatRestHandler() {
170182
Settings settings = Settings.EMPTY;
171-
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, null);
183+
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, new NamedWriteableRegistry());
172184
module.registerRestHandler(FakeCatRestHandler.class);
173185
// also check a builtin is bound
174186
assertSetMultiBinding(module, AbstractCatAction.class, FakeCatRestHandler.class, RestNodesAction.class);
175187
}
188+
189+
public void testRegisterTaskStatus() {
190+
NamedWriteableRegistry registry = new NamedWriteableRegistry();
191+
Settings settings = Settings.EMPTY;
192+
NetworkModule module = new NetworkModule(new NetworkService(settings), settings, false, registry);
193+
194+
// Builtin prototype comes back
195+
assertNotNull(registry.getPrototype(Task.Status.class, ReplicationTask.Status.PROTOTYPE.getWriteableName()));
196+
197+
Task.Status dummy = new DummyTaskStatus();
198+
module.registerTaskStatus(dummy);
199+
assertThat(registry.getPrototype(Task.Status.class, "dummy"), sameInstance(dummy));
200+
}
201+
202+
private class DummyTaskStatus implements Task.Status {
203+
@Override
204+
public String getWriteableName() {
205+
return "dummy";
206+
}
207+
208+
@Override
209+
public Status readFrom(StreamInput in) throws IOException {
210+
throw new UnsupportedOperationException();
211+
}
212+
213+
@Override
214+
public void writeTo(StreamOutput out) throws IOException {
215+
throw new UnsupportedOperationException();
216+
}
217+
218+
@Override
219+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
220+
throw new UnsupportedOperationException();
221+
}
222+
}
176223
}

core/src/test/java/org/elasticsearch/discovery/ZenFaultDetectionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ public void tearDown() throws Exception {
106106
protected MockTransportService build(Settings settings, Version version) {
107107
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
108108
MockTransportService transportService = new MockTransportService(Settings.EMPTY,
109-
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool, namedWriteableRegistry);
109+
new LocalTransport(settings, threadPool, version, namedWriteableRegistry), threadPool);
110110
transportService.start();
111111
transportService.acceptIncomingRequests();
112112
return transportService;

core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,8 @@ public FakeTransport(Settings settings, ThreadPool threadPool, Version version,
4141

4242
static class FakeTransportService extends TransportService {
4343
@Inject
44-
public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
45-
super(settings, transport, threadPool, namedWriteableRegistry);
44+
public FakeTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
45+
super(settings, transport, threadPool);
4646
}
4747
}
4848
}

core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ public void testScheduledPing() throws Exception {
5454

5555
NamedWriteableRegistry registryA = new NamedWriteableRegistry();
5656
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA);
57-
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool, registryA);
57+
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
5858
serviceA.start();
5959
serviceA.acceptIncomingRequests();
6060

6161
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
6262
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB);
63-
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool, registryB);
63+
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
6464
serviceB.start();
6565
serviceB.acceptIncomingRequests();
6666

modules/lang-groovy/src/test/java/org/elasticsearch/messy/tests/IndicesRequestTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -785,8 +785,8 @@ public void onModule(NetworkModule module) {
785785
private final Map<String, List<TransportRequest>> requests = new HashMap<>();
786786

787787
@Inject
788-
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool, NamedWriteableRegistry namedWriteableRegistry) {
789-
super(settings, transport, threadPool, namedWriteableRegistry);
788+
public InterceptingTransportService(Settings settings, Transport transport, ThreadPool threadPool) {
789+
super(settings, transport, threadPool);
790790
}
791791

792792
synchronized List<TransportRequest> consumeRequests(String action) {

modules/reindex/src/main/java/org/elasticsearch/index/reindex/ReindexPlugin.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,9 @@ public void onModule(ActionModule actionModule) {
4141
actionModule.registerAction(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class);
4242
}
4343

44-
public void onModule(NetworkModule restModule) {
45-
restModule.registerRestHandler(RestReindexAction.class);
46-
restModule.registerRestHandler(RestUpdateByQueryAction.class);
44+
public void onModule(NetworkModule networkModule) {
45+
networkModule.registerRestHandler(RestReindexAction.class);
46+
networkModule.registerRestHandler(RestUpdateByQueryAction.class);
47+
networkModule.registerTaskStatus(BulkByScrollTask.Status.PROTOTYPE);
4748
}
4849
}

modules/reindex/src/test/java/org/elasticsearch/index/reindex/CancelTestUtils.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,10 @@
2121

2222
import org.elasticsearch.action.ActionResponse;
2323
import org.elasticsearch.action.ListenableActionFuture;
24+
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
25+
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
2426
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskInfo;
27+
import org.elasticsearch.index.reindex.BulkByScrollTask.Status;
2528
import org.elasticsearch.plugins.Plugin;
2629
import org.elasticsearch.script.ExecutableScript;
2730
import org.elasticsearch.script.NativeScriptFactory;
@@ -41,7 +44,10 @@
4144

4245
import static java.util.Collections.emptyMap;
4346
import static org.elasticsearch.test.ESIntegTestCase.client;
47+
import static org.hamcrest.Matchers.empty;
4448
import static org.hamcrest.Matchers.hasSize;
49+
import static org.junit.Assert.assertEquals;
50+
import static org.junit.Assert.assertNull;
4551
import static org.junit.Assert.assertThat;
4652

4753
/**
@@ -76,10 +82,26 @@ Response testCancel(ESIntegTestCase test, Builder request, String actionToCancel
7682
// Wait until the script is on the second document.
7783
barrier.await(30, TimeUnit.SECONDS);
7884

85+
// Status should show running
86+
ListTasksResponse tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get();
87+
assertThat(tasksList.getNodeFailures(), empty());
88+
assertThat(tasksList.getTaskFailures(), empty());
89+
assertThat(tasksList.getTasks(), hasSize(1));
90+
BulkByScrollTask.Status status = (Status) tasksList.getTasks().get(0).getStatus();
91+
assertNull(status.getReasonCancelled());
92+
7993
// Cancel the request while the script is running. This will prevent the request from being sent at all.
8094
List<TaskInfo> cancelledTasks = client().admin().cluster().prepareCancelTasks().setActions(actionToCancel).get().getTasks();
8195
assertThat(cancelledTasks, hasSize(1));
8296

97+
// The status should now show canceled. The request will still be in the list because the script is still blocked.
98+
tasksList = client().admin().cluster().prepareListTasks().setActions(actionToCancel).setDetailed(true).get();
99+
assertThat(tasksList.getNodeFailures(), empty());
100+
assertThat(tasksList.getTaskFailures(), empty());
101+
assertThat(tasksList.getTasks(), hasSize(1));
102+
status = (Status) tasksList.getTasks().get(0).getStatus();
103+
assertEquals(CancelTasksRequest.DEFAULT_REASON, status.getReasonCancelled());
104+
83105
// Now let the next document through. It won't be sent because the request is cancelled but we need to unblock the script.
84106
barrier.await();
85107

0 commit comments

Comments
 (0)