Skip to content

Remove nonApplicationWrite from SSLDriver #41829

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public boolean isOpen() {
return closeContext.isDone() == false;
}

void handleException(Exception e) {
protected void handleException(Exception e) {
exceptionHandler.accept(e);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.nio.WriteOperation;

import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.LinkedList;
Expand All @@ -33,7 +34,8 @@
public final class SSLChannelContext extends SocketChannelContext {

private static final long CLOSE_TIMEOUT_NANOS = new TimeValue(10, TimeUnit.SECONDS).nanos();
private static final Runnable DEFAULT_TIMEOUT_CANCELLER = () -> {};
private static final Runnable DEFAULT_TIMEOUT_CANCELLER = () -> {
};

private final SSLDriver sslDriver;
private final InboundChannelBuffer networkReadBuffer;
Expand Down Expand Up @@ -68,9 +70,17 @@ public void register() throws IOException {
public void queueWriteOperation(WriteOperation writeOperation) {
getSelector().assertOnSelectorThread();
if (writeOperation instanceof CloseNotifyOperation) {
sslDriver.initiateClose();
long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime();
closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos);
try {
sslDriver.initiateClose();
SSLOutboundBuffer outboundBuffer = sslDriver.getOutboundBuffer();
if (outboundBuffer.hasEncryptedBytesToFlush()) {
encryptedFlushes.addLast(outboundBuffer.buildNetworkFlushOperation());
}
long relativeNanos = CLOSE_TIMEOUT_NANOS + System.nanoTime();
closeTimeoutCanceller = getSelector().getTaskScheduler().scheduleAtRelativeTime(this::channelCloseTimeout, relativeNanos);
} catch (SSLException e) {
handleException(e);
}
} else {
super.queueWriteOperation(writeOperation);
}
Expand All @@ -92,39 +102,25 @@ public void flushChannel() throws IOException {
}

// If the driver is ready for application writes, we can attempt to proceed with any queued writes.
if (sslDriver.readyForApplicationWrites()) {
FlushOperation unencryptedFlush;
while (pendingChannelFlush() == false && (unencryptedFlush = getPendingFlush()) != null) {
if (unencryptedFlush.isFullyFlushed()) {
currentFlushOperationComplete();
} else {
try {
// Attempt to encrypt application write data. The encrypted data ends up in the
// outbound write buffer.
sslDriver.write(unencryptedFlush);
SSLOutboundBuffer outboundBuffer = sslDriver.getOutboundBuffer();
if (outboundBuffer.hasEncryptedBytesToFlush() == false) {
break;
}
encryptedFlushes.addLast(outboundBuffer.buildNetworkFlushOperation());
// Flush the write buffer to the channel
flushEncryptedOperation();
} catch (IOException e) {
currentFlushOperationFailed(e);
throw e;
FlushOperation unencryptedFlush;
while (pendingChannelFlush() == false && (unencryptedFlush = getPendingFlush()) != null) {
if (unencryptedFlush.isFullyFlushed()) {
currentFlushOperationComplete();
} else {
try {
// Attempt to encrypt application write data. The encrypted data ends up in the
// outbound write buffer.
sslDriver.write(unencryptedFlush);
SSLOutboundBuffer outboundBuffer = sslDriver.getOutboundBuffer();
if (outboundBuffer.hasEncryptedBytesToFlush() == false) {
break;
}
}
}
} else {
// We are not ready for application writes, check if the driver has non-application writes. We
// only want to continue producing new writes if the outbound write buffer is fully flushed.
while (pendingChannelFlush() == false && sslDriver.needsNonApplicationWrite()) {
sslDriver.nonApplicationWrite();
// If non-application writes were produced, flush the outbound write buffer.
SSLOutboundBuffer outboundBuffer = sslDriver.getOutboundBuffer();
if (outboundBuffer.hasEncryptedBytesToFlush()) {
encryptedFlushes.addFirst(outboundBuffer.buildNetworkFlushOperation());
encryptedFlushes.addLast(outboundBuffer.buildNetworkFlushOperation());
// Flush the write buffer to the channel
flushEncryptedOperation();
} catch (IOException e) {
currentFlushOperationFailed(e);
throw e;
}
}
}
Expand All @@ -147,10 +143,10 @@ private void flushEncryptedOperation() throws IOException {
@Override
public boolean readyForFlush() {
getSelector().assertOnSelectorThread();
if (sslDriver.readyForApplicationWrites()) {
if (sslDriver.readyForApplicationData()) {
return pendingChannelFlush() || super.readyForFlush();
} else {
return pendingChannelFlush() || sslDriver.needsNonApplicationWrite();
return pendingChannelFlush();
}
}

Expand Down
Loading