diff --git a/pkg/sinks/loki.go b/pkg/sinks/loki.go index 4d0a0d93..25bccaf1 100644 --- a/pkg/sinks/loki.go +++ b/pkg/sinks/loki.go @@ -6,11 +6,12 @@ import ( "encoding/json" "errors" "fmt" - "github.com/resmoio/kubernetes-event-exporter/pkg/kube" - "io/ioutil" + "io" "net/http" "strconv" "time" + + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" "github.com/rs/zerolog/log" ) @@ -51,15 +52,32 @@ func generateTimestamp() string { return strconv.FormatInt(time.Now().Unix(), 10) + "000000000" } +func convertStreamTemplate(layout map[string]string, ev *kube.EnhancedEvent) (map[string]string, error) { + result := make(map[string]string) + for key, value := range layout { + rendered, err := GetString(ev, value) + if err != nil { + return nil, err + } + + result[key] = rendered + } + return result, nil +} + func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error { eventBody, err := serializeEventWithLayout(l.cfg.Layout, ev) if err != nil { return err } + streamLabels, err := convertStreamTemplate(l.cfg.StreamLabels, ev) + if err != nil { + return err + } timestamp := generateTimestamp() a := LokiMsg{ Streams: []promtailStream{{ - Stream: l.cfg.StreamLabels, + Stream: streamLabels, Values: [][]string{{timestamp, string(eventBody)}}, }}, } @@ -93,7 +111,7 @@ func (l *Loki) Send(ctx context.Context, ev *kube.EnhancedEvent) error { defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return err } diff --git a/pkg/sinks/loki_test.go b/pkg/sinks/loki_test.go new file mode 100644 index 00000000..6b08a7a8 --- /dev/null +++ b/pkg/sinks/loki_test.go @@ -0,0 +1,68 @@ +package sinks + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "time" + + "testing" + + "github.com/resmoio/kubernetes-event-exporter/pkg/kube" + "github.com/stretchr/testify/assert" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestLoki_Send(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + client := Loki{cfg: &LokiConfig{URL: ts.URL}} + + err := client.Send(context.Background(), &kube.EnhancedEvent{}) + + assert.NoError(t, err) +} + +func TestLoki_Send_StreamLabelsTemplated(t *testing.T) { + rr := httptest.NewRecorder() + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + result, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + rr.Write(result) + w.WriteHeader(http.StatusOK) + })) + defer ts.Close() + client := Loki{cfg: &LokiConfig{ + URL: ts.URL, + StreamLabels: map[string]string{ + "app": "kube-events", + "object_namespace": "{{ .InvolvedObject.Namespace }}", + }}} + + ev := &kube.EnhancedEvent{} + ev.Namespace = "default" + ev.Reason = "my reason" + ev.Type = "Warning" + ev.InvolvedObject.Kind = "Pod" + ev.InvolvedObject.Name = "nginx-server-123abc-456def" + ev.InvolvedObject.Namespace = "prod" + ev.Message = "Successfully pulled image \"nginx:latest\"" + ev.FirstTimestamp = v1.Time{Time: time.Now()} + + err := client.Send(context.Background(), ev) + assert.NoError(t, err) + + var res LokiMsg + err = json.Unmarshal(rr.Body.Bytes(), &res) + assert.NoError(t, err) + + assert.Equal(t, res.Streams[0].Stream["app"], "kube-events", "Non template labels should remain the same") + assert.Equal(t, res.Streams[0].Stream["object_namespace"], "prod", "Template labels should be templated") +}