Skip to content

Release tag 0.4.2: response from kafka is of unknown type while using kafka.Writer #487

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
riteshmodi opened this issue Aug 18, 2020 · 8 comments
Assignees
Labels

Comments

@riteshmodi
Copy link

Describe the bug
When publishing the message using kafka.Writer from release tag v0.4.2 then I am getting error like this: panic: BUG: promise resolved with impossible value of type <nil> and message is getting written to kafka successfully.

Kafka Version
kafka_2.11-2.0.0

To Reproduce
Set up local kafka broker and send the message using following code:

import (
	"net/http"

	kafka "github.com/segmentio/kafka-go"
)

func GetKafkaProducer() *kafka.Writer {
	return &kafka.Writer{
		Addr:     kafka.TCP("localhost:9092"),
		Topic:    "topicA",
		Balancer: &kafka.LeastBytes{},
	}
}

func PublishMessage(r *http.Request, w *kafka.Writer, msg kafka.Message) {
	w.WriteMessages(r.Context(), msg)
}

Test code

import (
	"context"
	"net/http"
	"testing"
	"time"

	kafka "github.com/segmentio/kafka-go"
	"github.com/stretchr/testify/assert"
)

func TestWriteToKafka(t *testing.T) {

	kw := GetKafkaProducer()
	defer kw.Close()

	msg := kafka.Message{
		Key:   []byte("KeyA"),
		Value: []byte("It seems that issue is in latest release. Check again!!")}

	ctx, cancel := context.WithTimeout(context.Background(), 2000*time.Millisecond)
	defer cancel()
	req, _ := http.NewRequestWithContext(ctx, "POST", "http://google.com", nil)

	PublishMessage(req, kw, msg)
	assert := assert.New(t)
	assert.Fail("")

}

Expected behavior
I am not sure. But I tried the code (provided in README) with release tag 0.4.0 and there is no issue.

Additional context
I tried to debug the issue. It seems to me that we are expecting response from kafka of type type Response = protocol.Message and we are getting different response. I checked console of kafka and zookeeper and there seems to be no issue

Stack trace of the error:

panic: BUG: promise resolved with impossible value of type <nil>

goroutine 36 [running]:
github.com/segmentio/kafka-go.async.await(0xc00007a120, 0x1435480, 0xc00018c140, 0x142f040, 0xc00014a2d0, 0xc0001660c0, 0x0)
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/transport.go:786 +0x287
github.com/segmentio/kafka-go.(*connPool).roundTrip(0xc0000b02c0, 0x1435480, 0xc00018c140, 0x142f040, 0xc00014a2d0, 0x0, 0x0, 0x0, 0x0)
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/transport.go:361 +0x3cd
github.com/segmentio/kafka-go.(*Transport).RoundTrip(0x1663bc0, 0x1435480, 0xc00018c140, 0x1431d80, 0xc000128720, 0x142f040, 0xc00014a2d0, 0x0, 0x0, 0x0, ...)
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/transport.go:157 +0xee
github.com/segmentio/kafka-go.(*Client).roundTrip(0xc000046d78, 0x1435480, 0xc00018c140, 0x0, 0x0, 0x142f040, 0xc00014a2d0, 0x0, 0x0, 0x0, ...)
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/client.go:113 +0xc9
github.com/segmentio/kafka-go.(*Client).Produce(0xc000046d78, 0x1435500, 0xc0001663c0, 0xc0001b7da0, 0xc0001663c0, 0xc0001500e0, 0xc000046d88)
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/produce.go:118 +0x1fc
github.com/segmentio/kafka-go.(*Writer).produce(0xc000126600, 0x0, 0xc0001a4000, 0x0, 0x0, 0x0)
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/writer.go:802 +0x24e
github.com/segmentio/kafka-go.(*Writer).writeBatch(0xc000126600, 0xc000000000, 0xc0001a4000)
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/writer.go:744 +0x30d
github.com/segmentio/kafka-go.(*Writer).newWriteBatch.func1(0xc000126600, 0x0, 0xc0001a4000)
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/writer.go:685 +0x7a
created by github.com/segmentio/kafka-go.(*Writer).newWriteBatch
	/Users/riteshmodi/go/pkg/mod/github.com/segmentio/[email protected]/writer.go:683 +0x16c
@riteshmodi riteshmodi added the bug label Aug 18, 2020
@riteshmodi riteshmodi changed the title Release tag 0.4.2: response from kafka is on unknown type while using kafka.Writer Release tag 0.4.2: response from kafka is of unknown type while using kafka.Writer Aug 18, 2020
@achille-roussel
Copy link
Contributor

Thanks a lot for reporting @riteshmodi!

I left a bit of context relevant to this discussion in #488

@achille-roussel achille-roussel self-assigned this Aug 19, 2020
@KScaesar
Copy link

KScaesar commented Aug 28, 2020

I have the same problem.

when I set , not panic

  1. kafka.Writer.RequiredAcks=kafka.RequireOne

  2. kafka.Writer.RequiredAcks=kafka.RequireAll

when I set , show: panic: BUG: promise resolved with impossible value of type <nil>

  1. kafka.Writer.RequiredAcks=kafka.RequireNone

@neilcook
Copy link
Contributor

neilcook commented Sep 5, 2020

I also get the same panic, except with 0.4.1

@neilcook
Copy link
Contributor

neilcook commented Sep 8, 2020

It seems that the 0.4 branch does not panic when kafka.Writer.RequiredAcks=kafka.RequireNone. Any thoughts on when this will become a released version?

@neilcook
Copy link
Contributor

neilcook commented Sep 8, 2020

Actually I was talking nonsense. The 0.4 branch still has this problem.

@neilcook
Copy link
Contributor

neilcook commented Sep 8, 2020

It seems that even though the Client.Produce() method has the following comment:

// When the request is configured with RequiredAcks=none, both the response and
// the error will be nil on success.

the async.await() method does not expect a nil Response. Changing that method to handle a nil Response removes the panic, although I don't know if there would be side-effects from this:

func (p async) await(ctx context.Context) (Response, error) {
	select {
	case x := <-p:
		if x == nil { // Response will be nil if RequiredAcks was set to None
			return nil, nil
		} else {
			switch v := x.(type) {
			case Response:
				return v, nil
			case error:
				return nil, v
			default:
				panic(fmt.Errorf("BUG: promise resolved with impossible value of type %T", v))
			}
		}
	case <-ctx.Done():
		return nil, ctx.Err()
	}
}

@neilcook
Copy link
Contributor

neilcook commented Sep 9, 2020

Created PR #504 for this issue

@achille-roussel
Copy link
Contributor

Closing since the issue was addressed by #504

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants