diff --git a/core/src/main/java/org/elasticsearch/ElasticsearchException.java b/core/src/main/java/org/elasticsearch/ElasticsearchException.java index 93576c790c44f..7830d521ff51d 100644 --- a/core/src/main/java/org/elasticsearch/ElasticsearchException.java +++ b/core/src/main/java/org/elasticsearch/ElasticsearchException.java @@ -101,7 +101,7 @@ public ElasticsearchException(String msg, Throwable cause, Object... args) { public ElasticsearchException(StreamInput in) throws IOException { super(in.readOptionalString(), in.readException()); readStackTrace(this, in); - headers.putAll(in.readMapOfLists()); + headers.putAll(in.readMapOfLists(StreamInput::readString, StreamInput::readString)); } /** @@ -196,7 +196,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(this.getMessage()); out.writeException(this.getCause()); writeStackTraces(this, out); - out.writeMapOfLists(headers); + out.writeMapOfLists(headers, StreamOutput::writeString, StreamOutput::writeString); } public static ElasticsearchException readException(StreamInput input, int id) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index 4c5f9757caa5d..794ed6f36fac7 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -431,27 +431,35 @@ public Map readMap(Writeable.Reader keyReader, Writeable.Reader< return map; } - @Nullable - @SuppressWarnings("unchecked") - public Map readMap() throws IOException { - return (Map) readGenericValue(); - } - /** - * Read a map of strings to string lists. + * Read a {@link Map} of {@code K}-type keys to {@code V}-type {@link List}s. + *

+     * Map<String, List<String>> map = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
+     * 
+ * + * @param keyReader The key reader + * @param valueReader The value reader + * @return Never {@code null}. */ - public Map> readMapOfLists() throws IOException { - int size = readVInt(); + public Map> readMapOfLists(final Writeable.Reader keyReader, final Writeable.Reader valueReader) + throws IOException { + final int size = readVInt(); if (size == 0) { return Collections.emptyMap(); } - Map> map = new HashMap<>(size); + final Map> map = new HashMap<>(size); for (int i = 0; i < size; ++i) { - map.put(readString(), readList(StreamInput::readString)); + map.put(keyReader.read(this), readList(valueReader)); } return map; } + @Nullable + @SuppressWarnings("unchecked") + public Map readMap() throws IOException { + return (Map) readGenericValue(); + } + @SuppressWarnings({"unchecked"}) @Nullable public Object readGenericValue() throws IOException { diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java index 24350936fa2cf..1176f54e88acc 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.geo.GeoPoint; +import org.elasticsearch.common.io.stream.Writeable.Writer; import org.elasticsearch.common.text.Text; import org.joda.time.DateTimeZone; import org.joda.time.ReadableInstant; @@ -413,25 +414,28 @@ public void writeMap(@Nullable Map map) throws IOException { } /** - * Writes a map of strings to string lists. + * Write a {@link Map} of {@code K}-type keys to {@code V}-type {@link List}s. + *

+     * Map<String, List<String>> map = ...;
+     * out.writeMapOfLists(map, StreamOutput::writeString, StreamOutput::writeString);
+     * 
+ * + * @param keyWriter The key writer + * @param valueWriter The value writer */ - public void writeMapOfLists(Map> map) throws IOException { + public void writeMapOfLists(final Map> map, final Writer keyWriter, final Writer valueWriter) + throws IOException { writeVInt(map.size()); - for (Map.Entry> entry : map.entrySet()) { - writeString(entry.getKey()); + for (final Map.Entry> entry : map.entrySet()) { + keyWriter.write(this, entry.getKey()); writeVInt(entry.getValue().size()); - for (String v : entry.getValue()) { - writeString(v); + for (final V value : entry.getValue()) { + valueWriter.write(this, value); } } } - @FunctionalInterface - interface Writer { - void write(StreamOutput o, Object value) throws IOException; - } - private static final Map, Writer> WRITERS; static { diff --git a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java index 16497533e29da..30607f3375909 100644 --- a/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java +++ b/core/src/main/java/org/elasticsearch/common/io/stream/Writeable.java @@ -25,26 +25,69 @@ * Implementers can be written to a {@linkplain StreamOutput} and read from a {@linkplain StreamInput}. This allows them to be "thrown * across the wire" using Elasticsearch's internal protocol. If the implementer also implements equals and hashCode then a copy made by * serializing and deserializing must be equal and have the same hashCode. It isn't required that such a copy be entirely unchanged. - * + *

* Prefer implementing this interface over implementing {@link Streamable} where possible. Lots of code depends on {@linkplain Streamable} * so this isn't always possible. */ public interface Writeable { + /** * Write this into the {@linkplain StreamOutput}. */ - void writeTo(StreamOutput out) throws IOException; + void writeTo(final StreamOutput out) throws IOException; + + /** + * Reference to a method that can write some object to a {@link StreamOutput}. + *

+ * By convention this is a method from {@link StreamOutput} itself (e.g., {@link StreamOutput#writeString}). If the value can be + * {@code null}, then the "optional" variant of methods should be used! + *

+ * Most classes should implement {@link Writeable} and the {@link Writeable#writeTo(StreamOutput)} method should use + * {@link StreamOutput} methods directly or this indirectly: + *


+     * public void writeTo(StreamOutput out) throws IOException {
+     *     out.writeVInt(someValue);
+     *     out.writeMapOfLists(someMap, StreamOutput::writeString, StreamOutput::writeString);
+     * }
+     * 
+ */ + @FunctionalInterface + interface Writer { + + /** + * Write {@code V}-type {@code value} to the {@code out}put stream. + * + * @param out Output to write the {@code value} too + * @param value The value to add + */ + void write(final StreamOutput out, final V value) throws IOException; + + } /** * Reference to a method that can read some object from a stream. By convention this is a constructor that takes * {@linkplain StreamInput} as an argument for most classes and a static method for things like enums. Returning null from one of these * is always wrong - for that we use methods like {@link StreamInput#readOptionalWriteable(Reader)}. + *

+ * As most classes will implement this via a constructor (or a static method in the case of enumerations), it's something that should + * look like: + *


+     * public MyClass(final StreamInput in) throws IOException {
+     *     this.someValue = in.readVInt();
+     *     this.someMap = in.readMapOfLists(StreamInput::readString, StreamInput::readString);
+     * }
+     * 
*/ @FunctionalInterface - interface Reader { + interface Reader { + /** - * Read R from a stream. + * Read {@code V}-type value from a stream. + * + * @param in Input to read the value from */ - R read(StreamInput in) throws IOException; + V read(final StreamInput in) throws IOException; + } + } diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java index bf1ef6a563f56..8c04c24ec5b6e 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/ThreadContext.java @@ -269,7 +269,7 @@ private ThreadContextStruct(StreamInput in) throws IOException { } this.requestHeaders = requestHeaders; - this.responseHeaders = in.readMapOfLists(); + this.responseHeaders = in.readMapOfLists(StreamInput::readString, StreamInput::readString); this.transientHeaders = Collections.emptyMap(); } @@ -370,7 +370,7 @@ private void writeTo(StreamOutput out, Map defaultHeaders) throw out.writeString(entry.getValue()); } - out.writeMapOfLists(responseHeaders); + out.writeMapOfLists(responseHeaders, StreamOutput::writeString, StreamOutput::writeString); } } diff --git a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java index 9e5beabd9b7db..52676444d3cab 100644 --- a/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java +++ b/core/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java @@ -30,7 +30,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -42,7 +41,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.startsWith; /** * Tests for {@link BytesStreamOutput} paging behaviour. @@ -462,11 +460,11 @@ public void testWriteMapOfLists() throws IOException { } final BytesStreamOutput out = new BytesStreamOutput(); - out.writeMapOfLists(expected); + out.writeMapOfLists(expected, StreamOutput::writeString, StreamOutput::writeString); final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes())); - final Map> loaded = in.readMapOfLists(); + final Map> loaded = in.readMapOfLists(StreamInput::readString, StreamInput::readString); assertThat(loaded.size(), equalTo(expected.size()));