Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[grid] decrement the connection per session counter on close #14842 #14854

Merged
merged 2 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions java/src/org/openqa/selenium/grid/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@
* by {@code sessionId}. This returns a boolean.</td>
* </tr>
* <tr>
* <td>DELETE</td>
* <td>/se/grid/node/connection/{sessionId}</td>
* <td>Notifies the node about closure of a websocket connection for the {@link Session}
* identified by {@code sessionId}.</td>
* </tr>
* <tr>
* <td>POST</td>
* <td>/se/grid/node/connection/{sessionId}</td>
* <td>Allows the node to be ask about whether or not new websocket connections are allowed for the {@link Session}
Expand Down Expand Up @@ -173,6 +179,9 @@ protected Node(
get("/se/grid/node/owner/{sessionId}")
.to(params -> new IsSessionOwner(this, sessionIdFrom(params)))
.with(spanDecorator("node.is_session_owner").andThen(requiresSecret)),
delete("/se/grid/node/connection/{sessionId}")
.to(params -> new ReleaseConnection(this, sessionIdFrom(params)))
.with(spanDecorator("node.is_session_owner").andThen(requiresSecret)),
post("/se/grid/node/connection/{sessionId}")
.to(params -> new TryAcquireConnection(this, sessionIdFrom(params)))
.with(spanDecorator("node.is_session_owner").andThen(requiresSecret)),
Expand Down Expand Up @@ -250,6 +259,8 @@ public TemporaryFilesystem getDownloadsFilesystem(UUID uuid) throws IOException

public abstract boolean tryAcquireConnection(SessionId id);

public abstract void releaseConnection(SessionId id);

public abstract boolean isSupporting(Capabilities capabilities);

public abstract NodeStatus getStatus();
Expand Down
11 changes: 8 additions & 3 deletions java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private Consumer<Message> createWsEndPoint(
WebSocket upstream =
client.openSocket(
new HttpRequest(GET, uri.toString()),
new ForwardingListener(downstream, sessionConsumer, sessionId));
new ForwardingListener(node, downstream, sessionConsumer, sessionId));

return (msg) -> {
try {
Expand All @@ -260,12 +260,17 @@ private Consumer<Message> createWsEndPoint(
}

private static class ForwardingListener implements WebSocket.Listener {
private final Node node;
private final Consumer<Message> downstream;
private final Consumer<SessionId> sessionConsumer;
private final SessionId sessionId;

public ForwardingListener(
Consumer<Message> downstream, Consumer<SessionId> sessionConsumer, SessionId sessionId) {
Node node,
Consumer<Message> downstream,
Consumer<SessionId> sessionConsumer,
SessionId sessionId) {
this.node = node;
this.downstream = Objects.requireNonNull(downstream);
this.sessionConsumer = Objects.requireNonNull(sessionConsumer);
this.sessionId = Objects.requireNonNull(sessionId);
Expand All @@ -280,7 +285,7 @@ public void onBinary(byte[] data) {
@Override
public void onClose(int code, String reason) {
downstream.accept(new CloseMessage(code, reason));
sessionConsumer.accept(sessionId);
node.releaseConnection(sessionId);
}

@Override
Expand Down
43 changes: 43 additions & 0 deletions java/src/org/openqa/selenium/grid/node/ReleaseConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.grid.node;

import java.io.UncheckedIOException;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.SessionId;
import org.openqa.selenium.remote.http.HttpHandler;
import org.openqa.selenium.remote.http.HttpRequest;
import org.openqa.selenium.remote.http.HttpResponse;

class ReleaseConnection implements HttpHandler {

private final Node node;
private final SessionId id;

ReleaseConnection(Node node, SessionId id) {
this.node = Require.nonNull("Node", node);
this.id = Require.nonNull("Session id", id);
}

@Override
public HttpResponse execute(HttpRequest req) throws UncheckedIOException {
node.releaseConnection(id);

return new HttpResponse().setStatus(200);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

import static org.openqa.selenium.remote.http.Contents.asJson;

import com.google.common.collect.ImmutableMap;
import java.io.UncheckedIOException;
import java.util.Map;
import org.openqa.selenium.internal.Require;
import org.openqa.selenium.remote.SessionId;
import org.openqa.selenium.remote.http.HttpHandler;
Expand All @@ -39,7 +39,6 @@ class TryAcquireConnection implements HttpHandler {

@Override
public HttpResponse execute(HttpRequest req) throws UncheckedIOException {
return new HttpResponse()
.setContent(asJson(ImmutableMap.of("value", node.tryAcquireConnection(id))));
return new HttpResponse().setContent(asJson(Map.of("value", node.tryAcquireConnection(id))));
}
}
4 changes: 2 additions & 2 deletions java/src/org/openqa/selenium/grid/node/config/NodeFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ public class NodeFlags implements HasRoles {
@Parameter(
names = {"--connection-limit-per-session"},
description =
"Let X be the maximum number of websocket connections per session.This will ensure one"
+ " session is not able to exhaust the connection limit of the host")
"Let X be the maximum number of concurrent websocket connections per session. This will"
+ " ensure one session is not able to exhaust the connection limit of the host")
@ConfigValue(section = NODE_SECTION, name = "connection-limit-per-session", example = "8")
public int connectionLimitPerSession = DEFAULT_CONNECTION_LIMIT;

Expand Down
19 changes: 18 additions & 1 deletion java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,24 @@ public boolean isSessionOwner(SessionId id) {

@Override
public boolean tryAcquireConnection(SessionId id) {
return sessionId.equals(id) && connectionLimitPerSession > connectionCounter.getAndIncrement();
if (!sessionId.equals(id)) {
return false;
}

if (connectionLimitPerSession > connectionCounter.getAndIncrement()) {
return true;
}

// ensure a rejected connection will not be counted
connectionCounter.getAndDecrement();
return false;
}

@Override
public void releaseConnection(SessionId id) {
if (sessionId.equals(id)) {
connectionCounter.getAndDecrement();
}
}

@Override
Expand Down
26 changes: 25 additions & 1 deletion java/src/org/openqa/selenium/grid/node/local/LocalNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,31 @@ public boolean tryAcquireConnection(SessionId id) throws NoSuchSessionException

AtomicLong counter = slot.getConnectionCounter();

return connectionLimitPerSession > counter.getAndIncrement();
if (connectionLimitPerSession > counter.getAndIncrement()) {
return true;
}

// ensure a rejected connection will not be counted
counter.getAndDecrement();
return false;
}

@Override
public void releaseConnection(SessionId id) {
SessionSlot slot = currentSessions.getIfPresent(id);

if (slot == null) {
return;
}

if (connectionLimitPerSession == -1) {
// no limit
return;
}

AtomicLong counter = slot.getConnectionCounter();

counter.decrementAndGet();
}

@Override
Expand Down
12 changes: 12 additions & 0 deletions java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ public boolean tryAcquireConnection(SessionId id) {
return Boolean.TRUE.equals(Values.get(res, Boolean.class));
}

@Override
public void releaseConnection(SessionId id) {
Require.nonNull("Session ID", id);

HttpRequest req = new HttpRequest(DELETE, "/se/grid/node/connection/" + id);
HttpTracing.inject(tracer, tracer.getCurrentContext(), req);

HttpResponse res = client.with(addSecret).execute(req);

Values.get(res, Void.class);
}

@Override
public Session getSession(SessionId id) throws NoSuchSessionException {
Require.nonNull("Session ID", id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ public boolean tryAcquireConnection(SessionId id) {
return false;
}

@Override
public void releaseConnection(SessionId id) {}

@Override
public boolean isSupporting(Capabilities capabilities) {
return Objects.equals("cake", capabilities.getCapability("cheese"));
Expand Down
51 changes: 50 additions & 1 deletion java/test/org/openqa/selenium/grid/router/DistributedTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openqa.selenium.Capabilities;
import org.openqa.selenium.HasCapabilities;
import org.openqa.selenium.SessionNotCreatedException;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.bidi.BiDi;
import org.openqa.selenium.bidi.BiDiProvider;
import org.openqa.selenium.grid.config.MapConfig;
import org.openqa.selenium.grid.config.MemoizedConfig;
import org.openqa.selenium.grid.config.TomlConfig;
Expand All @@ -43,6 +47,7 @@
import org.openqa.selenium.netty.server.NettyServer;
import org.openqa.selenium.remote.RemoteWebDriver;
import org.openqa.selenium.remote.http.ClientConfig;
import org.openqa.selenium.remote.http.ConnectionFailedException;
import org.openqa.selenium.remote.http.Contents;
import org.openqa.selenium.remote.http.HttpClient;
import org.openqa.selenium.remote.http.HttpMethod;
Expand Down Expand Up @@ -76,7 +81,9 @@ public void setupServers() {
+ "\n"
+ "override-max-sessions = true"
+ "\n"
+ "max-sessions = 2")));
+ "max-sessions = 2"
+ "\n"
+ "connection-limit-per-session = 3")));
tearDowns.add(deployment);

server = deployment.getServer();
Expand Down Expand Up @@ -192,4 +199,46 @@ void clientTimeoutDoesNotLeakARunningBrowser() throws Exception {
Safely.safelyCall(healthy::quit);
}
}

@Test
void connectionLimitIsRespected() throws Exception {
assertThat(server.isStarted()).isTrue();

// don't use the RemoteWebDriver.builder here, using it does create an unknown number of
// connections
WebDriver driver = new RemoteWebDriver(server.getUrl(), browser.getCapabilities());

try {
Capabilities caps = ((HasCapabilities) driver).getCapabilities();
BiDiProvider biDiProvider = new BiDiProvider();

BiDi cnn1 = biDiProvider.getImplementation(caps, null).getBiDi();
BiDi cnn2 = biDiProvider.getImplementation(caps, null).getBiDi();
BiDi cnn3 = biDiProvider.getImplementation(caps, null).getBiDi();

Assertions.assertThrows(
ConnectionFailedException.class,
() -> biDiProvider.getImplementation(caps, null).getBiDi());
cnn1.close();
BiDi cnn4 = biDiProvider.getImplementation(caps, null).getBiDi();

Assertions.assertThrows(
ConnectionFailedException.class,
() -> biDiProvider.getImplementation(caps, null).getBiDi());
cnn2.close();
cnn3.close();
BiDi cnn5 = biDiProvider.getImplementation(caps, null).getBiDi();
BiDi cnn6 = biDiProvider.getImplementation(caps, null).getBiDi();

Assertions.assertThrows(
ConnectionFailedException.class,
() -> biDiProvider.getImplementation(caps, null).getBiDi());

cnn4.close();
cnn5.close();
cnn6.close();
} finally {
Safely.safelyCall(driver::quit);
}
}
}
Loading