Skip to content

Consumer doesn't receive any messages #483

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vikusku opened this issue Aug 14, 2020 · 1 comment
Closed

Consumer doesn't receive any messages #483

vikusku opened this issue Aug 14, 2020 · 1 comment

Comments

@vikusku
Copy link

vikusku commented Aug 14, 2020

Started having issues after updating kafka-go to v0.4.1 release. First Writer was throwing nil pointer exception (similar to #477). I reverted back to v0.3.7 and then upgraded it to v0.3.8 but I still have problem with Reader not getting any messages. I can see in kafka UI tool that messages are pushed. I made a small test app using your code examples outside of my main code base just to test whether any of my code causes the problem, but it didn't help. Could you please check if it's kafka-go problem or give me tips on how I can debug. Here is a code of my test app.

package main

import (
	"context"
	"crypto/tls"
	"fmt"
	"github.com/segmentio/kafka-go"
	"github.com/segmentio/kafka-go/sasl/plain"
	"log"
	"time"
)

var topic = "my-test-topic"

func main() {
	createTopic()
	produceMessage()
	go consumeMessage()

	time.Sleep(5 * time.Minute)
}

func createTopic() {
	dialer := &kafka.Dialer{
		SASLMechanism: plain.Mechanism{
			Username: <USERNAME_VALUE>,
			Password: <PASSWORD_VALUE>
		},
		DualStack: true,
		// In order to enable TLS support TLS config must be initialized.
		// If the TLS field is nil, it will not connect with TLS.
		TLS: &tls.Config{},
	}

	conn, err := dialer.DialContext(context.Background(), "tcp", <SERVER_URL>)
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	config := kafka.TopicConfig{
		Topic:             topic,
		NumPartitions:     6,
		ReplicationFactor: 3,
	}

	conn.CreateTopics(config)
}

func produceMessage() {
	w := kafka.NewWriter(kafka.WriterConfig{
		Brokers: []string{<SERVER_URL>},
		Topic:   topic,
		Dialer: &kafka.Dialer{
			SASLMechanism: plain.Mechanism{
				Username: <USERNAME_VALUE>,
				Password: <PASSWORD_VALUE>,
			},
			DualStack: true,
			// In order to enable TLS support TLS config must be initialized.
			// If the TLS field is nil, it will not connect with TLS.
			TLS: &tls.Config{},
		},
		Balancer: &kafka.Hash{},
		Async:    false,
	})

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Key:   []byte("Key-A"),
			Value: []byte("Hello World!"),
		},
		kafka.Message{
			Key:   []byte("Key-B"),
			Value: []byte("One!"),
		},
		kafka.Message{
			Key:   []byte("Key-C"),
			Value: []byte("Two!"),
		},
	)

	if err != nil {
		panic(err)
	}

	w.Close()
}

func consumeMessage() {
	log.Println("Consuming messages")
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{<SERVER_URL>},
		GroupID:   "consumer-hospitality-group-id",
		Topic:     topic,
		MinBytes:  10e3, // 10KB
		MaxBytes:  10e6, // 10MB
		Dialer: &kafka.Dialer{
			SASLMechanism: plain.Mechanism{
				Username: <USERNAME_VALUE>,
				Password: <PASSWORD_VALUE>,
			},
			DualStack: true,
			// In order to enable TLS support TLS config must be initialized.
			// If the TLS field is nil, it will not connect with TLS.
			TLS: &tls.Config{},
		},
	})

	for {
		m, err := r.ReadMessage(context.Background())
		if err != nil {
			break
		}
		fmt.Printf("message at topic/partition/offset %v/%v/%v: %s = %s\n", m.Topic, m.Partition, m.Offset, string(m.Key), string(m.Value))
	}

	r.Close()
}

UPD. I tested again an actual app. It seems that Reader gets messages only from 2 partitions out of 6.

@vikusku
Copy link
Author

vikusku commented Aug 14, 2020

Seems like those were issues on the broker side. Problem automagically got resolved on its own.

@vikusku vikusku closed this as completed Aug 14, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant