Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Tracking exporter #3176

Draft
wants to merge 23 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
890f9a3
Use exporter.ExportableEvent instead of Any
thomaspoignant Mar 7, 2025
0024cd5
feat: Exporter support of TrackingEvents
thomaspoignant Mar 7, 2025
e4ca4b3
Merge branch 'main' into tracking-exporter
thomaspoignant Mar 11, 2025
56c1194
fix linter
thomaspoignant Mar 12, 2025
8dacaad
Merge branch 'main' into tracking-exporter
thomaspoignant Mar 13, 2025
e9b26ec
adding tests
thomaspoignant Mar 13, 2025
5cb715e
fix test to use ExportableEvent
thomaspoignant Mar 13, 2025
62f5a7e
Add 2nd deprecated mock
thomaspoignant Mar 13, 2025
b6d5024
Add test for trackingEvent
thomaspoignant Mar 13, 2025
c899dda
Add tracking function
thomaspoignant Mar 13, 2025
b5dfb94
fix lint
thomaspoignant Mar 13, 2025
a215304
Add parquet specific for tracking event
thomaspoignant Mar 13, 2025
103dc0c
Merge branch 'main' into tracking-exporter
thomaspoignant Mar 26, 2025
e36dfe2
Add global tracking func
thomaspoignant Mar 26, 2025
a2e458c
stop tracking event exporter when stop
thomaspoignant Mar 26, 2025
d451738
Merge branch 'main' into tracking-exporter
thomaspoignant Mar 27, 2025
a0f5a13
Rename method track
thomaspoignant Mar 27, 2025
c856b81
fix flaky tests
thomaspoignant Apr 1, 2025
fd3142f
manage both kind of events inside the collector events
thomaspoignant Apr 1, 2025
d9791ca
Merge remote-tracking branch 'origin/main' into tracking-exporter
thomaspoignant Apr 1, 2025
06b8bc3
fix linter
thomaspoignant Apr 2, 2025
dd732aa
WIP
thomaspoignant Apr 3, 2025
fc79fe5
Merge branch 'main' into tracking-exporter
thomaspoignant Apr 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ linters:
- lll
- path: _test\.go
linters:
- gosec
- errcheck
- funlen
- maligned
Expand Down
3 changes: 3 additions & 0 deletions cmd/relayproxy/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/knadh/koanf/providers/posflag"
"github.com/knadh/koanf/v2"
"github.com/spf13/pflag"
ffclient "github.com/thomaspoignant/go-feature-flag"
"github.com/xitongsys/parquet-go/parquet"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -45,6 +46,7 @@ var DefaultExporter = struct {
MaxEventInMemory int64
ParquetCompressionCodec string
LogLevel string
ExporterEventType ffclient.ExporterEventType
}{
Format: "JSON",
LogFormat: "[{{ .FormattedDate}}] user=\"{{ .UserKey}}\", flag=\"{{ .Key}}\", value=\"{{ .Value}}\"",
Expand All @@ -55,6 +57,7 @@ var DefaultExporter = struct {
MaxEventInMemory: 100000,
ParquetCompressionCodec: parquet.CompressionCodec_SNAPPY.String(),
LogLevel: DefaultLogLevel,
ExporterEventType: ffclient.FeatureEventExporter,
}

// New is reading the configuration file
Expand Down
1 change: 1 addition & 0 deletions cmd/relayproxy/config/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type ExporterConf struct {
AccountName string `mapstructure:"accountName" koanf:"accountname"`
AccountKey string `mapstructure:"accountKey" koanf:"accountkey"`
Container string `mapstructure:"container" koanf:"container"`
ExporterEventType string `mapstructure:"eventType" koanf:"eventtype"`
}

func (c *ExporterConf) IsValid() error {
Expand Down
96 changes: 79 additions & 17 deletions cmd/relayproxy/controller/collect_eval_data.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package controller

import (
"encoding/json"
"fmt"
"net/http"
"strconv"

"github.com/go-viper/mapstructure/v2"
"github.com/labstack/echo/v4"
ffclient "github.com/thomaspoignant/go-feature-flag"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/config"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/metric"
"github.com/thomaspoignant/go-feature-flag/cmd/relayproxy/model"
"github.com/thomaspoignant/go-feature-flag/exporter"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.uber.org/zap"
Expand Down Expand Up @@ -63,28 +66,87 @@ func (h *collectEvalData) Handler(c echo.Context) error {
_, span := tracer.Start(c.Request().Context(), "collectEventData")
defer span.End()
span.SetAttributes(attribute.Int("collectEventData.eventCollectionSize", len(reqBody.Events)))
counterTracking := 0
counterEvaluation := 0
for _, event := range reqBody.Events {
if event.Source == "" {
event.Source = "PROVIDER_CACHE"
switch event["kind"] {
case "tracking":
e, err := convertTrackingEvent(event, h.logger)
if err != nil {
h.logger.Error(
"impossible to convert the event to a tracking event",
zap.Error(err),
)
continue
}
h.goFF.CollectTrackingEventData(e)
counterTracking++
default:
e, err := convertFeatureEvent(event, reqBody.Meta, h.logger)
if err != nil {
h.logger.Error("impossible to convert the event to a feature event", zap.Error(err))
continue
}
h.goFF.CollectEventData(e)
counterEvaluation++
}
// force the creation date to be a unix timestamp
if event.CreationDate > 9999999999 {
h.logger.Warn(
"creationDate received is in milliseconds, we convert it to seconds",
zap.Int64("creationDate", event.CreationDate))
// if we receive a timestamp in milliseconds, we convert it to seconds
// but since it is totally possible to have a timestamp in seconds that is bigger than 9999999999
// we will accept timestamp up to 9999999999 (2286-11-20 18:46:39 +0100 CET)
event.CreationDate, _ = strconv.ParseInt(
strconv.FormatInt(event.CreationDate, 10)[:10], 10, 64)
}
if reqBody.Meta != nil {
event.Metadata = reqBody.Meta
}
h.goFF.CollectEventData(event)
}
span.SetAttributes(attribute.Int("collectEventData.trackingCollectionSize", counterTracking))
span.SetAttributes(
attribute.Int("collectEventData.evaluationCollectionSize", counterEvaluation),
)
h.metrics.IncCollectEvalData(float64(len(reqBody.Events)))
return c.JSON(http.StatusOK, model.CollectEvalDataResponse{
IngestedContentCount: len(reqBody.Events),
})
}

func convertTrackingEvent(
event map[string]any,
logger *zap.Logger,
) (exporter.TrackingEvent, error) {
var e exporter.TrackingEvent
marshalled, err := json.Marshal(event)
if err != nil {
return exporter.TrackingEvent{}, err
}
err = json.Unmarshal(marshalled, &e)
if err != nil {
return exporter.TrackingEvent{}, err
}
e.CreationDate = formatCreationDate(e.CreationDate, logger)
return e, nil
}

func convertFeatureEvent(event map[string]any,
metadata exporter.FeatureEventMetadata,
logger *zap.Logger) (exporter.FeatureEvent, error) {
var e exporter.FeatureEvent
err := mapstructure.Decode(event, &e)
if err != nil {
return exporter.FeatureEvent{}, err
}
if e.Source == "" {
e.Source = "PROVIDER_CACHE"
}
if metadata != nil {
e.Metadata = metadata
}
e.CreationDate = formatCreationDate(e.CreationDate, logger)
return e, nil
}

func formatCreationDate(input int64, logger *zap.Logger) int64 {
if input > 9999999999 {
logger.Warn(
"creationDate received is in milliseconds, we convert it to seconds",
zap.Int64("creationDate", input))
// if we receive a timestamp in milliseconds, we convert it to seconds
// but since it is totally possible to have a timestamp in seconds that is bigger than 9999999999
// we will accept timestamp up to 9999999999 (2286-11-20 18:46:39 +0100 CET)
converted, _ := strconv.ParseInt(
strconv.FormatInt(input, 10)[:10], 10, 64)
return converted
}
return input
}
74 changes: 74 additions & 0 deletions cmd/relayproxy/controller/collect_eval_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
Expand Down Expand Up @@ -184,3 +185,76 @@
})
}
}

