Skip to content

Consumer group offset getting reset occationaly #410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rjshrjndrn opened this issue Feb 29, 2020 · 6 comments
Closed

Consumer group offset getting reset occationaly #410

rjshrjndrn opened this issue Feb 29, 2020 · 6 comments
Labels

Comments

@rjshrjndrn
Copy link

Hi,
Thank you for this awesome library.
I am facing a small Issue, where when I run this applicaiton for 2 different place( one at a time), occationaly I see the consumer group offset get reset. and reading starts from zero.

Eg: I've a container which runs of k8s, which will read and commit to the consumer group.
For debugging, I scaled that contiainer zero, and ran the application ( main.go) from my local, and it started to read from beginning. Any Ideas why is that.

// This package reads the topic from kafka and prase it as prometheus metrics with optional timestamp.
// Json format should be as described below.
// {
// 	"system": "<category of the metrics, for example: samza>",
// 	"subsystem": "<producer of the metrics, for example: pipeline-metrics>",
// 	"metricts": "< timestamp of the merics, which should be passed to prometheus. This should be in epoch milliseconds>", // This is an optional field
// 	"metrics": [
// 		{
// 			"id": "<name of the metric>", // It can contain alphabets and '-' and '_'. Should should start with alphabet
// 			"value": "< value of the metric>" // Should be of int or float64
// 		}
// 		{
// 			...
// 			...
// 			...
// 		}
// 	],
// 	"dimensions": [ // Labels which will get injectd to each of the above metrics
// 		{
// 			"id": "< name of the label>", // It can contain alphabets and '-' and '_'. Should should start with alphabet
// 			"value": < value of the label>"
// 		}
// 		{
// 			...
// 			...
// 			...
// 		}
// 	]
// }
//
// Example:
// {
//     "system": "samza",
//     "subsystem": "pipeline-metrics",
//     "metricts" : 1582521646464,
//     "metrics": [
//         {
//             "id": "success-message-count",
//             "value": 1
//         },
//         {
//             "id": "skipped-message-count",
//             "value": 1
//         },
//         {
//         }
//     ],
//     "dimensions": [
//         {
//             "id": "job-name",
//             "value": "test-job"
//         },
//         {
//             "id": "partition",
//             "value": 0
//         }
//     ]
// }
//
// kafka_host and kafka_topic environment variables should be set
//
// Example:
// export kafka_host=10.0.0.9:9092
// export kafka_topic=sunbird.metrics.topic
//

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net"
	"net/http"
	"os"
	"strings"
	"time"

	kafka "github.com/rjshrjndrn/kafka-go"
)

// Metrics structure
type Metrics struct {
	System    string      `json:"system"`
	SubSystem string      `json:"subsystem"`
	MetricTS  json.Number `json:"metricTs"`
	Metrics   []struct {
		ID    string  `json:"id"`
		Value float64 `json:"value"`
	} `json:"metrics"`
	// Labels to be added for the metric
	Lables []struct {
		ID string `json:"id"`
		// Even nimbers will be read as string
		Value json.Number `json:"value"`
	} `json:"dimensions"`
}

// Validating metrics name
// Input a list of string, and concatinate with _ after
// removing all - in the provided names
func metricsNameValidator(names ...string) string {
	retName := ""
	for _, name := range names {
		retName += strings.ReplaceAll(name, "-", "_") + "_"
	}
	return strings.TrimRight(retName, "_")
}

// This function will take the metrics input and create prometheus metrics
// output and send it to metrics channel
// So that http endpoint can serve the data
func (metrics *Metrics) pushMetrics() (err error) {
	label := fmt.Sprintf("system=%q,subsystem=%q,", metrics.System, metrics.SubSystem)
	// Creating dictionary of labels
	for _, labels := range metrics.Lables {
		// Lables can't have '-' in it
		label += fmt.Sprintf("%v=%q,", metricsNameValidator(labels.ID), labels.Value)
	}
	// Generating metrics
	for _, metric := range metrics.Metrics {
		retMetrics := ""
		// Adding optional timestamp
		switch metrics.MetricTS {
		case "":
			retMetrics = fmt.Sprintf("%s{%s} %.2f", metricsNameValidator(metrics.System, metrics.SubSystem, metric.ID), strings.TrimRight(label, ","), metric.Value)
		default:
			retMetrics = fmt.Sprintf("%s{%s} %.2f %s", metricsNameValidator(metrics.System, metrics.SubSystem, metric.ID), strings.TrimRight(label, ","), metric.Value, metrics.MetricTS)
		}
		fmt.Printf("%s\n", retMetrics)
		promMetricsChannel <- retMetrics
	}
	return nil
}

