Skip to content

Commit 541cb69

Browse files
committed
SpigotMC#3392: Consolidate flushes, leading to reduces syscalls and higher performance
Based on Netty FlushConsolidationHandler
1 parent 71990e3 commit 541cb69

File tree

6 files changed

+386
-1
lines changed

6 files changed

+386
-1
lines changed

pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
<url>https://github.com/SpigotMC/BungeeCord/blob/master/LICENSE</url>
2323
<distribution>repo</distribution>
2424
</license>
25+
<license>
26+
<name>Apache License Version 2.0</name>
27+
<url>https://www.apache.org/licenses/LICENSE-2.0</url>
28+
<comments>Applies only to original versions of files in proxy/src/main/java/net/md_5/bungee/netty/flush/</comments>
29+
</license>
2530
</licenses>
2631

2732
<developers>

proxy/src/main/java/net/md_5/bungee/UserConnection.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import net.md_5.bungee.netty.ChannelWrapper;
4848
import net.md_5.bungee.netty.HandlerBoss;
4949
import net.md_5.bungee.netty.PipelineUtils;
50+
import net.md_5.bungee.netty.flush.BungeeFlushConsolidationHandler;
51+
import net.md_5.bungee.netty.flush.FlushSignalingHandler;
5052
import net.md_5.bungee.protocol.DefinedPacket;
5153
import net.md_5.bungee.protocol.MinecraftDecoder;
5254
import net.md_5.bungee.protocol.MinecraftEncoder;
@@ -315,7 +317,7 @@ public void connect(final ServerConnectRequest request)
315317

316318
pendingConnects.add( target );
317319

318-
ChannelInitializer initializer = new ChannelInitializer()
320+
ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>()
319321
{
320322
@Override
321323
protected void initChannel(Channel ch) throws Exception
@@ -324,6 +326,11 @@ protected void initChannel(Channel ch) throws Exception
324326
ch.pipeline().addAfter( PipelineUtils.FRAME_DECODER, PipelineUtils.PACKET_DECODER, new MinecraftDecoder( Protocol.HANDSHAKE, false, getPendingConnection().getVersion() ) );
325327
ch.pipeline().addAfter( PipelineUtils.FRAME_PREPENDER, PipelineUtils.PACKET_ENCODER, new MinecraftEncoder( Protocol.HANDSHAKE, false, getPendingConnection().getVersion() ) );
326328
ch.pipeline().get( HandlerBoss.class ).setHandler( new ServerConnector( bungee, UserConnection.this, target ) );
329+
330+
ch.pipeline().addFirst( PipelineUtils.FLUSH_CONSOLIDATION, BungeeFlushConsolidationHandler.newInstance( false ) );
331+
ch.pipeline().addFirst( PipelineUtils.FLUSH_SIGNALING, new FlushSignalingHandler( UserConnection.this.ch.getFlushConsolidationHandler( true ) ) );
332+
333+
UserConnection.this.ch.setFlushSignalingTarget( ch.pipeline().get( BungeeFlushConsolidationHandler.class ) );
327334
}
328335
};
329336
ChannelFutureListener listener = new ChannelFutureListener()

proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java

+22
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import lombok.Setter;
1212
import net.md_5.bungee.compress.PacketCompressor;
1313
import net.md_5.bungee.compress.PacketDecompressor;
14+
import net.md_5.bungee.netty.flush.BungeeFlushConsolidationHandler;
15+
import net.md_5.bungee.netty.flush.FlushSignalingHandler;
1416
import net.md_5.bungee.protocol.MinecraftDecoder;
1517
import net.md_5.bungee.protocol.MinecraftEncoder;
1618
import net.md_5.bungee.protocol.PacketWrapper;
@@ -47,6 +49,26 @@ public void setVersion(int protocol)
4749
ch.pipeline().get( MinecraftEncoder.class ).setProtocolVersion( protocol );
4850
}
4951

