Skip to content

Handling 0 as possible required ack value and InvalidRequiredAcks errors #176

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message)
// connection requests when producing messages.
func (c *Conn) SetRequiredAcks(n int) error {
switch n {
case -1, 1:
case -1, 0, 1:
atomic.StoreInt32(&c.requiredAcks, int32(n))
return nil
default:
Expand Down Expand Up @@ -939,21 +939,23 @@ func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func
return err
}

deadline, size, lock, err := c.waitResponse(d, id)
if err != nil {
return err
}
if c.requiredAcks != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have to check that this is a produce request? do is used for all operations, but "required acks" is only employed when send messages isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree with you but the waitResponse is done at connection level (so in the conn.go file) where as far as I can see from the code we don't have information if it's a producer or a consumer request (which is at higher level). We should do a refactoring for moving the waiting for the response at the upper layer.
Is that really a problem? I mean, the conn is always initialized with -1 as a default value so other requests will work out of the box (and for consumer we cannot change it).
Btw if you think there is a simple way to check for producer request, feedback are really appreciated :-)

Copy link
Contributor Author

@ppatierno ppatierno Jan 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just adding that if the conn is there for representing a connection to the broker, the requiredAcks should not be an information at this so low level imho ... but it was already there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could thing to add a new waitResponse bool parameter for the do function passed by the writeOperation and readOperation used by higher level protocol operation for which we know if it's needed to wait a response or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we're pretty much forced to handle it at this level since it's core to how the kafka protocol works (some requests have responses, some don't, currently we only support the former), and we use the Conn abstraction for all interactions with kafka brokers.

We don't know how the connection is being handled, but we do know what kafka API call is being made, so I think we can figure out whether the "required ack" applies or not based on that, can't we?

It's OK to modify internal APIs to support new use cases as well, if we need to pass more info down to this method then let's do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't know how the connection is being handled, but we do know what kafka API call is being made, so I think we can figure out whether the "required ack" applies or not based on that, can't we?

Which information inside conn.go provides the information about which API is called. Isn't it an information of higher level?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entry point for producing messages is here https://github.com/segmentio/kafka-go/blob/master/conn.go#L828

So this is where we have most of the context for deciding whether waiting for a ack is required or not.

I think the simplest is going to be copying the content of the do method into WriteCompressedMessages, in replacement for the call to writeOperation, and based on whether "required acks" are set or not we wait for the response.

What do you think about this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ppatierno do you still have interest in pushing this through the finish line?

deadline, size, lock, err := c.waitResponse(d, id)
if err != nil {
return err
}

if err = read(deadline, size); err != nil {
switch err.(type) {
case Error:
default:
c.conn.Close()
if err = read(deadline, size); err != nil {
switch err.(type) {
case Error:
default:
c.conn.Close()
}
}
lock.Unlock()
}

d.unsetConnReadDeadline()
lock.Unlock()
return err
}

Expand Down
3 changes: 2 additions & 1 deletion writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,8 @@ func (w *writer) dial() (conn *Conn, err error) {
t1 := time.Now()
w.stats.dials.observe(1)
w.stats.dialTime.observeDuration(t1.Sub(t0))
conn.SetRequiredAcks(w.requiredAcks)
// set the error for a potential invalid requiredAcks value
err = conn.SetRequiredAcks(w.requiredAcks)
break
}
}
Expand Down