|
34 | 34 | import com.google.common.base.Ticker;
|
35 | 35 | import com.google.common.cache.Cache;
|
36 | 36 | import com.google.common.cache.CacheBuilder;
|
| 37 | +import com.google.common.cache.RemovalCause; |
37 | 38 | import com.google.common.cache.RemovalListener;
|
38 | 39 | import com.google.common.cache.RemovalNotification;
|
39 | 40 | import com.google.common.collect.ImmutableList;
|
@@ -313,30 +314,55 @@ public void close() {
|
313 | 314 | }
|
314 | 315 |
|
315 | 316 | private void stopTimedOutSession(RemovalNotification<SessionId, SessionSlot> notification) {
|
316 |
| - if (notification.getKey() != null && notification.getValue() != null) { |
317 |
| - SessionSlot slot = notification.getValue(); |
318 |
| - SessionId id = notification.getKey(); |
319 |
| - if (notification.wasEvicted()) { |
320 |
| - // Session is timing out, stopping it by sending a DELETE |
321 |
| - LOG.log(Level.INFO, () -> String.format("Session id %s timed out, stopping...", id)); |
322 |
| - try { |
323 |
| - slot.execute(new HttpRequest(DELETE, "/session/" + id)); |
324 |
| - } catch (Exception e) { |
325 |
| - LOG.log(Level.WARNING, String.format("Exception while trying to stop session %s", id), e); |
| 317 | + try (Span span = tracer.getCurrentContext().createSpan("node.stop_session")) { |
| 318 | + AttributeMap attributeMap = tracer.createAttributeMap(); |
| 319 | + attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), getClass().getName()); |
| 320 | + if (notification.getKey() != null && notification.getValue() != null) { |
| 321 | + SessionSlot slot = notification.getValue(); |
| 322 | + SessionId id = notification.getKey(); |
| 323 | + attributeMap.put("node.id", getId().toString()); |
| 324 | + attributeMap.put("session.slotId", slot.getId().toString()); |
| 325 | + attributeMap.put("session.id", id.toString()); |
| 326 | + attributeMap.put("session.timeout_in_seconds", getSessionTimeout().toSeconds()); |
| 327 | + attributeMap.put("session.remove.cause", notification.getCause().name()); |
| 328 | + if (notification.wasEvicted() && notification.getCause() == RemovalCause.EXPIRED) { |
| 329 | + // Session is timing out, stopping it by sending a DELETE |
| 330 | + LOG.log(Level.INFO, () -> String.format("Session id %s timed out, stopping...", id)); |
| 331 | + span.setStatus(Status.CANCELLED); |
| 332 | + span.addEvent(String.format("Stopping the the timed session %s", id), attributeMap); |
| 333 | + } else { |
| 334 | + LOG.log(Level.INFO, () -> String.format("Session id %s is stopping on demand...", id)); |
| 335 | + span.addEvent(String.format("Stopping the session %s on demand", id), attributeMap); |
326 | 336 | }
|
327 |
| - } |
328 |
| - // Attempt to stop the session |
329 |
| - slot.stop(); |
330 |
| - // Decrement pending sessions if Node is draining |
331 |
| - if (this.isDraining()) { |
332 |
| - int done = pendingSessions.decrementAndGet(); |
333 |
| - if (done <= 0) { |
334 |
| - LOG.info("Node draining complete!"); |
335 |
| - bus.fire(new NodeDrainComplete(this.getId())); |
| 337 | + if (notification.wasEvicted()) { |
| 338 | + try { |
| 339 | + slot.execute(new HttpRequest(DELETE, "/session/" + id)); |
| 340 | + } catch (Exception e) { |
| 341 | + LOG.log( |
| 342 | + Level.WARNING, String.format("Exception while trying to stop session %s", id), e); |
| 343 | + span.setStatus(Status.INTERNAL); |
| 344 | + span.addEvent( |
| 345 | + String.format("Exception while trying to stop session %s", id), attributeMap); |
| 346 | + } |
336 | 347 | }
|
| 348 | + // Attempt to stop the session |
| 349 | + slot.stop(); |
| 350 | + // Decrement pending sessions if Node is draining |
| 351 | + if (this.isDraining()) { |
| 352 | + int done = pendingSessions.decrementAndGet(); |
| 353 | + attributeMap.put("current.session.count", done); |
| 354 | + attributeMap.put("node.drain_after_session_count", this.configuredSessionCount); |
| 355 | + if (done <= 0) { |
| 356 | + LOG.info("Node draining complete!"); |
| 357 | + bus.fire(new NodeDrainComplete(this.getId())); |
| 358 | + span.addEvent("Node draining complete!", attributeMap); |
| 359 | + } |
| 360 | + } |
| 361 | + } else { |
| 362 | + LOG.log(Debug.getDebugLogLevel(), "Received stop session notification with null values"); |
| 363 | + span.setStatus(Status.INVALID_ARGUMENT); |
| 364 | + span.addEvent("Received stop session notification with null values", attributeMap); |
337 | 365 | }
|
338 |
| - } else { |
339 |
| - LOG.log(Debug.getDebugLogLevel(), "Received stop session notification with null values"); |
340 | 366 | }
|
341 | 367 | }
|
342 | 368 |
|
@@ -985,17 +1011,27 @@ public HealthCheck getHealthCheck() {
|
985 | 1011 |
|
986 | 1012 | @Override
|
987 | 1013 | public void drain() {
|
988 |
| - bus.fire(new NodeDrainStarted(getId())); |
989 |
| - draining = true; |
990 |
| - // Ensure the pendingSessions counter will not be decremented by timed out sessions not included |
991 |
| - // in the currentSessionCount and the NodeDrainComplete will be raised to early. |
992 |
| - currentSessions.cleanUp(); |
993 |
| - int currentSessionCount = getCurrentSessionCount(); |
994 |
| - if (currentSessionCount == 0) { |
995 |
| - LOG.info("Firing node drain complete message"); |
996 |
| - bus.fire(new NodeDrainComplete(getId())); |
997 |
| - } else { |
998 |
| - pendingSessions.set(currentSessionCount); |
| 1014 | + try (Span span = tracer.getCurrentContext().createSpan("node.drain")) { |
| 1015 | + AttributeMap attributeMap = tracer.createAttributeMap(); |
| 1016 | + attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), getClass().getName()); |
| 1017 | + bus.fire(new NodeDrainStarted(getId())); |
| 1018 | + draining = true; |
| 1019 | + // Ensure the pendingSessions counter will not be decremented by timed out sessions not |
| 1020 | + // included |
| 1021 | + // in the currentSessionCount and the NodeDrainComplete will be raised to early. |
| 1022 | + currentSessions.cleanUp(); |
| 1023 | + int currentSessionCount = getCurrentSessionCount(); |
| 1024 | + attributeMap.put("current.session.count", currentSessionCount); |
| 1025 | + attributeMap.put("node.id", getId().toString()); |
| 1026 | + attributeMap.put("node.drain_after_session_count", this.configuredSessionCount); |
| 1027 | + if (currentSessionCount == 0) { |
| 1028 | + LOG.info("Firing node drain complete message"); |
| 1029 | + bus.fire(new NodeDrainComplete(getId())); |
| 1030 | + span.addEvent("Node drain complete", attributeMap); |
| 1031 | + } else { |
| 1032 | + pendingSessions.set(currentSessionCount); |
| 1033 | + span.addEvent(String.format("%s session(s) pending before draining Node", attributeMap)); |
| 1034 | + } |
999 | 1035 | }
|
1000 | 1036 | }
|
1001 | 1037 |
|
|
0 commit comments