52+
public void setFlushSignalingTarget(BungeeFlushConsolidationHandler target)
53+
{
54+
FlushSignalingHandler handler = ch.pipeline().get( FlushSignalingHandler.class );
55+
if ( handler == null )
56+
{
57+
ch.pipeline().addFirst( PipelineUtils.FLUSH_SIGNALING, handler = new FlushSignalingHandler( target ) );
58+
}
59+
handler.setTarget( target );
60+
}
61+
62+
public BungeeFlushConsolidationHandler getFlushConsolidationHandler(boolean toClient)
63+
{
64+
BungeeFlushConsolidationHandler handler = ch.pipeline().get( BungeeFlushConsolidationHandler.class );
65+
if ( handler == null )
66+
{
67+
ch.pipeline().addFirst( PipelineUtils.FLUSH_CONSOLIDATION, handler = BungeeFlushConsolidationHandler.newInstance( toClient ) );
68+
}
69+
return handler;
70+
}
71+
5072
public void write(Object packet)
5173
{
5274
if ( !closed )

proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java

+2
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ protected void initChannel(Channel ch) throws Exception
9595
public static final String FRAME_PREPENDER = "frame-prepender";
9696
public static final String LEGACY_DECODER = "legacy-decoder";
9797
public static final String LEGACY_KICKER = "legacy-kick";
98+
public static final String FLUSH_CONSOLIDATION = "flush-consolidation";
99+
public static final String FLUSH_SIGNALING = "flush-signaling";
98100

99101
private static boolean epoll;
100102

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* The original file is licensed under the following license:
3+
*
4+
* Copyright 2016 The Netty Project
5+
*
6+
* The Netty Project licenses this file to you under the Apache License,
7+
* version 2.0 (the "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at:
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations
16+
* under the License.
17+
*
18+
* ---
19+
*
20+
* This file is based on the io/netty/handler/flush/FlushConsolidationHandler.java file from Netty (v4.1).
21+
* It was modified to fit to bungee's use of forwarded connections.
22+
* All modifications are licensed under the "Modified BSD 3-Clause License" to be found at
23+
* https://github.com/SpigotMC/BungeeCord/blob/master/LICENSE
24+
*/
25+
package net.md_5.bungee.netty.flush;
26+
27+
import io.netty.channel.Channel;
28+
import io.netty.channel.ChannelDuplexHandler;
29+
import io.netty.channel.ChannelHandler;
30+
import io.netty.channel.ChannelHandlerContext;
31+
import io.netty.channel.ChannelOutboundHandler;
32+
import io.netty.channel.ChannelOutboundInvoker;
33+
import io.netty.channel.ChannelPipeline;
34+
import io.netty.channel.ChannelPromise;
35+
import io.netty.util.internal.ObjectUtil;
36+
import java.util.concurrent.Future;
37+
38+
/**
39+
* {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
40+
* operations (which also includes
41+
* {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
42+
* {@link ChannelOutboundInvoker#writeAndFlush(Object)} /
43+
* {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}).
44+
* <p>
45+
* Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
46+
* in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
47+
* as much as possible.
48+
* <p>
49+
* If a {@link FlushSignalingHandler} signalises a read loop is currently ongoing,
50+
* {@link #flush(ChannelHandlerContext)} will not be passed on to the next {@link ChannelOutboundHandler} in the
51+
* {@link ChannelPipeline}, as it will pick up any pending flushes when
52+
* {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
53+
* If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
54+
* <ul>
55+
* <li>if {@code false}, flushes are passed on to the next handler directly;</li>
56+
* <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
57+
* high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
58+
* batching multiple flushes into one.</li>
59+
* </ul>
60+
* If {@code explicitFlushAfterFlushes} is reached the flush will be forwarded as well (whether while in a read loop, or
61+
* while batching outside of a read loop).
62+
* <p>
63+
* If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
64+
* <p>
65+
* The {@link BungeeFlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
66+
* {@link ChannelPipeline} to have the best effect.
67+
*/
68+
public final class BungeeFlushConsolidationHandler extends ChannelDuplexHandler
69+
{
70+
private final int explicitFlushAfterFlushes;
71+
private final boolean consolidateWhenNoReadInProgress;
72+
private final Runnable flushTask;
73+
private int flushPendingCount;
74+
boolean readInProgress;
75+
ChannelHandlerContext ctx;
76+
private Future<?> nextScheduledFlush;
77+
78+
/**
79+
* The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a
80+
* read loop, or while batching outside of a read loop).
81+
*/
82+
public static final int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256;
83+
84+
/**
85+
* Creates a new instance with bungee's default values
86+
*
87+
* @param toClient whether this handler is for a client connection
88+
* @return a new instance of BungeeFlushConsolidationHandler
89+
*/
90+
public static BungeeFlushConsolidationHandler newInstance(boolean toClient)
91+
{
92+
// Currently the toClient boolean is ignored. It is present in case we find different parameters neccessary
93+
// for client and server connections.
94+
return new BungeeFlushConsolidationHandler( 20, false );
95+
}
96+
97+
/**
98+
* Create new instance which explicit flush after {@value DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES} pending flush
99+
* operations at the latest.
100+
*/
101+
private BungeeFlushConsolidationHandler()
102+
{
103+
this( DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, false );
104+
}
105+
106+
/**
107+
* Create new instance which doesn't consolidate flushes when no read is in progress.
108+
*
109+
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
110+
*/
111+
private BungeeFlushConsolidationHandler(int explicitFlushAfterFlushes)
112+
{
113+
this( explicitFlushAfterFlushes, false );
114+
}
115+
116+
/**
117+
* Create new instance.
118+
*
119+
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
120+
* @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
121+
* ongoing.
122+
*/
123+
private BungeeFlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress)
124+
{
125+
this.explicitFlushAfterFlushes = ObjectUtil.checkPositive( explicitFlushAfterFlushes, "explicitFlushAfterFlushes" );
126+
this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
127+
this.flushTask = consolidateWhenNoReadInProgress ? new Runnable()
128+
{
129+
@Override
130+
public void run()
131+
{
132+
if ( flushPendingCount > 0 && !readInProgress )
133+
{
134+
flushPendingCount = 0;
135+
nextScheduledFlush = null;
136+
ctx.flush();
137+
} // else we'll flush when the read completes
138+
}
139+
} : null;
140+
}
141+
142+
@Override
143+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception
144+
{
145+
this.ctx = ctx;
146+
}
147+
148+
@Override
149+
public void flush(ChannelHandlerContext ctx) throws Exception
150+
{
151+
if ( readInProgress )
152+
{
153+
// If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
154+
// we only need to flush if we reach the explicitFlushAfterFlushes limit.
155+
if ( ++flushPendingCount == explicitFlushAfterFlushes )
156+
{
157+
flushNow( ctx );
158+
}
159+
} else if ( consolidateWhenNoReadInProgress )
160+
{
161+
// Flush immediately if we reach the threshold, otherwise schedule
162+
if ( ++flushPendingCount == explicitFlushAfterFlushes )
163+
{
164+
flushNow( ctx );
165+
} else
166+
{
167+
scheduleFlush( ctx );
168+
}
169+
} else
170+
{
171+
// Always flush directly
172+
flushNow( ctx );
173+
}
174+
}
175+
176+
@Override
177+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
178+
{
179+
// To ensure we not miss to flush anything, do it now.
180+
resetReadAndFlushIfNeeded( ctx );
181+
ctx.fireExceptionCaught( cause );
182+
}
183+
184+
@Override
185+
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
186+
{
187+
// Try to flush one last time if flushes are pending before disconnect the channel.
188+
resetReadAndFlushIfNeeded( ctx );
189+
ctx.disconnect( promise );
190+
}
191+
192+
@Override
193+
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
194+
{
195+
// Try to flush one last time if flushes are pending before close the channel.
196+
resetReadAndFlushIfNeeded( ctx );
197+
ctx.close( promise );
198+
}
199+
200+
@Override
201+
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
202+
{
203+
if ( !ctx.channel().isWritable() )
204+
{
205+
// The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
206+
flushIfNeeded( ctx );
207+
}
208+
ctx.fireChannelWritabilityChanged();
209+
}
210+
211+
@Override
212+
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
213+
{
214+
flushIfNeeded( ctx );
215+
}
216+
217+
void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx)
218+
{
219+
readInProgress = false;
220+
flushIfNeeded( ctx );
221+
}
222+
223+
void flushIfNeeded(ChannelHandlerContext ctx)
224+
{
225+
if ( flushPendingCount > 0 )
226+
{
227+
flushNow( ctx );
228+
}
229+
}
230+
231+
private void flushNow(ChannelHandlerContext ctx)
232+
{
233+
cancelScheduledFlush();
234+
flushPendingCount = 0;
235+
ctx.flush();
236+
}
237+
238+
private void scheduleFlush(final ChannelHandlerContext ctx)
239+
{
240+
if ( nextScheduledFlush == null )
241+
{
242+
// Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
243+
nextScheduledFlush = ctx.channel().eventLoop().submit( flushTask );
244+
}
245+
}
246+
247+
private void cancelScheduledFlush()
248+
{
249+
if ( nextScheduledFlush != null )
250+
{
251+
nextScheduledFlush.cancel( false );
252+
nextScheduledFlush = null;
253+
}
254+
}
255+
}

0 commit comments

Comments
 (0)