Skip to content

Commit d82c40d

Browse files
authored
Implement byte array reusage in NioTransport (elastic#27696)
This is related to elastic#27563. This commit modifies the InboundChannelBuffer to support releasable byte pages. These byte pages are provided by the PageCacheRecycler. The PageCacheRecycler must be passed to the Transport with this change.
1 parent d21167e commit d82c40d

File tree

44 files changed

+280
-91
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+280
-91
lines changed

core/src/main/java/org/elasticsearch/client/transport/TransportClient.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.common.transport.TransportAddress;
4444
import org.elasticsearch.common.unit.TimeValue;
4545
import org.elasticsearch.common.util.BigArrays;
46+
import org.elasticsearch.common.util.PageCacheRecycler;
4647
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
4748
import org.elasticsearch.indices.breaker.CircuitBreakerService;
4849
import org.elasticsearch.node.InternalSettingsPreparer;
@@ -169,11 +170,12 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
169170
CircuitBreakerService circuitBreakerService = Node.createCircuitBreakerService(settingsModule.getSettings(),
170171
settingsModule.getClusterSettings());
171172
resourcesToClose.add(circuitBreakerService);
172-
BigArrays bigArrays = new BigArrays(settings, circuitBreakerService);
173+
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
174+
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
173175
resourcesToClose.add(bigArrays);
174176
modules.add(settingsModule);
175177
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
176-
bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
178+
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
177179
final Transport transport = networkModule.getTransportSupplier().get();
178180
final TransportService transportService = new TransportService(settings, transport, threadPool,
179181
networkModule.getTransportInterceptor(),

core/src/main/java/org/elasticsearch/common/network/NetworkModule.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.common.settings.Setting.Property;
3535
import org.elasticsearch.common.settings.Settings;
3636
import org.elasticsearch.common.util.BigArrays;
37+
import org.elasticsearch.common.util.PageCacheRecycler;
3738
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3839
import org.elasticsearch.common.xcontent.XContentParser;
3940
import org.elasticsearch.http.HttpServerTransport;
@@ -107,6 +108,7 @@ public final class NetworkModule {
107108
*/
108109
public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlugin> plugins, ThreadPool threadPool,
109110
BigArrays bigArrays,
111+
PageCacheRecycler pageCacheRecycler,
110112
CircuitBreakerService circuitBreakerService,
111113
NamedWriteableRegistry namedWriteableRegistry,
112114
NamedXContentRegistry xContentRegistry,
@@ -121,9 +123,9 @@ public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlu
121123
registerHttpTransport(entry.getKey(), entry.getValue());
122124
}
123125
}
124-
Map<String, Supplier<Transport>> httpTransportFactory = plugin.getTransports(settings, threadPool, bigArrays,
126+
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler,
125127
circuitBreakerService, namedWriteableRegistry, networkService);
126-
for (Map.Entry<String, Supplier<Transport>> entry : httpTransportFactory.entrySet()) {
128+
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
127129
registerTransport(entry.getKey(), entry.getValue());
128130
}
129131
List<TransportInterceptor> transportInterceptors = plugin.getTransportInterceptors(namedWriteableRegistry,

core/src/main/java/org/elasticsearch/common/util/BigArrays.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,9 +372,9 @@ public T set(long index, T value) {
372372
final boolean checkBreaker;
373373
private final BigArrays circuitBreakingInstance;
374374

375-
public BigArrays(Settings settings, @Nullable final CircuitBreakerService breakerService) {
375+
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) {
376376
// Checking the breaker is disabled if not specified
377-
this(new PageCacheRecycler(settings), breakerService, false);
377+
this(recycler, breakerService, false);
378378
}
379379

380380
// public for tests

core/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ public void close() {
6565
Releasables.close(true, bytePage, intPage, longPage, objectPage);
6666
}
6767

68-
protected PageCacheRecycler(Settings settings) {
68+
public PageCacheRecycler(Settings settings) {
6969
super(settings);
70-
final Type type = TYPE_SETTING .get(settings);
71-
final long limit = LIMIT_HEAP_SETTING .get(settings).getBytes();
70+
final Type type = TYPE_SETTING.get(settings);
71+
final long limit = LIMIT_HEAP_SETTING.get(settings).getBytes();
7272
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
7373

7474
// We have a global amount of memory that we need to divide across data types.

core/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
import org.elasticsearch.common.transport.TransportAddress;
8181
import org.elasticsearch.common.unit.TimeValue;
8282
import org.elasticsearch.common.util.BigArrays;
83+
import org.elasticsearch.common.util.PageCacheRecycler;
8384
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
8485
import org.elasticsearch.discovery.Discovery;
8586
import org.elasticsearch.discovery.DiscoveryModule;
@@ -363,7 +364,8 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
363364
modules.add(new GatewayModule());
364365

365366

366-
BigArrays bigArrays = createBigArrays(settings, circuitBreakerService);
367+
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
368+
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
367369
resourcesToClose.add(bigArrays);
368370
modules.add(settingsModule);
369371
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
@@ -403,7 +405,8 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
403405

404406
final RestController restController = actionModule.getRestController();
405407
final NetworkModule networkModule = new NetworkModule(settings, false, pluginsService.filterPlugins(NetworkPlugin.class),
406-
threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, restController);
408+
threadPool, bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry,
409+
networkService, restController);
407410
Collection<UnaryOperator<Map<String, MetaData.Custom>>> customMetaDataUpgraders =
408411
pluginsService.filterPlugins(Plugin.class).stream()
409412
.map(Plugin::getCustomMetaDataUpgrader)
@@ -898,8 +901,16 @@ public static CircuitBreakerService createCircuitBreakerService(Settings setting
898901
* Creates a new {@link BigArrays} instance used for this node.
899902
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
900903
*/
901-
BigArrays createBigArrays(Settings settings, CircuitBreakerService circuitBreakerService) {
902-
return new BigArrays(settings, circuitBreakerService);
904+
BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
905+
return new BigArrays(pageCacheRecycler, circuitBreakerService);
906+
}
907+
908+
/**
909+
* Creates a new {@link BigArrays} instance used for this node.
910+
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
911+
*/
912+
PageCacheRecycler createPageCacheRecycler(Settings settings) {
913+
return new PageCacheRecycler(settings);
903914
}
904915

905916
/**

core/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.network.NetworkService;
2828
import org.elasticsearch.common.settings.Settings;
2929
import org.elasticsearch.common.util.BigArrays;
30+
import org.elasticsearch.common.util.PageCacheRecycler;
3031
import org.elasticsearch.common.util.concurrent.ThreadContext;
3132
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3233
import org.elasticsearch.http.HttpServerTransport;
@@ -58,6 +59,7 @@ default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegist
5859
* See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
5960
*/
6061
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
62+
PageCacheRecycler pageCacheRecycler,
6163
CircuitBreakerService circuitBreakerService,
6264
NamedWriteableRegistry namedWriteableRegistry,
6365
NetworkService networkService) {

core/src/test/java/org/elasticsearch/common/io/stream/ReleasableBytesStreamOutputTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.elasticsearch.common.settings.Settings;
2323
import org.elasticsearch.common.util.MockBigArrays;
24+
import org.elasticsearch.common.util.MockPageCacheRecycler;
2425
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
2526
import org.elasticsearch.test.ESTestCase;
2627

@@ -30,7 +31,7 @@ public class ReleasableBytesStreamOutputTests extends ESTestCase {
3031

3132
public void testRelease() throws Exception {
3233
MockBigArrays mockBigArrays =
33-
new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
34+
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
3435
try (ReleasableBytesStreamOutput output =
3536
getRandomReleasableBytesStreamOutput(mockBigArrays)) {
3637
output.writeBoolean(randomBoolean());

core/src/test/java/org/elasticsearch/common/network/NetworkModuleTests.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.elasticsearch.common.settings.Settings;
2828
import org.elasticsearch.common.transport.BoundTransportAddress;
2929
import org.elasticsearch.common.util.BigArrays;
30+
import org.elasticsearch.common.util.PageCacheRecycler;
3031
import org.elasticsearch.common.util.concurrent.ThreadContext;
3132
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3233
import org.elasticsearch.http.HttpInfo;
@@ -133,6 +134,7 @@ public void testRegisterTransport() {
133134
NetworkPlugin plugin = new NetworkPlugin() {
134135
@Override
135136
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
137+
PageCacheRecycler pageCacheRecycler,
136138
CircuitBreakerService circuitBreakerService,
137139
NamedWriteableRegistry namedWriteableRegistry,
138140
NetworkService networkService) {
@@ -193,6 +195,7 @@ public void testOverrideDefault() {
193195
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
194196
@Override
195197
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
198+
PageCacheRecycler pageCacheRecycler,
196199
CircuitBreakerService circuitBreakerService,
197200
NamedWriteableRegistry namedWriteableRegistry,
198201
NetworkService networkService) {
@@ -227,6 +230,7 @@ public void testDefaultKeys() {
227230
NetworkModule module = newNetworkModule(settings, false, new NetworkPlugin() {
228231
@Override
229232
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
233+
PageCacheRecycler pageCacheRecycler,
230234
CircuitBreakerService circuitBreakerService,
231235
NamedWriteableRegistry namedWriteableRegistry,
232236
NetworkService networkService) {
@@ -306,7 +310,7 @@ public List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegistr
306310
}
307311

308312
private NetworkModule newNetworkModule(Settings settings, boolean transportClient, NetworkPlugin... plugins) {
309-
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, xContentRegistry(), null,
310-
new NullDispatcher());
313+
return new NetworkModule(settings, transportClient, Arrays.asList(plugins), threadPool, null, null, null, null,
314+
xContentRegistry(), null, new NullDispatcher());
311315
}
312316
}

core/src/test/java/org/elasticsearch/common/util/BigArraysTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
public class BigArraysTests extends ESTestCase {
4343

4444
private BigArrays randombigArrays() {
45-
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
45+
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
4646
}
4747

4848
private BigArrays bigArrays;

core/src/test/java/org/elasticsearch/common/util/BytesRefHashTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public class BytesRefHashTests extends ESSingleNodeTestCase {
4141
BytesRefHash hash;
4242

4343
private BigArrays randombigArrays() {
44-
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
44+
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
4545
}
4646

4747
private void newHash() {

core/src/test/java/org/elasticsearch/common/util/LongHashTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public class LongHashTests extends ESSingleNodeTestCase {
3636
LongHash hash;
3737

3838
private BigArrays randombigArrays() {
39-
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
39+
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
4040
}
4141

4242
private void newHash() {

core/src/test/java/org/elasticsearch/common/util/LongObjectHashMapTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
public class LongObjectHashMapTests extends ESSingleNodeTestCase {
2828

2929
private BigArrays randombigArrays() {
30-
return new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
30+
return new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
3131
}
3232

3333
public void testDuel() {

core/src/test/java/org/elasticsearch/index/IndexModuleTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.common.settings.Setting.Property;
4040
import org.elasticsearch.common.settings.Settings;
4141
import org.elasticsearch.common.util.BigArrays;
42+
import org.elasticsearch.common.util.PageCacheRecycler;
4243
import org.elasticsearch.env.Environment;
4344
import org.elasticsearch.env.NodeEnvironment;
4445
import org.elasticsearch.env.ShardLock;
@@ -124,7 +125,8 @@ public void setUp() throws Exception {
124125
emptyMap(), emptyMap(), emptyMap());
125126
threadPool = new TestThreadPool("test");
126127
circuitBreakerService = new NoneCircuitBreakerService();
127-
bigArrays = new BigArrays(settings, circuitBreakerService);
128+
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
129+
bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
128130
scriptService = new ScriptService(settings, Collections.emptyMap(), Collections.emptyMap());
129131
clusterService = ClusterServiceUtils.createClusterService(threadPool);
130132
nodeEnvironment = new NodeEnvironment(settings, environment);

core/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.common.unit.TimeValue;
3333
import org.elasticsearch.common.util.BigArrays;
3434
import org.elasticsearch.common.util.MockBigArrays;
35+
import org.elasticsearch.common.util.MockPageCacheRecycler;
3536
import org.elasticsearch.index.IndexService;
3637
import org.elasticsearch.index.IndexSettings;
3738
import org.elasticsearch.index.cache.IndexCache;
@@ -104,7 +105,7 @@ public void testPreProcess() throws Exception {
104105
when(indexService.getIndexSettings()).thenReturn(indexSettings);
105106
when(mapperService.getIndexSettings()).thenReturn(indexSettings);
106107

107-
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
108+
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
108109

109110
try (Directory dir = newDirectory();
110111
RandomIndexWriter w = new RandomIndexWriter(random(), dir);

core/src/test/java/org/elasticsearch/search/aggregations/MultiBucketAggregatorWrapperTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.util.BigArrays;
2727
import org.elasticsearch.common.util.MockBigArrays;
28+
import org.elasticsearch.common.util.MockPageCacheRecycler;
2829
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
2930
import org.elasticsearch.search.internal.SearchContext;
3031
import org.elasticsearch.test.ESTestCase;
@@ -46,7 +47,7 @@ public class MultiBucketAggregatorWrapperTests extends ESTestCase {
4647
public void testNoNullScorerIsDelegated() throws Exception {
4748
LeafReaderContext leafReaderContext = MemoryIndex.fromDocument(Collections.emptyList(), new MockAnalyzer(random()))
4849
.createSearcher().getIndexReader().leaves().get(0);
49-
BigArrays bigArrays = new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService());
50+
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
5051
SearchContext searchContext = mock(SearchContext.class);
5152
when(searchContext.bigArrays()).thenReturn(bigArrays);
5253

core/src/test/java/org/elasticsearch/search/aggregations/bucket/sampler/BestDocsDeferringCollectorTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.lucene.store.Directory;
3434
import org.elasticsearch.common.settings.Settings;
3535
import org.elasticsearch.common.util.MockBigArrays;
36+
import org.elasticsearch.common.util.MockPageCacheRecycler;
3637
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
3738
import org.elasticsearch.search.aggregations.AggregatorTestCase;
3839
import org.elasticsearch.search.aggregations.BucketCollector;
@@ -63,8 +64,8 @@ public void testReplay() throws Exception {
6364
TermQuery termQuery = new TermQuery(new Term("field", String.valueOf(randomInt(maxNumValues))));
6465
TopDocs topDocs = indexSearcher.search(termQuery, numDocs);
6566

66-
BestDocsDeferringCollector collector =
67-
new BestDocsDeferringCollector(numDocs, new MockBigArrays(Settings.EMPTY, new NoneCircuitBreakerService()));
67+
BestDocsDeferringCollector collector = new BestDocsDeferringCollector(numDocs,
68+
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()));
6869
Set<Integer> deferredCollectedDocIds = new HashSet<>();
6970
collector.setDeferredCollector(Collections.singleton(testCollector(deferredCollectedDocIds)));
7071
collector.preCollection();

core/src/test/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.elasticsearch.common.network.InetAddresses;
3939
import org.elasticsearch.common.settings.Settings;
4040
import org.elasticsearch.common.util.MockBigArrays;
41+
import org.elasticsearch.common.util.MockPageCacheRecycler;
4142
import org.elasticsearch.index.IndexSettings;
4243
import org.elasticsearch.index.mapper.IpFieldMapper;
4344
import org.elasticsearch.index.mapper.KeywordFieldMapper;
@@ -912,7 +913,7 @@ public void testMixLongAndDouble() throws Exception {
912913
dir.close();
913914
}
914915
InternalAggregation.ReduceContext ctx =
915-
new InternalAggregation.ReduceContext(new MockBigArrays(Settings.EMPTY,
916+
new InternalAggregation.ReduceContext(new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY),
916917
new NoneCircuitBreakerService()), null, true);
917918
for (InternalAggregation internalAgg : aggs) {
918919
InternalAggregation mergedAggs = internalAgg.doReduce(aggs, ctx);

0 commit comments

Comments
 (0)