Skip to content

Commit 25356a0

Browse files
feat: support for multiple exporters (#3171)
* Use new data structure for exporters Signed-off-by: Thomas Poignant <[email protected]> * Avoid breaking change Signed-off-by: Thomas Poignant <[email protected]> * add test for multiple exporters Signed-off-by: Thomas Poignant <[email protected]> * add test with legacy exporter format Signed-off-by: Thomas Poignant <[email protected]> * fix swagger Signed-off-by: Thomas Poignant <[email protected]> * fix error messages Signed-off-by: Thomas Poignant <[email protected]> * Use consumerID instead of consumerId Signed-off-by: Thomas Poignant <[email protected]> * test both exporter errors Signed-off-by: Thomas Poignant <[email protected]> * Support multiple exporters Signed-off-by: Thomas Poignant <[email protected]> * remove go routine Signed-off-by: Thomas Poignant <[email protected]> * fix swagger changes Signed-off-by: Thomas Poignant <[email protected]> * add test for legacy exporter Signed-off-by: Thomas Poignant <[email protected]> * Handle multiple providers in relay proxy Signed-off-by: Thomas Poignant <[email protected]> * use interface for DataExporter Signed-off-by: Thomas Poignant <[email protected]> * Do not start the periodic Flush for live exporters Signed-off-by: Thomas Poignant <[email protected]> * Rename start stop func Signed-off-by: Thomas Poignant <[email protected]> * Add doc Signed-off-by: Thomas Poignant <[email protected]> * Configure cleanQueueInterval Signed-off-by: Thomas Poignant <[email protected]> * Update doc using exporters Signed-off-by: Thomas Poignant <[email protected]> * allow exporters to be configured with env variables Signed-off-by: Thomas Poignant <[email protected]> * fix broken link Signed-off-by: Thomas Poignant <[email protected]> * fix broken link Signed-off-by: Thomas Poignant <[email protected]> * adding godoc Signed-off-by: Thomas Poignant <[email protected]> * fix code style (lll) Signed-off-by: Thomas Poignant <[email protected]> * remove unused file Signed-off-by: Thomas Poignant <[email protected]> * use a lock for fetching and updating offset Signed-off-by: Thomas Poignant <[email protected]> * fix code style Signed-off-by: Thomas Poignant <[email protected]> * add getConsumer func in eventStore Signed-off-by: Thomas Poignant <[email protected]> * remove debug line Signed-off-by: Thomas Poignant <[email protected]> * adding test for initretrievers Signed-off-by: Thomas Poignant <[email protected]> * fix lint Signed-off-by: Thomas Poignant <[email protected]> * add coverage Signed-off-by: Thomas Poignant <[email protected]> --------- Signed-off-by: Thomas Poignant <[email protected]>
1 parent 5caf22c commit 25356a0

34 files changed

+2012
-417
lines changed

cmd/relayproxy/config/config.go

+51-19
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ func New(flagSet *pflag.FlagSet, log *zap.Logger, version string) (*Config, erro
101101
// Map environment variables
102102
_ = k.Load(env.ProviderWithValue("", ".", func(s string, v string) (string, interface{}) {
103103
if strings.HasPrefix(s, "RETRIEVERS") ||
104-
strings.HasPrefix(s, "NOTIFIERS") {
104+
strings.HasPrefix(s, "NOTIFIERS") ||
105+
strings.HasPrefix(s, "EXPORTERS") {
105106
configMap := k.Raw()
106107
err := loadArrayEnv(s, v, configMap)
107108
if err != nil {
@@ -256,6 +257,14 @@ type Config struct {
256257
// Exporter is the configuration on how to export data
257258
Exporter *ExporterConf `mapstructure:"exporter" koanf:"exporter"`
258259

260+
// Exporters is the exact same things than Exporter but allows to give more than 1 exporter at the time.
261+
Exporters *[]ExporterConf `mapstructure:"exporters" koanf:"exporters"`
262+
263+
// ExporterCleanQueueInterval (optional) is the duration between each cleaning of the queue by the thread in charge
264+
// of removing the old events.
265+
// Default: 1 minute
266+
ExporterCleanQueueInterval time.Duration `mapstructure:"exporterCleanQueueInterval" koanf:"exportercleanqueueinterval"`
267+
259268
// Notifiers is the configuration on where to notify a flag change
260269
Notifiers []NotifierConf `mapstructure:"notifier" koanf:"notifier"`
261270

@@ -413,15 +422,41 @@ func (c *Config) IsValid() error {
413422
if c == nil {
414423
return fmt.Errorf("empty config")
415424
}
416-
417425
if c.ListenPort == 0 {
418426
return fmt.Errorf("invalid port %d", c.ListenPort)
419427
}
428+
if c.LogLevel != "" {
429+
if _, err := zapcore.ParseLevel(c.LogLevel); err != nil {
430+
return err
431+
}
432+
}
433+
if err := c.validateRetrievers(); err != nil {
434+
return err
435+
}
436+
437+
if err := c.validateExporters(); err != nil {
438+
return err
439+
}
440+
441+
if err := c.validateNotifiers(); err != nil {
442+
return err
443+
}
444+
445+
// log format validation
446+
switch strings.ToLower(c.LogFormat) {
447+
case "json", "logfmt", "":
448+
break
449+
default:
450+
return fmt.Errorf("invalid log format %s", c.LogFormat)
451+
}
452+
453+
return nil
454+
}
420455

456+
func (c *Config) validateRetrievers() error {
421457
if c.Retriever == nil && c.Retrievers == nil {
422458
return fmt.Errorf("no retriever available in the configuration")
423459
}
424-
425460
if c.Retriever != nil {
426461
if err := c.Retriever.IsValid(); err != nil {
427462
return err
@@ -435,35 +470,32 @@ func (c *Config) IsValid() error {
435470
}
436471
}
437472
}
473+
return nil
474+
}
438475

439-
// Exporter is optional
476+
func (c *Config) validateExporters() error {
440477
if c.Exporter != nil {
441478
if err := c.Exporter.IsValid(); err != nil {
442479
return err
443480
}
444481
}
445-
482+
if c.Exporters != nil {
483+
for _, exporter := range *c.Exporters {
484+
if err := exporter.IsValid(); err != nil {
485+
return err
486+
}
487+
}
488+
}
489+
return nil
490+
}
491+
func (c *Config) validateNotifiers() error {
446492
if c.Notifiers != nil {
447493
for _, notif := range c.Notifiers {
448494
if err := notif.IsValid(); err != nil {
449495
return err
450496
}
451497
}
452498
}
453-
if c.LogLevel != "" {
454-
if _, err := zapcore.ParseLevel(c.LogLevel); err != nil {
455-
return err
456-
}
457-
}
458-
459-
// log format validation
460-
switch strings.ToLower(c.LogFormat) {
461-
case "json", "logfmt", "":
462-
break
463-
default:
464-
return fmt.Errorf("invalid log format %s", c.LogFormat)
465-
}
466-
467499
return nil
468500
}
469501

cmd/relayproxy/config/config_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,84 @@ func TestParseConfig_fileFromPflag(t *testing.T) {
8787
},
8888
wantErr: assert.NoError,
8989
},
90+
{
91+
name: "Valid yaml file with multiple exporters",
92+
fileLocation: "../testdata/config/valid-yaml-multiple-exporters.yaml",
93+
want: &config.Config{
94+
ListenPort: 1031,
95+
PollingInterval: 1000,
96+
FileFormat: "yaml",
97+
Host: "localhost",
98+
Retriever: &config.RetrieverConf{
99+
Kind: "http",
100+
URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml",
101+
},
102+
Exporters: &[]config.ExporterConf{
103+
{
104+
Kind: "log",
105+
},
106+
{
107+
Kind: "file",
108+
OutputDir: "./",
109+
},
110+
},
111+
StartWithRetrieverError: false,
112+
Version: "1.X.X",
113+
EnableSwagger: true,
114+
AuthorizedKeys: config.APIKeys{
115+
Admin: []string{
116+
"apikey3",
117+
},
118+
Evaluation: []string{
119+
"apikey1",
120+
"apikey2",
121+
},
122+
},
123+
LogLevel: "info",
124+
},
125+
wantErr: assert.NoError,
126+
},
127+
{
128+
name: "Valid yaml file with both exporter and exporters",
129+
fileLocation: "../testdata/config/valid-yaml-exporter-and-exporters.yaml",
130+
want: &config.Config{
131+
ListenPort: 1031,
132+
PollingInterval: 1000,
133+
FileFormat: "yaml",
134+
Host: "localhost",
135+
Retriever: &config.RetrieverConf{
136+
Kind: "http",
137+
URL: "https://raw.githubusercontent.com/thomaspoignant/go-feature-flag/main/examples/retriever_file/flags.goff.yaml",
138+
},
139+
Exporter: &config.ExporterConf{
140+
Kind: "log",
141+
},
142+
Exporters: &[]config.ExporterConf{
143+
{
144+
Kind: "webhook",
145+
EndpointURL: "https://example.com/webhook",
146+
},
147+
{
148+
Kind: "file",
149+
OutputDir: "./",
150+
},
151+
},
152+
StartWithRetrieverError: false,
153+
Version: "1.X.X",
154+
EnableSwagger: true,
155+
AuthorizedKeys: config.APIKeys{
156+
Admin: []string{
157+
"apikey3",
158+
},
159+
Evaluation: []string{
160+
"apikey1",
161+
"apikey2",
162+
},
163+
},
164+
LogLevel: "info",
165+
},
166+
wantErr: assert.NoError,
167+
},
90168
{
91169
name: "Valid json file",
92170
fileLocation: "../testdata/config/valid-file.json",

cmd/relayproxy/service/gofeatureflag.go

+53-27
Original file line numberDiff line numberDiff line change
@@ -47,38 +47,19 @@ func NewGoFeatureFlagClient(
4747
logger *zap.Logger,
4848
notifiers []notifier.Notifier,
4949
) (*ffclient.GoFeatureFlag, error) {
50-
var mainRetriever retriever.Retriever
5150
var err error
52-
5351
if proxyConf == nil {
5452
return nil, fmt.Errorf("proxy config is empty")
5553
}
5654

57-
if proxyConf.Retriever != nil {
58-
mainRetriever, err = initRetriever(proxyConf.Retriever)
59-
if err != nil {
60-
return nil, err
61-
}
62-
}
63-
64-
// Manage if we have more than 1 retriever
65-
retrievers := make([]retriever.Retriever, 0)
66-
if proxyConf.Retrievers != nil {
67-
for _, r := range *proxyConf.Retrievers {
68-
currentRetriever, err := initRetriever(&r)
69-
if err != nil {
70-
return nil, err
71-
}
72-
retrievers = append(retrievers, currentRetriever)
73-
}
55+
retrievers, err := initRetrievers(proxyConf)
56+
if err != nil {
57+
return nil, err
7458
}
7559

76-
var exp ffclient.DataExporter
77-
if proxyConf.Exporter != nil {
78-
exp, err = initDataExporter(proxyConf.Exporter)
79-
if err != nil {
80-
return nil, err
81-
}
60+
exporters, err := initDataExporters(proxyConf)
61+
if err != nil {
62+
return nil, err
8263
}
8364

8465
notif, err := initNotifier(proxyConf.Notifiers)
@@ -91,11 +72,10 @@ func NewGoFeatureFlagClient(
9172
PollingInterval: time.Duration(proxyConf.PollingInterval) * time.Millisecond,
9273
LeveledLogger: slog.New(slogzap.Option{Level: slog.LevelDebug, Logger: logger}.NewZapHandler()),
9374
Context: context.Background(),
94-
Retriever: mainRetriever,
9575
Retrievers: retrievers,
9676
Notifiers: notif,
9777
FileFormat: proxyConf.FileFormat,
98-
DataExporter: exp,
78+
DataExporters: exporters,
9979
StartWithRetrieverError: proxyConf.StartWithRetrieverError,
10080
EnablePollingJitter: proxyConf.EnablePollingJitter,
10181
DisableNotifierOnInit: proxyConf.DisableNotifierOnInit,
@@ -106,6 +86,29 @@ func NewGoFeatureFlagClient(
10686
return ffclient.New(f)
10787
}
10888

89+
// initRetrievers initialize the retrievers based on the configuration
90+
// it handles both the `retriever` and `retrievers` fields
91+
func initRetrievers(proxyConf *config.Config) ([]retriever.Retriever, error) {
92+
retrievers := make([]retriever.Retriever, 0)
93+
if proxyConf.Retriever != nil {
94+
currentRetriever, err := initRetriever(proxyConf.Retriever)
95+
if err != nil {
96+
return nil, err
97+
}
98+
retrievers = append(retrievers, currentRetriever)
99+
}
100+
if proxyConf.Retrievers != nil {
101+
for _, r := range *proxyConf.Retrievers {
102+
currentRetriever, err := initRetriever(&r)
103+
if err != nil {
104+
return nil, err
105+
}
106+
retrievers = append(retrievers, currentRetriever)
107+
}
108+
}
109+
return retrievers, nil
110+
}
111+
109112
// initRetriever initialize the retriever based on the configuration
110113
func initRetriever(c *config.RetrieverConf) (retriever.Retriever, error) {
111114
retrieverTimeout := config.DefaultRetriever.Timeout
@@ -200,6 +203,29 @@ func initRetriever(c *config.RetrieverConf) (retriever.Retriever, error) {
200203
}
201204
}
202205

206+
// initDataExporters initialize the exporters based on the configuration
207+
// it handles both the `exporter` and `exporters` fields.
208+
func initDataExporters(proxyConf *config.Config) ([]ffclient.DataExporter, error) {
209+
exporters := make([]ffclient.DataExporter, 0)
210+
if proxyConf.Exporter != nil {
211+
exp, err := initDataExporter(proxyConf.Exporter)
212+
if err != nil {
213+
return nil, err
214+
}
215+
exporters = append(exporters, exp)
216+
}
217+
if proxyConf.Exporters != nil {
218+
for _, e := range *proxyConf.Exporters {
219+
currentExporter, err := initDataExporter(&e)
220+
if err != nil {
221+
return nil, err
222+
}
223+
exporters = append(exporters, currentExporter)
224+
}
225+
}
226+
return exporters, nil
227+
}
228+
203229
func initDataExporter(c *config.ExporterConf) (ffclient.DataExporter, error) {
204230
dataExp := ffclient.DataExporter{
205231
FlushInterval: func() time.Duration {

0 commit comments

Comments
 (0)