Skip to content

Commit 8270c80

Browse files
authored
Refactor TransportSingleShardAction to serialize Writeable responses (elastic#41985)
Previously, TransportSingleShardAction required constructing a new empty response object. This response object's Streamable readFrom was used. As part of the migration to Writeable, the interface here was updated to leverage Writeable.Reader. relates to elastic#34389.
1 parent 0531987 commit 8270c80

31 files changed

+276
-179
lines changed

modules/lang-painless/src/main/java/org/elasticsearch/painless/action/PainlessExecuteAction.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ private PainlessExecuteAction() {
101101

102102
@Override
103103
public Response newResponse() {
104-
return new Response();
104+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
105105
}
106106

107107
public static class Request extends SingleShardRequest<Request> implements ToXContentObject {
@@ -381,20 +381,22 @@ public static class Response extends ActionResponse implements ToXContentObject
381381

382382
private Object result;
383383

384-
Response() {}
385-
386384
Response(Object result) {
387385
this.result = result;
388386
}
389387

388+
Response(StreamInput in) throws IOException {
389+
super(in);
390+
result = in.readGenericValue();
391+
}
392+
390393
public Object getResult() {
391394
return result;
392395
}
393396

394397
@Override
395398
public void readFrom(StreamInput in) throws IOException {
396-
super.readFrom(in);
397-
result = in.readGenericValue();
399+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
398400
}
399401

400402
@Override
@@ -469,8 +471,8 @@ public TransportAction(ThreadPool threadPool, TransportService transportService,
469471
}
470472

471473
@Override
472-
protected Response newResponse() {
473-
return new Response();
474+
protected Writeable.Reader<Response> getResponseReader() {
475+
return Response::new;
474476
}
475477

476478
@Override

modules/lang-painless/src/test/java/org/elasticsearch/painless/action/PainlessExecuteResponseTests.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,57 @@
1818
*/
1919
package org.elasticsearch.painless.action;
2020

21-
import org.elasticsearch.test.AbstractStreamableTestCase;
21+
import org.elasticsearch.common.io.stream.Writeable;
22+
import org.elasticsearch.common.xcontent.XContentParser;
23+
import org.elasticsearch.test.AbstractSerializingTestCase;
2224

23-
public class PainlessExecuteResponseTests extends AbstractStreamableTestCase<PainlessExecuteAction.Response> {
25+
import java.io.IOException;
26+
27+
public class PainlessExecuteResponseTests extends AbstractSerializingTestCase<PainlessExecuteAction.Response> {
2428

2529
@Override
26-
protected PainlessExecuteAction.Response createBlankInstance() {
27-
return new PainlessExecuteAction.Response();
30+
protected Writeable.Reader<PainlessExecuteAction.Response> instanceReader() {
31+
return PainlessExecuteAction.Response::new;
2832
}
2933

3034
@Override
3135
protected PainlessExecuteAction.Response createTestInstance() {
32-
return new PainlessExecuteAction.Response(randomAlphaOfLength(10));
36+
Object result;
37+
switch (randomIntBetween(0, 2)) {
38+
case 0:
39+
result = randomAlphaOfLength(10);
40+
break;
41+
case 1:
42+
result = randomBoolean();
43+
break;
44+
case 2:
45+
result = randomDoubleBetween(-10, 10, true);
46+
break;
47+
default:
48+
throw new IllegalStateException("invalid branch");
49+
}
50+
return new PainlessExecuteAction.Response(result);
51+
}
52+
53+
@Override
54+
protected PainlessExecuteAction.Response doParseInstance(XContentParser parser) throws IOException {
55+
parser.nextToken(); // START-OBJECT
56+
parser.nextToken(); // FIELD-NAME
57+
XContentParser.Token token = parser.nextToken(); // result value
58+
Object result;
59+
switch (token) {
60+
case VALUE_STRING:
61+
result = parser.text();
62+
break;
63+
case VALUE_BOOLEAN:
64+
result = parser.booleanValue();
65+
break;
66+
case VALUE_NUMBER:
67+
result = parser.doubleValue();
68+
break;
69+
default:
70+
throw new IOException("invalid response");
71+
}
72+
return new PainlessExecuteAction.Response(result);
3373
}
3474
}

server/src/main/java/org/elasticsearch/action/admin/indices/analyze/AnalyzeResponse.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public int hashCode() {
7171
return Objects.hash(term, startOffset, endOffset, position, positionLength, attributes, type);
7272
}
7373

74-
public AnalyzeToken(String term, int position, int startOffset, int endOffset, int positionLength,
74+
AnalyzeToken(String term, int position, int startOffset, int endOffset, int positionLength,
7575
String type, Map<String, Object> attributes) {
7676
this.term = term;
7777
this.position = position;
@@ -82,7 +82,7 @@ public AnalyzeToken(String term, int position, int startOffset, int endOffset, i
8282
this.attributes = attributes;
8383
}
8484

85-
public AnalyzeToken(StreamInput in) throws IOException {
85+
AnalyzeToken(StreamInput in) throws IOException {
8686
term = in.readString();
8787
startOffset = in.readInt();
8888
endOffset = in.readInt();
@@ -203,7 +203,6 @@ public void writeTo(StreamOutput out) throws IOException {
203203
}
204204

205205
private final DetailAnalyzeResponse detail;
206-
207206
private final List<AnalyzeToken> tokens;
208207

209208
public AnalyzeResponse(List<AnalyzeToken> tokens, DetailAnalyzeResponse detail) {

server/src/main/java/org/elasticsearch/action/admin/indices/analyze/TransportAnalyzeAction.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,6 @@ public TransportAnalyzeAction(Settings settings, ThreadPool threadPool, ClusterS
9696
this.environment = environment;
9797
}
9898

99-
@Override
100-
protected AnalyzeResponse newResponse() {
101-
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
102-
}
103-
10499
@Override
105100
protected Writeable.Reader<AnalyzeResponse> getResponseReader() {
106101
return AnalyzeResponse::new;

server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.admin.indices.mapping.get;
2121

2222
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.common.io.stream.Writeable;
2324

2425
public class GetFieldMappingsAction extends Action<GetFieldMappingsResponse> {
2526

@@ -32,6 +33,11 @@ private GetFieldMappingsAction() {
3233

3334
@Override
3435
public GetFieldMappingsResponse newResponse() {
35-
return new GetFieldMappingsResponse();
36+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
37+
}
38+
39+
@Override
40+
public Writeable.Reader<GetFieldMappingsResponse> getResponseReader() {
41+
return GetFieldMappingsResponse::new;
3642
}
3743
}

server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetFieldMappingsResponse.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,33 @@ public class GetFieldMappingsResponse extends ActionResponse implements ToXConte
9292
this.mappings = mappings;
9393
}
9494

95+
9596
GetFieldMappingsResponse() {
9697
}
9798

99+
GetFieldMappingsResponse(StreamInput in) throws IOException {
100+
super(in);
101+
int size = in.readVInt();
102+
Map<String, Map<String, Map<String, FieldMappingMetaData>>> indexMapBuilder = new HashMap<>(size);
103+
for (int i = 0; i < size; i++) {
104+
String index = in.readString();
105+
int typesSize = in.readVInt();
106+
Map<String, Map<String, FieldMappingMetaData>> typeMapBuilder = new HashMap<>(typesSize);
107+
for (int j = 0; j < typesSize; j++) {
108+
String type = in.readString();
109+
int fieldSize = in.readVInt();
110+
Map<String, FieldMappingMetaData> fieldMapBuilder = new HashMap<>(fieldSize);
111+
for (int k = 0; k < fieldSize; k++) {
112+
fieldMapBuilder.put(in.readString(), new FieldMappingMetaData(in.readString(), in.readBytesReference()));
113+
}
114+
typeMapBuilder.put(type, unmodifiableMap(fieldMapBuilder));
115+
}
116+
indexMapBuilder.put(index, unmodifiableMap(typeMapBuilder));
117+
}
118+
mappings = unmodifiableMap(indexMapBuilder);
119+
120+
}
121+
98122
/** returns the retrieved field mapping. The return map keys are index, type, field (as specified in the request). */
99123
public Map<String, Map<String, Map<String, FieldMappingMetaData>>> mappings() {
100124
return mappings;
@@ -269,25 +293,7 @@ public int hashCode() {
269293

270294
@Override
271295
public void readFrom(StreamInput in) throws IOException {
272-
super.readFrom(in);
273-
int size = in.readVInt();
274-
Map<String, Map<String, Map<String, FieldMappingMetaData>>> indexMapBuilder = new HashMap<>(size);
275-
for (int i = 0; i < size; i++) {
276-
String index = in.readString();
277-
int typesSize = in.readVInt();
278-
Map<String, Map<String, FieldMappingMetaData>> typeMapBuilder = new HashMap<>(typesSize);
279-
for (int j = 0; j < typesSize; j++) {
280-
String type = in.readString();
281-
int fieldSize = in.readVInt();
282-
Map<String, FieldMappingMetaData> fieldMapBuilder = new HashMap<>(fieldSize);
283-
for (int k = 0; k < fieldSize; k++) {
284-
fieldMapBuilder.put(in.readString(), new FieldMappingMetaData(in.readString(), in.readBytesReference()));
285-
}
286-
typeMapBuilder.put(type, unmodifiableMap(fieldMapBuilder));
287-
}
288-
indexMapBuilder.put(index, unmodifiableMap(typeMapBuilder));
289-
}
290-
mappings = unmodifiableMap(indexMapBuilder);
296+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
291297
}
292298

293299
@Override

server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/TransportGetFieldMappingsIndexAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.service.ClusterService;
3333
import org.elasticsearch.common.bytes.BytesReference;
3434
import org.elasticsearch.common.inject.Inject;
35+
import org.elasticsearch.common.io.stream.Writeable;
3536
import org.elasticsearch.common.regex.Regex;
3637
import org.elasticsearch.common.xcontent.ToXContent;
3738
import org.elasticsearch.common.xcontent.XContentHelper;
@@ -123,8 +124,8 @@ protected GetFieldMappingsResponse shardOperation(final GetFieldMappingsIndexReq
123124
}
124125

125126
@Override
126-
protected GetFieldMappingsResponse newResponse() {
127-
return new GetFieldMappingsResponse();
127+
protected Writeable.Reader<GetFieldMappingsResponse> getResponseReader() {
128+
return GetFieldMappingsResponse::new;
128129
}
129130

130131
@Override

server/src/main/java/org/elasticsearch/action/explain/ExplainAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.explain;
2121

2222
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.common.io.stream.Writeable;
2324

2425
/**
2526
* Entry point for the explain feature.
@@ -35,6 +36,11 @@ private ExplainAction() {
3536

3637
@Override
3738
public ExplainResponse newResponse() {
38-
return new ExplainResponse();
39+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
40+
}
41+
42+
@Override
43+
public Writeable.Reader<ExplainResponse> getResponseReader() {
44+
return ExplainResponse::new;
3945
}
4046
}

server/src/main/java/org/elasticsearch/action/explain/ExplainResponse.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public class ExplainResponse extends ActionResponse implements StatusToXContentO
6060
private Explanation explanation;
6161
private GetResult getResult;
6262

63+
// TODO(talevy): remove dependency on empty constructor from ExplainResponseTests
6364
ExplainResponse() {
6465
}
6566

@@ -80,6 +81,20 @@ public ExplainResponse(String index, String type, String id, boolean exists, Exp
8081
this.getResult = getResult;
8182
}
8283

84+
public ExplainResponse(StreamInput in) throws IOException {
85+
super(in);
86+
index = in.readString();
87+
type = in.readString();
88+
id = in.readString();
89+
exists = in.readBoolean();
90+
if (in.readBoolean()) {
91+
explanation = readExplanation(in);
92+
}
93+
if (in.readBoolean()) {
94+
getResult = GetResult.readGetResult(in);
95+
}
96+
}
97+
8398
public String getIndex() {
8499
return index;
85100
}
@@ -123,17 +138,7 @@ public RestStatus status() {
123138

124139
@Override
125140
public void readFrom(StreamInput in) throws IOException {
126-
super.readFrom(in);
127-
index = in.readString();
128-
type = in.readString();
129-
id = in.readString();
130-
exists = in.readBoolean();
131-
if (in.readBoolean()) {
132-
explanation = readExplanation(in);
133-
}
134-
if (in.readBoolean()) {
135-
getResult = GetResult.readGetResult(in);
136-
}
141+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
137142
}
138143

139144
@Override

server/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.service.ClusterService;
3333
import org.elasticsearch.common.Strings;
3434
import org.elasticsearch.common.inject.Inject;
35+
import org.elasticsearch.common.io.stream.Writeable;
3536
import org.elasticsearch.common.lease.Releasables;
3637
import org.elasticsearch.index.IndexService;
3738
import org.elasticsearch.index.engine.Engine;
@@ -152,8 +153,8 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
152153
}
153154

154155
@Override
155-
protected ExplainResponse newResponse() {
156-
return new ExplainResponse();
156+
protected Writeable.Reader<ExplainResponse> getResponseReader() {
157+
return ExplainResponse::new;
157158
}
158159

159160
@Override

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.elasticsearch.cluster.routing.ShardsIterator;
2929
import org.elasticsearch.cluster.service.ClusterService;
3030
import org.elasticsearch.common.inject.Inject;
31+
import org.elasticsearch.common.io.stream.Writeable;
3132
import org.elasticsearch.index.mapper.MappedFieldType;
3233
import org.elasticsearch.index.mapper.MapperService;
3334
import org.elasticsearch.index.mapper.ObjectMapper;
@@ -114,8 +115,8 @@ protected FieldCapabilitiesIndexResponse shardOperation(final FieldCapabilitiesI
114115
}
115116

116117
@Override
117-
protected FieldCapabilitiesIndexResponse newResponse() {
118-
return new FieldCapabilitiesIndexResponse();
118+
protected Writeable.Reader<FieldCapabilitiesIndexResponse> getResponseReader() {
119+
return FieldCapabilitiesIndexResponse::new;
119120
}
120121

121122
@Override

server/src/main/java/org/elasticsearch/action/get/GetAction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.action.get;
2121

2222
import org.elasticsearch.action.Action;
23+
import org.elasticsearch.common.io.stream.Writeable;
2324

2425
public class GetAction extends Action<GetResponse> {
2526

@@ -32,6 +33,11 @@ private GetAction() {
3233

3334
@Override
3435
public GetResponse newResponse() {
35-
return new GetResponse();
36+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
37+
}
38+
39+
@Override
40+
public Writeable.Reader<GetResponse> getResponseReader() {
41+
return GetResponse::new;
3642
}
3743
}

server/src/main/java/org/elasticsearch/action/get/GetResponse.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ public class GetResponse extends ActionResponse implements Iterable<DocumentFiel
4848

4949
GetResult getResult;
5050

51-
GetResponse() {
51+
GetResponse(StreamInput in) throws IOException {
52+
super(in);
53+
getResult = GetResult.readGetResult(in);
5254
}
5355

5456
public GetResponse(GetResult getResult) {
@@ -203,8 +205,7 @@ public static GetResponse fromXContent(XContentParser parser) throws IOException
203205

204206
@Override
205207
public void readFrom(StreamInput in) throws IOException {
206-
super.readFrom(in);
207-
getResult = GetResult.readGetResult(in);
208+
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
208209
}
209210

210211
@Override

0 commit comments

Comments
 (0)