Skip to content

async writer retries #382

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 22, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 48 additions & 63 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,83 +308,50 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
res = make(chan error, len(msgs))
}
t0 := time.Now()
defer w.stats.writeTime.observeDuration(time.Since(t0))

for attempt := 0; attempt < w.config.MaxAttempts; attempt++ {
w.mutex.RLock()
w.mutex.RLock()
closed := w.closed
w.mutex.RUnlock()

if w.closed {
w.mutex.RUnlock()
return io.ErrClosedPipe
}
if closed {
return io.ErrClosedPipe
}

for i, msg := range msgs {
if int(msg.size()) > w.config.BatchBytes {
err := MessageTooLargeError{
Message: msg,
Remaining: msgs[i+1:],
}
w.mutex.RUnlock()
return err
}
select {
case w.msgs <- writerMessage{
msg: msg,
res: res,
}:
case <-ctx.Done():
w.mutex.RUnlock()
return ctx.Err()
for i, msg := range msgs {

if int(msg.size()) > w.config.BatchBytes {
err := MessageTooLargeError{
Message: msg,
Remaining: msgs[i+1:],
}
return err
}

w.mutex.RUnlock()

if w.config.Async {
break
}
wm := writerMessage{msg: msg, res: res}

var retry []Message

for i := 0; i != len(msgs); i++ {
select {
case e := <-res:
if e != nil {
if we, ok := e.(*writerError); ok {
w.stats.retries.observe(1)
retry, err = append(retry, we.msg), we.err
} else {
err = e
}
}
case <-ctx.Done():
return ctx.Err()
}
select {
case w.msgs <- wm:
case <-ctx.Done():
return ctx.Err()
}
}

if msgs = retry; len(msgs) == 0 {
break
}
if w.config.Async {
return nil
}

timer := time.NewTimer(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
for i := 0; i != len(msgs); i++ {
select {
case <-timer.C:
// Only clear the error (so we retry the loop) if we have more retries, otherwise
// we risk silencing the error.
if attempt < w.config.MaxAttempts-1 {
err = nil
case e := <-res:
if e != nil {
err = e
}
case <-ctx.Done():
err = ctx.Err()
case <-w.done:
err = io.ErrClosedPipe
}
timer.Stop()

if err != nil {
break
return ctx.Err()
}
}
w.stats.writeTime.observeDuration(time.Since(t0))

return err
}

Expand Down Expand Up @@ -571,6 +538,7 @@ type writer struct {
codec CompressionCodec
logger Logger
errorLogger Logger
maxAttempts int
}

func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
Expand All @@ -590,6 +558,7 @@ func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
codec: config.CompressionCodec,
logger: config.Logger,
errorLogger: config.ErrorLogger,
maxAttempts: config.MaxAttempts,
}
w.join.Add(1)
go w.run()
Expand Down Expand Up @@ -701,13 +670,15 @@ func (w *writer) run() {
if len(batch) == 0 {
continue
}

var err error
if conn, err = w.write(conn, batch, resch); err != nil {
if conn, err = w.writeWithRetries(conn, batch, resch); err != nil {
if conn != nil {
conn.Close()
conn = nil
}
}

idleConnDeadline = time.Now().Add(w.idleConnTimeout)
for i := range batch {
batch[i] = Message{}
Expand Down Expand Up @@ -737,6 +708,20 @@ func (w *writer) dial() (conn *Conn, err error) {
return
}

func (w *writer) writeWithRetries(conn *Conn, batch []Message, resch [](chan<- error)) (*Conn, error) {
var err error

for attempt := 0; attempt < w.maxAttempts; attempt++ {
conn, err = w.write(conn, batch, resch)
if err == nil {
break
}
w.stats.retries.observe(1)
time.Sleep(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
}
return conn, err
}

func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret *Conn, err error) {
w.stats.writes.observe(1)
if conn == nil {
Expand Down