Skip to content

Commit 20f17fc

Browse files
committed
Revert "move writer retries so async messages are retried (#382)"
This reverts commit bf3be08.
1 parent d5019b0 commit 20f17fc

File tree

1 file changed

+63
-48
lines changed

1 file changed

+63
-48
lines changed

writer.go

+63-48
Original file line numberDiff line numberDiff line change
@@ -308,50 +308,83 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
308308
res = make(chan error, len(msgs))
309309
}
310310
t0 := time.Now()
311-
defer w.stats.writeTime.observeDuration(time.Since(t0))
312311

313-
w.mutex.RLock()
314-
closed := w.closed
315-
w.mutex.RUnlock()
312+
for attempt := 0; attempt < w.config.MaxAttempts; attempt++ {
313+
w.mutex.RLock()
316314

317-
if closed {
318-
return io.ErrClosedPipe
319-
}
320-
321-
for i, msg := range msgs {
315+
if w.closed {
316+
w.mutex.RUnlock()
317+
return io.ErrClosedPipe
318+
}
322319

323-
if int(msg.size()) > w.config.BatchBytes {
324-
err := MessageTooLargeError{
325-
Message: msg,
326-
Remaining: msgs[i+1:],
320+
for i, msg := range msgs {
321+
if int(msg.size()) > w.config.BatchBytes {
322+
err := MessageTooLargeError{
323+
Message: msg,
324+
Remaining: msgs[i+1:],
325+
}
326+
w.mutex.RUnlock()
327+
return err
328+
}
329+
select {
330+
case w.msgs <- writerMessage{
331+
msg: msg,
332+
res: res,
333+
}:
334+
case <-ctx.Done():
335+
w.mutex.RUnlock()
336+
return ctx.Err()
327337
}
328-
return err
329338
}
330339

331-
wm := writerMessage{msg: msg, res: res}
340+
w.mutex.RUnlock()
332341

333-
select {
334-
case w.msgs <- wm:
335-
case <-ctx.Done():
336-
return ctx.Err()
342+
if w.config.Async {
343+
break
337344
}
338-
}
339345

340-
if w.config.Async {
341-
return nil
342-
}
346+
var retry []Message
347+
348+
for i := 0; i != len(msgs); i++ {
349+
select {
350+
case e := <-res:
351+
if e != nil {
352+
if we, ok := e.(*writerError); ok {
353+
w.stats.retries.observe(1)
354+
retry, err = append(retry, we.msg), we.err
355+
} else {
356+
err = e
357+
}
358+
}
359+
case <-ctx.Done():
360+
return ctx.Err()
361+
}
362+
}
343363

344-
for i := 0; i != len(msgs); i++ {
364+
if msgs = retry; len(msgs) == 0 {
365+
break
366+
}
367+
368+
timer := time.NewTimer(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
345369
select {
346-
case e := <-res:
347-
if e != nil {
348-
err = e
370+
case <-timer.C:
371+
// Only clear the error (so we retry the loop) if we have more retries, otherwise
372+
// we risk silencing the error.
373+
if attempt < w.config.MaxAttempts-1 {
374+
err = nil
349375
}
350376
case <-ctx.Done():
351-
return ctx.Err()
377+
err = ctx.Err()
378+
case <-w.done:
379+
err = io.ErrClosedPipe
352380
}
353-
}
381+
timer.Stop()
354382

383+
if err != nil {
384+
break
385+
}
386+
}
387+
w.stats.writeTime.observeDuration(time.Since(t0))
355388
return err
356389
}
357390

@@ -538,7 +571,6 @@ type writer struct {
538571
codec CompressionCodec
539572
logger Logger
540573
errorLogger Logger
541-
maxAttempts int
542574
}
543575

544576
func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
@@ -558,7 +590,6 @@ func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
558590
codec: config.CompressionCodec,
559591
logger: config.Logger,
560592
errorLogger: config.ErrorLogger,
561-
maxAttempts: config.MaxAttempts,
562593
}
563594
w.join.Add(1)
564595
go w.run()
@@ -670,15 +701,13 @@ func (w *writer) run() {
670701
if len(batch) == 0 {
671702
continue
672703
}
673-
674704
var err error
675-
if conn, err = w.writeWithRetries(conn, batch, resch); err != nil {
705+
if conn, err = w.write(conn, batch, resch); err != nil {
676706
if conn != nil {
677707
conn.Close()
678708
conn = nil
679709
}
680710
}
681-
682711
idleConnDeadline = time.Now().Add(w.idleConnTimeout)
683712
for i := range batch {
684713
batch[i] = Message{}
@@ -708,20 +737,6 @@ func (w *writer) dial() (conn *Conn, err error) {
708737
return
709738
}
710739

711-
func (w *writer) writeWithRetries(conn *Conn, batch []Message, resch [](chan<- error)) (*Conn, error) {
712-
var err error
713-
714-
for attempt := 0; attempt < w.maxAttempts; attempt++ {
715-
conn, err = w.write(conn, batch, resch)
716-
if err == nil {
717-
break
718-
}
719-
w.stats.retries.observe(1)
720-
time.Sleep(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
721-
}
722-
return conn, err
723-
}
724-
725740
func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret *Conn, err error) {
726741
w.stats.writes.observe(1)
727742
if conn == nil {

0 commit comments

Comments
 (0)