Skip to content

Commit c6f56b4

Browse files
committed
Modify BigArrays to take name of circuit breaker (elastic#36461)
This commit modifies BigArrays to take a circuit breaker name and the circuit breaking service. The default instance of BigArrays that is passed around everywhere always uses the request breaker. At the network level, we want to be using the inflight request breaker. So this change will allow that. Additionally, as this change moves away from a single instance of BigArrays, the class is modified to not be a Releasable anymore. Releasing big arrays was always dispatching to the PageCacheRecycler, so this change makes the PageCacheRecycler the class that needs to be managed and torn-down. Finally, this commit closes elastic#31435 be making the serialization of transport messages use the inflight request breaker. With this change, we no longer push the global BigArrays instnace to the network level.
1 parent fc4ec5d commit c6f56b4

File tree

29 files changed

+100
-100
lines changed

29 files changed

+100
-100
lines changed

modules/transport-netty4/src/main/java/org/elasticsearch/transport/Netty4Plugin.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,11 @@ public Settings additionalSettings() {
7979
}
8080

8181
@Override
82-
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
83-
PageCacheRecycler pageCacheRecycler,
82+
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
8483
CircuitBreakerService circuitBreakerService,
85-
NamedWriteableRegistry namedWriteableRegistry,
86-
NetworkService networkService) {
84+
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
8785
return Collections.singletonMap(NETTY_TRANSPORT_NAME, () -> new Netty4Transport(settings, Version.CURRENT, threadPool,
88-
networkService, bigArrays, namedWriteableRegistry, circuitBreakerService));
86+
networkService, pageCacheRecycler, namedWriteableRegistry, circuitBreakerService));
8987
}
9088

9189
@Override

modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@
5151
import org.elasticsearch.common.settings.Settings;
5252
import org.elasticsearch.common.unit.ByteSizeUnit;
5353
import org.elasticsearch.common.unit.ByteSizeValue;
54-
import org.elasticsearch.common.util.BigArrays;
54+
import org.elasticsearch.common.util.PageCacheRecycler;
5555
import org.elasticsearch.common.util.concurrent.EsExecutors;
5656
import org.elasticsearch.indices.breaker.CircuitBreakerService;
5757
import org.elasticsearch.threadpool.ThreadPool;
@@ -104,9 +104,10 @@ public class Netty4Transport extends TcpTransport {
104104
private volatile Bootstrap clientBootstrap;
105105
private volatile NioEventLoopGroup eventLoopGroup;
106106

107-
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
108-
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
109-
super("netty", settings, version, threadPool, bigArrays, circuitBreakerService, namedWriteableRegistry, networkService);
107+
public Netty4Transport(Settings settings, Version version, ThreadPool threadPool, NetworkService networkService,
108+
PageCacheRecycler pageCacheRecycler, NamedWriteableRegistry namedWriteableRegistry,
109+
CircuitBreakerService circuitBreakerService) {
110+
super("netty", settings, version, threadPool, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, networkService);
110111
Netty4Utils.setAvailableProcessors(EsExecutors.PROCESSORS_SETTING.get(settings));
111112
this.workerCount = WORKER_COUNT.get(settings);
112113

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4SizeHeaderFrameDecoderTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
import org.elasticsearch.common.network.NetworkService;
2525
import org.elasticsearch.common.settings.Settings;
2626
import org.elasticsearch.common.transport.TransportAddress;
27-
import org.elasticsearch.common.util.BigArrays;
28-
import org.elasticsearch.common.util.MockBigArrays;
2927
import org.elasticsearch.common.util.MockPageCacheRecycler;
28+
import org.elasticsearch.common.util.PageCacheRecycler;
3029
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
3130
import org.elasticsearch.mocksocket.MockSocket;
3231
import org.elasticsearch.test.ESTestCase;
@@ -65,8 +64,8 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
6564
public void startThreadPool() {
6665
threadPool = new ThreadPool(settings);
6766
NetworkService networkService = new NetworkService(Collections.emptyList());
68-
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
69-
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, bigArrays,
67+
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
68+
nettyTransport = new Netty4Transport(settings, Version.CURRENT, threadPool, networkService, recycler,
7069
new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
7170
nettyTransport.start();
7271

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4TransportIT.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.elasticsearch.common.network.NetworkModule;
3030
import org.elasticsearch.common.network.NetworkService;
3131
import org.elasticsearch.common.settings.Settings;
32-
import org.elasticsearch.common.util.BigArrays;
3332
import org.elasticsearch.common.util.PageCacheRecycler;
3433
import org.elasticsearch.indices.breaker.CircuitBreakerService;
3534
import org.elasticsearch.plugins.NetworkPlugin;
@@ -90,13 +89,13 @@ public static final class ExceptionThrowingNetty4Transport extends Netty4Transpo
9089
public static class TestPlugin extends Plugin implements NetworkPlugin {
9190

9291
@Override
93-
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
92+
public Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool,
9493
PageCacheRecycler pageCacheRecycler,
9594
CircuitBreakerService circuitBreakerService,
9695
NamedWriteableRegistry namedWriteableRegistry,
9796
NetworkService networkService) {
9897
return Collections.singletonMap("exception-throwing",
99-
() -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, bigArrays,
98+
() -> new ExceptionThrowingNetty4Transport(settings, threadPool, networkService, pageCacheRecycler,
10099
namedWriteableRegistry, circuitBreakerService));
101100
}
102101
}
@@ -105,10 +104,10 @@ public ExceptionThrowingNetty4Transport(
105104
Settings settings,
106105
ThreadPool threadPool,
107106
NetworkService networkService,
108-
BigArrays bigArrays,
107+
PageCacheRecycler recycler,
109108
NamedWriteableRegistry namedWriteableRegistry,
110109
CircuitBreakerService circuitBreakerService) {
111-
super(settings, Version.CURRENT, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
110+
super(settings, Version.CURRENT, threadPool, networkService, recycler, namedWriteableRegistry, circuitBreakerService);
112111
}
113112

114113
@Override

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4UtilsTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import io.netty.buffer.CompositeByteBuf;
2424
import io.netty.buffer.Unpooled;
2525
import org.apache.lucene.util.BytesRef;
26+
import org.elasticsearch.common.breaker.CircuitBreaker;
2627
import org.elasticsearch.common.bytes.AbstractBytesReferenceTestCase;
2728
import org.elasticsearch.common.bytes.BytesArray;
2829
import org.elasticsearch.common.bytes.BytesReference;
@@ -36,7 +37,7 @@
3637
public class Netty4UtilsTests extends ESTestCase {
3738

3839
private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
39-
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), false);
40+
private final BigArrays bigarrays = new BigArrays(null, new NoneCircuitBreakerService(), CircuitBreaker.REQUEST);
4041

4142
public void testToChannelBufferWithEmptyRef() throws IOException {
4243
ByteBuf buffer = Netty4Utils.toByteBuf(getRandomizedBytesReference(0));

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyTransportMultiPortTests.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,8 @@
2424
import org.elasticsearch.common.network.NetworkService;
2525
import org.elasticsearch.common.network.NetworkUtils;
2626
import org.elasticsearch.common.settings.Settings;
27-
import org.elasticsearch.common.util.BigArrays;
28-
import org.elasticsearch.common.util.MockBigArrays;
2927
import org.elasticsearch.common.util.MockPageCacheRecycler;
28+
import org.elasticsearch.common.util.PageCacheRecycler;
3029
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
3130
import org.elasticsearch.test.ESTestCase;
3231
import org.elasticsearch.threadpool.TestThreadPool;
@@ -118,9 +117,9 @@ public void testThatDefaultProfilePortOverridesGeneralConfiguration() throws Exc
118117
}
119118

120119
private TcpTransport startTransport(Settings settings, ThreadPool threadPool) {
121-
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
120+
PageCacheRecycler recycler = new MockPageCacheRecycler(Settings.EMPTY);
122121
TcpTransport transport = new Netty4Transport(settings, Version.CURRENT, threadPool, new NetworkService(Collections.emptyList()),
123-
bigArrays, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
122+
recycler, new NamedWriteableRegistry(Collections.emptyList()), new NoneCircuitBreakerService());
124123
transport.start();
125124

126125
assertThat(transport.lifecycleState(), is(Lifecycle.State.STARTED));

modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import org.elasticsearch.common.settings.ClusterSettings;
2828
import org.elasticsearch.common.settings.Settings;
2929
import org.elasticsearch.common.transport.TransportAddress;
30-
import org.elasticsearch.common.util.BigArrays;
30+
import org.elasticsearch.common.util.PageCacheRecycler;
3131
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
3232
import org.elasticsearch.node.Node;
3333
import org.elasticsearch.test.transport.MockTransportService;
@@ -55,7 +55,7 @@ public static MockTransportService nettyFromThreadPool(Settings settings, Thread
5555
ClusterSettings clusterSettings, boolean doHandshake) {
5656
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry(Collections.emptyList());
5757
Transport transport = new Netty4Transport(settings, version, threadPool, new NetworkService(Collections.emptyList()),
58-
BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
58+
PageCacheRecycler.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) {
5959

6060
@Override
6161
public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile,

server/src/main/java/org/elasticsearch/client/transport/TransportClient.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.cluster.ClusterModule;
3131
import org.elasticsearch.cluster.node.DiscoveryNode;
3232
import org.elasticsearch.common.UUIDs;
33+
import org.elasticsearch.common.breaker.CircuitBreaker;
3334
import org.elasticsearch.common.component.LifecycleComponent;
3435
import org.elasticsearch.common.inject.Injector;
3536
import org.elasticsearch.common.inject.Module;
@@ -183,8 +184,8 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
183184
settingsModule.getClusterSettings());
184185
resourcesToClose.add(circuitBreakerService);
185186
PageCacheRecycler pageCacheRecycler = new PageCacheRecycler(settings);
186-
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService);
187-
resourcesToClose.add(bigArrays);
187+
BigArrays bigArrays = new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
188+
resourcesToClose.add(pageCacheRecycler);
188189
modules.add(settingsModule);
189190
NetworkModule networkModule = new NetworkModule(settings, true, pluginsService.filterPlugins(NetworkPlugin.class), threadPool,
190191
bigArrays, pageCacheRecycler, circuitBreakerService, namedWriteableRegistry, xContentRegistry, networkService, null);
@@ -195,6 +196,7 @@ private static ClientTemplate buildTemplate(Settings providedSettings, Settings
195196
UUIDs.randomBase64UUID()), null, Collections.emptySet());
196197
modules.add((b -> {
197198
b.bind(BigArrays.class).toInstance(bigArrays);
199+
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
198200
b.bind(PluginsService.class).toInstance(pluginsService);
199201
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
200202
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
@@ -375,7 +377,7 @@ public void close() {
375377
closeables.add(plugin);
376378
}
377379
closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS));
378-
closeables.add(injector.getInstance(BigArrays.class));
380+
closeables.add(injector.getInstance(PageCacheRecycler.class));
379381
IOUtils.closeWhileHandlingException(closeables);
380382
}
381383

server/src/main/java/org/elasticsearch/common/network/NetworkModule.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ public NetworkModule(Settings settings, boolean transportClient, List<NetworkPlu
124124
registerHttpTransport(entry.getKey(), entry.getValue());
125125
}
126126
}
127-
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, bigArrays, pageCacheRecycler,
127+
Map<String, Supplier<Transport>> transportFactory = plugin.getTransports(settings, threadPool, pageCacheRecycler,
128128
circuitBreakerService, namedWriteableRegistry, networkService);
129129
for (Map.Entry<String, Supplier<Transport>> entry : transportFactory.entrySet()) {
130130
registerTransport(entry.getKey(), entry.getValue());

server/src/main/java/org/elasticsearch/common/util/BigArrays.java

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,9 @@
3333
import java.util.Arrays;
3434

3535
/** Utility class to work with arrays. */
36-
public class BigArrays implements Releasable {
36+
public class BigArrays {
3737

38-
public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, false);
38+
public static final BigArrays NON_RECYCLING_INSTANCE = new BigArrays(null, null, CircuitBreaker.REQUEST);
3939

4040
/** Page size in bytes: 16KB */
4141
public static final int PAGE_SIZE_IN_BYTES = 1 << 14;
@@ -83,11 +83,6 @@ static boolean indexIsInt(long index) {
8383
return index == (int) index;
8484
}
8585

86-
@Override
87-
public void close() {
88-
recycler.close();
89-
}
90-
9186
private abstract static class AbstractArrayWrapper extends AbstractArray implements BigArray {
9287

9388
static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ByteArrayWrapper.class);
@@ -369,24 +364,26 @@ public T set(long index, T value) {
369364
}
370365

371366
final PageCacheRecycler recycler;
372-
final CircuitBreakerService breakerService;
373-
final boolean checkBreaker;
367+
private final CircuitBreakerService breakerService;
368+
private final boolean checkBreaker;
374369
private final BigArrays circuitBreakingInstance;
370+
private final String breakerName;
375371

376-
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService) {
372+
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName) {
377373
// Checking the breaker is disabled if not specified
378-
this(recycler, breakerService, false);
374+
this(recycler, breakerService, breakerName, false);
379375
}
380376

381-
// public for tests
382-
public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, boolean checkBreaker) {
377+
protected BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerService breakerService, String breakerName,
378+
boolean checkBreaker) {
383379
this.checkBreaker = checkBreaker;
384380
this.recycler = recycler;
385381
this.breakerService = breakerService;
382+
this.breakerName = breakerName;
386383
if (checkBreaker) {
387384
this.circuitBreakingInstance = this;
388385
} else {
389-
this.circuitBreakingInstance = new BigArrays(recycler, breakerService, true);
386+
this.circuitBreakingInstance = new BigArrays(recycler, breakerService, breakerName, true);
390387
}
391388
}
392389

@@ -400,7 +397,7 @@ public BigArrays(PageCacheRecycler recycler, @Nullable final CircuitBreakerServi
400397
*/
401398
void adjustBreaker(final long delta, final boolean isDataAlreadyCreated) {
402399
if (this.breakerService != null) {
403-
CircuitBreaker breaker = this.breakerService.getBreaker(CircuitBreaker.REQUEST);
400+
CircuitBreaker breaker = this.breakerService.getBreaker(breakerName);
404401
if (this.checkBreaker) {
405402
// checking breaker means potentially tripping, but it doesn't
406403
// have to if the delta is negative

server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,12 @@ public class PageCacheRecycler implements Releasable {
5959
private final Recycler<long[]> longPage;
6060
private final Recycler<Object[]> objectPage;
6161

62+
public static final PageCacheRecycler NON_RECYCLING_INSTANCE;
63+
64+
static {
65+
NON_RECYCLING_INSTANCE = new PageCacheRecycler(Settings.builder().put(LIMIT_HEAP_SETTING.getKey(), "0%").build());
66+
}
67+
6268
@Override
6369
public void close() {
6470
Releasables.close(true, bytePage, intPage, longPage, objectPage);

server/src/main/java/org/elasticsearch/node/Node.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
5959
import org.elasticsearch.cluster.service.ClusterService;
6060
import org.elasticsearch.common.StopWatch;
61+
import org.elasticsearch.common.breaker.CircuitBreaker;
6162
import org.elasticsearch.common.component.Lifecycle;
6263
import org.elasticsearch.common.component.LifecycleComponent;
6364
import org.elasticsearch.common.inject.Binder;
@@ -410,7 +411,7 @@ protected Node(
410411

411412
PageCacheRecycler pageCacheRecycler = createPageCacheRecycler(settings);
412413
BigArrays bigArrays = createBigArrays(pageCacheRecycler, circuitBreakerService);
413-
resourcesToClose.add(bigArrays);
414+
resourcesToClose.add(pageCacheRecycler);
414415
modules.add(settingsModule);
415416
List<NamedWriteableRegistry.Entry> namedWriteables = Stream.of(
416417
NetworkModule.getNamedWriteables().stream(),
@@ -563,6 +564,7 @@ protected Node(
563564
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
564565
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
565566
b.bind(BigArrays.class).toInstance(bigArrays);
567+
b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
566568
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
567569
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
568570
b.bind(IngestService.class).toInstance(ingestService);
@@ -917,7 +919,7 @@ public synchronized void close() throws IOException {
917919

918920

919921
toClose.add(injector.getInstance(NodeEnvironment.class));
920-
toClose.add(injector.getInstance(BigArrays.class));
922+
toClose.add(injector.getInstance(PageCacheRecycler.class));
921923

922924
if (logger.isTraceEnabled()) {
923925
logger.trace("Close times for each service:\n{}", stopWatch.prettyPrint());
@@ -1000,7 +1002,7 @@ public static CircuitBreakerService createCircuitBreakerService(Settings setting
10001002
* This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing
10011003
*/
10021004
BigArrays createBigArrays(PageCacheRecycler pageCacheRecycler, CircuitBreakerService circuitBreakerService) {
1003-
return new BigArrays(pageCacheRecycler, circuitBreakerService);
1005+
return new BigArrays(pageCacheRecycler, circuitBreakerService, CircuitBreaker.REQUEST);
10041006
}
10051007

10061008
/**

server/src/main/java/org/elasticsearch/plugins/NetworkPlugin.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,9 @@ default List<TransportInterceptor> getTransportInterceptors(NamedWriteableRegist
5858
* Returns a map of {@link Transport} suppliers.
5959
* See {@link org.elasticsearch.common.network.NetworkModule#TRANSPORT_TYPE_KEY} to configure a specific implementation.
6060
*/
61-
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, BigArrays bigArrays,
62-
PageCacheRecycler pageCacheRecycler,
61+
default Map<String, Supplier<Transport>> getTransports(Settings settings, ThreadPool threadPool, PageCacheRecycler pageCacheRecycler,
6362
CircuitBreakerService circuitBreakerService,
64-
NamedWriteableRegistry namedWriteableRegistry,
65-
NetworkService networkService) {
63+
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService) {
6664
return Collections.emptyMap();
6765
}
6866

0 commit comments

Comments
 (0)