Skip to content

Commit 0dbc4a7

Browse files
committed
rate-limiting: propagate back-pressure from queue as HTTP 429's
Adds a proactive handler that rejects new requests with HTTP 429's when the queue has been blocking for more than 10 consecutive seconds, allowing back- pressure to propagate in advance of filling up the connection backlog queue.
1 parent aba42b3 commit 0dbc4a7

File tree

10 files changed

+634
-9
lines changed

10 files changed

+634
-9
lines changed

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,9 @@ Gemfile.bak
44
.bundle
55
vendor
66
.idea
7+
.ci
8+
build/*
9+
.ci/*
10+
.gradle/*
11+
lib/logstash-input-http_jars.rb
12+
logstash-input-http.iml

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 3.10.0
2+
- add improved proactive rate-limiting, rejecting new requests when queue has been actively blocking for more than 10 seconds [#179](https://github.com/logstash-plugins/logstash-input-http/pull/179)
3+
14
## 3.9.0
25
- Netty boss and worker groups are separated [#178](https://github.com/logstash-plugins/logstash-input-http/pull/178)
36
As a result, when shutdown requested incoming connections are closed first and improved graceful shutdown

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.9.0
1+
3.10.0

spec/inputs/helpers.rb

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,8 @@
33

44
def certificate_path(filename)
55
File.join(CERTS_DIR, filename)
6-
end
6+
end
7+
8+
RSpec.configure do |config|
9+
config.formatter = :documentation
10+
end

spec/inputs/http_spec.rb

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,23 +57,85 @@
5757
let(:config) { { "port" => port, "threads" => threads, "max_pending_requests" => max_pending_requests } }
5858

5959
context "when sending more requests than queue slots" do
60-
it "should block when the queue is full" do
60+
it "rejects additional incoming requests with HTTP 429" do
6161
# these will queue and return 200
6262
logstash_queue_size.times.each do |i|
6363
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
6464
expect(response.code).to eq(200)
6565
end
6666

6767
# these will block
68-
(threads + max_pending_requests).times.each do |i|
69-
expect {
70-
client.post("http://127.0.0.1:#{port}", :body => '{}').call
71-
}.to raise_error(Manticore::SocketTimeout)
68+
blocked_calls = (threads + max_pending_requests).times.map do
69+
Thread.new do
70+
begin
71+
{:result => client.post("http://127.0.0.1:#{port}", :body => '{}').call}
72+
rescue Manticore::SocketException, Manticore::SocketTimeout => e
73+
{:exception => e}
74+
end
75+
end
76+
end
77+
78+
sleep 1 # let those requests go, but not so long that our block-detector starts emitting 429's
79+
80+
# by now we should be rejecting with 429 since the backlog is full
81+
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
82+
expect(response.code).to eq(429)
83+
84+
# ensure that our blocked connections did block
85+
aggregate_failures do
86+
blocked_calls.map(&:value).each do |blocked|
87+
expect(blocked[:result]).to be_nil
88+
expect(blocked[:exception]).to be_a_kind_of Manticore::SocketTimeout
89+
end
90+
end
91+
end
92+
end
93+
end
94+
95+
describe "observing queue back-pressure" do
96+
let(:logstash_queue_size) { rand(10) + 1 }
97+
let(:max_pending_requests) { rand(5) + 1 }
98+
let(:threads) { rand(4) + 1 }
99+
let(:logstash_queue) { SizedQueue.new(logstash_queue_size) }
100+
let(:client_options) { {
101+
"request_timeout" => 0.1,
102+
"connect_timeout" => 3,
103+
"socket_timeout" => 0.1
104+
} }
105+
106+
let(:config) { { "port" => port, "threads" => threads, "max_pending_requests" => max_pending_requests } }
107+
108+
context "when sending request to an input that has blocked connections" do
109+
it "rejects incoming requests with HTTP 429" do
110+
# these will queue and return 200
111+
logstash_queue_size.times.each do |i|
112+
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
113+
expect(response.code).to eq(200)
72114
end
73115

74-
# by now we should be rejecting with 429
116+
# these will block
117+
blocked_call = Thread.new do
118+
begin
119+
{:result => client.post("http://127.0.0.1:#{port}", :body => '{}').call}
120+
rescue Manticore::SocketException, Manticore::SocketTimeout => e
121+
{:exception => e}
122+
end
123+
end
124+
125+
sleep 12 # let that requests go, and ensure it is blocking long enough to be problematic
126+
127+
# by now we should be rejecting with 429 since at least one existing request is blocked
128+
# for more than 10s.
75129
response = client.post("http://127.0.0.1:#{port}", :body => '{}').call
76130
expect(response.code).to eq(429)
131+
132+
# ensure that our blocked connections did block
133+
aggregate_failures do
134+
blocked_call.value.tap do |blocked|
135+
expect(blocked[:result]).to be_nil
136+
expect(blocked[:exception]).to be_a_kind_of Manticore::SocketTimeout
137+
end
138+
end
77139
end
78140
end
79141
end

src/main/java/org/logstash/plugins/inputs/http/HttpInitializer.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,12 @@
88
import io.netty.handler.codec.http.HttpResponseStatus;
99
import io.netty.handler.codec.http.HttpServerCodec;
1010
import io.netty.handler.ssl.SslHandler;
11+
import org.logstash.plugins.inputs.http.util.ExecutionObserver;
12+
import org.logstash.plugins.inputs.http.util.ExecutionObservingMessageHandler;
13+
import org.logstash.plugins.inputs.http.util.RejectWhenBlockedInboundHandler;
1114
import org.logstash.plugins.inputs.http.util.SslHandlerProvider;
1215

16+
import java.time.Duration;
1317
import java.util.concurrent.ThreadPoolExecutor;
1418

1519
/**
@@ -22,9 +26,11 @@ public class HttpInitializer extends ChannelInitializer<SocketChannel> {
2226
private final HttpResponseStatus responseStatus;
2327
private final ThreadPoolExecutor executorGroup;
2428

29+
private final ExecutionObserver executionObserver = new ExecutionObserver();
30+
2531
public HttpInitializer(IMessageHandler messageHandler, ThreadPoolExecutor executorGroup,
2632
int maxContentLength, HttpResponseStatus responseStatus) {
27-
this.messageHandler = messageHandler;
33+
this.messageHandler = new ExecutionObservingMessageHandler(executionObserver, messageHandler);
2834
this.executorGroup = executorGroup;
2935
this.maxContentLength = maxContentLength;
3036
this.responseStatus = responseStatus;
@@ -37,7 +43,9 @@ protected void initChannel(SocketChannel socketChannel) throws Exception {
3743
SslHandler sslHandler = sslHandlerProvider.getSslHandler(socketChannel.alloc());
3844
pipeline.addLast(sslHandler);
3945
}
46+
4047
pipeline.addLast(new HttpServerCodec());
48+
pipeline.addLast(new RejectWhenBlockedInboundHandler(executionObserver, Duration.ofSeconds(10)));
4149
pipeline.addLast(new HttpContentDecompressor());
4250
pipeline.addLast(new HttpObjectAggregator(maxContentLength));
4351
pipeline.addLast(new HttpServerHandler(messageHandler.copy(), executorGroup, responseStatus));
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package org.logstash.plugins.inputs.http.util;
2+
3+
import java.lang.invoke.MethodHandles;
4+
import java.time.Duration;
5+
import java.util.Optional;
6+
import java.util.concurrent.atomic.AtomicReference;
7+
import java.util.function.LongSupplier;
8+
9+
/**
10+
* An {@code ExecutionObserver} observes possibly-concurrent execution, and provides information about the
11+
* longest-running observed execution.
12+
*
13+
* <p>
14+
* It is concurrency-safe and non-blocking, and uses plain memory access where practical.
15+
* </p>
16+
*/
17+
public class ExecutionObserver {
18+
private final AtomicReference<Execution> head; // newest execution
19+
private final AtomicReference<Execution> tail; // oldest execution
20+
21+
private final LongSupplier nanosSupplier;
22+
23+
public ExecutionObserver() {
24+
this(System::nanoTime);
25+
}
26+
27+
ExecutionObserver(final LongSupplier nanosSupplier) {
28+
this.nanosSupplier = nanosSupplier;
29+
final Execution anchor = new Execution(nanosSupplier.getAsLong(), true);
30+
this.head = new AtomicReference<>(anchor);
31+
this.tail = new AtomicReference<>(anchor);
32+
}
33+
34+
/**
35+
* @see ExecutionObserver#anyExecuting(Duration)
36+
* @return true if there are any active executions.
37+
*/
38+
public boolean anyExecuting() {
39+
return this.anyExecuting(Duration.ZERO);
40+
}
41+
42+
/**
43+
* @param minimumDuration a threshold to exclude young executions
44+
* @return true if any active execution has been running for at least the provided {@code Duration}
45+
*/
46+
public boolean anyExecuting(final Duration minimumDuration) {
47+
final Execution tailExecution = compactTail();
48+
if (tailExecution.isComplete) {
49+
return false;
50+
} else {
51+
return nanosSupplier.getAsLong() - tailExecution.startNanos >= minimumDuration.toNanos();
52+
}
53+
}
54+
55+
// visible for test
56+
Optional<Duration> longestExecuting() {
57+
final Execution tailExecution = compactTail();
58+
if (tailExecution.isComplete) {
59+
return Optional.empty();
60+
} else {
61+
return Optional.of(Duration.ofNanos(nanosSupplier.getAsLong() - tailExecution.startNanos));
62+
}
63+
}
64+
65+
// test inspections
66+
Stats stats() {
67+
int nodes = 0;
68+
int executing = 0;
69+
70+
Execution candidate = this.tail.get();
71+
while (candidate != null) {
72+
nodes += 1;
73+
if (!candidate.isComplete) {
74+
executing += 1;
75+
}
76+
candidate = candidate.getNextPlain();
77+
}
78+
return new Stats(nodes, executing);
79+
}
80+
81+
static class Stats {
82+
final int nodes;
83+
final int executing;
84+
85+
Stats(int nodes, int executing) {
86+
this.nodes = nodes;
87+
this.executing = executing;
88+
}
89+
}
90+
91+
@FunctionalInterface
92+
public interface ExceptionalSupplier<T, E extends Throwable> {
93+
T get() throws E;
94+
}
95+
96+
public <T,E extends Throwable> T observeExecution(final ExceptionalSupplier<T,E> supplier) throws E {
97+
final Execution execution = startExecution();
98+
try {
99+
return supplier.get();
100+
} finally {
101+
final boolean isCompact = execution.markComplete();
102+
if (!isCompact) {
103+
this.compactTail();
104+
}
105+
}
106+
}
107+
108+
@FunctionalInterface
109+
public interface ExceptionalRunnable<E extends Throwable> {
110+
void run() throws E;
111+
}
112+
113+
public <E extends Throwable> void observeExecution(final ExceptionalRunnable<E> runnable) throws E {
114+
observeExecution(() -> { runnable.run(); return null; });
115+
}
116+
117+
// visible for test
118+
Execution startExecution() {
119+
final Execution newHead = new Execution(nanosSupplier.getAsLong());
120+
121+
// atomically attach the new execution as a new (detached) head
122+
final Execution oldHead = this.head.getAndSet(newHead);
123+
// attach our new head to the old one
124+
oldHead.linkNext(newHead);
125+
126+
return newHead;
127+
}
128+
129+
private Execution compactTail() {
130+
return this.tail.updateAndGet(Execution::chaseTail);
131+
}
132+
133+
static class Execution {
134+
private static final java.lang.invoke.VarHandle NEXT;
135+
static {
136+
try {
137+
MethodHandles.Lookup l = MethodHandles.lookup();
138+
NEXT = l.findVarHandle(Execution.class, "next", Execution.class);
139+
} catch (ReflectiveOperationException e) {
140+
throw new ExceptionInInitializerError(e);
141+
}
142+
}
143+
144+
private final long startNanos;
145+
146+
private volatile boolean isComplete;
147+
private volatile Execution next;
148+
149+
Execution(long startNanos) {
150+
this(startNanos, false);
151+
}
152+
153+
Execution(final long startNanos,
154+
final boolean isComplete) {
155+
this.startNanos = startNanos;
156+
this.isComplete = isComplete;
157+
}
158+
159+
/**
160+
* marks this execution as complete
161+
* @return true if the completion resulted in a compaction
162+
*/
163+
boolean markComplete() {
164+
isComplete = true;
165+
166+
// concurrency: use plain memory for reads because we can tolerate
167+
// completed nodes remaining as the result of a race
168+
final Execution preCompletionNext = this.getNextPlain();
169+
if (preCompletionNext != null) {
170+
final Execution newNext = preCompletionNext.chaseTail();
171+
return (newNext != preCompletionNext) && NEXT.compareAndSet(this, preCompletionNext, newNext);
172+
}
173+
return false;
174+
}
175+
176+
private void linkNext(final Execution proposedNext) {
177+
final Execution witness = (Execution)NEXT.compareAndExchange(this, null, proposedNext);
178+
if (witness != null && witness != proposedNext) {
179+
throw new IllegalStateException();
180+
}
181+
}
182+
183+
/**
184+
* @return the next {@code Execution} that is either not yet complete
185+
* or is the current head, using plain memory access.
186+
*/
187+
private Execution chaseTail() {
188+
Execution compactedTail = this;
189+
Execution candidate = this.getNextPlain();
190+
while (candidate != null && compactedTail.isComplete) {
191+
compactedTail = candidate;
192+
candidate = candidate.getNextPlain();
193+
}
194+
return compactedTail;
195+
}
196+
197+
private Execution getNextPlain() {
198+
return (Execution) NEXT.get(this);
199+
}
200+
}
201+
}

0 commit comments

Comments
 (0)