Skip to content

Commit d3bf48c

Browse files
artembilangaryrussell
authored andcommitted
GH-3826: Fix SimplePool for resizing from MAX
Fixes #3826 By default, the `SimplePool` is used with an `Integer.MAX_VALUE` pool size. There is a performance degradation in the `setPoolSize()` when we try to decrease the pool size: the `while` for `permits.tryAcquire()` is too long close to the current `Integer.MAX_VALUE` pool size * Revise the logic in the `setPoolSize()` to use `Semaphore.reducePermits()` instead of the loop. * Change the calculation for a new pool size for the current pool state: or it is a size of a new request, or iti s equal to the `inUse.size()`. It will be reduced on subsequent `releaseItem()` calls * Reduce the number of `available` according a new pool size based on the `inUse`. So, if `inUse > newPoolSize`, the `available` is cleared. Otherewise, it is reduced to the number which would give `newPoolSize` together with the `inUse` size **Cherry-pick to `5.5.x`**
1 parent 404fce8 commit d3bf48c

File tree

2 files changed

+116
-34
lines changed

2 files changed

+116
-34
lines changed

Diff for: spring-integration-core/src/main/java/org/springframework/integration/util/SimplePool.java

+34-15
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -48,19 +48,19 @@ public class SimplePool<T> implements Pool<T> {
4848

4949
protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final
5050

51-
private final Semaphore permits = new Semaphore(0);
51+
private final PoolSemaphore permits = new PoolSemaphore(0);
5252

5353
private final AtomicInteger poolSize = new AtomicInteger();
5454

5555
private final AtomicInteger targetPoolSize = new AtomicInteger();
5656

5757
private long waitTimeout = Long.MAX_VALUE;
5858

59-
private final BlockingQueue<T> available = new LinkedBlockingQueue<T>();
59+
private final BlockingQueue<T> available = new LinkedBlockingQueue<>();
6060

61-
private final Set<T> allocated = Collections.synchronizedSet(new HashSet<T>());
61+
private final Set<T> allocated = Collections.synchronizedSet(new HashSet<>());
6262

63-
private final Set<T> inUse = Collections.synchronizedSet(new HashSet<T>());
63+
private final Set<T> inUse = Collections.synchronizedSet(new HashSet<>());
6464

6565
private final PoolItemCallback<T> callback;
6666

@@ -105,21 +105,27 @@ public synchronized void setPoolSize(int poolSize) {
105105
this.permits.release(delta);
106106
}
107107
else {
108-
while (delta < 0) {
109-
if (!this.permits.tryAcquire()) {
110-
break;
111-
}
108+
this.permits.reducePermits(-delta);
109+
110+
int inUseSize = this.inUse.size();
111+
int newPoolSize = Math.max(poolSize, inUseSize);
112+
this.poolSize.set(newPoolSize);
113+
114+
for (int i = this.available.size(); i > newPoolSize - inUseSize; i--) {
112115
T item = this.available.poll();
113116
if (item != null) {
114117
doRemoveItem(item);
115118
}
116-
this.poolSize.decrementAndGet();
117-
delta++;
119+
else {
120+
break;
121+
}
122+
}
123+
124+
int inUseDelta = poolSize - inUseSize;
125+
if (inUseDelta < 0 && this.logger.isDebugEnabled()) {
126+
this.logger.debug(String.format("Pool is overcommitted by %d; items will be removed when returned",
127+
-inUseDelta));
118128
}
119-
}
120-
if (delta < 0 && this.logger.isDebugEnabled()) {
121-
this.logger.debug(String.format("Pool is overcommitted by %d; items will be removed when returned",
122-
-delta));
123129
}
124130
}
125131

@@ -266,6 +272,19 @@ public synchronized void close() {
266272
removeAllIdleItems();
267273
}
268274

275+
private static class PoolSemaphore extends Semaphore {
276+
277+
PoolSemaphore(int permits) {
278+
super(permits);
279+
}
280+
281+
@Override
282+
public void reducePermits(int reduction) { // NOSONAR increases visibility
283+
super.reducePermits(reduction);
284+
}
285+
286+
}
287+
269288
/**
270289
* User of the pool provide an implementation of this interface; called during
271290
* various pool operations.

Diff for: spring-integration-core/src/test/java/org/springframework/integration/util/SimplePoolTests.java

+82-19
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,7 +20,6 @@
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
2222
import static org.assertj.core.api.Assertions.assertThatIllegalStateException;
23-
import static org.assertj.core.api.Assertions.fail;
2423

2524
import java.util.ArrayList;
2625
import java.util.HashSet;
@@ -36,14 +35,16 @@
3635
/**
3736
* @author Gary Russell
3837
* @author Sergey Bogatyrev
38+
* @author Artem Bilan
39+
*
3940
* @since 2.2
4041
*
4142
*/
4243
public class SimplePoolTests {
4344

4445
@Test
4546
public void testReuseAndStale() {
46-
final Set<String> strings = new HashSet<String>();
47+
final Set<String> strings = new HashSet<>();
4748
final AtomicBoolean stale = new AtomicBoolean();
4849
SimplePool<String> pool = stringPool(2, strings, stale);
4950
String s1 = pool.getItem();
@@ -62,7 +63,7 @@ public void testReuseAndStale() {
6263

6364
@Test
6465
public void testOverCommitAndResize() {
65-
final Set<String> strings = new HashSet<String>();
66+
final Set<String> strings = new HashSet<>();
6667
final AtomicBoolean stale = new AtomicBoolean();
6768
SimplePool<String> pool = stringPool(2, strings, stale);
6869
String s1 = pool.getItem();
@@ -83,13 +84,9 @@ public void testOverCommitAndResize() {
8384
assertThat(pool.getIdleCount()).isEqualTo(0);
8485
assertThat(pool.getActiveCount()).isEqualTo(2);
8586
assertThat(pool.getAllocatedCount()).isEqualTo(2);
86-
try {
87-
pool.getItem();
88-
fail("Expected exception");
89-
}
90-
catch (PoolItemNotAvailableException e) {
9187

92-
}
88+
assertThatExceptionOfType(PoolItemNotAvailableException.class)
89+
.isThrownBy(pool::getItem);
9390

9491
// resize up
9592
pool.setPoolSize(4);
@@ -131,7 +128,7 @@ public void testOverCommitAndResize() {
131128

132129
@Test
133130
public void testForeignObject() {
134-
final Set<String> strings = new HashSet<String>();
131+
final Set<String> strings = new HashSet<>();
135132
final AtomicBoolean stale = new AtomicBoolean();
136133
SimplePool<String> pool = stringPool(2, strings, stale);
137134
pool.getItem();
@@ -140,7 +137,7 @@ public void testForeignObject() {
140137

141138
@Test
142139
public void testDoubleReturn() {
143-
final Set<String> strings = new HashSet<String>();
140+
final Set<String> strings = new HashSet<>();
144141
final AtomicBoolean stale = new AtomicBoolean();
145142
SimplePool<String> pool = stringPool(2, strings, stale);
146143
Semaphore permits = TestUtils.getPropertyValue(pool, "permits", Semaphore.class);
@@ -168,7 +165,27 @@ public void testSizeUpdateIfNotAllocated() {
168165
assertThat(allocatedItems).hasSize(5);
169166

170167
// no more items can be allocated (indirect check of permits)
171-
assertThatExceptionOfType(PoolItemNotAvailableException.class).isThrownBy(() -> pool.getItem());
168+
assertThatExceptionOfType(PoolItemNotAvailableException.class)
169+
.isThrownBy(pool::getItem);
170+
}
171+
172+
@Test
173+
public void testMaxValueSizeUpdateIfNotAllocated() {
174+
SimplePool<String> pool = stringPool(0, new HashSet<>(), new AtomicBoolean());
175+
pool.setWaitTimeout(0);
176+
pool.setPoolSize(5);
177+
assertThat(pool.getPoolSize()).isEqualTo(5);
178+
179+
// allocating all available items to check permits
180+
Set<String> allocatedItems = new HashSet<>();
181+
for (int i = 0; i < 5; i++) {
182+
allocatedItems.add(pool.getItem());
183+
}
184+
assertThat(allocatedItems).hasSize(5);
185+
186+
// no more items can be allocated (indirect check of permits)
187+
assertThatExceptionOfType(PoolItemNotAvailableException.class)
188+
.isThrownBy(pool::getItem);
172189
}
173190

174191
@Test
@@ -207,7 +224,54 @@ public void testSizeUpdateIfPartiallyAllocated() {
207224
assertThat(pool.getActiveCount()).isEqualTo(5);
208225

209226
// no more items can be allocated (indirect check of permits)
210-
assertThatExceptionOfType(PoolItemNotAvailableException.class).isThrownBy(() -> pool.getItem());
227+
assertThatExceptionOfType(PoolItemNotAvailableException.class)
228+
.isThrownBy(pool::getItem);
229+
}
230+
231+
@Test
232+
public void testMaxValueSizeUpdateIfPartiallyAllocated() {
233+
SimplePool<String> pool = stringPool(0, new HashSet<>(), new AtomicBoolean());
234+
pool.setWaitTimeout(0);
235+
236+
List<String> allocated = new ArrayList<>();
237+
for (int i = 0; i < 10; i++) {
238+
allocated.add(pool.getItem());
239+
}
240+
241+
// release only 2 items
242+
for (int i = 0; i < 2; i++) {
243+
pool.releaseItem(allocated.get(i));
244+
}
245+
246+
// release only 2 items
247+
for (int i = 0; i < 2; i++) {
248+
pool.releaseItem(allocated.get(i));
249+
}
250+
251+
// trying to reduce pool size
252+
pool.setPoolSize(5);
253+
254+
// at this moment the actual pool size can be reduced only partially, because
255+
// only 2 items have been released, so 8 items are in use
256+
assertThat(pool.getPoolSize()).isEqualTo(8);
257+
assertThat(pool.getAllocatedCount()).isEqualTo(8);
258+
assertThat(pool.getIdleCount()).isEqualTo(0);
259+
assertThat(pool.getActiveCount()).isEqualTo(8);
260+
261+
// releasing 3 items
262+
for (int i = 2; i < 5; i++) {
263+
pool.releaseItem(allocated.get(i));
264+
}
265+
266+
// now pool size should be reduced
267+
assertThat(pool.getPoolSize()).isEqualTo(5);
268+
assertThat(pool.getAllocatedCount()).isEqualTo(5);
269+
assertThat(pool.getIdleCount()).isEqualTo(0);
270+
assertThat(pool.getActiveCount()).isEqualTo(5);
271+
272+
// no more items can be allocated (indirect check of permits)
273+
assertThatExceptionOfType(PoolItemNotAvailableException.class)
274+
.isThrownBy(pool::getItem);
211275
}
212276

213277
@Test
@@ -240,7 +304,8 @@ public void testSizeUpdateIfFullyAllocated() {
240304
assertThat(pool.getActiveCount()).isEqualTo(5);
241305

242306
// no more items can be allocated (indirect check of permits)
243-
assertThatExceptionOfType(PoolItemNotAvailableException.class).isThrownBy(() -> pool.getItem());
307+
assertThatExceptionOfType(PoolItemNotAvailableException.class)
308+
.isThrownBy(pool::getItem);
244309

245310
// releasing remaining items
246311
for (int i = 5; i < 10; i++) {
@@ -266,10 +331,9 @@ void testClose() {
266331
assertThatIllegalStateException().isThrownBy(pool::getItem);
267332
}
268333

269-
private SimplePool<String> stringPool(int size, final Set<String> strings,
270-
final AtomicBoolean stale) {
334+
private SimplePool<String> stringPool(int size, Set<String> strings, AtomicBoolean stale) {
335+
return new SimplePool<String>(size, new SimplePool.PoolItemCallback<String>() {
271336

272-
SimplePool<String> pool = new SimplePool<String>(size, new SimplePool.PoolItemCallback<String>() {
273337
private int i;
274338

275339
@Override
@@ -293,7 +357,6 @@ public void removedFromPool(String item) {
293357
}
294358

295359
});
296-
return pool;
297360
}
298361

299362
}

0 commit comments

Comments
 (0)