7
7
8
8
import org .elasticsearch .nio .FlushOperation ;
9
9
import org .elasticsearch .nio .InboundChannelBuffer ;
10
+ import org .elasticsearch .nio .Page ;
10
11
import org .elasticsearch .nio .utils .ExceptionsHelper ;
11
12
12
13
import javax .net .ssl .SSLEngine ;
32
33
*
33
34
* Producing writes for a channel is more complicated. The method {@link #needsNonApplicationWrite()} can be
34
35
* called to determine if this driver needs to produce more data to advance the handshake or close process.
35
- * If that method returns true, {@link #nonApplicationWrite(SSLOutboundBuffer )} should be called (and the
36
+ * If that method returns true, {@link #nonApplicationWrite()} should be called (and the
36
37
* data produced then flushed to the channel) until no further non-application writes are needed.
37
38
*
38
39
* If no non-application writes are needed, {@link #readyForApplicationWrites()} can be called to determine
39
40
* if the driver is ready to consume application data. (Note: It is possible that
40
41
* {@link #readyForApplicationWrites()} and {@link #needsNonApplicationWrite()} can both return false if the
41
42
* driver is waiting on non-application data from the peer.) If the driver indicates it is ready for
42
- * application writes, {@link #write(FlushOperation, SSLOutboundBuffer )} can be called. This method will
43
+ * application writes, {@link #write(FlushOperation)} can be called. This method will
43
44
* encrypt flush operation application data and place it in the outbound buffer for flushing to a channel.
44
45
*
45
46
* If you are ready to close the channel {@link #initiateClose()} should be called. After that is called, the
@@ -53,6 +54,8 @@ public class SSLDriver implements AutoCloseable {
53
54
private static final FlushOperation EMPTY_FLUSH_OPERATION = new FlushOperation (EMPTY_BUFFERS , (r , t ) -> {});
54
55
55
56
private final SSLEngine engine ;
57
+ // TODO: When the bytes are actually recycled, we need to test that they are released on driver close
58
+ private final SSLOutboundBuffer outboundBuffer = new SSLOutboundBuffer ((n ) -> new Page (ByteBuffer .allocate (n )));
56
59
private final boolean isClientMode ;
57
60
// This should only be accessed by the network thread associated with this channel, so nothing needs to
58
61
// be volatile.
@@ -107,6 +110,10 @@ public ByteBuffer getNetworkReadBuffer() {
107
110
return networkReadBuffer ;
108
111
}
109
112
113
+ public SSLOutboundBuffer getOutboundBuffer () {
114
+ return outboundBuffer ;
115
+ }
116
+
110
117
public void read (InboundChannelBuffer buffer ) throws SSLException {
111
118
Mode modePriorToRead ;
112
119
do {
@@ -125,14 +132,14 @@ public boolean needsNonApplicationWrite() {
125
132
return currentMode .needsNonApplicationWrite ();
126
133
}
127
134
128
- public int write (FlushOperation applicationBytes , SSLOutboundBuffer outboundBuffer ) throws SSLException {
129
- return currentMode .write (applicationBytes , outboundBuffer );
135
+ public int write (FlushOperation applicationBytes ) throws SSLException {
136
+ return currentMode .write (applicationBytes );
130
137
}
131
138
132
- public void nonApplicationWrite (SSLOutboundBuffer outboundBuffer ) throws SSLException {
139
+ public void nonApplicationWrite () throws SSLException {
133
140
assert currentMode .isApplication () == false : "Should not be called if driver is in application mode" ;
134
141
if (currentMode .isApplication () == false ) {
135
- currentMode .write (EMPTY_FLUSH_OPERATION , outboundBuffer );
142
+ currentMode .write (EMPTY_FLUSH_OPERATION );
136
143
} else {
137
144
throw new AssertionError ("Attempted to non-application write from invalid mode: " + currentMode .modeName ());
138
145
}
@@ -148,6 +155,7 @@ public boolean isClosed() {
148
155
149
156
@ Override
150
157
public void close () throws SSLException {
158
+ outboundBuffer .close ();
151
159
ArrayList <SSLException > closingExceptions = new ArrayList <>(2 );
152
160
closingInternal ();
153
161
CloseMode closeMode = (CloseMode ) this .currentMode ;
@@ -276,7 +284,7 @@ private interface Mode {
276
284
277
285
void read (InboundChannelBuffer buffer ) throws SSLException ;
278
286
279
- int write (FlushOperation applicationBytes , SSLOutboundBuffer outboundBuffer ) throws SSLException ;
287
+ int write (FlushOperation applicationBytes ) throws SSLException ;
280
288
281
289
boolean needsNonApplicationWrite ();
282
290
@@ -296,18 +304,17 @@ private class HandshakeMode implements Mode {
296
304
297
305
private void startHandshake () throws SSLException {
298
306
handshakeStatus = engine .getHandshakeStatus ();
299
- if (handshakeStatus != SSLEngineResult .HandshakeStatus .NEED_UNWRAP &&
300
- handshakeStatus != SSLEngineResult .HandshakeStatus .NEED_WRAP ) {
307
+ if (handshakeStatus != SSLEngineResult .HandshakeStatus .NEED_UNWRAP ) {
301
308
try {
302
- handshake (null );
309
+ handshake ();
303
310
} catch (SSLException e ) {
304
311
closingInternal ();
305
312
throw e ;
306
313
}
307
314
}
308
315
}
309
316
310
- private void handshake (SSLOutboundBuffer outboundBuffer ) throws SSLException {
317
+ private void handshake () throws SSLException {
311
318
boolean continueHandshaking = true ;
312
319
while (continueHandshaking ) {
313
320
switch (handshakeStatus ) {
@@ -316,15 +323,7 @@ private void handshake(SSLOutboundBuffer outboundBuffer) throws SSLException {
316
323
continueHandshaking = false ;
317
324
break ;
318
325
case NEED_WRAP :
319
- if (outboundBuffer != null ) {
320
- handshakeStatus = wrap (outboundBuffer ).getHandshakeStatus ();
321
- // If we need NEED_TASK we should run the tasks immediately
322
- if (handshakeStatus != SSLEngineResult .HandshakeStatus .NEED_TASK ) {
323
- continueHandshaking = false ;
324
- }
325
- } else {
326
- continueHandshaking = false ;
327
- }
326
+ handshakeStatus = wrap (outboundBuffer ).getHandshakeStatus ();
328
327
break ;
329
328
case NEED_TASK :
330
329
runTasks ();
@@ -351,7 +350,7 @@ public void read(InboundChannelBuffer buffer) throws SSLException {
351
350
try {
352
351
SSLEngineResult result = unwrap (buffer );
353
352
handshakeStatus = result .getHandshakeStatus ();
354
- handshake (null );
353
+ handshake ();
355
354
// If we are done handshaking we should exit the handshake read
356
355
continueUnwrap = result .bytesConsumed () > 0 && currentMode .isHandshake ();
357
356
} catch (SSLException e ) {
@@ -362,9 +361,9 @@ public void read(InboundChannelBuffer buffer) throws SSLException {
362
361
}
363
362
364
363
@ Override
365
- public int write (FlushOperation applicationBytes , SSLOutboundBuffer outboundBuffer ) throws SSLException {
364
+ public int write (FlushOperation applicationBytes ) throws SSLException {
366
365
try {
367
- handshake (outboundBuffer );
366
+ handshake ();
368
367
} catch (SSLException e ) {
369
368
closingInternal ();
370
369
throw e ;
@@ -444,7 +443,7 @@ public void read(InboundChannelBuffer buffer) throws SSLException {
444
443
}
445
444
446
445
@ Override
447
- public int write (FlushOperation applicationBytes , SSLOutboundBuffer outboundBuffer ) throws SSLException {
446
+ public int write (FlushOperation applicationBytes ) throws SSLException {
448
447
boolean continueWrap = true ;
449
448
int totalBytesProduced = 0 ;
450
449
while (continueWrap && applicationBytes .isFullyFlushed () == false ) {
@@ -538,7 +537,7 @@ public void read(InboundChannelBuffer buffer) throws SSLException {
538
537
}
539
538
540
539
@ Override
541
- public int write (FlushOperation applicationBytes , SSLOutboundBuffer outboundBuffer ) throws SSLException {
540
+ public int write (FlushOperation applicationBytes ) throws SSLException {
542
541
int bytesProduced = 0 ;
543
542
if (engine .isOutboundDone () == false ) {
544
543
bytesProduced += wrap (outboundBuffer ).bytesProduced ();
@@ -549,6 +548,8 @@ public int write(FlushOperation applicationBytes, SSLOutboundBuffer outboundBuff
549
548
closeInboundAndSwallowPeerDidNotCloseException ();
550
549
}
551
550
}
551
+ } else {
552
+ needToSendClose = false ;
552
553
}
553
554
return bytesProduced ;
554
555
}
0 commit comments