Skip to content

Commit e742e1b

Browse files
Deduplicate Strings in REST Bulk Request Parsing (#56506)
We can save a little memory here since these strings might live for quite a while on the coordinating node.
1 parent 92fcbd3 commit e742e1b

File tree

2 files changed

+31
-4
lines changed

2 files changed

+31
-4
lines changed

server/src/main/java/org/elasticsearch/action/bulk/BulkRequestParser.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,11 @@
3737

3838
import java.io.IOException;
3939
import java.io.InputStream;
40+
import java.util.HashMap;
41+
import java.util.Map;
4042
import java.util.function.BiConsumer;
4143
import java.util.function.Consumer;
44+
import java.util.function.Function;
4245

4346
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
4447

@@ -115,6 +118,10 @@ public void parse(
115118
int line = 0;
116119
int from = 0;
117120
byte marker = xContent.streamSeparator();
121+
// Bulk requests can contain a lot of repeated strings for the index, pipeline and routing parameters. This map is used to
122+
// deduplicate duplicate strings parsed for these parameters. While it does not prevent instantiating the duplicate strings, it
123+
// reduces their lifetime to the lifetime of this parse call instead of the lifetime of the full bulk request.
124+
final Map<String, String> stringDeduplicator = new HashMap<>();
118125
while (true) {
119126
int nextMarker = findNextMarker(marker, from, data);
120127
if (nextMarker == -1) {
@@ -174,17 +181,17 @@ public void parse(
174181
if (!allowExplicitIndex) {
175182
throw new IllegalArgumentException("explicit index in bulk is not allowed");
176183
}
177-
index = parser.text();
184+
index = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
178185
} else if (TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
179186
if (errorOnType) {
180187
throw new IllegalArgumentException("Action/metadata line [" + line + "] contains an unknown parameter ["
181188
+ currentFieldName + "]");
182189
}
183-
type = parser.text();
190+
type = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
184191
} else if (ID.match(currentFieldName, parser.getDeprecationHandler())) {
185192
id = parser.text();
186193
} else if (ROUTING.match(currentFieldName, parser.getDeprecationHandler())) {
187-
routing = parser.text();
194+
routing = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
188195
} else if (OP_TYPE.match(currentFieldName, parser.getDeprecationHandler())) {
189196
opType = parser.text();
190197
} else if (VERSION.match(currentFieldName, parser.getDeprecationHandler())) {
@@ -198,7 +205,7 @@ public void parse(
198205
} else if (RETRY_ON_CONFLICT.match(currentFieldName, parser.getDeprecationHandler())) {
199206
retryOnConflict = parser.intValue();
200207
} else if (PIPELINE.match(currentFieldName, parser.getDeprecationHandler())) {
201-
pipeline = parser.text();
208+
pipeline = stringDeduplicator.computeIfAbsent(parser.text(), Function.identity());
202209
} else if (SOURCE.match(currentFieldName, parser.getDeprecationHandler())) {
203210
fetchSourceContext = FetchSourceContext.fromXContent(parser);
204211
} else {

server/src/test/java/org/elasticsearch/action/bulk/BulkRequestParserTests.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22+
import org.elasticsearch.action.index.IndexRequest;
2223
import org.elasticsearch.common.bytes.BytesArray;
2324
import org.elasticsearch.common.xcontent.XContentType;
2425
import org.elasticsearch.test.ESTestCase;
26+
import org.hamcrest.Matchers;
2527

2628
import java.io.IOException;
29+
import java.util.ArrayList;
30+
import java.util.List;
2731
import java.util.concurrent.atomic.AtomicBoolean;
2832

2933
public class BulkRequestParserTests extends ESTestCase {
@@ -109,4 +113,20 @@ public void testTypesStillParsedForBulkMonitoring() throws IOException {
109113
assertTrue(parsed.get());
110114
}
111115

116+
public void testParseDeduplicatesParameterStrings() throws IOException {
117+
BytesArray request = new BytesArray(
118+
"{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\"} }\n{}\n"
119+
+ "{ \"index\":{ \"_index\": \"bar\", \"pipeline\": \"foo\", \"routing\": \"blub\" } }\n{}\n");
120+
BulkRequestParser parser = new BulkRequestParser(randomBoolean());
121+
final List<IndexRequest> indexRequests = new ArrayList<>();
122+
parser.parse(request, null, null, null, null, true, XContentType.JSON,
123+
(indexRequest, type) -> indexRequests.add(indexRequest),
124+
req -> fail(), req -> fail());
125+
assertThat(indexRequests, Matchers.hasSize(2));
126+
final IndexRequest first = indexRequests.get(0);
127+
final IndexRequest second = indexRequests.get(1);
128+
assertSame(first.index(), second.index());
129+
assertSame(first.getPipeline(), second.getPipeline());
130+
assertSame(first.routing(), second.routing());
131+
}
112132
}

0 commit comments

Comments
 (0)