Skip to content

Commit f652014

Browse files
committed
SpigotMC#3392: Consolidate flushes to reduce syscalls, improves performance
Based on Netty FlushConsolidationHandler
1 parent f486a25 commit f652014

File tree

7 files changed

+395
-1
lines changed

7 files changed

+395
-1
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
<url>https://github.com/SpigotMC/BungeeCord/blob/master/LICENSE</url>
2323
<distribution>repo</distribution>
2424
</license>
25+
<license>
26+
<name>Apache License Version 2.0</name>
27+
<url>https://www.apache.org/licenses/LICENSE-2.0</url>
28+
<comments>Applies only to original versions of files in proxy/src/main/java/net/md_5/bungee/netty/flush/</comments>
29+
</license>
2530
</licenses>
2631

2732
<developers>

proxy/src/main/java/net/md_5/bungee/ServerConnector.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,10 @@ private void cutThrough(ServerConnection server)
367367
user.setServer( server );
368368
ch.getHandle().pipeline().get( HandlerBoss.class ).setHandler( new DownstreamBridge( bungee, user, server ) );
369369

370+
// Updates the flush handler connections (the get/set methods add the channel handlers if needed)
371+
ch.setFlushSignalingTarget( user.getCh().getFlushConsolidationHandler( false ) );
372+
user.getCh().setFlushSignalingTarget( ch.getFlushConsolidationHandler( true ) );
373+
370374
bungee.getPluginManager().callEvent( new ServerSwitchEvent( user, from ) );
371375

372376
thisState = State.FINISHED;

proxy/src/main/java/net/md_5/bungee/UserConnection.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.Queue;
2222
import java.util.UUID;
2323
import java.util.logging.Level;
24+
import lombok.AccessLevel;
2425
import lombok.Getter;
2526
import lombok.NonNull;
2627
import lombok.RequiredArgsConstructor;
@@ -319,7 +320,7 @@ public void connect(final ServerConnectRequest request)
319320

320321
pendingConnects.add( target );
321322

322-
ChannelInitializer initializer = new ChannelInitializer()
323+
ChannelInitializer<Channel> initializer = new ChannelInitializer<Channel>()
323324
{
324325
@Override
325326
protected void initChannel(Channel ch) throws Exception

proxy/src/main/java/net/md_5/bungee/netty/ChannelWrapper.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import lombok.Setter;
1212
import net.md_5.bungee.compress.PacketCompressor;
1313
import net.md_5.bungee.compress.PacketDecompressor;
14+
import net.md_5.bungee.netty.flush.BungeeFlushConsolidationHandler;
15+
import net.md_5.bungee.netty.flush.FlushSignalingHandler;
1416
import net.md_5.bungee.protocol.DefinedPacket;
1517
import net.md_5.bungee.protocol.MinecraftDecoder;
1618
import net.md_5.bungee.protocol.MinecraftEncoder;
@@ -69,6 +71,37 @@ public void setVersion(int protocol)
6971
ch.pipeline().get( MinecraftEncoder.class ).setProtocolVersion( protocol );
7072
}
7173

74+
/**
75+
* Set the {@link FlushSignalingHandler} target. If the handler is absent, one will be added.
76+
* @param target the (new) target for the flush signaling handler
77+
*/
78+
public void setFlushSignalingTarget(BungeeFlushConsolidationHandler target)
79+
{
80+
FlushSignalingHandler handler = ch.pipeline().get( FlushSignalingHandler.class );
81+
if ( handler == null )
82+
{
83+
ch.pipeline().addFirst( PipelineUtils.FLUSH_SIGNALING, new FlushSignalingHandler( target ) );
84+
} else
85+
{
86+
handler.setTarget( target );
87+
}
88+
}
89+
90+
/**
91+
* Get the flush consolidation handler of this channel. If none is present, one will be added.
92+
* @param toClient whether this channel is a bungee-client connection
93+
* @return the flush consolidation handler for this channel
94+
*/
95+
public BungeeFlushConsolidationHandler getFlushConsolidationHandler(boolean toClient)
96+
{
97+
BungeeFlushConsolidationHandler handler = ch.pipeline().get( BungeeFlushConsolidationHandler.class );
98+
if ( handler == null )
99+
{
100+
ch.pipeline().addFirst( PipelineUtils.FLUSH_CONSOLIDATION, handler = BungeeFlushConsolidationHandler.newInstance( toClient ) );
101+
}
102+
return handler;
103+
}
104+
72105
public void write(Object packet)
73106
{
74107
if ( !closed )

proxy/src/main/java/net/md_5/bungee/netty/PipelineUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ protected void initChannel(Channel ch) throws Exception
100100
public static final String FRAME_PREPENDER = "frame-prepender";
101101
public static final String LEGACY_DECODER = "legacy-decoder";
102102
public static final String LEGACY_KICKER = "legacy-kick";
103+
public static final String FLUSH_CONSOLIDATION = "flush-consolidation";
104+
public static final String FLUSH_SIGNALING = "flush-signaling";
103105

104106
private static boolean epoll;
105107

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
/*
2+
* The original file is licensed under the following license:
3+
*
4+
* Copyright 2016 The Netty Project
5+
*
6+
* The Netty Project licenses this file to you under the Apache License,
7+
* version 2.0 (the "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at:
9+
*
10+
* https://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
* License for the specific language governing permissions and limitations
16+
* under the License.
17+
*
18+
* ---
19+
*
20+
* This file is based on the io/netty/handler/flush/FlushConsolidationHandler.java file from Netty (v4.1).
21+
* It was modified to fit to bungee's use of forwarded connections.
22+
* All modifications are licensed under the "Modified BSD 3-Clause License" to be found at
23+
* https://github.com/SpigotMC/BungeeCord/blob/master/LICENSE
24+
*/
25+
package net.md_5.bungee.netty.flush;
26+
27+
import io.netty.channel.Channel;
28+
import io.netty.channel.ChannelDuplexHandler;
29+
import io.netty.channel.ChannelHandler;
30+
import io.netty.channel.ChannelHandlerContext;
31+
import io.netty.channel.ChannelOutboundHandler;
32+
import io.netty.channel.ChannelOutboundInvoker;
33+
import io.netty.channel.ChannelPipeline;
34+
import io.netty.channel.ChannelPromise;
35+
import io.netty.util.internal.ObjectUtil;
36+
import java.util.concurrent.Future;
37+
38+
/**
39+
* {@link ChannelDuplexHandler} which consolidates {@link Channel#flush()} / {@link ChannelHandlerContext#flush()}
40+
* operations (which also includes
41+
* {@link Channel#writeAndFlush(Object)} / {@link Channel#writeAndFlush(Object, ChannelPromise)} and
42+
* {@link ChannelOutboundInvoker#writeAndFlush(Object)} /
43+
* {@link ChannelOutboundInvoker#writeAndFlush(Object, ChannelPromise)}).
44+
* <p>
45+
* Flush operations are generally speaking expensive as these may trigger a syscall on the transport level. Thus it is
46+
* in most cases (where write latency can be traded with throughput) a good idea to try to minimize flush operations
47+
* as much as possible.
48+
* <p>
49+
* If a {@link FlushSignalingHandler} signalises a read loop is currently ongoing,
50+
* {@link #flush(ChannelHandlerContext)} will not be passed on to the next {@link ChannelOutboundHandler} in the
51+
* {@link ChannelPipeline}, as it will pick up any pending flushes when
52+
* {@link #channelReadComplete(ChannelHandlerContext)} is triggered.
53+
* If no read loop is ongoing, the behavior depends on the {@code consolidateWhenNoReadInProgress} constructor argument:
54+
* <ul>
55+
* <li>if {@code false}, flushes are passed on to the next handler directly;</li>
56+
* <li>if {@code true}, the invocation of the next handler is submitted as a separate task on the event loop. Under
57+
* high throughput, this gives the opportunity to process other flushes before the task gets executed, thus
58+
* batching multiple flushes into one.</li>
59+
* </ul>
60+
* If {@code explicitFlushAfterFlushes} is reached the flush will be forwarded as well (whether while in a read loop, or
61+
* while batching outside of a read loop).
62+
* <p>
63+
* If the {@link Channel} becomes non-writable it will also try to execute any pending flush operations.
64+
* <p>
65+
* The {@link BungeeFlushConsolidationHandler} should be put as first {@link ChannelHandler} in the
66+
* {@link ChannelPipeline} to have the best effect.
67+
*/
68+
public final class BungeeFlushConsolidationHandler extends ChannelDuplexHandler
69+
{
70+
private final int explicitFlushAfterFlushes;
71+
private final boolean consolidateWhenNoReadInProgress;
72+
private final Runnable flushTask;
73+
private int flushPendingCount;
74+
boolean readInProgress;
75+
ChannelHandlerContext ctx;
76+
private Future<?> nextScheduledFlush;
77+
78+
/**
79+
* The default number of flushes after which a flush will be forwarded to downstream handlers (whether while in a
80+
* read loop, or while batching outside of a read loop).
81+
*/
82+
public static final int DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES = 256;
83+
84+
/**
85+
* Creates a new instance with bungee's default values
86+
*
87+
* @param toClient whether this handler is for a bungee-client connection
88+
* @return a new instance of BungeeFlushConsolidationHandler
89+
*/
90+
public static BungeeFlushConsolidationHandler newInstance(boolean toClient)
91+
{
92+
// Currently the toClient boolean is ignored. It is present in case we find different parameters neccessary
93+
// for client and server connections.
94+
return new BungeeFlushConsolidationHandler( 20, false );
95+
}
96+
97+
/**
98+
* Create new instance which explicit flush after {@value DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES} pending flush
99+
* operations at the latest.
100+
*/
101+
private BungeeFlushConsolidationHandler()
102+
{
103+
this( DEFAULT_EXPLICIT_FLUSH_AFTER_FLUSHES, false );
104+
}
105+
106+
/**
107+
* Create new instance which doesn't consolidate flushes when no read is in progress.
108+
*
109+
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
110+
*/
111+
private BungeeFlushConsolidationHandler(int explicitFlushAfterFlushes)
112+
{
113+
this( explicitFlushAfterFlushes, false );
114+
}
115+
116+
/**
117+
* Create new instance.
118+
*
119+
* @param explicitFlushAfterFlushes the number of flushes after which an explicit flush will be done.
120+
* @param consolidateWhenNoReadInProgress whether to consolidate flushes even when no read loop is currently
121+
* ongoing.
122+
*/
123+
private BungeeFlushConsolidationHandler(int explicitFlushAfterFlushes, boolean consolidateWhenNoReadInProgress)
124+
{
125+
this.explicitFlushAfterFlushes = ObjectUtil.checkPositive( explicitFlushAfterFlushes, "explicitFlushAfterFlushes" );
126+
this.consolidateWhenNoReadInProgress = consolidateWhenNoReadInProgress;
127+
this.flushTask = consolidateWhenNoReadInProgress ? new Runnable()
128+
{
129+
@Override
130+
public void run()
131+
{
132+
if ( flushPendingCount > 0 && !readInProgress )
133+
{
134+
flushPendingCount = 0;
135+
nextScheduledFlush = null;
136+
ctx.flush();
137+
} // else we'll flush when the read completes
138+
}
139+
} : null;
140+
}
141+
142+
@Override
143+
public void handlerAdded(ChannelHandlerContext ctx) throws Exception
144+
{
145+
this.ctx = ctx;
146+
}
147+
148+
@Override
149+
public void flush(ChannelHandlerContext ctx) throws Exception
150+
{
151+
if ( readInProgress )
152+
{
153+
// If there is still a read in progress we are sure we will see a channelReadComplete(...) call. Thus
154+
// we only need to flush if we reach the explicitFlushAfterFlushes limit.
155+
if ( ++flushPendingCount == explicitFlushAfterFlushes )
156+
{
157+
flushNow( ctx );
158+
}
159+
} else if ( consolidateWhenNoReadInProgress )
160+
{
161+
// Flush immediately if we reach the threshold, otherwise schedule
162+
if ( ++flushPendingCount == explicitFlushAfterFlushes )
163+
{
164+
flushNow( ctx );
165+
} else
166+
{
167+
scheduleFlush( ctx );
168+
}
169+
} else
170+
{
171+
// Always flush directly
172+
flushNow( ctx );
173+
}
174+
}
175+
176+
@Override
177+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
178+
{
179+
// To ensure we not miss to flush anything, do it now.
180+
resetReadAndFlushIfNeeded( ctx );
181+
ctx.fireExceptionCaught( cause );
182+
}
183+
184+
@Override
185+
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
186+
{
187+
// Try to flush one last time if flushes are pending before disconnect the channel.
188+
resetReadAndFlushIfNeeded( ctx );
189+
ctx.disconnect( promise );
190+
}
191+
192+
@Override
193+
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception
194+
{
195+
// Try to flush one last time if flushes are pending before close the channel.
196+
resetReadAndFlushIfNeeded( ctx );
197+
ctx.close( promise );
198+
}
199+
200+
@Override
201+
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception
202+
{
203+
if ( !ctx.channel().isWritable() )
204+
{
205+
// The writability of the channel changed to false, so flush all consolidated flushes now to free up memory.
206+
flushIfNeeded( ctx );
207+
}
208+
ctx.fireChannelWritabilityChanged();
209+
}
210+
211+
@Override
212+
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception
213+
{
214+
flushIfNeeded( ctx );
215+
}
216+
217+
void resetReadAndFlushIfNeeded(ChannelHandlerContext ctx)
218+
{
219+
readInProgress = false;
220+
flushIfNeeded( ctx );
221+
}
222+
223+
void flushIfNeeded(ChannelHandlerContext ctx)
224+
{
225+
if ( flushPendingCount > 0 )
226+
{
227+
flushNow( ctx );
228+
}
229+
}
230+
231+
private void flushNow(ChannelHandlerContext ctx)
232+
{
233+
cancelScheduledFlush();
234+
flushPendingCount = 0;
235+
ctx.flush();
236+
}
237+
238+
private void scheduleFlush(final ChannelHandlerContext ctx)
239+
{
240+
if ( nextScheduledFlush == null )
241+
{
242+
// Run as soon as possible, but still yield to give a chance for additional writes to enqueue.
243+
nextScheduledFlush = ctx.channel().eventLoop().submit( flushTask );
244+
}
245+
}
246+
247+
private void cancelScheduledFlush()
248+
{
249+
if ( nextScheduledFlush != null )
250+
{
251+
nextScheduledFlush.cancel( false );
252+
nextScheduledFlush = null;
253+
}
254+
}
255+
}

0 commit comments

Comments
 (0)