Skip to content

Commit 5e09712

Browse files
committed
quic: fast path for stream writes
Similar to the fast-path for reads, writes are buffered in an unsynchronized []byte allowing for lock-free small writes. For golang/go#58547 Change-Id: I305cb5f91eff662a473f44a4bc051acc7c213e4c Reviewed-on: https://go-review.googlesource.com/c/net/+/564496 LUCI-TryBot-Result: Go LUCI <[email protected]> Reviewed-by: Jonathan Amsterdam <[email protected]>
1 parent 08d27e3 commit 5e09712

File tree

3 files changed

+62
-3
lines changed

3 files changed

+62
-3
lines changed

internal/quic/pipe.go

+12
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,18 @@ func (p *pipe) peek(n int64) []byte {
148148
return b[:min(int64(len(b)), n)]
149149
}
150150

151+
// availableBuffer returns the available contiguous, allocated buffer space
152+
// following the pipe window.
153+
//
154+
// This is used by the stream write fast path, which makes multiple writes into the pipe buffer
155+
// without a lock, and then adjusts p.end at a later time with a lock held.
156+
func (p *pipe) availableBuffer() []byte {
157+
if p.tail == nil {
158+
return nil
159+
}
160+
return p.tail.b[p.end-p.tail.off:]
161+
}
162+
151163
// discardBefore discards all data prior to off.
152164
func (p *pipe) discardBefore(off int64) {
153165
for p.head != nil && p.head.end() < off {

internal/quic/stream.go

+48-2
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,10 @@ type Stream struct {
5858
outdone chan struct{} // closed when all data sent
5959

6060
// Unsynchronized buffers, used for lock-free fast path.
61-
inbuf []byte // received data
62-
inbufoff int // bytes of inbuf which have been consumed
61+
inbuf []byte // received data
62+
inbufoff int // bytes of inbuf which have been consumed
63+
outbuf []byte // written data
64+
outbufoff int // bytes of outbuf which contain data to write
6365

6466
// Atomic stream state bits.
6567
//
@@ -313,7 +315,14 @@ func (s *Stream) Write(b []byte) (n int, err error) {
313315
if s.IsReadOnly() {
314316
return 0, errors.New("write to read-only stream")
315317
}
318+
if len(b) > 0 && len(s.outbuf)-s.outbufoff >= len(b) {
319+
// Fast path: The data to write fits in s.outbuf.
320+
copy(s.outbuf[s.outbufoff:], b)
321+
s.outbufoff += len(b)
322+
return len(b), nil
323+
}
316324
canWrite := s.outgate.lock()
325+
s.flushFastOutputBuffer()
317326
for {
318327
// The first time through this loop, we may or may not be write blocked.
319328
// We exit the loop after writing all data, so on subsequent passes through
@@ -373,17 +382,51 @@ func (s *Stream) Write(b []byte) (n int, err error) {
373382
// If we have bytes left to send, we're blocked.
374383
canWrite = false
375384
}
385+
if lim := s.out.start + s.outmaxbuf - s.out.end - 1; lim > 0 {
386+
// If s.out has space allocated and available to be written into,
387+
// then reference it in s.outbuf for fast-path writes.
388+
//
389+
// It's perhaps a bit pointless to limit s.outbuf to the send buffer limit.
390+
// We've already allocated this buffer so we aren't saving any memory
391+
// by not using it.
392+
// For now, we limit it anyway to make it easier to reason about limits.
393+
//
394+
// We set the limit to one less than the send buffer limit (the -1 above)
395+
// so that a write which completely fills the buffer will overflow
396+
// s.outbuf and trigger a flush.
397+
s.outbuf = s.out.availableBuffer()
398+
if int64(len(s.outbuf)) > lim {
399+
s.outbuf = s.outbuf[:lim]
400+
}
401+
}
376402
s.outUnlock()
377403
return n, nil
378404
}
379405

380406
// WriteBytes writes a single byte to the stream.
381407
func (s *Stream) WriteByte(c byte) error {
408+
if s.outbufoff < len(s.outbuf) {
409+
s.outbuf[s.outbufoff] = c
410+
s.outbufoff++
411+
return nil
412+
}
382413
b := [1]byte{c}
383414
_, err := s.Write(b[:])
384415
return err
385416
}
386417

418+
func (s *Stream) flushFastOutputBuffer() {
419+
if s.outbuf == nil {
420+
return
421+
}
422+
// Commit data previously written to s.outbuf.
423+
// s.outbuf is a reference to a buffer in s.out, so we just need to record
424+
// that the output buffer has been extended.
425+
s.out.end += int64(s.outbufoff)
426+
s.outbuf = nil
427+
s.outbufoff = 0
428+
}
429+
387430
// Flush flushes data written to the stream.
388431
// It does not wait for the peer to acknowledge receipt of the data.
389432
// Use Close to wait for the peer's acknowledgement.
@@ -394,6 +437,7 @@ func (s *Stream) Flush() {
394437
}
395438

396439
func (s *Stream) flushLocked() {
440+
s.flushFastOutputBuffer()
397441
s.outopened.set()
398442
if s.outflushed < s.outwin {
399443
s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
@@ -509,6 +553,8 @@ func (s *Stream) resetInternal(code uint64, userClosed bool) {
509553
// extra RESET_STREAM in this case is harmless.
510554
s.outreset.set()
511555
s.outresetcode = code
556+
s.outbuf = nil
557+
s.outbufoff = 0
512558
s.out.discardBefore(s.out.end)
513559
s.outunsent = rangeset[int64]{}
514560
s.outblocked.clear()

internal/quic/stream_test.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ func TestStreamWriteBlockedByStreamFlowControl(t *testing.T) {
100100
if err != nil {
101101
t.Fatalf("write with available output buffer: unexpected error: %v", err)
102102
}
103+
s.Flush()
103104
tc.wantFrame("write blocked by flow control triggers a STREAM_DATA_BLOCKED frame",
104105
packetType1RTT, debugFrameStreamDataBlocked{
105106
id: s.id,
@@ -111,6 +112,7 @@ func TestStreamWriteBlockedByStreamFlowControl(t *testing.T) {
111112
if err != nil {
112113
t.Fatalf("write with available output buffer: unexpected error: %v", err)
113114
}
115+
s.Flush()
114116
tc.wantIdle("adding more blocked data does not trigger another STREAM_DATA_BLOCKED")
115117

116118
// Provide some flow control window.
@@ -1349,7 +1351,6 @@ func TestStreamFlushImplicitExact(t *testing.T) {
13491351
id: s.id,
13501352
data: want[0:4],
13511353
})
1352-
13531354
})
13541355
}
13551356

0 commit comments

Comments
 (0)