func Test_collect_tracking_and_evaluation_events(t *testing.T) {
//
//
//
//
// PLEASE REWORK THIS TEST
//
// TODO: Do some tests that the exporterEventType is correctly set in the exporter

Check failure on line 196 in cmd/relayproxy/controller/collect_eval_data_test.go

View workflow job for this annotation

GitHub Actions / Lint

cmd/relayproxy/controller/collect_eval_data_test.go:196: Line contains TODO/BUG/FIXME: "TODO: Do some tests that the exporterEve..." (godox)
//
//
//

evalExporter, err := os.CreateTemp("", "evalExport.json")
assert.NoError(t, err)
trackingExporter, err := os.CreateTemp("", "trackExport.json")
assert.NoError(t, err)
defer func() {
_ = os.Remove(evalExporter.Name())
_ = os.Remove(trackingExporter.Name())
}()

// init go-feature-flag
goFF, _ := ffclient.New(ffclient.Config{
PollingInterval: 10 * time.Second,
LeveledLogger: slog.Default(),
Context: context.Background(),
Retriever: &fileretriever.Retriever{
Path: configFlagsLocation,
},

DataExporters: []ffclient.DataExporter{
{
FlushInterval: 10 * time.Second,
MaxEventInMemory: 10000,
Exporter: &fileexporter.Exporter{Filename: evalExporter.Name()},
},
{
FlushInterval: 10 * time.Second,
MaxEventInMemory: 10000,
Exporter: &fileexporter.Exporter{Filename: trackingExporter.Name()},
ExporterEventType: ffclient.TrackingEventExporter,
},
},
})
logger, err := zap.NewDevelopment()
require.NoError(t, err)
ctrl := controller.NewCollectEvalData(goFF, metric.Metrics{}, logger)

bodyReq, err := os.ReadFile(
"../testdata/controller/collect_eval_data/valid_request_mix_tracking_evaluation.json")
assert.NoError(t, err)
e := echo.New()
rec := httptest.NewRecorder()

req := httptest.NewRequest(echo.POST, "/v1/data/collector", strings.NewReader(string(bodyReq)))
req.Header.Set(echo.HeaderContentType, echo.MIMEApplicationJSON)
c := e.NewContext(req, rec)
c.SetPath("/v1/data/collector")
handlerErr := ctrl.Handler(c)
assert.NoError(t, handlerErr)
goFF.Close()

fmt.Println("Evaluation events:")
evalEvents, err := os.ReadFile(evalExporter.Name())
assert.NoError(t, err)
fmt.Println(string(evalEvents))

fmt.Println("Tracking events:")
trackingEvents, err := os.ReadFile(trackingExporter.Name())
assert.NoError(t, err)
fmt.Println(string(trackingEvents))
}
3 changes: 2 additions & 1 deletion cmd/relayproxy/model/collect_eval_data_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ type CollectEvalDataRequest struct {
Meta exporter.FeatureEventMetadata `json:"meta"`

// Events is the list of the event we send in the payload
Events []exporter.FeatureEvent `json:"events"`
// here the type is any because we will unmarshal later in the different event types
Events []map[string]any `json:"events"`
}
5 changes: 5 additions & 0 deletions cmd/relayproxy/service/gofeatureflag.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func initDataExporters(proxyConf *config.Config) ([]ffclient.DataExporter, error
}

