Skip to content

Commit 16bca5c

Browse files
maltalexslandelle
authored andcommittedDec 6, 2018
Added BlockingConnectionSemaphoreFactory (#1586)
* Added BlockingConnectionSemaphoreFactory * Added missing copyright to BlockingSemaphoreInfinite * Added acquireFreeChannelTimeout configuration value * Implemented acquireFreeChannelTimeout by replacing existing NonBlocking semaphores with regular Semaphores * ConnectionSemaphore tests
1 parent f61f88e commit 16bca5c

16 files changed

+463
-228
lines changed
 

‎client/src/main/java/org/asynchttpclient/AsyncHttpClientConfig.java

+8
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,14 @@ public interface AsyncHttpClientConfig {
6565
*/
6666
int getMaxConnectionsPerHost();
6767

68+
/**
69+
* Return the maximum duration in milliseconds an {@link AsyncHttpClient} can wait to acquire a free channel
70+
*
71+
* @return Return the maximum duration in milliseconds an {@link AsyncHttpClient} can wait to acquire a free channel
72+
*/
73+
int getAcquireFreeChannelTimeout();
74+
75+
6876
/**
6977
* Return the maximum time in millisecond an {@link AsyncHttpClient} can wait when connecting to a remote host
7078
*

‎client/src/main/java/org/asynchttpclient/DefaultAsyncHttpClientConfig.java

+18
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
8484
private final int connectionTtl;
8585
private final int maxConnections;
8686
private final int maxConnectionsPerHost;
87+
private final int acquireFreeChannelTimeout;
8788
private final ChannelPool channelPool;
8889
private final ConnectionSemaphoreFactory connectionSemaphoreFactory;
8990
private final KeepAliveStrategy keepAliveStrategy;
@@ -163,6 +164,7 @@ private DefaultAsyncHttpClientConfig(// http
163164
int connectionTtl,
164165
int maxConnections,
165166
int maxConnectionsPerHost,
167+
int acquireFreeChannelTimeout,
166168
ChannelPool channelPool,
167169
ConnectionSemaphoreFactory connectionSemaphoreFactory,
168170
KeepAliveStrategy keepAliveStrategy,
@@ -250,6 +252,7 @@ private DefaultAsyncHttpClientConfig(// http
250252
this.connectionTtl = connectionTtl;
251253
this.maxConnections = maxConnections;
252254
this.maxConnectionsPerHost = maxConnectionsPerHost;
255+
this.acquireFreeChannelTimeout = acquireFreeChannelTimeout;
253256
this.channelPool = channelPool;
254257
this.connectionSemaphoreFactory = connectionSemaphoreFactory;
255258
this.keepAliveStrategy = keepAliveStrategy;
@@ -445,6 +448,9 @@ public int getMaxConnectionsPerHost() {
445448
return maxConnectionsPerHost;
446449
}
447450

451+
@Override
452+
public int getAcquireFreeChannelTimeout() { return acquireFreeChannelTimeout; }
453+
448454
@Override
449455
public ChannelPool getChannelPool() {
450456
return channelPool;
@@ -696,6 +702,7 @@ public static class Builder {
696702
private int connectionTtl = defaultConnectionTtl();
697703
private int maxConnections = defaultMaxConnections();
698704
private int maxConnectionsPerHost = defaultMaxConnectionsPerHost();
705+
private int acquireFreeChannelTimeout = defaultAcquireFreeChannelTimeout();
699706
private ChannelPool channelPool;
700707
private ConnectionSemaphoreFactory connectionSemaphoreFactory;
701708
private KeepAliveStrategy keepAliveStrategy = new DefaultKeepAliveStrategy();
@@ -991,6 +998,16 @@ public Builder setMaxConnectionsPerHost(int maxConnectionsPerHost) {
991998
return this;
992999
}
9931000

1001+
/**
1002+
* Sets the maximum duration in milliseconds to acquire a free channel to send a request
1003+
* @param acquireFreeChannelTimeout maximum duration in milliseconds to acquire a free channel to send a request
1004+
* @return the same builder instance
1005+
*/
1006+
public Builder setAcquireFreeChannelTimeout(int acquireFreeChannelTimeout) {
1007+
this.acquireFreeChannelTimeout = acquireFreeChannelTimeout;
1008+
return this;
1009+
}
1010+
9941011
public Builder setChannelPool(ChannelPool channelPool) {
9951012
this.channelPool = channelPool;
9961013
return this;
@@ -1249,6 +1266,7 @@ public DefaultAsyncHttpClientConfig build() {
12491266
connectionTtl,
12501267
maxConnections,
12511268
maxConnectionsPerHost,
1269+
acquireFreeChannelTimeout,
12521270
channelPool,
12531271
connectionSemaphoreFactory,
12541272
keepAliveStrategy,

‎client/src/main/java/org/asynchttpclient/config/AsyncHttpClientConfigDefaults.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public final class AsyncHttpClientConfigDefaults {
2222
public static final String THREAD_POOL_NAME_CONFIG = "threadPoolName";
2323
public static final String MAX_CONNECTIONS_CONFIG = "maxConnections";
2424
public static final String MAX_CONNECTIONS_PER_HOST_CONFIG = "maxConnectionsPerHost";
25+
public static final String ACQUIRE_FREE_CHANNEL_TIMEOUT = "acquireFreeChannelTimeout";
2526
public static final String CONNECTION_TIMEOUT_CONFIG = "connectTimeout";
2627
public static final String POOLED_CONNECTION_IDLE_TIMEOUT_CONFIG = "pooledConnectionIdleTimeout";
2728
public static final String CONNECTION_POOL_CLEANER_PERIOD_CONFIG = "connectionPoolCleanerPeriod";
@@ -39,7 +40,7 @@ public final class AsyncHttpClientConfigDefaults {
3940
public static final String USE_PROXY_PROPERTIES_CONFIG = "useProxyProperties";
4041
public static final String VALIDATE_RESPONSE_HEADERS_CONFIG = "validateResponseHeaders";
4142
public static final String AGGREGATE_WEBSOCKET_FRAME_FRAGMENTS_CONFIG = "aggregateWebSocketFrameFragments";
42-
public static final String ENABLE_WEBSOCKET_COMPRESSION_CONFIG= "enableWebSocketCompression";
43+
public static final String ENABLE_WEBSOCKET_COMPRESSION_CONFIG = "enableWebSocketCompression";
4344
public static final String STRICT_302_HANDLING_CONFIG = "strict302Handling";
4445
public static final String KEEP_ALIVE_CONFIG = "keepAlive";
4546
public static final String MAX_REQUEST_RETRY_CONFIG = "maxRequestRetry";
@@ -97,6 +98,10 @@ public static int defaultMaxConnectionsPerHost() {
9798
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + MAX_CONNECTIONS_PER_HOST_CONFIG);
9899
}
99100

101+
public static int defaultAcquireFreeChannelTimeout() {
102+
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + ACQUIRE_FREE_CHANNEL_TIMEOUT);
103+
}
104+
100105
public static int defaultConnectTimeout() {
101106
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + CONNECTION_TIMEOUT_CONFIG);
102107
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import java.io.IOException;
17+
import java.util.concurrent.TimeUnit;
18+
19+
/**
20+
* A combined {@link ConnectionSemaphore} with two limits - a global limit and a per-host limit
21+
*/
22+
public class CombinedConnectionSemaphore extends PerHostConnectionSemaphore {
23+
protected final MaxConnectionSemaphore globalMaxConnectionSemaphore;
24+
25+
CombinedConnectionSemaphore(int maxConnections, int maxConnectionsPerHost, int acquireTimeout) {
26+
super(maxConnectionsPerHost, acquireTimeout);
27+
this.globalMaxConnectionSemaphore = new MaxConnectionSemaphore(maxConnections, acquireTimeout);
28+
}
29+
30+
@Override
31+
public void acquireChannelLock(Object partitionKey) throws IOException {
32+
long remainingTime = super.acquireTimeout > 0 ? acquireGlobalTimed(partitionKey) : acquireGlobal(partitionKey);
33+
34+
try {
35+
if (remainingTime < 0 || !getFreeConnectionsForHost(partitionKey).tryAcquire(remainingTime, TimeUnit.MILLISECONDS)) {
36+
releaseGlobal(partitionKey);
37+
throw tooManyConnectionsPerHost;
38+
}
39+
} catch (InterruptedException e) {
40+
releaseGlobal(partitionKey);
41+
throw new RuntimeException(e);
42+
}
43+
}
44+
45+
protected void releaseGlobal(Object partitionKey) {
46+
this.globalMaxConnectionSemaphore.releaseChannelLock(partitionKey);
47+
}
48+
49+
protected long acquireGlobal(Object partitionKey) throws IOException {
50+
this.globalMaxConnectionSemaphore.acquireChannelLock(partitionKey);
51+
return 0;
52+
}
53+
54+
/*
55+
* Acquires the global lock and returns the remaining time, in millis, to acquire the per-host lock
56+
*/
57+
protected long acquireGlobalTimed(Object partitionKey) throws IOException {
58+
long beforeGlobalAcquire = System.currentTimeMillis();
59+
acquireGlobal(partitionKey);
60+
long lockTime = System.currentTimeMillis() - beforeGlobalAcquire;
61+
return this.acquireTimeout - lockTime;
62+
}
63+
64+
@Override
65+
public void releaseChannelLock(Object partitionKey) {
66+
this.globalMaxConnectionSemaphore.releaseChannelLock(partitionKey);
67+
super.releaseChannelLock(partitionKey);
68+
}
69+
}

‎client/src/main/java/org/asynchttpclient/netty/channel/DefaultConnectionSemaphoreFactory.java

+16-9
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,21 @@
1717

1818
public class DefaultConnectionSemaphoreFactory implements ConnectionSemaphoreFactory {
1919

20-
public ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) {
21-
ConnectionSemaphore semaphore = new NoopConnectionSemaphore();
22-
if (config.getMaxConnections() > 0) {
23-
semaphore = new MaxConnectionSemaphore(config.getMaxConnections());
24-
}
25-
if (config.getMaxConnectionsPerHost() > 0) {
26-
semaphore = new PerHostConnectionSemaphore(config.getMaxConnectionsPerHost(), semaphore);
27-
}
28-
return semaphore;
20+
public ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) {
21+
int acquireFreeChannelTimeout = Math.max(0, config.getAcquireFreeChannelTimeout());
22+
int maxConnections = config.getMaxConnections();
23+
int maxConnectionsPerHost = config.getMaxConnectionsPerHost();
24+
25+
if (maxConnections > 0 && maxConnectionsPerHost > 0) {
26+
return new CombinedConnectionSemaphore(maxConnections, maxConnectionsPerHost, acquireFreeChannelTimeout);
27+
}
28+
if (maxConnections > 0) {
29+
return new MaxConnectionSemaphore(maxConnections, acquireFreeChannelTimeout);
2930
}
31+
if (maxConnectionsPerHost > 0) {
32+
return new CombinedConnectionSemaphore(maxConnections, maxConnectionsPerHost, acquireFreeChannelTimeout);
33+
}
34+
35+
return new NoopConnectionSemaphore();
36+
}
3037
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
package org.asynchttpclient.netty.channel;
15+
16+
import java.util.Collection;
17+
import java.util.Collections;
18+
import java.util.concurrent.Semaphore;
19+
import java.util.concurrent.TimeUnit;
20+
21+
/**
22+
* A java.util.concurrent.Semaphore that always has Integer.Integer.MAX_VALUE free permits
23+
*
24+
* @author Alex Maltinsky
25+
*/
26+
public class InfiniteSemaphore extends Semaphore {
27+
28+
public static final InfiniteSemaphore INSTANCE = new InfiniteSemaphore();
29+
private static final long serialVersionUID = 1L;
30+
31+
private InfiniteSemaphore() {
32+
super(Integer.MAX_VALUE);
33+
}
34+
35+
@Override
36+
public void acquire() {
37+
// NO-OP
38+
}
39+
40+
@Override
41+
public void acquireUninterruptibly() {
42+
// NO-OP
43+
}
44+
45+
@Override
46+
public boolean tryAcquire() {
47+
return true;
48+
}
49+
50+
@Override
51+
public boolean tryAcquire(long timeout, TimeUnit unit) {
52+
return true;
53+
}
54+
55+
@Override
56+
public void release() {
57+
// NO-OP
58+
}
59+
60+
@Override
61+
public void acquire(int permits) {
62+
// NO-OP
63+
}
64+
65+
@Override
66+
public void acquireUninterruptibly(int permits) {
67+
// NO-OP
68+
}
69+
70+
@Override
71+
public boolean tryAcquire(int permits) {
72+
return true;
73+
}
74+
75+
@Override
76+
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
77+
return true;
78+
}
79+
80+
@Override
81+
public void release(int permits) {
82+
// NO-OP
83+
}
84+
85+
@Override
86+
public int availablePermits() {
87+
return Integer.MAX_VALUE;
88+
}
89+
90+
@Override
91+
public int drainPermits() {
92+
return Integer.MAX_VALUE;
93+
}
94+
95+
@Override
96+
protected void reducePermits(int reduction) {
97+
// NO-OP
98+
}
99+
100+
@Override
101+
public boolean isFair() {
102+
return true;
103+
}
104+
105+
@Override
106+
protected Collection<Thread> getQueuedThreads() {
107+
return Collections.emptyList();
108+
}
109+
}
110+

‎client/src/main/java/org/asynchttpclient/netty/channel/MaxConnectionSemaphore.java

+16-6
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,38 @@
1616
import org.asynchttpclient.exception.TooManyConnectionsException;
1717

1818
import java.io.IOException;
19+
import java.util.concurrent.Semaphore;
20+
import java.util.concurrent.TimeUnit;
1921

2022
import static org.asynchttpclient.util.ThrowableUtil.unknownStackTrace;
2123

2224
/**
2325
* Max connections limiter.
2426
*
2527
* @author Stepan Koltsov
28+
* @author Alex Maltinsky
2629
*/
2730
public class MaxConnectionSemaphore implements ConnectionSemaphore {
2831

29-
private final NonBlockingSemaphoreLike freeChannels;
30-
private final IOException tooManyConnections;
32+
protected final Semaphore freeChannels;
33+
protected final IOException tooManyConnections;
34+
protected final int acquireTimeout;
3135

32-
MaxConnectionSemaphore(int maxConnections) {
36+
MaxConnectionSemaphore(int maxConnections, int acquireTimeout) {
3337
tooManyConnections = unknownStackTrace(new TooManyConnectionsException(maxConnections), MaxConnectionSemaphore.class, "acquireChannelLock");
34-
freeChannels = maxConnections > 0 ? new NonBlockingSemaphore(maxConnections) : NonBlockingSemaphoreInfinite.INSTANCE;
38+
freeChannels = maxConnections > 0 ? new Semaphore(maxConnections) : InfiniteSemaphore.INSTANCE;
39+
this.acquireTimeout = Math.max(0, acquireTimeout);
3540
}
3641

3742
@Override
3843
public void acquireChannelLock(Object partitionKey) throws IOException {
39-
if (!freeChannels.tryAcquire())
40-
throw tooManyConnections;
44+
try {
45+
if (!freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) {
46+
throw tooManyConnections;
47+
}
48+
} catch (InterruptedException e) {
49+
throw new RuntimeException(e);
50+
}
4151
}
4252

4353
@Override

0 commit comments

Comments
 (0)