16
16
import org .elasticsearch .nio .WriteOperation ;
17
17
18
18
import javax .net .ssl .SSLEngine ;
19
+ import javax .net .ssl .SSLException ;
19
20
import java .io .IOException ;
20
21
import java .nio .channels .ClosedChannelException ;
21
22
import java .util .LinkedList ;
33
34
public final class SSLChannelContext extends SocketChannelContext {
34
35
35
36
private static final long CLOSE_TIMEOUT_NANOS = new TimeValue (10 , TimeUnit .SECONDS ).nanos ();
36
- private static final Runnable DEFAULT_TIMEOUT_CANCELLER = () -> {};
37
+ private static final Runnable DEFAULT_TIMEOUT_CANCELLER = () -> {
38
+ };
37
39
38
40
private final SSLDriver sslDriver ;
39
41
private final InboundChannelBuffer networkReadBuffer ;
@@ -68,9 +70,17 @@ public void register() throws IOException {
68
70
public void queueWriteOperation (WriteOperation writeOperation ) {
69
71
getSelector ().assertOnSelectorThread ();
70
72
if (writeOperation instanceof CloseNotifyOperation ) {
71
- sslDriver .initiateClose ();
72
- long relativeNanos = CLOSE_TIMEOUT_NANOS + System .nanoTime ();
73
- closeTimeoutCanceller = getSelector ().getTaskScheduler ().scheduleAtRelativeTime (this ::channelCloseTimeout , relativeNanos );
73
+ try {
74
+ sslDriver .initiateClose ();
75
+ SSLOutboundBuffer outboundBuffer = sslDriver .getOutboundBuffer ();
76
+ if (outboundBuffer .hasEncryptedBytesToFlush ()) {
77
+ encryptedFlushes .addLast (outboundBuffer .buildNetworkFlushOperation ());
78
+ }
79
+ long relativeNanos = CLOSE_TIMEOUT_NANOS + System .nanoTime ();
80
+ closeTimeoutCanceller = getSelector ().getTaskScheduler ().scheduleAtRelativeTime (this ::channelCloseTimeout , relativeNanos );
81
+ } catch (SSLException e ) {
82
+ handleException (e );
83
+ }
74
84
} else {
75
85
super .queueWriteOperation (writeOperation );
76
86
}
@@ -92,39 +102,25 @@ public void flushChannel() throws IOException {
92
102
}
93
103
94
104
// If the driver is ready for application writes, we can attempt to proceed with any queued writes.
95
- if (sslDriver .readyForApplicationWrites ()) {
96
- FlushOperation unencryptedFlush ;
97
- while (pendingChannelFlush () == false && (unencryptedFlush = getPendingFlush ()) != null ) {
98
- if (unencryptedFlush .isFullyFlushed ()) {
99
- currentFlushOperationComplete ();
100
- } else {
101
- try {
102
- // Attempt to encrypt application write data. The encrypted data ends up in the
103
- // outbound write buffer.
104
- sslDriver .write (unencryptedFlush );
105
- SSLOutboundBuffer outboundBuffer = sslDriver .getOutboundBuffer ();
106
- if (outboundBuffer .hasEncryptedBytesToFlush () == false ) {
107
- break ;
108
- }
109
- encryptedFlushes .addLast (outboundBuffer .buildNetworkFlushOperation ());
110
- // Flush the write buffer to the channel
111
- flushEncryptedOperation ();
112
- } catch (IOException e ) {
113
- currentFlushOperationFailed (e );
114
- throw e ;
105
+ FlushOperation unencryptedFlush ;
106
+ while (pendingChannelFlush () == false && (unencryptedFlush = getPendingFlush ()) != null ) {
107
+ if (unencryptedFlush .isFullyFlushed ()) {
108
+ currentFlushOperationComplete ();
109
+ } else {
110
+ try {
111
+ // Attempt to encrypt application write data. The encrypted data ends up in the
112
+ // outbound write buffer.
113
+ sslDriver .write (unencryptedFlush );
114
+ SSLOutboundBuffer outboundBuffer = sslDriver .getOutboundBuffer ();
115
+ if (outboundBuffer .hasEncryptedBytesToFlush () == false ) {
116
+ break ;
115
117
}
116
- }
117
- }
118
- } else {
119
- // We are not ready for application writes, check if the driver has non-application writes. We
120
- // only want to continue producing new writes if the outbound write buffer is fully flushed.
121
- while (pendingChannelFlush () == false && sslDriver .needsNonApplicationWrite ()) {
122
- sslDriver .nonApplicationWrite ();
123
- // If non-application writes were produced, flush the outbound write buffer.
124
- SSLOutboundBuffer outboundBuffer = sslDriver .getOutboundBuffer ();
125
- if (outboundBuffer .hasEncryptedBytesToFlush ()) {
126
- encryptedFlushes .addFirst (outboundBuffer .buildNetworkFlushOperation ());
118
+ encryptedFlushes .addLast (outboundBuffer .buildNetworkFlushOperation ());
119
+ // Flush the write buffer to the channel
127
120
flushEncryptedOperation ();
121
+ } catch (IOException e ) {
122
+ currentFlushOperationFailed (e );
123
+ throw e ;
128
124
}
129
125
}
130
126
}
@@ -147,10 +143,10 @@ private void flushEncryptedOperation() throws IOException {
147
143
@ Override
148
144
public boolean readyForFlush () {
149
145
getSelector ().assertOnSelectorThread ();
150
- if (sslDriver .readyForApplicationWrites ()) {
146
+ if (sslDriver .readyForApplicationData ()) {
151
147
return pendingChannelFlush () || super .readyForFlush ();
152
148
} else {
153
- return pendingChannelFlush () || sslDriver . needsNonApplicationWrite () ;
149
+ return pendingChannelFlush ();
154
150
}
155
151
}
156
152
0 commit comments