func initDataExporter(c *config.ExporterConf) (ffclient.DataExporter, error) {
exporterEventType := c.ExporterEventType
if exporterEventType == "" {
exporterEventType = config.DefaultExporter.ExporterEventType
}
dataExp := ffclient.DataExporter{
FlushInterval: func() time.Duration {
if c.FlushInterval != 0 {
Expand All @@ -181,6 +185,7 @@ func initDataExporter(c *config.ExporterConf) (ffclient.DataExporter, error) {
}
return config.DefaultExporter.MaxEventInMemory
}(),
ExporterEventType: exporterEventType,
}

var err error
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"events": [
{
"contextKind": "user",
"creationDate": 1680246000,
"default": false,
"key": "my-feature-flag",
"kind": "feature",
"userKey": "94a25909-20d8-40cc-8500-fee99b569345",
"value": "string",
"variation": "admin-variation",
"version": "v1.0.0"
},
{
"kind": "tracking",
"creationDate": 1680246020,
"contextKind": "user",
"userKey": "94a25909-20d8-40cc-8500-fee99b569345",
"key": "my-feature-flag",
"evaluationContext": {
"targetingKey": "94a25909-20d8-40cc-8500-fee99b569345",
"name": "john doe",
"admin": true
},
"trackingEventDetails": {
"value": "string",
"version": "v1.0.0"
}
}
],
"meta": {
"environment": "production",
"sdkVersion": "v1.0.0",
"source": "my-source",
"timestamp": 1680246000
}
}
11 changes: 11 additions & 0 deletions config_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ import (
"github.com/thomaspoignant/go-feature-flag/exporter"
)

type ExporterEventType = string

const (
TrackingEventExporter ExporterEventType = "tracking"
FeatureEventExporter ExporterEventType = "feature"
)

// DataExporter is the configuration of your export target.
type DataExporter struct {
// FlushInterval is the interval we are waiting to export the data.
Expand All @@ -22,4 +29,8 @@ type DataExporter struct {
// Exporter is the configuration of your exporter.
// You can see all available exporter in the exporter package.
Exporter exporter.CommonExporter

// ExporterEventType is the type of event the exporter is expecting.
// The default type if not set is FeatureEventExporter.
ExporterEventType ExporterEventType
}
2 changes: 1 addition & 1 deletion exporter/azureexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f *Exporter) initializeAzureClient() (*azblob.Client, error) {
func (f *Exporter) Export(
ctx context.Context,
logger *fflog.FFLogger,
featureEvents []exporter.FeatureEvent,
featureEvents []exporter.ExportableEvent,
) error {
if f.AccountName == "" {
return fmt.Errorf("you should specify an AccountName. %v is invalid", f.AccountName)
Expand Down
Loading
Loading