From 7520d5aac24803ded0f37fef62bbcf1873e11fd0 Mon Sep 17 00:00:00 2001 From: Bas Breijer Date: Thu, 17 Apr 2025 15:25:22 +0200 Subject: [PATCH] revive PR 200: feat(loki): support go template in stream labels --- pkg/sinks/loki.go | 23 ++++++++++++-- pkg/sinks/loki_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 3 deletions(-) create mode 100644 pkg/sinks/loki_test.go diff --git a/pkg/sinks/loki.go b/pkg/sinks/loki.go index 4d0a0d93..5f3c85a2 100644 --- a/pkg/sinks/loki.go +++ b/pkg/sinks/loki.go @@ -7,7 +7,7 @@ import ( "errors" "fmt" "github.com/resmoio/kubernetes-event-exporter/pkg/kube" - "io/ioutil" + "io" "net/http" "strconv" "time" @@ -51,15 +51,32 @@ func generateTimestamp() string { return strconv.FormatInt(time.Now().Unix(), 10) + "000000000" } +func convertStreamTemplate(layout map[string]string, ev *kube.EnhancedEvent) (map[string]string, error) { + result := make(map[string]string) + for key, value := range layout { + rendered, err := GetString(ev, value) + if err != nil { + return nil, err + } + + result[key] = rendered + } + return result, nil +} + func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error { eventBody, err := serializeEventWithLayout(l.cfg.Layout, ev) if err != nil { return err } + streamLabels, err := convertStreamTemplate(l.cfg.StreamLabels, ev) + if err != nil { + return err + } timestamp := generateTimestamp() a := LokiMsg{ Streams: []promtailStream{{ - Stream: l.cfg.StreamLabels, + Stream: streamLabels, Values: [][]string{{timestamp, string(eventBody)}}, }}, } @@ -93,7 +110,7 @@ func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error { defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return err } diff --git a/pkg/sinks/loki_test.go b/pkg/sinks/loki_test.go new file mode 100644 index 00000000..6b08a7a8 --- /dev/null +++ b/pkg/sinks/loki_test.go @@ -0,0 +1,68 @@ +package sinks + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "time" + + "testing" + + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestLoki_Send(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + client := Loki{cfg: &LokiConfig{URL: ts.URL}} + + err := client.Send(context.Background(), &kube.EnhancedEvent{}) + + assert.NoError(t, err) +} + +func TestLoki_Send_StreamLabelsTemplated(t *testing.T) { + rr := httptest.NewRecorder() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + result, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + rr.Write(result) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + client := Loki{cfg: &LokiConfig{ + URL: ts.URL, + StreamLabels: map[string]string{ + "app": "kube-events", + "object_namespace": "{{ .InvolvedObject.Namespace }}", + }}} + + ev := &kube.EnhancedEvent{} + ev.Namespace = "default" + ev.Reason = "my reason" + ev.Type = "Warning" + ev.InvolvedObject.Kind = "Pod" + ev.InvolvedObject.Name = "nginx-server-123abc-456def" + ev.InvolvedObject.Namespace = "prod" + ev.Message = "Successfully pulled image \"nginx:latest\"" + ev.FirstTimestamp = v1.Time{Time: time.Now()} + + err := client.Send(context.Background(), ev) + assert.NoError(t, err) + + var res LokiMsg + err = json.Unmarshal(rr.Body.Bytes(), &res) + assert.NoError(t, err) + + assert.Equal(t, res.Streams[0].Stream["app"], "kube-events", "Non template labels should remain the same") + assert.Equal(t, res.Streams[0].Stream["object_namespace"], "prod", "Template labels should be templated") +}