|
19 | 19 |
|
20 | 20 | package org.elasticsearch.transport.nio;
|
21 | 21 |
|
22 |
| -import org.apache.logging.log4j.LogManager; |
23 |
| -import org.apache.logging.log4j.Logger; |
24 |
| -import org.apache.logging.log4j.message.ParameterizedMessage; |
25 | 22 | import org.elasticsearch.nio.ChannelContext;
|
26 | 23 | import org.elasticsearch.nio.EventHandler;
|
27 | 24 | import org.elasticsearch.nio.NioSelector;
|
|
32 | 29 | import java.util.Collections;
|
33 | 30 | import java.util.Set;
|
34 | 31 | import java.util.WeakHashMap;
|
35 |
| -import java.util.concurrent.TimeUnit; |
36 | 32 | import java.util.function.Consumer;
|
37 |
| -import java.util.function.LongSupplier; |
38 | 33 | import java.util.function.Supplier;
|
39 | 34 |
|
40 | 35 | public class TestEventHandler extends EventHandler {
|
41 | 36 |
|
42 |
| - private static final Logger logger = LogManager.getLogger(TestEventHandler.class); |
43 |
| - |
44 | 37 | private final Set<SocketChannelContext> hasConnectedMap = Collections.newSetFromMap(new WeakHashMap<>());
|
45 | 38 | private final Set<SocketChannelContext> hasConnectExceptionMap = Collections.newSetFromMap(new WeakHashMap<>());
|
46 |
| - private final LongSupplier relativeNanosSupplier; |
| 39 | + private final MockNioTransport.TransportThreadWatchdog transportThreadWatchdog; |
47 | 40 |
|
48 |
| - TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier, LongSupplier relativeNanosSupplier) { |
| 41 | + TestEventHandler(Consumer<Exception> exceptionHandler, Supplier<NioSelector> selectorSupplier, |
| 42 | + MockNioTransport.TransportThreadWatchdog transportThreadWatchdog) { |
49 | 43 | super(exceptionHandler, selectorSupplier);
|
50 |
| - this.relativeNanosSupplier = relativeNanosSupplier; |
| 44 | + this.transportThreadWatchdog = transportThreadWatchdog; |
51 | 45 | }
|
52 | 46 |
|
53 | 47 | @Override
|
54 | 48 | protected void acceptChannel(ServerChannelContext context) throws IOException {
|
55 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 49 | + final boolean registered = transportThreadWatchdog.register(); |
56 | 50 | try {
|
57 | 51 | super.acceptChannel(context);
|
58 | 52 | } finally {
|
59 |
| - maybeLogElapsedTime(startTime); |
| 53 | + if (registered) { |
| 54 | + transportThreadWatchdog.unregister(); |
| 55 | + } |
60 | 56 | }
|
61 | 57 | }
|
62 | 58 |
|
63 | 59 | @Override
|
64 | 60 | protected void acceptException(ServerChannelContext context, Exception exception) {
|
65 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 61 | + final boolean registered = transportThreadWatchdog.register(); |
66 | 62 | try {
|
67 | 63 | super.acceptException(context, exception);
|
68 | 64 | } finally {
|
69 |
| - maybeLogElapsedTime(startTime); |
| 65 | + if (registered) { |
| 66 | + transportThreadWatchdog.unregister(); |
| 67 | + } |
70 | 68 | }
|
71 | 69 | }
|
72 | 70 |
|
73 | 71 | @Override
|
74 | 72 | protected void handleRegistration(ChannelContext<?> context) throws IOException {
|
75 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 73 | + final boolean registered = transportThreadWatchdog.register(); |
76 | 74 | try {
|
77 | 75 | super.handleRegistration(context);
|
78 | 76 | } finally {
|
79 |
| - maybeLogElapsedTime(startTime); |
| 77 | + if (registered) { |
| 78 | + transportThreadWatchdog.unregister(); |
| 79 | + } |
80 | 80 | }
|
81 | 81 | }
|
82 | 82 |
|
83 | 83 | @Override
|
84 | 84 | protected void registrationException(ChannelContext<?> context, Exception exception) {
|
85 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 85 | + final boolean registered = transportThreadWatchdog.register(); |
86 | 86 | try {
|
87 | 87 | super.registrationException(context, exception);
|
88 | 88 | } finally {
|
89 |
| - maybeLogElapsedTime(startTime); |
| 89 | + if (registered) { |
| 90 | + transportThreadWatchdog.unregister(); |
| 91 | + } |
90 | 92 | }
|
91 | 93 | }
|
92 | 94 |
|
93 | 95 | public void handleConnect(SocketChannelContext context) throws IOException {
|
94 | 96 | assert hasConnectedMap.contains(context) == false : "handleConnect should only be called is a channel is not yet connected";
|
95 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 97 | + final boolean registered = transportThreadWatchdog.register(); |
96 | 98 | try {
|
97 | 99 | super.handleConnect(context);
|
98 | 100 | if (context.isConnectComplete()) {
|
99 | 101 | hasConnectedMap.add(context);
|
100 | 102 | }
|
101 | 103 | } finally {
|
102 |
| - maybeLogElapsedTime(startTime); |
| 104 | + if (registered) { |
| 105 | + transportThreadWatchdog.unregister(); |
| 106 | + } |
103 | 107 | }
|
104 | 108 | }
|
105 | 109 |
|
106 | 110 | public void connectException(SocketChannelContext context, Exception e) {
|
107 | 111 | assert hasConnectExceptionMap.contains(context) == false : "connectException should only called at maximum once per channel";
|
| 112 | + final boolean registered = transportThreadWatchdog.register(); |
108 | 113 | hasConnectExceptionMap.add(context);
|
109 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
110 | 114 | try {
|
111 | 115 | super.connectException(context, e);
|
112 | 116 | } finally {
|
113 |
| - maybeLogElapsedTime(startTime); |
| 117 | + if (registered) { |
| 118 | + transportThreadWatchdog.unregister(); |
| 119 | + } |
114 | 120 | }
|
115 | 121 | }
|
116 | 122 |
|
117 | 123 | @Override
|
118 | 124 | protected void handleRead(SocketChannelContext context) throws IOException {
|
119 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 125 | + final boolean registered = transportThreadWatchdog.register(); |
120 | 126 | try {
|
121 | 127 | super.handleRead(context);
|
122 | 128 | } finally {
|
123 |
| - maybeLogElapsedTime(startTime); |
| 129 | + if (registered) { |
| 130 | + transportThreadWatchdog.unregister(); |
| 131 | + } |
124 | 132 | }
|
125 | 133 | }
|
126 | 134 |
|
127 | 135 | @Override
|
128 | 136 | protected void readException(SocketChannelContext context, Exception exception) {
|
129 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 137 | + final boolean registered = transportThreadWatchdog.register(); |
130 | 138 | try {
|
131 | 139 | super.readException(context, exception);
|
132 | 140 | } finally {
|
133 |
| - maybeLogElapsedTime(startTime); |
| 141 | + if (registered) { |
| 142 | + transportThreadWatchdog.unregister(); |
| 143 | + } |
134 | 144 | }
|
135 | 145 | }
|
136 | 146 |
|
137 | 147 | @Override
|
138 | 148 | protected void handleWrite(SocketChannelContext context) throws IOException {
|
139 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 149 | + final boolean registered = transportThreadWatchdog.register(); |
140 | 150 | try {
|
141 | 151 | super.handleWrite(context);
|
142 | 152 | } finally {
|
143 |
| - maybeLogElapsedTime(startTime); |
| 153 | + if (registered) { |
| 154 | + transportThreadWatchdog.unregister(); |
| 155 | + } |
144 | 156 | }
|
145 | 157 | }
|
146 | 158 |
|
147 | 159 | @Override
|
148 | 160 | protected void writeException(SocketChannelContext context, Exception exception) {
|
149 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 161 | + final boolean registered = transportThreadWatchdog.register(); |
150 | 162 | try {
|
151 | 163 | super.writeException(context, exception);
|
152 | 164 | } finally {
|
153 |
| - maybeLogElapsedTime(startTime); |
| 165 | + if (registered) { |
| 166 | + transportThreadWatchdog.unregister(); |
| 167 | + } |
154 | 168 | }
|
155 | 169 | }
|
156 | 170 |
|
157 | 171 | @Override
|
158 | 172 | protected void handleTask(Runnable task) {
|
159 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 173 | + final boolean registered = transportThreadWatchdog.register(); |
160 | 174 | try {
|
161 | 175 | super.handleTask(task);
|
162 | 176 | } finally {
|
163 |
| - maybeLogElapsedTime(startTime); |
| 177 | + if (registered) { |
| 178 | + transportThreadWatchdog.unregister(); |
| 179 | + } |
164 | 180 | }
|
165 | 181 | }
|
166 | 182 |
|
167 | 183 | @Override
|
168 | 184 | protected void taskException(Exception exception) {
|
169 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 185 | + final boolean registered = transportThreadWatchdog.register(); |
170 | 186 | try {
|
171 | 187 | super.taskException(exception);
|
172 | 188 | } finally {
|
173 |
| - maybeLogElapsedTime(startTime); |
| 189 | + if (registered) { |
| 190 | + transportThreadWatchdog.unregister(); |
| 191 | + } |
174 | 192 | }
|
175 | 193 | }
|
176 | 194 |
|
177 | 195 | @Override
|
178 | 196 | protected void handleClose(ChannelContext<?> context) throws IOException {
|
179 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 197 | + final boolean registered = transportThreadWatchdog.register(); |
180 | 198 | try {
|
181 | 199 | super.handleClose(context);
|
182 | 200 | } finally {
|
183 |
| - maybeLogElapsedTime(startTime); |
| 201 | + if (registered) { |
| 202 | + transportThreadWatchdog.unregister(); |
| 203 | + } |
184 | 204 | }
|
185 | 205 | }
|
186 | 206 |
|
187 | 207 | @Override
|
188 | 208 | protected void closeException(ChannelContext<?> context, Exception exception) {
|
189 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 209 | + final boolean registered = transportThreadWatchdog.register(); |
190 | 210 | try {
|
191 | 211 | super.closeException(context, exception);
|
192 | 212 | } finally {
|
193 |
| - maybeLogElapsedTime(startTime); |
| 213 | + if (registered) { |
| 214 | + transportThreadWatchdog.unregister(); |
| 215 | + } |
194 | 216 | }
|
195 | 217 | }
|
196 | 218 |
|
197 | 219 | @Override
|
198 | 220 | protected void genericChannelException(ChannelContext<?> context, Exception exception) {
|
199 |
| - long startTime = relativeNanosSupplier.getAsLong(); |
| 221 | + final boolean registered = transportThreadWatchdog.register(); |
200 | 222 | try {
|
201 | 223 | super.genericChannelException(context, exception);
|
202 | 224 | } finally {
|
203 |
| - maybeLogElapsedTime(startTime); |
204 |
| - } |
205 |
| - } |
206 |
| - |
207 |
| - private static final long WARN_THRESHOLD = 150; |
208 |
| - |
209 |
| - private void maybeLogElapsedTime(long startTime) { |
210 |
| - long elapsedTime = TimeUnit.NANOSECONDS.toMillis(relativeNanosSupplier.getAsLong() - startTime); |
211 |
| - if (elapsedTime > WARN_THRESHOLD) { |
212 |
| - logger.warn(new ParameterizedMessage("Slow execution on network thread [{} milliseconds]", elapsedTime), |
213 |
| - new RuntimeException("Slow exception on network thread")); |
| 225 | + if (registered) { |
| 226 | + transportThreadWatchdog.unregister(); |
| 227 | + } |
214 | 228 | }
|
215 | 229 | }
|
216 | 230 | }
|
0 commit comments