@@ -96,6 +96,7 @@ public class LocalNewSessionQueue extends NewSessionQueue implements Closeable {
96
96
private static final String NAME = "Local New Session Queue" ;
97
97
private final SlotMatcher slotMatcher ;
98
98
private final Duration requestTimeout ;
99
+ private final Duration maximumResponseDelay ;
99
100
private final int batchSize ;
100
101
private final Map <RequestId , Data > requests ;
101
102
private final Map <RequestId , TraceContext > contexts ;
@@ -115,6 +116,7 @@ public LocalNewSessionQueue(
115
116
SlotMatcher slotMatcher ,
116
117
Duration requestTimeoutCheck ,
117
118
Duration requestTimeout ,
119
+ Duration maximumResponseDelay ,
118
120
Secret registrationSecret ,
119
121
int batchSize ) {
120
122
super (tracer , registrationSecret );
@@ -123,6 +125,7 @@ public LocalNewSessionQueue(
123
125
Require .nonNegative ("Retry period" , requestTimeoutCheck );
124
126
125
127
this .requestTimeout = Require .positive ("Request timeout" , requestTimeout );
128
+ this .maximumResponseDelay = Require .positive ("Maximum response delay" , maximumResponseDelay );
126
129
127
130
this .requests = new ConcurrentHashMap <>();
128
131
this .queue = new ConcurrentLinkedDeque <>();
@@ -152,6 +155,7 @@ public static NewSessionQueue create(Config config) {
152
155
slotMatcher ,
153
156
newSessionQueueOptions .getSessionRequestTimeoutPeriod (),
154
157
newSessionQueueOptions .getSessionRequestTimeout (),
158
+ newSessionQueueOptions .getMaximumResponseDelay (),
155
159
secretOptions .getRegistrationSecret (),
156
160
newSessionQueueOptions .getBatchSize ());
157
161
}
@@ -234,7 +238,9 @@ public HttpResponse addToQueue(SessionRequest request) {
234
238
}
235
239
236
240
Lock writeLock = this .lock .writeLock ();
237
- writeLock .lock ();
241
+ if (!writeLock .tryLock ()) {
242
+ writeLock .lock ();
243
+ }
238
244
try {
239
245
requests .remove (request .getRequestId ());
240
246
queue .remove (request );
@@ -268,7 +274,9 @@ Data injectIntoQueue(SessionRequest request) {
268
274
Data data = new Data (request .getEnqueued ());
269
275
270
276
Lock writeLock = lock .writeLock ();
271
- writeLock .lock ();
277
+ if (!writeLock .tryLock ()) {
278
+ writeLock .lock ();
279
+ }
272
280
try {
273
281
requests .put (request .getRequestId (), data );
274
282
queue .addLast (request );
@@ -288,7 +296,9 @@ public boolean retryAddToQueue(SessionRequest request) {
288
296
contexts .getOrDefault (request .getRequestId (), tracer .getCurrentContext ());
289
297
try (Span ignored = context .createSpan ("sessionqueue.retry" )) {
290
298
Lock writeLock = lock .writeLock ();
291
- writeLock .lock ();
299
+ if (!writeLock .tryLock ()) {
300
+ writeLock .lock ();
301
+ }
292
302
try {
293
303
if (!requests .containsKey (request .getRequestId ())) {
294
304
return false ;
@@ -319,7 +329,9 @@ public Optional<SessionRequest> remove(RequestId reqId) {
319
329
Require .nonNull ("Request ID" , reqId );
320
330
321
331
Lock writeLock = lock .writeLock ();
322
- writeLock .lock ();
332
+ if (!writeLock .tryLock ()) {
333
+ writeLock .lock ();
334
+ }
323
335
try {
324
336
Iterator <SessionRequest > iterator = queue .iterator ();
325
337
while (iterator .hasNext ()) {
@@ -340,6 +352,29 @@ public Optional<SessionRequest> remove(RequestId reqId) {
340
352
public List <SessionRequest > getNextAvailable (Map <Capabilities , Long > stereotypes ) {
341
353
Require .nonNull ("Stereotypes" , stereotypes );
342
354
355
+ // use nano time to avoid issues with a jumping clock e.g. on WSL2 or due to time-sync
356
+ long started = System .nanoTime ();
357
+ // delay the response to avoid heavy polling via http
358
+ while (maximumResponseDelay .toNanos () > System .nanoTime () - started ) {
359
+ Lock readLock = lock .readLock ();
360
+ readLock .lock ();
361
+
362
+ try {
363
+ if (!queue .isEmpty ()) {
364
+ break ;
365
+ }
366
+ } finally {
367
+ readLock .unlock ();
368
+ }
369
+
370
+ try {
371
+ Thread .sleep (10 );
372
+ } catch (InterruptedException ex ) {
373
+ Thread .currentThread ().interrupt ();
374
+ break ;
375
+ }
376
+ }
377
+
343
378
Predicate <Capabilities > matchesStereotype =
344
379
caps ->
345
380
stereotypes .entrySet ().stream ()
@@ -355,7 +390,9 @@ public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes
355
390
});
356
391
357
392
Lock writeLock = lock .writeLock ();
358
- writeLock .lock ();
393
+ if (!writeLock .tryLock ()) {
394
+ writeLock .lock ();
395
+ }
359
396
try {
360
397
List <SessionRequest > availableRequests =
361
398
queue .stream ()
@@ -381,7 +418,9 @@ public boolean complete(
381
418
try (Span ignored = context .createSpan ("sessionqueue.completed" )) {
382
419
Data data ;
383
420
Lock writeLock = lock .writeLock ();
384
- writeLock .lock ();
421
+ if (!writeLock .tryLock ()) {
422
+ writeLock .lock ();
423
+ }
385
424
try {
386
425
data = requests .remove (reqId );
387
426
queue .removeIf (req -> reqId .equals (req .getRequestId ()));
@@ -401,7 +440,9 @@ public boolean complete(
401
440
@ Override
402
441
public int clearQueue () {
403
442
Lock writeLock = lock .writeLock ();
404
- writeLock .lock ();
443
+ if (!writeLock .tryLock ()) {
444
+ writeLock .lock ();
445
+ }
405
446
406
447
try {
407
448
int size = queue .size ();
0 commit comments