59
59
import java .util .concurrent .Executors ;
60
60
import java .util .concurrent .ScheduledExecutorService ;
61
61
import java .util .concurrent .TimeUnit ;
62
- import java .util .concurrent .atomic .AtomicBoolean ;
63
62
import java .util .concurrent .atomic .AtomicInteger ;
64
63
import java .util .concurrent .atomic .AtomicLong ;
64
+ import java .util .concurrent .locks .Lock ;
65
+ import java .util .concurrent .locks .ReadWriteLock ;
66
+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
65
67
import java .util .logging .Level ;
66
68
import java .util .logging .Logger ;
67
69
import java .util .stream .Collectors ;
@@ -133,7 +135,7 @@ public class LocalNode extends Node implements Closeable {
133
135
private final int connectionLimitPerSession ;
134
136
135
137
private final boolean bidiEnabled ;
136
- private final AtomicBoolean drainAfterSessions = new AtomicBoolean () ;
138
+ private final boolean drainAfterSessions ;
137
139
private final List <SessionSlot > factories ;
138
140
private final Cache <SessionId , SessionSlot > currentSessions ;
139
141
private final Cache <SessionId , TemporaryFilesystem > uploadsTempFileSystem ;
@@ -142,6 +144,7 @@ public class LocalNode extends Node implements Closeable {
142
144
private final AtomicInteger pendingSessions = new AtomicInteger ();
143
145
private final AtomicInteger sessionCount = new AtomicInteger ();
144
146
private final Runnable shutdown ;
147
+ private final ReadWriteLock drainLock = new ReentrantReadWriteLock ();
145
148
146
149
protected LocalNode (
147
150
Tracer tracer ,
@@ -177,7 +180,7 @@ protected LocalNode(
177
180
this .factories = ImmutableList .copyOf (factories );
178
181
Require .nonNull ("Registration secret" , registrationSecret );
179
182
this .configuredSessionCount = drainAfterSessionCount ;
180
- this .drainAfterSessions . set ( this .configuredSessionCount > 0 ) ;
183
+ this .drainAfterSessions = this .configuredSessionCount > 0 ;
181
184
this .sessionCount .set (drainAfterSessionCount );
182
185
this .cdpEnabled = cdpEnabled ;
183
186
this .bidiEnabled = bidiEnabled ;
@@ -443,6 +446,9 @@ public Either<WebDriverException, CreateSessionResponse> newSession(
443
446
CreateSessionRequest sessionRequest ) {
444
447
Require .nonNull ("Session request" , sessionRequest );
445
448
449
+ Lock lock = drainLock .readLock ();
450
+ lock .lock ();
451
+
446
452
try (Span span = tracer .getCurrentContext ().createSpan ("node.new_session" )) {
447
453
AttributeMap attributeMap = tracer .createAttributeMap ();
448
454
attributeMap .put (AttributeKey .LOGGER_CLASS .getKey (), getClass ().getName ());
@@ -455,13 +461,14 @@ public Either<WebDriverException, CreateSessionResponse> newSession(
455
461
span .setAttribute ("current.session.count" , currentSessionCount );
456
462
attributeMap .put ("current.session.count" , currentSessionCount );
457
463
458
- if (getCurrentSessionCount () >= maxSessionCount ) {
464
+ if (currentSessionCount >= maxSessionCount ) {
459
465
span .setAttribute (AttributeKey .ERROR .getKey (), true );
460
466
span .setStatus (Status .RESOURCE_EXHAUSTED );
461
467
attributeMap .put ("max.session.count" , maxSessionCount );
462
468
span .addEvent ("Max session count reached" , attributeMap );
463
469
return Either .left (new RetrySessionRequestException ("Max session count reached." ));
464
470
}
471
+
465
472
if (isDraining ()) {
466
473
span .setStatus (
467
474
Status .UNAVAILABLE .withDescription (
@@ -492,6 +499,15 @@ public Either<WebDriverException, CreateSessionResponse> newSession(
492
499
new RetrySessionRequestException ("No slot matched the requested capabilities." ));
493
500
}
494
501
502
+ if (!decrementSessionCount ()) {
503
+ slotToUse .release ();
504
+ span .setAttribute (AttributeKey .ERROR .getKey (), true );
505
+ span .setStatus (Status .RESOURCE_EXHAUSTED );
506
+ attributeMap .put ("drain.after.session.count" , configuredSessionCount );
507
+ span .addEvent ("Drain after session count reached" , attributeMap );
508
+ return Either .left (new RetrySessionRequestException ("Drain after session count reached." ));
509
+ }
510
+
495
511
UUID uuidForSessionDownloads = UUID .randomUUID ();
496
512
Capabilities desiredCapabilities = sessionRequest .getDesiredCapabilities ();
497
513
if (managedDownloadsRequested (desiredCapabilities )) {
@@ -548,6 +564,7 @@ public Either<WebDriverException, CreateSessionResponse> newSession(
548
564
return Either .left (possibleSession .left ());
549
565
}
550
566
} finally {
567
+ lock .unlock ();
551
568
checkSessionCount ();
552
569
}
553
570
}
@@ -1020,20 +1037,40 @@ public void drain() {
1020
1037
}
1021
1038
1022
1039
private void checkSessionCount () {
1023
- if (this .drainAfterSessions .get ()) {
1040
+ if (this .drainAfterSessions ) {
1041
+ Lock lock = drainLock .writeLock ();
1042
+ if (!lock .tryLock ()) {
1043
+ // in case we can't get a write lock another thread does hold a read lock and will call
1044
+ // checkSessionCount as soon as he releases the read lock. So we do not need to wait here
1045
+ // for the other session to start and release the lock, just continue and let the other
1046
+ // session start to drain the node.
1047
+ return ;
1048
+ }
1049
+ try {
1050
+ int remainingSessions = this .sessionCount .get ();
1051
+ if (remainingSessions <= 0 ) {
1052
+ LOG .info (
1053
+ String .format (
1054
+ "Draining Node, configured sessions value (%s) has been reached." ,
1055
+ this .configuredSessionCount ));
1056
+ drain ();
1057
+ }
1058
+ } finally {
1059
+ lock .unlock ();
1060
+ }
1061
+ }
1062
+ }
1063
+
1064
+ private boolean decrementSessionCount () {
1065
+ if (this .drainAfterSessions ) {
1024
1066
int remainingSessions = this .sessionCount .decrementAndGet ();
1025
1067
LOG .log (
1026
1068
Debug .getDebugLogLevel (),
1027
1069
"{0} remaining sessions before draining Node" ,
1028
1070
remainingSessions );
1029
- if (remainingSessions <= 0 ) {
1030
- LOG .info (
1031
- String .format (
1032
- "Draining Node, configured sessions value (%s) has been reached." ,
1033
- this .configuredSessionCount ));
1034
- drain ();
1035
- }
1071
+ return remainingSessions >= 0 ;
1036
1072
}
1073
+ return true ;
1037
1074
}
1038
1075
1039
1076
private Map <String , Object > toJson () {
0 commit comments