Skip to content

Netty4 HTTP authn enhancements (#92220) #96703

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
1009ac5
Add mechanism for early http header validation (#92220)
Tim-Brooks Mar 31, 2023
85bf361
Audit authn success in the `SecurityRestFilter` (#94120)
albertzaharovits Apr 3, 2023
5d57628
Use the remote address from thread context for rest request auditing …
albertzaharovits Apr 3, 2023
5a6881f
Make Authn and Audit take a HTTP interface when dealing with Rest req…
albertzaharovits Apr 4, 2023
7c956e9
Parameterize the `Security` plugin with the request-wise thread conte…
albertzaharovits Apr 10, 2023
cd0262a
Only remove telltale response headers in case of 401 or 403 (#95189)
albertzaharovits Jun 13, 2023
265b2de
Ensure http channel validator does not alter ES thread context (#95498)
albertzaharovits Apr 25, 2023
f409613
Security authn via netty channel validator (#95112)
albertzaharovits May 4, 2023
8d6f2d5
Still authn NIO requests in the SecurityRestFilter
albertzaharovits Jun 15, 2023
250dcf8
Handle Unauthenticated OPTIONS requests (#96061)
albertzaharovits May 12, 2023
f76d8a0
Reject OPTIONS requests with a body (#96357)
albertzaharovits May 29, 2023
928d910
Test that malformed HTTP request is not validated (#95886)
albertzaharovits May 8, 2023
1483bf7
Fix SecurityNetty4HttpServerTransportTests (#96178)
albertzaharovits May 17, 2023
578a06a
Fix SecurityNetty4HttpServerTransportTests (#95913)
albertzaharovits May 8, 2023
f1daa20
Test validation fails for unwrapped requests (#95949)
albertzaharovits May 9, 2023
c616d6b
Test large request is never dispatched (#95962)
albertzaharovits May 10, 2023
140f323
Make the HttpHeadersValidationException a RuntimeException
albertzaharovits Jun 20, 2023
74bced2
Reinstate authn failure for SecurityRestFilterTests
albertzaharovits Jun 20, 2023
aeb8443
Merge branch '7.17' into backport-7.17-DOS715
albertzaharovits Aug 23, 2023
b0c7b65
Nit test
albertzaharovits Aug 23, 2023
70c8225
Nit: assert request type in SecurityRestFilter
albertzaharovits Aug 23, 2023
0a6e189
Fix assert request type fallout
albertzaharovits Aug 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.http.netty4;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.DecoderResult;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ContextPreservingActionListener;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.http.netty4.internal.HttpValidator;
import org.elasticsearch.transport.Transports;

import java.util.ArrayDeque;

import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_PERMANENTLY;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.DROPPING_DATA_UNTIL_NEXT_REQUEST;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.FORWARDING_DATA_UNTIL_NEXT_REQUEST;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.QUEUEING_DATA;
import static org.elasticsearch.http.netty4.Netty4HttpHeaderValidator.State.WAITING_TO_START;

public class Netty4HttpHeaderValidator extends ChannelInboundHandlerAdapter {

private final HttpValidator validator;
private final ThreadContext threadContext;
private ArrayDeque<HttpObject> pending = new ArrayDeque<>(4);
private State state = WAITING_TO_START;

public Netty4HttpHeaderValidator(HttpValidator validator, ThreadContext threadContext) {
this.validator = validator;
this.threadContext = threadContext;
}

State getState() {
return state;
}

@SuppressWarnings("fallthrough")
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
assert msg instanceof HttpObject;
final HttpObject httpObject = (HttpObject) msg;

switch (state) {
case WAITING_TO_START:
assert pending.isEmpty();
pending.add(ReferenceCountUtil.retain(httpObject));
requestStart(ctx);
assert state == QUEUEING_DATA;
break;
case QUEUEING_DATA:
pending.add(ReferenceCountUtil.retain(httpObject));
break;
case FORWARDING_DATA_UNTIL_NEXT_REQUEST:
assert pending.isEmpty();
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
ctx.fireChannelRead(httpObject);
break;
case DROPPING_DATA_UNTIL_NEXT_REQUEST:
assert pending.isEmpty();
if (httpObject instanceof LastHttpContent) {
state = WAITING_TO_START;
}
// fall-through
case DROPPING_DATA_PERMANENTLY:
assert pending.isEmpty();
ReferenceCountUtil.release(httpObject); // consume without enqueuing
break;
}

setAutoReadForState(ctx, state);
}

private void requestStart(ChannelHandlerContext ctx) {
assert state == WAITING_TO_START;

if (pending.isEmpty()) {
return;
}

final HttpObject httpObject = pending.getFirst();
final HttpRequest httpRequest;
if (httpObject instanceof HttpRequest && httpObject.decoderResult().isSuccess()) {
// a properly decoded HTTP start message is expected to begin validation
// anything else is probably an error that the downstream HTTP message aggregator will have to handle
httpRequest = (HttpRequest) httpObject;
} else {
httpRequest = null;
}

state = QUEUEING_DATA;

if (httpRequest == null) {
// this looks like a malformed request and will forward without validation
ctx.channel().eventLoop().submit(() -> forwardFullRequest(ctx));
} else {
Transports.assertDefaultThreadContext(threadContext);
// this prevents thread-context changes to propagate to the validation listener
// atm, the validation listener submits to the event loop executor, which doesn't know about the ES thread-context,
// so this is just a defensive play, in case the code inside the listener changes to not use the event loop executor
ContextPreservingActionListener<Void> contextPreservingActionListener = new ContextPreservingActionListener<>(
threadContext.wrapRestorable(threadContext.newStoredContext(false)),
ActionListener.wrap(aVoid ->
// Always use "Submit" to prevent reentrancy concerns if we are still on event loop
ctx.channel().eventLoop().submit(() -> forwardFullRequest(ctx)),
e -> ctx.channel().eventLoop().submit(() -> forwardRequestWithDecoderExceptionAndNoContent(ctx, e))
)
);
// this prevents thread-context changes to propagate beyond the validation, as netty worker threads are reused
try (ThreadContext.StoredContext ignore = threadContext.newStoredContext(false)) {
validator.validate(httpRequest, ctx.channel(), contextPreservingActionListener);
}
}
}

private void forwardFullRequest(ChannelHandlerContext ctx) {
Transports.assertDefaultThreadContext(threadContext);
assert ctx.channel().eventLoop().inEventLoop();
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;

boolean fullRequestForwarded = forwardData(ctx, pending);

assert fullRequestForwarded || pending.isEmpty();
if (fullRequestForwarded) {
state = WAITING_TO_START;
requestStart(ctx);
} else {
state = FORWARDING_DATA_UNTIL_NEXT_REQUEST;
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == FORWARDING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

private void forwardRequestWithDecoderExceptionAndNoContent(ChannelHandlerContext ctx, Exception e) {
Transports.assertDefaultThreadContext(threadContext);
assert ctx.channel().eventLoop().inEventLoop();
assert ctx.channel().config().isAutoRead() == false;
assert state == QUEUEING_DATA;

HttpObject messageToForward = pending.getFirst();
boolean fullRequestDropped = dropData(pending);
if (messageToForward instanceof HttpContent) {
// if the request to forward contained data (which got dropped), replace with empty data
messageToForward = ((HttpContent) messageToForward).replace(Unpooled.EMPTY_BUFFER);
}
messageToForward.setDecoderResult(DecoderResult.failure(e));
ctx.fireChannelRead(messageToForward);

assert fullRequestDropped || pending.isEmpty();
if (fullRequestDropped) {
state = WAITING_TO_START;
requestStart(ctx);
} else {
state = DROPPING_DATA_UNTIL_NEXT_REQUEST;
}

assert state == WAITING_TO_START || state == QUEUEING_DATA || state == DROPPING_DATA_UNTIL_NEXT_REQUEST;
setAutoReadForState(ctx, state);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
state = DROPPING_DATA_PERMANENTLY;
while (true) {
if (dropData(pending) == false) {
break;
}
}
super.channelInactive(ctx);
}

private static boolean forwardData(ChannelHandlerContext ctx, ArrayDeque<HttpObject> pending) {
final int pendingMessages = pending.size();
try {
HttpObject toForward;
while ((toForward = pending.poll()) != null) {
ctx.fireChannelRead(toForward);
ReferenceCountUtil.release(toForward); // reference cnt incremented when enqueued
if (toForward instanceof LastHttpContent) {
return true;
}
}
return false;
} finally {
maybeResizePendingDown(pendingMessages, pending);
}
}

private static boolean dropData(ArrayDeque<HttpObject> pending) {
final int pendingMessages = pending.size();
try {
HttpObject toDrop;
while ((toDrop = pending.poll()) != null) {
ReferenceCountUtil.release(toDrop, 2); // 1 for enqueuing, 1 for consuming
if (toDrop instanceof LastHttpContent) {
return true;
}
}
return false;
} finally {
maybeResizePendingDown(pendingMessages, pending);
}
}

private static void maybeResizePendingDown(int largeSize, ArrayDeque<HttpObject> pending) {
if (pending.size() <= 4 && largeSize > 32) {
// Prevent the ArrayDeque from becoming forever large due to a single large message.
ArrayDeque<HttpObject> old = pending;
pending = new ArrayDeque<>(4);
pending.addAll(old);
}
}

private static void setAutoReadForState(ChannelHandlerContext ctx, State state) {
ctx.channel().config().setAutoRead((state == QUEUEING_DATA || state == DROPPING_DATA_PERMANENTLY) == false);
}

enum State {
WAITING_TO_START,
QUEUEING_DATA,
FORWARDING_DATA_UNTIL_NEXT_REQUEST,
DROPPING_DATA_UNTIL_NEXT_REQUEST,
DROPPING_DATA_PERMANENTLY
}
}
Loading