Skip to content

Commit 76691bc

Browse files
authored
Simplify reading a list and converting it to a map from stream (#84183)
This commit adds readMapFromList to StreamInput. This method allows to efficiently read list from the stream and create a presized map from it without having to allocate intermediate collection.
1 parent 42b0fd9 commit 76691bc

File tree

10 files changed

+91
-77
lines changed

10 files changed

+91
-77
lines changed

docs/changelog/84183.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 84183
2+
summary: Simplify reading a list and converting it to a map from stream
3+
area: Infra/Core
4+
type: enhancement
5+
issues: []

server/src/main/java/org/elasticsearch/cluster/health/ClusterIndexHealth.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,13 +171,7 @@ public ClusterIndexHealth(final StreamInput in) throws IOException {
171171
initializingShards = in.readVInt();
172172
unassignedShards = in.readVInt();
173173
status = ClusterHealthStatus.readFrom(in);
174-
175-
int size = in.readVInt();
176-
shards = Maps.newMapWithExpectedSize(size);
177-
for (int i = 0; i < size; i++) {
178-
ClusterShardHealth shardHealth = new ClusterShardHealth(in);
179-
shards.put(shardHealth.getShardId(), shardHealth);
180-
}
174+
shards = in.readMapValues(ClusterShardHealth::new, ClusterShardHealth::getShardId);
181175
}
182176

183177
/**
@@ -263,7 +257,7 @@ public void writeTo(final StreamOutput out) throws IOException {
263257
out.writeVInt(initializingShards);
264258
out.writeVInt(unassignedShards);
265259
out.writeByte(status.value());
266-
out.writeCollection(shards.values());
260+
out.writeMapValues(shards);
267261
}
268262

269263
@Override

server/src/main/java/org/elasticsearch/cluster/health/ClusterStateHealth.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
import org.elasticsearch.common.io.stream.StreamInput;
1515
import org.elasticsearch.common.io.stream.StreamOutput;
1616
import org.elasticsearch.common.io.stream.Writeable;
17-
import org.elasticsearch.common.util.Maps;
1817
import org.elasticsearch.rest.RestStatus;
1918

2019
import java.io.IOException;
@@ -119,12 +118,7 @@ public ClusterStateHealth(final StreamInput in) throws IOException {
119118
numberOfNodes = in.readVInt();
120119
numberOfDataNodes = in.readVInt();
121120
status = ClusterHealthStatus.readFrom(in);
122-
int size = in.readVInt();
123-
indices = Maps.newMapWithExpectedSize(size);
124-
for (int i = 0; i < size; i++) {
125-
ClusterIndexHealth indexHealth = new ClusterIndexHealth(in);
126-
indices.put(indexHealth.getIndex(), indexHealth);
127-
}
121+
indices = in.readMapValues(ClusterIndexHealth::new, ClusterIndexHealth::getIndex);
128122
activeShardsPercent = in.readDouble();
129123
}
130124

@@ -210,10 +204,7 @@ public void writeTo(final StreamOutput out) throws IOException {
210204
out.writeVInt(numberOfNodes);
211205
out.writeVInt(numberOfDataNodes);
212206
out.writeByte(status.value());
213-
out.writeVInt(indices.size());
214-
for (ClusterIndexHealth indexHealth : this) {
215-
indexHealth.writeTo(out);
216-
}
207+
out.writeMapValues(indices);
217208
out.writeDouble(activeShardsPercent);
218209
}
219210

server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1086,13 +1086,8 @@ public static Metadata readFrom(StreamInput in) throws IOException {
10861086
}
10871087
final Function<String, MappingMetadata> mappingLookup;
10881088
if (in.getVersion().onOrAfter(MAPPINGS_AS_HASH_VERSION)) {
1089-
final int mappings = in.readVInt();
1090-
if (mappings > 0) {
1091-
final Map<String, MappingMetadata> mappingMetadataMap = new HashMap<>(mappings);
1092-
for (int i = 0; i < mappings; i++) {
1093-
final MappingMetadata m = new MappingMetadata(in);
1094-
mappingMetadataMap.put(m.getSha256(), m);
1095-
}
1089+
final Map<String, MappingMetadata> mappingMetadataMap = in.readMapValues(MappingMetadata::new, MappingMetadata::getSha256);
1090+
if (mappingMetadataMap.size() > 0) {
10961091
mappingLookup = mappingMetadataMap::get;
10971092
} else {
10981093
mappingLookup = null;
@@ -1131,7 +1126,7 @@ public void writeTo(StreamOutput out) throws IOException {
11311126
// Starting in #MAPPINGS_AS_HASH_VERSION we write the mapping metadata first and then write the indices without metadata so that
11321127
// we avoid writing duplicate mappings twice
11331128
if (out.getVersion().onOrAfter(MAPPINGS_AS_HASH_VERSION)) {
1134-
out.writeCollection(mappingsByHash.values());
1129+
out.writeMapValues(mappingsByHash);
11351130
}
11361131
out.writeVInt(indices.size());
11371132
final boolean writeMappingsHash = out.getVersion().onOrAfter(MAPPINGS_AS_HASH_VERSION);

server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.Map;
6868
import java.util.Set;
6969
import java.util.concurrent.TimeUnit;
70+
import java.util.function.Function;
7071
import java.util.function.IntFunction;
7172

7273
import static org.elasticsearch.ElasticsearchException.readStackTrace;
@@ -652,6 +653,26 @@ public <K, V> Map<K, List<V>> readMapOfLists(final Writeable.Reader<K> keyReader
652653
return map;
653654
}
654655

656+
/**
657+
* Reads a multiple {@code V}-values and then converts them to a {@code Map} using keyMapper.
658+
*
659+
* @param valueReader The value reader
660+
* @param keyMapper function to create a key from a value
661+
* @return Never {@code null}.
662+
*/
663+
public <K, V> Map<K, V> readMapValues(final Writeable.Reader<V> valueReader, final Function<V, K> keyMapper) throws IOException {
664+
final int size = readArraySize();
665+
if (size == 0) {
666+
return Map.of();
667+
}
668+
final Map<K, V> map = Maps.newMapWithExpectedSize(size);
669+
for (int i = 0; i < size; i++) {
670+
V value = valueReader.read(this);
671+
map.put(keyMapper.apply(value), value);
672+
}
673+
return map;
674+
}
675+
655676
/**
656677
* If the returned map contains any entries it will be mutable. If it is empty it might be immutable.
657678
*/

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,20 @@ public void writeMapWithConsistentOrder(@Nullable Map<String, ? extends Object>
545545
}
546546
}
547547

