Never block the event loop acquiring a connection permit by pavel-ptashyts · Pull Request #2226 · AsyncHttpClient/async-http-client · GitHub
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ public void acquireChannelLock(Object partitionKey) throws IOException {
}
}

@Override
public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException {
if (!nonBlocking) {
acquireChannelLock(partitionKey);
return;
}
// nonBlocking (the caller is on the event loop): take the global permit without waiting, then the
// per-host permit without waiting, releasing the global one if the per-host permit is unavailable.
globalMaxConnectionSemaphore.acquireChannelLock(partitionKey, true);
if (!getFreeConnectionsForHost(partitionKey).tryAcquire()) {
releaseGlobal(partitionKey);
throw tooManyConnectionsPerHost;
}
}

protected void releaseGlobal(Object partitionKey) {
globalMaxConnectionSemaphore.releaseChannelLock(partitionKey);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,27 @@ public interface ConnectionSemaphore {

void acquireChannelLock(Object partitionKey) throws IOException;

/**
* Acquires a connection permit for {@code partitionKey}, optionally without blocking.
*
* <p>When {@code nonBlocking} is {@code true} a permit must be taken without waiting — the
* too-many-connections exception is thrown immediately if none is free. This is used when the caller
* runs on a Netty event-loop thread (a redirect / 401 / 407 / retry replay re-enters the send path on
* the event loop), where waiting for the configured acquire timeout would freeze the loop and stall
* every other connection it serves. When {@code false} this behaves exactly like
* {@link #acquireChannelLock(Object)}.
*
* <p>The default implementation ignores the hint and delegates to the blocking
* {@link #acquireChannelLock(Object)}, preserving the behaviour of custom implementations; the built-in
* limiters override it to honour {@code nonBlocking}.
*
* @param partitionKey the per-host partition key the permit is scoped to
* @param nonBlocking {@code true} to fail fast instead of waiting for a permit
* @throws IOException if no permit could be acquired
*/
default void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException {
acquireChannelLock(partitionKey);
}

void releaseChannelLock(Object partitionKey);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,18 @@ public class MaxConnectionSemaphore implements ConnectionSemaphore {

@Override
public void acquireChannelLock(Object partitionKey) throws IOException {
acquireChannelLock(partitionKey, false);
}

@Override
public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException {
try {
if (!freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) {
// nonBlocking (the caller is on the event loop): try once and fail fast rather than parking
// the loop for up to acquireTimeout.
boolean acquired = nonBlocking
? freeChannels.tryAcquire()
: freeChannels.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
if (!acquired) {
throw tooManyConnections;
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,19 @@ public class PerHostConnectionSemaphore implements ConnectionSemaphore {

@Override
public void acquireChannelLock(Object partitionKey) throws IOException {
acquireChannelLock(partitionKey, false);
}

@Override
public void acquireChannelLock(Object partitionKey, boolean nonBlocking) throws IOException {
try {
if (!getFreeConnectionsForHost(partitionKey).tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS)) {
Semaphore freeConnections = getFreeConnectionsForHost(partitionKey);
// nonBlocking (the caller is on the event loop): try once and fail fast rather than parking
// the loop for up to acquireTimeout.
boolean acquired = nonBlocking
? freeConnections.tryAcquire()
: freeConnections.tryAcquire(acquireTimeout, TimeUnit.MILLISECONDS);
if (!acquired) {
throw tooManyConnectionsPerHost;
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,12 @@ private <T> ListenableFuture<T> sendRequestWithNewChannel(Request request, Proxy
// Do not throw an exception when we need an extra connection for a
// redirect.
try {
future.acquirePartitionLockLazily();
// On the event loop (a redirect / 401 / 407 / retry replay re-enters sendRequest here),
// acquire the connection permit WITHOUT blocking: parking the loop for
// acquireFreeChannelTimeout would stall every other connection it serves (and the permit
// may be released only by a task queued on this same loop). Off the loop — the initial
// execute() on the caller thread — keep the configured blocking wait.
future.acquirePartitionLockLazily(isOnEventLoop());
} catch (IOException semaphoreException) {
// If HTTP/2 is enabled, another thread may be establishing an H2 connection.
// Poll the H2 registry with brief retries before giving up.
Expand Down
Loading