|
19 | 19 |
|
20 | 20 | package org.elasticsearch.transport.netty4;
|
21 | 21 |
|
22 |
| -import io.netty.buffer.ByteBuf; |
23 |
| -import io.netty.channel.ChannelHandlerContext; |
24 | 22 | import io.netty.handler.logging.LogLevel;
|
25 | 23 | import io.netty.handler.logging.LoggingHandler;
|
26 |
| -import org.elasticsearch.Version; |
27 |
| -import org.elasticsearch.common.compress.Compressor; |
28 |
| -import org.elasticsearch.common.compress.CompressorFactory; |
29 |
| -import org.elasticsearch.common.io.stream.StreamInput; |
30 |
| -import org.elasticsearch.common.settings.Settings; |
31 |
| -import org.elasticsearch.common.util.concurrent.ThreadContext; |
32 |
| -import org.elasticsearch.transport.TcpHeader; |
33 |
| -import org.elasticsearch.transport.TcpTransport; |
34 |
| -import org.elasticsearch.transport.TransportStatus; |
35 |
| - |
36 |
| -import java.io.IOException; |
37 | 24 |
|
38 | 25 | final class ESLoggingHandler extends LoggingHandler {
|
39 | 26 |
|
40 | 27 | ESLoggingHandler() {
|
41 | 28 | super(LogLevel.TRACE);
|
42 | 29 | }
|
43 | 30 |
|
44 |
| - @Override |
45 |
| - protected String format(final ChannelHandlerContext ctx, final String eventName, final Object arg) { |
46 |
| - if (arg instanceof ByteBuf) { |
47 |
| - try { |
48 |
| - return format(ctx, eventName, (ByteBuf) arg); |
49 |
| - } catch (final Exception e) { |
50 |
| - // we really do not want to allow a bug in the formatting handling to escape |
51 |
| - logger.trace("an exception occurred formatting a trace message", e); |
52 |
| - // we are going to let this be formatted via the default formatting |
53 |
| - return super.format(ctx, eventName, arg); |
54 |
| - } |
55 |
| - } else { |
56 |
| - return super.format(ctx, eventName, arg); |
57 |
| - } |
58 |
| - } |
59 |
| - |
60 |
| - private static final int MESSAGE_LENGTH_OFFSET = TcpHeader.MARKER_BYTES_SIZE; |
61 |
| - private static final int REQUEST_ID_OFFSET = MESSAGE_LENGTH_OFFSET + TcpHeader.MESSAGE_LENGTH_SIZE; |
62 |
| - private static final int STATUS_OFFSET = REQUEST_ID_OFFSET + TcpHeader.REQUEST_ID_SIZE; |
63 |
| - private static final int VERSION_ID_OFFSET = STATUS_OFFSET + TcpHeader.STATUS_SIZE; |
64 |
| - private static final int ACTION_OFFSET = VERSION_ID_OFFSET + TcpHeader.VERSION_ID_SIZE; |
65 |
| - |
66 |
| - private String format(final ChannelHandlerContext ctx, final String eventName, final ByteBuf arg) throws IOException { |
67 |
| - final int readableBytes = arg.readableBytes(); |
68 |
| - if (readableBytes == 0) { |
69 |
| - return super.format(ctx, eventName, arg); |
70 |
| - } else if (readableBytes >= 2) { |
71 |
| - final StringBuilder sb = new StringBuilder(); |
72 |
| - sb.append(ctx.channel().toString()); |
73 |
| - final int offset = arg.readerIndex(); |
74 |
| - // this might be an ES message, check the header |
75 |
| - if (arg.getByte(offset) == (byte) 'E' && arg.getByte(offset + 1) == (byte) 'S') { |
76 |
| - if (readableBytes == TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE) { |
77 |
| - final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET); |
78 |
| - if (length == TcpTransport.PING_DATA_SIZE) { |
79 |
| - sb.append(" [ping]").append(' ').append(eventName).append(": ").append(readableBytes).append('B'); |
80 |
| - return sb.toString(); |
81 |
| - } |
82 |
| - } |
83 |
| - else if (readableBytes >= TcpHeader.HEADER_SIZE) { |
84 |
| - // we are going to try to decode this as an ES message |
85 |
| - final int length = arg.getInt(offset + MESSAGE_LENGTH_OFFSET); |
86 |
| - final long requestId = arg.getLong(offset + REQUEST_ID_OFFSET); |
87 |
| - final byte status = arg.getByte(offset + STATUS_OFFSET); |
88 |
| - final boolean isRequest = TransportStatus.isRequest(status); |
89 |
| - final String type = isRequest ? "request" : "response"; |
90 |
| - final String version = Version.fromId(arg.getInt(offset + VERSION_ID_OFFSET)).toString(); |
91 |
| - sb.append(" [length: ").append(length); |
92 |
| - sb.append(", request id: ").append(requestId); |
93 |
| - sb.append(", type: ").append(type); |
94 |
| - sb.append(", version: ").append(version); |
95 |
| - if (isRequest) { |
96 |
| - // it looks like an ES request, try to decode the action |
97 |
| - final int remaining = readableBytes - ACTION_OFFSET; |
98 |
| - final ByteBuf slice = arg.slice(offset + ACTION_OFFSET, remaining); |
99 |
| - // the stream might be compressed |
100 |
| - try (StreamInput in = in(status, slice, remaining)) { |
101 |
| - // the first bytes in the message is the context headers |
102 |
| - try (ThreadContext context = new ThreadContext(Settings.EMPTY)) { |
103 |
| - context.readHeaders(in); |
104 |
| - } |
105 |
| - // now we decode the features |
106 |
| - if (in.getVersion().onOrAfter(Version.V_6_3_0)) { |
107 |
| - in.readStringArray(); |
108 |
| - } |
109 |
| - // now we can decode the action name |
110 |
| - sb.append(", action: ").append(in.readString()); |
111 |
| - } |
112 |
| - } |
113 |
| - sb.append(']'); |
114 |
| - sb.append(' ').append(eventName).append(": ").append(readableBytes).append('B'); |
115 |
| - return sb.toString(); |
116 |
| - } |
117 |
| - } |
118 |
| - } |
119 |
| - // we could not decode this as an ES message, use the default formatting |
120 |
| - return super.format(ctx, eventName, arg); |
121 |
| - } |
122 |
| - |
123 |
| - private StreamInput in(final Byte status, final ByteBuf slice, final int remaining) throws IOException { |
124 |
| - final ByteBufStreamInput in = new ByteBufStreamInput(slice, remaining); |
125 |
| - if (TransportStatus.isCompress(status)) { |
126 |
| - final Compressor compressor = CompressorFactory.compressor(Netty4Utils.toBytesReference(slice)); |
127 |
| - return compressor.streamInput(in); |
128 |
| - } else { |
129 |
| - return in; |
130 |
| - } |
131 |
| - } |
132 |
| - |
133 | 31 | }
|
0 commit comments