From fae0c9893324887c52ed5c2f1389ed9993e2fb0f Mon Sep 17 00:00:00 2001 From: Paolo Patierno Date: Sat, 5 Jan 2019 22:42:31 +0100 Subject: [PATCH] Added 0 handled as valid value for required ack Added handling of InvalidRequiredAcks Added handling of producer not waiting for response if required ack is 0 --- conn.go | 24 +++++++++++++----------- writer.go | 3 ++- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/conn.go b/conn.go index 853ea6945..7802188c9 100644 --- a/conn.go +++ b/conn.go @@ -870,7 +870,7 @@ func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) // connection requests when producing messages. func (c *Conn) SetRequiredAcks(n int) error { switch n { - case -1, 1: + case -1, 0, 1: atomic.StoreInt32(&c.requiredAcks, int32(n)) return nil default: @@ -939,21 +939,23 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func return err } - deadline, size, lock, err := c.waitResponse(d, id) - if err != nil { - return err - } + if c.requiredAcks != 0 { + deadline, size, lock, err := c.waitResponse(d, id) + if err != nil { + return err + } - if err = read(deadline, size); err != nil { - switch err.(type) { - case Error: - default: - c.conn.Close() + if err = read(deadline, size); err != nil { + switch err.(type) { + case Error: + default: + c.conn.Close() + } } + lock.Unlock() } d.unsetConnReadDeadline() - lock.Unlock() return err } diff --git a/writer.go b/writer.go index e183e636f..9340509cf 100644 --- a/writer.go +++ b/writer.go @@ -636,7 +636,8 @@ func (w *writer) dial() (conn *Conn, err error) { t1 := time.Now() w.stats.dials.observe(1) w.stats.dialTime.observeDuration(t1.Sub(t0)) - conn.SetRequiredAcks(w.requiredAcks) + // set the error for a potential invalid requiredAcks value + err = conn.SetRequiredAcks(w.requiredAcks) break } }