Skip to content

Commit da976d2

Browse files
authored
Improve robustness of Query Result serializations (elastic#54692) (elastic#55028)
Makes query result serialization more robust by propagating possible IOExceptions that can occur during shard level result serialization to the caller instead of throwing AssertionError that is not intercepted. Fixes elastic#54665
1 parent 17101d8 commit da976d2

File tree

5 files changed

+21
-15
lines changed

5 files changed

+21
-15
lines changed

server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -808,7 +808,7 @@ public void writeGenericValue(@Nullable Object value) throws IOException {
808808
if (writer != null) {
809809
writer.write(this, value);
810810
} else {
811-
throw new IOException("can not write type [" + type + "]");
811+
throw new IllegalArgumentException("can not write type [" + type + "]");
812812
}
813813
}
814814

server/src/main/java/org/elasticsearch/indices/IndicesRequestCache.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.lucene.index.IndexReader;
2929
import org.apache.lucene.util.Accountable;
3030
import org.apache.lucene.util.RamUsageEstimator;
31+
import org.elasticsearch.common.CheckedSupplier;
3132
import org.elasticsearch.common.bytes.BytesReference;
3233
import org.elasticsearch.common.cache.Cache;
3334
import org.elasticsearch.common.cache.CacheBuilder;
@@ -43,6 +44,7 @@
4344
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4445

4546
import java.io.Closeable;
47+
import java.io.IOException;
4648
import java.util.Collection;
4749
import java.util.Collections;
4850
import java.util.Iterator;
@@ -115,8 +117,8 @@ public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
115117
// NORELEASE The cacheKeyRenderer has been added in order to debug
116118
// https://github.com/elastic/elasticsearch/issues/32827, it should be
117119
// removed when this issue is solved
118-
BytesReference getOrCompute(CacheEntity cacheEntity, Supplier<BytesReference> loader,
119-
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
120+
BytesReference getOrCompute(CacheEntity cacheEntity, CheckedSupplier<BytesReference, IOException> loader,
121+
DirectoryReader reader, BytesReference cacheKey, Supplier<String> cacheKeyRenderer) throws Exception {
120122
assert reader.getReaderCacheHelper() != null;
121123
final Key key = new Key(cacheEntity, reader.getReaderCacheHelper().getKey(), cacheKey);
122124
Loader cacheLoader = new Loader(cacheEntity, loader);
@@ -157,10 +159,10 @@ void invalidate(CacheEntity cacheEntity, DirectoryReader reader, BytesReference
157159
private static class Loader implements CacheLoader<Key, BytesReference> {
158160

159161
private final CacheEntity entity;
160-
private final Supplier<BytesReference> loader;
162+
private final CheckedSupplier<BytesReference, IOException> loader;
161163
private boolean loaded;
162164

163-
Loader(CacheEntity entity, Supplier<BytesReference> loader) {
165+
Loader(CacheEntity entity, CheckedSupplier<BytesReference, IOException> loader) {
164166
this.entity = entity;
165167
this.loader = loader;
166168
}

server/src/main/java/org/elasticsearch/indices/IndicesService.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@
4545
import org.elasticsearch.cluster.routing.RecoverySource;
4646
import org.elasticsearch.cluster.routing.ShardRouting;
4747
import org.elasticsearch.cluster.service.ClusterService;
48+
import org.elasticsearch.common.CheckedConsumer;
4849
import org.elasticsearch.common.CheckedFunction;
50+
import org.elasticsearch.common.CheckedSupplier;
4951
import org.elasticsearch.common.Nullable;
5052
import org.elasticsearch.common.breaker.CircuitBreaker;
5153
import org.elasticsearch.common.bytes.BytesReference;
@@ -1376,12 +1378,7 @@ public void loadIntoContext(ShardSearchRequest request, SearchContext context, Q
13761378
() -> "Shard: " + request.shardId() + "\nSource:\n" + request.source(),
13771379
out -> {
13781380
queryPhase.execute(context);
1379-
try {
1380-
context.queryResult().writeToNoId(out);
1381-
1382-
} catch (IOException e) {
1383-
throw new AssertionError("Could not serialize response", e);
1384-
}
1381+
context.queryResult().writeToNoId(out);
13851382
loadedFromCache[0] = false;
13861383
});
13871384

@@ -1420,9 +1417,9 @@ public ByteSizeValue getTotalIndexingBufferBytes() {
14201417
* @return the contents of the cache or the result of calling the loader
14211418
*/
14221419
private BytesReference cacheShardLevelResult(IndexShard shard, DirectoryReader reader, BytesReference cacheKey,
1423-
Supplier<String> cacheKeyRenderer, Consumer<StreamOutput> loader) throws Exception {
1420+
Supplier<String> cacheKeyRenderer, CheckedConsumer<StreamOutput, IOException> loader) throws Exception {
14241421
IndexShardCacheEntity cacheEntity = new IndexShardCacheEntity(shard);
1425-
Supplier<BytesReference> supplier = () -> {
1422+
CheckedSupplier<BytesReference, IOException> supplier = () -> {
14261423
/* BytesStreamOutput allows to pass the expected size but by default uses
14271424
* BigArrays.PAGE_SIZE_IN_BYTES which is 16k. A common cached result ie.
14281425
* a date histogram with 3 buckets is ~100byte so 16k might be very wasteful

server/src/test/java/org/elasticsearch/common/io/stream/BytesStreamsTests.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,13 @@ public void testSimpleStreams() throws Exception {
349349
assertThat(jdt.getZonedDateTime().toInstant().toEpochMilli(), equalTo(123456L));
350350
assertThat(jdt.getZonedDateTime().getZone(), equalTo(ZoneId.of("America/Los_Angeles")));
351351
assertEquals(0, in.available());
352+
IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> out.writeGenericValue(new Object() {
353+
@Override
354+
public String toString() {
355+
return "This object cannot be serialized by writeGeneric method";
356+
}
357+
}));
358+
assertThat(ex.getMessage(), containsString("can not write type"));
352359
in.close();
353360
out.close();
354361
}

server/src/test/java/org/elasticsearch/indices/IndicesRequestCacheTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.lucene.search.TopDocs;
3333
import org.apache.lucene.store.Directory;
3434
import org.apache.lucene.util.BytesRef;
35+
import org.elasticsearch.common.CheckedSupplier;
3536
import org.elasticsearch.common.bytes.AbstractBytesReference;
3637
import org.elasticsearch.common.bytes.BytesReference;
3738
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@@ -49,7 +50,6 @@
4950
import java.io.IOException;
5051
import java.util.Arrays;
5152
import java.util.concurrent.atomic.AtomicBoolean;
52-
import java.util.function.Supplier;
5353

5454
public class IndicesRequestCacheTests extends ESTestCase {
5555

@@ -331,7 +331,7 @@ public Iterable<Field> newDoc(int id, String value) {
331331
StringField.TYPE_STORED));
332332
}
333333

334-
private static class Loader implements Supplier<BytesReference> {
334+
private static class Loader implements CheckedSupplier<BytesReference, IOException> {
335335

336336
private final DirectoryReader reader;
337337
private final int id;

0 commit comments

Comments
 (0)