Skip to content

Added BlockingConnectionSemaphoreFactory #1586

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ public interface AsyncHttpClientConfig {
*/
int getMaxConnectionsPerHost();

/**
* Return the maximum duration in milliseconds an {@link AsyncHttpClient} can wait to acquire a free channel
*
* @return Return the maximum duration in milliseconds an {@link AsyncHttpClient} can wait to acquire a free channel
*/
int getAcquireFreeChannelTimeout();


/**
* Return the maximum time in millisecond an {@link AsyncHttpClient} can wait when connecting to a remote host
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public class DefaultAsyncHttpClientConfig implements AsyncHttpClientConfig {
private final int connectionTtl;
private final int maxConnections;
private final int maxConnectionsPerHost;
private final int acquireFreeChannelTimeout;
private final ChannelPool channelPool;
private final ConnectionSemaphoreFactory connectionSemaphoreFactory;
private final KeepAliveStrategy keepAliveStrategy;
Expand Down Expand Up @@ -163,6 +164,7 @@ private DefaultAsyncHttpClientConfig(// http
int connectionTtl,
int maxConnections,
int maxConnectionsPerHost,
int acquireFreeChannelTimeout,
ChannelPool channelPool,
ConnectionSemaphoreFactory connectionSemaphoreFactory,
KeepAliveStrategy keepAliveStrategy,
Expand Down Expand Up @@ -250,6 +252,7 @@ private DefaultAsyncHttpClientConfig(// http
this.connectionTtl = connectionTtl;
this.maxConnections = maxConnections;
this.maxConnectionsPerHost = maxConnectionsPerHost;
this.acquireFreeChannelTimeout = acquireFreeChannelTimeout;
this.channelPool = channelPool;
this.connectionSemaphoreFactory = connectionSemaphoreFactory;
this.keepAliveStrategy = keepAliveStrategy;
Expand Down Expand Up @@ -445,6 +448,9 @@ public int getMaxConnectionsPerHost() {
return maxConnectionsPerHost;
}

@Override
public int getAcquireFreeChannelTimeout() { return acquireFreeChannelTimeout; }

@Override
public ChannelPool getChannelPool() {
return channelPool;
Expand Down Expand Up @@ -696,6 +702,7 @@ public static class Builder {
private int connectionTtl = defaultConnectionTtl();
private int maxConnections = defaultMaxConnections();
private int maxConnectionsPerHost = defaultMaxConnectionsPerHost();
private int acquireFreeChannelTimeout = defaultAcquireFreeChannelTimeout();
private ChannelPool channelPool;
private ConnectionSemaphoreFactory connectionSemaphoreFactory;
private KeepAliveStrategy keepAliveStrategy = new DefaultKeepAliveStrategy();
Expand Down Expand Up @@ -991,6 +998,16 @@ public Builder setMaxConnectionsPerHost(int maxConnectionsPerHost) {
return this;
}

/**
* Sets the maximum duration in milliseconds to acquire a free channel to send a request
* @param acquireFreeChannelTimeout maximum duration in milliseconds to acquire a free channel to send a request
* @return the same builder instance
*/
public Builder setAcquireFreeChannelTimeout(int acquireFreeChannelTimeout) {
this.acquireFreeChannelTimeout = acquireFreeChannelTimeout;
return this;
}

public Builder setChannelPool(ChannelPool channelPool) {
this.channelPool = channelPool;
return this;
Expand Down Expand Up @@ -1249,6 +1266,7 @@ public DefaultAsyncHttpClientConfig build() {
connectionTtl,
maxConnections,
maxConnectionsPerHost,
acquireFreeChannelTimeout,
channelPool,
connectionSemaphoreFactory,
keepAliveStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class AsyncHttpClientConfigDefaults {
public static final String THREAD_POOL_NAME_CONFIG = "threadPoolName";
public static final String MAX_CONNECTIONS_CONFIG = "maxConnections";
public static final String MAX_CONNECTIONS_PER_HOST_CONFIG = "maxConnectionsPerHost";
public static final String ACQUIRE_FREE_CHANNEL_TIMEOUT = "acquireFreeChannelTimeout";
public static final String CONNECTION_TIMEOUT_CONFIG = "connectTimeout";
public static final String POOLED_CONNECTION_IDLE_TIMEOUT_CONFIG = "pooledConnectionIdleTimeout";
public static final String CONNECTION_POOL_CLEANER_PERIOD_CONFIG = "connectionPoolCleanerPeriod";
Expand All @@ -39,7 +40,7 @@ public final class AsyncHttpClientConfigDefaults {
public static final String USE_PROXY_PROPERTIES_CONFIG = "useProxyProperties";
public static final String VALIDATE_RESPONSE_HEADERS_CONFIG = "validateResponseHeaders";
public static final String AGGREGATE_WEBSOCKET_FRAME_FRAGMENTS_CONFIG = "aggregateWebSocketFrameFragments";
public static final String ENABLE_WEBSOCKET_COMPRESSION_CONFIG= "enableWebSocketCompression";
public static final String ENABLE_WEBSOCKET_COMPRESSION_CONFIG = "enableWebSocketCompression";
public static final String STRICT_302_HANDLING_CONFIG = "strict302Handling";
public static final String KEEP_ALIVE_CONFIG = "keepAlive";
public static final String MAX_REQUEST_RETRY_CONFIG = "maxRequestRetry";
Expand Down Expand Up @@ -97,6 +98,10 @@ public static int defaultMaxConnectionsPerHost() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + MAX_CONNECTIONS_PER_HOST_CONFIG);
}

public static int defaultAcquireFreeChannelTimeout() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + ACQUIRE_FREE_CHANNEL_TIMEOUT);
}

public static int defaultConnectTimeout() {
return AsyncHttpClientConfigHelper.getAsyncHttpClientConfig().getInt(ASYNC_CLIENT_CONFIG_ROOT + CONNECTION_TIMEOUT_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.channel;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
* A combined {@link ConnectionSemaphore} with two limits - a global limit and a per-host limit
*/
public class CombinedConnectionSemaphore extends PerHostConnectionSemaphore {
protected final MaxConnectionSemaphore globalMaxConnectionSemaphore;

CombinedConnectionSemaphore(int maxConnections, int maxConnectionsPerHost, int acquireTimeout) {
super(maxConnectionsPerHost, acquireTimeout);
this.globalMaxConnectionSemaphore = new MaxConnectionSemaphore(maxConnections, acquireTimeout);
}

@Override
public void acquireChannelLock(Object partitionKey) throws IOException {
long remainingTime = super.acquireTimeout > 0 ? acquireGlobalTimed(partitionKey) : acquireGlobal(partitionKey);

try {
if (remainingTime < 0 || !getFreeConnectionsForHost(partitionKey).tryAcquire(remainingTime, TimeUnit.MILLISECONDS)) {
releaseGlobal(partitionKey);
throw tooManyConnectionsPerHost;
}
} catch (InterruptedException e) {
releaseGlobal(partitionKey);
throw new RuntimeException(e);
}
}

protected void releaseGlobal(Object partitionKey) {
this.globalMaxConnectionSemaphore.releaseChannelLock(partitionKey);
}

protected long acquireGlobal(Object partitionKey) throws IOException {
this.globalMaxConnectionSemaphore.acquireChannelLock(partitionKey);
return 0;
}

/*
* Acquires the global lock and returns the remaining time, in millis, to acquire the per-host lock
*/
protected long acquireGlobalTimed(Object partitionKey) throws IOException {
long beforeGlobalAcquire = System.currentTimeMillis();
acquireGlobal(partitionKey);
long lockTime = System.currentTimeMillis() - beforeGlobalAcquire;
return this.acquireTimeout - lockTime;
}

@Override
public void releaseChannelLock(Object partitionKey) {
this.globalMaxConnectionSemaphore.releaseChannelLock(partitionKey);
super.releaseChannelLock(partitionKey);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@

public class DefaultConnectionSemaphoreFactory implements ConnectionSemaphoreFactory {

public ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) {
ConnectionSemaphore semaphore = new NoopConnectionSemaphore();
if (config.getMaxConnections() > 0) {
semaphore = new MaxConnectionSemaphore(config.getMaxConnections());
}
if (config.getMaxConnectionsPerHost() > 0) {
semaphore = new PerHostConnectionSemaphore(config.getMaxConnectionsPerHost(), semaphore);
}
return semaphore;
public ConnectionSemaphore newConnectionSemaphore(AsyncHttpClientConfig config) {
int acquireFreeChannelTimeout = Math.max(0, config.getAcquireFreeChannelTimeout());
int maxConnections = config.getMaxConnections();
int maxConnectionsPerHost = config.getMaxConnectionsPerHost();

if (maxConnections > 0 && maxConnectionsPerHost > 0) {
return new CombinedConnectionSemaphore(maxConnections, maxConnectionsPerHost, acquireFreeChannelTimeout);
}
if (maxConnections > 0) {
return new MaxConnectionSemaphore(maxConnections, acquireFreeChannelTimeout);
}
if (maxConnectionsPerHost > 0) {
return new CombinedConnectionSemaphore(maxConnections, maxConnectionsPerHost, acquireFreeChannelTimeout);
}

return new NoopConnectionSemaphore();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright (c) 2018 AsyncHttpClient Project. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package org.asynchttpclient.netty.channel;

import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
* A java.util.concurrent.Semaphore that always has Integer.Integer.MAX_VALUE free permits
*
* @author Alex Maltinsky
*/
public class InfiniteSemaphore extends Semaphore {

public static final InfiniteSemaphore INSTANCE = new InfiniteSemaphore();
private static final long serialVersionUID = 1L;

private InfiniteSemaphore() {
super(Integer.MAX_VALUE);
}

@Override
public void acquire() {
// NO-OP
}

@Override
public void acquireUninterruptibly() {
// NO-OP
}

@Override
public boolean tryAcquire() {
return true;
}

@Override
public boolean tryAcquire(long timeout, TimeUnit unit) {
return true;
}

@Override
public void release() {
// NO-OP
}

@Override
public void acquire(int permits) {
// NO-OP
}

@Override
public void acquireUninterruptibly(int permits) {
// NO-OP
}

@Override
public boolean tryAcquire(int permits) {
return true;
}

@Override
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
return true;
}

@Override
public void release(int permits) {
// NO-OP
}

@Override
public int availablePermits() {
return Integer.MAX_VALUE;
}

@Override
public int drainPermits() {
return Integer.MAX_VALUE;
}

@Override
protected void reducePermits(int reduction) {
// NO-OP
}

@Override
public boolean isFair() {
return true;
}

@Override
protected Collection<Thread> getQueuedThreads() {
return Collections.emptyList();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,38 @@
import org.asynchttpclient.exception.TooManyConnectionsException;

import java.io.IOException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import static org.asynchttpclient.util.ThrowableUtil.unknownStackTrace;

/**
* Max connections limiter.
*
* @author Stepan Koltsov
* @author Alex Maltinsky
*/
public class MaxConnectionSemaphore implements ConnectionSemaphore {

private final NonBlockingSemaphoreLike freeChannels;
private final IOException tooManyConnections;
protected final Semaphore freeChannels;
protected final IOException tooManyConnections;
protected final int acquireTimeout;

MaxConnectionSemaphore(int maxConnections) {
MaxConnectionSemaphore(int maxConnections, int acquireTimeout) {
tooManyConnections = unknownStackTrace(new TooManyConnectionsException(maxConnections), MaxConnectionSemaphore.class, "acquireChannelLock");
freeChannels = maxConnections > 0 ? new NonBlockingSemaphore(maxConnections) : NonBlockingSemaphoreInfinite.INSTANCE;
freeChannels = maxConnections > 0 ? new Semaphore(maxConnections) : InfiniteSemaphore.INSTANCE;
this.acquireTimeout = Math.max(0, acquireTimeout);
}

@Override
public void acquireChannelLock(Object partitionKey) throws IOException {
if (!freeChannels.tryAcquire())
throw tooManyConnections;
try {
if (!freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) {
throw tooManyConnections;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Loading