// Channel to keep metrics till prometheus scrape that
var promMetricsChannel = make(chan string)

func metricsCreation(data []byte) error {
	metrics := Metrics{}
	// Creating metrics struct
	if err := json.Unmarshal(data, &metrics); err != nil {
		fmt.Printf("Unmarshal error: %q\n", err)
		return err
	}
	metrics.pushMetrics()
	return nil
}

// Http handler
func serve(w http.ResponseWriter, r *http.Request) {
	// Channel to keep track of offset lag
	lagChannel := make(chan int64, 1)
	ctx := r.Context()
	// Creating context
	// Reading topic
	go func(ctx context.Context, r *kafka.Reader) {
		for {
			m, err := r.ReadMessage(ctx)
			if err != nil {
				fmt.Printf("err reading message: %v\n", err)
				break
			}
			fmt.Printf("topic: %q partition: %v offset: %v lag: %d\n ", m.Topic, m.Partition, m.Offset, r.Lag())
			go metricsCreation(m.Value)
			fmt.Println(r.Lag())
			lagChannel <- r.Lag()
		}
	}(ctx, kafkaReader)
	for {
		select {
		case message := <-promMetricsChannel:
			fmt.Println("queue length before printing: ", len(promMetricsChannel))
			fmt.Fprintf(w, "%s\n", message)
			fmt.Println("queue length in printing: ", len(promMetricsChannel))
			// Waiting for 1 ms before quitting
		case <-ctx.Done():
			fmt.Printf("done")
			return
		case lag := <-lagChannel:
			if lag == 0 {
				fmt.Println("queue length: ", len(promMetricsChannel))
				return
			}
		}
	}
}

var kafkaReader *kafka.Reader

func main() {
	// Getting kafka_ip and topic
	kafka_host := os.Getenv("kafka_host")
	kafka_topic := os.Getenv("kafka_topic")
	if kafka_topic == "" || kafka_host == "" {
		log.Fatalf(`"kafka_topic or kafka_host environment variables not set."
For example,
	export kafka_host=10.0.0.9:9092
	kafka_topic=sunbird.metrics.topic`)
	}
	fmt.Printf("kafak_host: %s\nkafka_topic: %s\n", kafka_host, kafka_topic)
	// Checking kafka port and ip are accessible
	fmt.Println("Checking connection to kafka")
	conn, err := net.DialTimeout("tcp", kafka_host, 10*time.Second)
	if err != nil {
		log.Fatalf("Connection error: %s", err)
	}
	conn.Close()
	fmt.Println("kafka is accessible")
	// Initializing kafka
	kafkaReader = kafka.NewReader(kafka.ReaderConfig{
		Brokers:  []string{kafka_host},
		GroupID:  "metrics-reader-test", // Consumer group ID
		Topic:    kafka_topic,
		MinBytes: 10e3, // 10KB
		MaxBytes: 10e6, // 10MB
	})
	defer kafkaReader.Close()
	http.HandleFunc("/metrics", serve)
	log.Fatal(http.ListenAndServe(":8000", nil))
}
@rjshrjndrn rjshrjndrn added the bug label Feb 29, 2020
@anhtuanluu36
Copy link

I have the same issue with Rancher 2.0 and K8S. If I redeploy and keep 1 pod running until a new pod is available, Consumer will reset Offset. However, if I stop an old pod and start a new pod, it works fine.
Do you have any ideas?

@718529333
Copy link
Contributor

#424
this pr may help you
take a chance

@stevevls
Copy link
Contributor

stevevls commented Apr 6, 2020

Hi @rjshrjndrn and @anhtuanluu36. Thanks for reporting the issue. I just merged the fix contributed by @718529333 into master, which should fix the problem. I'll leave the issue open for now pending confirmation.

@anhtuanluu36
Copy link

Thank @718529333 and @stevevls. It works fine now. However, I have another issue: #426

@stevevls stevevls closed this as completed Apr 9, 2020
@madneal
Copy link

madneal commented May 26, 2020

@stevevls Sorry to bother u. But there is a problem trouble me. As when I use consumer group to cosume, I cannot use kafka-consumer-groups.sh to obtain the CURRENT-OFFSET any more.

@stevevls
Copy link
Contributor

@neal1991 I think we're talking about this same question in #453. If not, let's open a new issue to discuss. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants