We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
type kafkaErrLogger struct{} func (l *kafkaErrLogger) Printf(f string, args ...interface{}) { glog.V(1).Infof("[ERROR]"+f, args...) } type kafkaInfoLogger struct{} func (l *kafkaInfoLogger) Printf(f string, args ...interface{}) { glog.V(2).Infof(f, args...) } func output(ctx context.Context, ch <-chan []byte) { w := kafka.NewWriter(kafka.WriterConfig{ Brokers: strings.Split(kafkaServers, ";"), Topic: kafkaTopic, Balancer: &kafka.LeastBytes{}, Async: true, QueueCapacity: defaultKafkaQueueCapacity, BatchSize: defaultKafkaBatchSize, Logger: &kafkaInfoLogger{}, ErrorLogger: &kafkaErrLogger{}, }) defer func() { w.Close() wg.Done() glog.V(1).Info("output goroutine exits") }() counter := 0 exit := false for !exit { select { case c := <-ch: w.WriteMessages(ctx, kafka.Message{ Key: []byte(fmt.Sprintf("%s_%d", host, time.Now().Unix())), Value: c, }, ) counter++ if glog.V(1) && counter%10 == 0 { prettyJSON, _ := json.MarshalIndent(w.Stats(), "", "\t") glog.V(1).Infof("Kafka Stats Summary:\n%s", string(prettyJSON)) } glog.V(2).Infof("success WriteMessage to kafka client") case <-ctx.Done(): exit = true } } }
If given an invalid broker address, I cannot find any error message or summary of error. The root cause may be is:
func (w *Writer) partitions() (partitions []int, err error) { for _, broker := range shuffledStrings(w.config.Brokers) { var conn *Conn var plist []Partition if conn, err = w.config.Dialer.Dial("tcp", broker); err != nil { continue } conn.SetReadDeadline(time.Now().Add(w.config.ReadTimeout)) plist, err = conn.ReadPartitions(w.config.Topic) conn.Close() if err == nil { partitions = make([]int, len(plist)) for i, p := range plist { partitions[i] = p.ID } break } } sort.Ints(partitions) return }
no error report out if dial is error
The text was updated successfully, but these errors were encountered:
Hello @nporsche, and thank you for reporting and identifying the source of the problem.
We always welcome contributions, would you be available to submit a fix for this?
Sorry, something went wrong.
Closing since we believe the issue was addressed by #461
achille-roussel
No branches or pull requests
If given an invalid broker address, I cannot find any error message or summary of error. The root cause may be is:
no error report out if dial is error
The text was updated successfully, but these errors were encountered: