-
Notifications
You must be signed in to change notification settings - Fork 311
Fix net.Conn deadlines tearing down the connection when no reads or writes are active #350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,7 +6,9 @@ import ( | |
"io" | ||
"math" | ||
"net" | ||
"os" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
) | ||
|
||
|
@@ -43,15 +45,26 @@ func NetConn(ctx context.Context, c *Conn, msgType MessageType) net.Conn { | |
msgType: msgType, | ||
} | ||
|
||
var cancel context.CancelFunc | ||
nc.writeContext, cancel = context.WithCancel(ctx) | ||
nc.writeTimer = time.AfterFunc(math.MaxInt64, cancel) | ||
var writeCancel context.CancelFunc | ||
nc.writeContext, writeCancel = context.WithCancel(ctx) | ||
nc.writeTimer = time.AfterFunc(math.MaxInt64, func() { | ||
nc.afterWriteDeadline.Store(true) | ||
if nc.writing.Load() { | ||
writeCancel() | ||
} | ||
}) | ||
if !nc.writeTimer.Stop() { | ||
<-nc.writeTimer.C | ||
} | ||
|
||
nc.readContext, cancel = context.WithCancel(ctx) | ||
nc.readTimer = time.AfterFunc(math.MaxInt64, cancel) | ||
var readCancel context.CancelFunc | ||
nc.readContext, readCancel = context.WithCancel(ctx) | ||
nc.readTimer = time.AfterFunc(math.MaxInt64, func() { | ||
nc.afterReadDeadline.Store(true) | ||
if nc.reading.Load() { | ||
readCancel() | ||
} | ||
}) | ||
if !nc.readTimer.Stop() { | ||
<-nc.readTimer.C | ||
} | ||
|
@@ -63,11 +76,15 @@ type netConn struct { | |
c *Conn | ||
msgType MessageType | ||
|
||
writeTimer *time.Timer | ||
writeContext context.Context | ||
writeTimer *time.Timer | ||
writeContext context.Context | ||
writing atomic.Bool | ||
afterWriteDeadline atomic.Bool | ||
|
||
readTimer *time.Timer | ||
readContext context.Context | ||
readTimer *time.Timer | ||
readContext context.Context | ||
reading atomic.Bool | ||
afterReadDeadline atomic.Bool | ||
|
||
readMu sync.Mutex | ||
eofed bool | ||
|
@@ -81,16 +98,34 @@ func (c *netConn) Close() error { | |
} | ||
|
||
func (c *netConn) Write(p []byte) (int, error) { | ||
if c.afterWriteDeadline.Load() { | ||
return 0, os.ErrDeadlineExceeded | ||
} | ||
|
||
if swapped := c.writing.CompareAndSwap(false, true); !swapped { | ||
panic("Concurrent writes not allowed") | ||
} | ||
defer c.writing.Store(false) | ||
|
||
err := c.c.Write(c.writeContext, c.msgType, p) | ||
if err != nil { | ||
return 0, err | ||
} | ||
|
||
return len(p), nil | ||
} | ||
|
||
func (c *netConn) Read(p []byte) (int, error) { | ||
if c.afterReadDeadline.Load() { | ||
return 0, os.ErrDeadlineExceeded | ||
} | ||
|
||
c.readMu.Lock() | ||
defer c.readMu.Unlock() | ||
if swapped := c.reading.CompareAndSwap(false, true); !swapped { | ||
panic("Concurrent reads not allowed") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. concurrent reads should be allowed |
||
} | ||
defer c.reading.Store(false) | ||
|
||
if c.eofed { | ||
return 0, io.EOF | ||
|
@@ -151,16 +186,18 @@ func (c *netConn) SetWriteDeadline(t time.Time) error { | |
if t.IsZero() { | ||
c.writeTimer.Stop() | ||
} else { | ||
c.writeTimer.Reset(t.Sub(time.Now())) | ||
c.writeTimer.Reset(time.Until(t)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice, didn't realize we had |
||
} | ||
c.afterWriteDeadline.Store(false) | ||
return nil | ||
} | ||
|
||
func (c *netConn) SetReadDeadline(t time.Time) error { | ||
if t.IsZero() { | ||
c.readTimer.Stop() | ||
} else { | ||
c.readTimer.Reset(t.Sub(time.Now())) | ||
c.readTimer.Reset(time.Until(t)) | ||
} | ||
c.afterReadDeadline.Store(false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. while highly unlikely this can race with the timer routine as it's possible the timer has already set off and the timer goroutine has already set |
||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
concurrent writes should be allowed