59
59
import java .util .concurrent .TimeUnit ;
60
60
import java .util .concurrent .atomic .AtomicBoolean ;
61
61
import java .util .concurrent .atomic .AtomicInteger ;
62
+ import java .util .concurrent .atomic .AtomicLong ;
62
63
import java .util .logging .Level ;
63
64
import java .util .logging .Logger ;
64
65
import java .util .stream .Collectors ;
@@ -127,6 +128,7 @@ public class LocalNode extends Node {
127
128
private final int configuredSessionCount ;
128
129
private final boolean cdpEnabled ;
129
130
private final boolean managedDownloadsEnabled ;
131
+ private final int connectionLimitPerSession ;
130
132
131
133
private final boolean bidiEnabled ;
132
134
private final AtomicBoolean drainAfterSessions = new AtomicBoolean ();
@@ -153,7 +155,8 @@ protected LocalNode(
153
155
Duration heartbeatPeriod ,
154
156
List <SessionSlot > factories ,
155
157
Secret registrationSecret ,
156
- boolean managedDownloadsEnabled ) {
158
+ boolean managedDownloadsEnabled ,
159
+ int connectionLimitPerSession ) {
157
160
super (
158
161
tracer ,
159
162
new NodeId (UUID .randomUUID ()),
@@ -176,6 +179,7 @@ protected LocalNode(
176
179
this .cdpEnabled = cdpEnabled ;
177
180
this .bidiEnabled = bidiEnabled ;
178
181
this .managedDownloadsEnabled = managedDownloadsEnabled ;
182
+ this .connectionLimitPerSession = connectionLimitPerSession ;
179
183
180
184
this .healthCheck =
181
185
healthCheck == null
@@ -579,6 +583,24 @@ public boolean isSessionOwner(SessionId id) {
579
583
return currentSessions .getIfPresent (id ) != null ;
580
584
}
581
585
586
+ @ Override
587
+ public boolean tryAcquireConnection (SessionId id ) throws NoSuchSessionException {
588
+ SessionSlot slot = currentSessions .getIfPresent (id );
589
+
590
+ if (slot == null ) {
591
+ return false ;
592
+ }
593
+
594
+ if (connectionLimitPerSession == -1 ) {
595
+ // no limit
596
+ return true ;
597
+ }
598
+
599
+ AtomicLong counter = slot .getConnectionCounter ();
600
+
601
+ return connectionLimitPerSession > counter .getAndIncrement ();
602
+ }
603
+
582
604
@ Override
583
605
public Session getSession (SessionId id ) throws NoSuchSessionException {
584
606
Require .nonNull ("Session ID" , id );
@@ -987,6 +1009,7 @@ public static class Builder {
987
1009
private HealthCheck healthCheck ;
988
1010
private Duration heartbeatPeriod = Duration .ofSeconds (NodeOptions .DEFAULT_HEARTBEAT_PERIOD );
989
1011
private boolean managedDownloadsEnabled = false ;
1012
+ private int connectionLimitPerSession = -1 ;
990
1013
991
1014
private Builder (Tracer tracer , EventBus bus , URI uri , URI gridUri , Secret registrationSecret ) {
992
1015
this .tracer = Require .nonNull ("Tracer" , tracer );
@@ -1041,6 +1064,11 @@ public Builder enableManagedDownloads(boolean enable) {
1041
1064
return this ;
1042
1065
}
1043
1066
1067
+ public Builder connectionLimitPerSession (int connectionLimitPerSession ) {
1068
+ this .connectionLimitPerSession = connectionLimitPerSession ;
1069
+ return this ;
1070
+ }
1071
+
1044
1072
public LocalNode build () {
1045
1073
return new LocalNode (
1046
1074
tracer ,
@@ -1057,7 +1085,8 @@ public LocalNode build() {
1057
1085
heartbeatPeriod ,
1058
1086
factories .build (),
1059
1087
registrationSecret ,
1060
- managedDownloadsEnabled );
1088
+ managedDownloadsEnabled ,
1089
+ connectionLimitPerSession );
1061
1090
}
1062
1091
1063
1092
public Advanced advanced () {
0 commit comments