548+
/**
549+
* Writes values of a map as a collection
550+
*/
551+
public final <V> void writeMapValues(final Map<?, V> map, final Writer<V> valueWriter) throws IOException {
552+
writeCollection(map.values(), valueWriter);
553+
}
554+
555+
/**
556+
* Writes values of a map as a collection
557+
*/
558+
public final <V extends Writeable> void writeMapValues(final Map<?, V> map) throws IOException {
559+
writeMapValues(map, (o, v) -> v.writeTo(o));
560+
}
561+
548562
/**
549563
* Write a {@link Map} of {@code K}-type keys to {@code V}-type {@link List}s.
550564
* <pre><code>

server/src/main/java/org/elasticsearch/index/get/GetResult.java

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import org.elasticsearch.common.io.stream.StreamOutput;
1919
import org.elasticsearch.common.io.stream.Writeable;
2020
import org.elasticsearch.common.logging.DeprecationLogger;
21-
import org.elasticsearch.common.util.Maps;
2221
import org.elasticsearch.common.xcontent.XContentHelper;
2322
import org.elasticsearch.core.RestApiVersion;
2423
import org.elasticsearch.index.mapper.IgnoredFieldMapper;
@@ -80,8 +79,8 @@ public GetResult(StreamInput in) throws IOException {
8079
if (source.length() == 0) {
8180
source = null;
8281
}
83-
documentFields = readFields(in);
84-
metaFields = readFields(in);
82+
documentFields = in.readMapValues(DocumentField::new, DocumentField::getName);
83+
metaFields = in.readMapValues(DocumentField::new, DocumentField::getName);
8584
} else {
8685
metaFields = Collections.emptyMap();
8786
documentFields = Collections.emptyMap();
@@ -384,21 +383,6 @@ public static GetResult fromXContent(XContentParser parser) throws IOException {
384383
return fromXContentEmbedded(parser);
385384
}
386385

387-
private Map<String, DocumentField> readFields(StreamInput in) throws IOException {
388-
Map<String, DocumentField> fields;
389-
int size = in.readVInt();
390-
if (size == 0) {
391-
fields = emptyMap();
392-
} else {
393-
fields = Maps.newMapWithExpectedSize(size);
394-
for (int i = 0; i < size; i++) {
395-
DocumentField field = new DocumentField(in);
396-
fields.put(field.getName(), field);
397-
}
398-
}
399-
return fields;
400-
}
401-
402386
@Override
403387
public void writeTo(StreamOutput out) throws IOException {
404388
out.writeString(index);
@@ -412,19 +396,8 @@ public void writeTo(StreamOutput out) throws IOException {
412396
out.writeBoolean(exists);
413397
if (exists) {
414398
out.writeBytesReference(source);
415-
writeFields(out, documentFields);
416-
writeFields(out, metaFields);
417-
}
418-
}
419-
420-
private void writeFields(StreamOutput out, Map<String, DocumentField> fields) throws IOException {
421-
if (fields == null) {
422-
out.writeVInt(0);
423-
} else {
424-
out.writeVInt(fields.size());
425-
for (DocumentField field : fields.values()) {
426-
field.writeTo(out);
427-
}
399+
out.writeMapValues(documentFields);
400+
out.writeMapValues(metaFields);
428401
}
429402
}
430403

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@
5252
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
5353
import org.elasticsearch.common.settings.Setting;
5454
import org.elasticsearch.common.settings.Setting.Property;
55-
import org.elasticsearch.common.util.Maps;
5655
import org.elasticsearch.core.AbstractRefCounted;
5756
import org.elasticsearch.core.Nullable;
5857
import org.elasticsearch.core.RefCounted;
@@ -780,16 +779,11 @@ public MetadataSnapshot(Map<String, StoreFileMetadata> metadata, Map<String, Str
780779
}
781780

782781
public static MetadataSnapshot readFrom(StreamInput in) throws IOException {
783-
final int metadataSize = in.readVInt();
784-
final Map<String, StoreFileMetadata> metadata = metadataSize == 0 ? emptyMap() : Maps.newMapWithExpectedSize(metadataSize);
785-
for (int i = 0; i < metadataSize; i++) {
786-
final var storeFileMetadata = new StoreFileMetadata(in);
787-
metadata.put(storeFileMetadata.name(), storeFileMetadata);
788-
}
782+
final Map<String, StoreFileMetadata> metadata = in.readMapValues(StoreFileMetadata::new, StoreFileMetadata::name);
789783
final var commitUserData = in.readMap(StreamInput::readString, StreamInput::readString);
790784
final var numDocs = in.readLong();
791785

792-
if (metadataSize == 0 && commitUserData.size() == 0 && numDocs == 0) {
786+
if (metadata.size() == 0 && commitUserData.size() == 0 && numDocs == 0) {
793787
return MetadataSnapshot.EMPTY;
794788
} else {
795789
return new MetadataSnapshot(metadata, commitUserData, numDocs);
@@ -798,15 +792,8 @@ public static MetadataSnapshot readFrom(StreamInput in) throws IOException {
798792

799793
@Override
800794
public void writeTo(StreamOutput out) throws IOException {
801-
out.writeVInt(this.metadata.size());
802-
for (StoreFileMetadata meta : this) {
803-
meta.writeTo(out);
804-
}
805-
out.writeVInt(commitUserData.size());
806-
for (Map.Entry<String, String> entry : commitUserData.entrySet()) {
807-
out.writeString(entry.getKey());
808-
out.writeString(entry.getValue());
809-
}
795+
out.writeMapValues(metadata);
796+
out.writeMap(commitUserData, StreamOutput::writeString, StreamOutput::writeString);
810797
out.writeLong(numDocs);
811798
}
812799

server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -498,7 +498,7 @@ public void testWriteWriteableList() throws IOException {
498498

499499
public void testWriteMap() throws IOException {
500500
final int size = randomIntBetween(0, 100);
501-
final Map<String, String> expected = Maps.newMapWithExpectedSize(randomIntBetween(0, 100));
501+
final Map<String, String> expected = Maps.newMapWithExpectedSize(size);
502502
for (int i = 0; i < size; ++i) {
503503
expected.put(randomAlphaOfLength(2), randomAlphaOfLength(5));
504504
}
@@ -586,6 +586,23 @@ public void testWriteMapOfLists() throws IOException {
586586
out.close();
587587
}
588588

589+
public void testWriteMapAsList() throws IOException {
590+
final int size = randomIntBetween(0, 100);
591+
final Map<String, String> expected = Maps.newMapWithExpectedSize(size);
592+
for (int i = 0; i < size; ++i) {
593+
final String value = randomAlphaOfLength(5);
594+
expected.put("key_" + value, value);
595+
}
596+
597+
final BytesStreamOutput out = new BytesStreamOutput();
598+
out.writeMapValues(expected, StreamOutput::writeString);
599+
final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
600+
final Map<String, String> loaded = in.readMapValues(StreamInput::readString, value -> "key_" + value);
601+
602+
assertThat(loaded.size(), equalTo(expected.size()));
603+
assertThat(expected, equalTo(loaded));
604+
}
605+
589606
private abstract static class BaseNamedWriteable implements NamedWriteable {
590607

591608
}

server/src/test/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutputTests.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,7 @@ public void testWriteWriteableList() throws IOException {
503503

504504
public void testWriteMap() throws IOException {
505505
final int size = randomIntBetween(0, 100);
506-
final Map<String, String> expected = Maps.newMapWithExpectedSize(randomIntBetween(0, 100));
506+
final Map<String, String> expected = Maps.newMapWithExpectedSize(size);
507507
for (int i = 0; i < size; ++i) {
508508
expected.put(randomAlphaOfLength(2), randomAlphaOfLength(5));
509509
}
@@ -591,6 +591,23 @@ public void testWriteMapOfLists() throws IOException {
591591
out.close();
592592
}
593593

594+
public void testWriteMapAsList() throws IOException {
595+
final int size = randomIntBetween(0, 100);
596+
final Map<String, String> expected = Maps.newMapWithExpectedSize(size);
597+
for (int i = 0; i < size; ++i) {
598+
final String value = randomAlphaOfLength(5);
599+
expected.put("key_" + value, value);
600+
}
601+
602+
final RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
603+
out.writeMapValues(expected, StreamOutput::writeString);
604+
final StreamInput in = StreamInput.wrap(BytesReference.toBytes(out.bytes()));
605+
final Map<String, String> loaded = in.readMapValues(StreamInput::readString, value -> "key_" + value);
606+
607+
assertThat(loaded.size(), equalTo(expected.size()));
608+
assertThat(expected, equalTo(loaded));
609+
}
610+
594611
private abstract static class BaseNamedWriteable implements NamedWriteable {
595612

596613
}

0 commit comments

Comments
 (0)