From eb1a2ab1bbdd9ddc9d814029db7044665da62ed0 Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Thu, 25 Aug 2022 10:01:46 +0200 Subject: [PATCH] feat: added consistency params support --- CHANGELOG.md | 2 ++ api/write/options.go | 30 ++++++++++++++++++++++++++++++ api/write/options_test.go | 5 ++++- internal/write/service.go | 3 +++ internal/write/service_test.go | 12 ++++++++++++ 5 files changed, 51 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f3af327..6e1fff5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ ## [unreleased] +### Features +- [#348](https://github.com/influxdata/influxdb-client-go/pull/348) Added `write.Options.Consitency` parameter to support InfluxDB Enterprise. ## 2.9.2 [2022-07-29] ### Bug fixes diff --git a/api/write/options.go b/api/write/options.go index 04680d29..7d85ad31 100644 --- a/api/write/options.go +++ b/api/write/options.go @@ -33,8 +33,27 @@ type Options struct { maxRetryTime uint // The base for the exponential retry delay exponentialBase uint + // InfluxDB Enterprise write consistency as explained in https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency + consistency Consistency } +const ( + // ConsistencyOne requires at least one data node acknowledged a write. + ConsistencyOne Consistency = "one" + + // ConsistencyAll requires all data nodes to acknowledge a write. + ConsistencyAll Consistency = "all" + + // ConsistencyQuorum requires a quorum of data nodes to acknowledge a write. + ConsistencyQuorum Consistency = "quorum" + + // ConsistencyAny allows for hinted hand off, potentially no write happened yet. + ConsistencyAny Consistency = "any" +) + +// Consistency defines enum for allows consistency values for InfluxDB Enterprise, as explained https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency +type Consistency string + // BatchSize returns size of batch func (o *Options) BatchSize() uint { return o.batchSize @@ -162,6 +181,17 @@ func (o *Options) DefaultTags() map[string]string { return o.defaultTags } +// Consistency returns consistency for param value +func (o *Options) Consistency() Consistency { + return o.consistency +} + +// SetConsistency allows setting InfluxDB Enterprise write consistency, as explained in https://docs.influxdata.com/enterprise_influxdb/v1.9/concepts/clustering/#write-consistency */ +func (o *Options) SetConsistency(consistency Consistency) *Options { + o.consistency = consistency + return o +} + // DefaultOptions returns Options object with default values func DefaultOptions() *Options { return &Options{batchSize: 5_000, flushInterval: 1_000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 50_000, defaultTags: make(map[string]string), diff --git a/api/write/options_test.go b/api/write/options_test.go index a30f18c5..17260518 100644 --- a/api/write/options_test.go +++ b/api/write/options_test.go @@ -24,6 +24,7 @@ func TestDefaultOptions(t *testing.T) { assert.EqualValues(t, 125_000, opts.MaxRetryInterval()) assert.EqualValues(t, 180_000, opts.MaxRetryTime()) assert.EqualValues(t, 2, opts.ExponentialBase()) + assert.EqualValues(t, "", opts.Consistency()) assert.Len(t, opts.DefaultTags(), 0) } @@ -40,7 +41,8 @@ func TestSettingsOptions(t *testing.T) { SetExponentialBase(3). SetMaxRetryTime(200_000). AddDefaultTag("a", "1"). - AddDefaultTag("b", "2") + AddDefaultTag("b", "2"). + SetConsistency(write.ConsistencyOne) assert.EqualValues(t, 5, opts.BatchSize()) assert.EqualValues(t, true, opts.UseGZip()) assert.EqualValues(t, 5000, opts.FlushInterval()) @@ -51,5 +53,6 @@ func TestSettingsOptions(t *testing.T) { assert.EqualValues(t, 150_000, opts.MaxRetryInterval()) assert.EqualValues(t, 200_000, opts.MaxRetryTime()) assert.EqualValues(t, 3, opts.ExponentialBase()) + assert.EqualValues(t, "one", opts.Consistency()) assert.Len(t, opts.DefaultTags(), 2) } diff --git a/internal/write/service.go b/internal/write/service.go index e4885e41..b35ce154 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -79,6 +79,9 @@ func NewService(org string, bucket string, httpService http2.Service, options *w params.Set("org", org) params.Set("bucket", bucket) params.Set("precision", precisionToString(options.Precision())) + if options.Consistency() != "" { + params.Set("consistency", string(options.Consistency())) + } u.RawQuery = params.Encode() writeURL := u.String() return &Service{ diff --git a/internal/write/service_test.go b/internal/write/service_test.go index e3de0fa1..b27419e5 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -647,3 +647,15 @@ func TestFlush(t *testing.T) { assert.Len(t, hs.Lines(), 5) assert.Equal(t, 0, srv.retryQueue.list.Len()) } + +func TestConsistencyParam(t *testing.T) { + hs := test.NewTestService(t, "http://localhost:8888") + opts := write.DefaultOptions().SetConsistency(write.ConsistencyQuorum) + srv := NewService("org", "buc", hs, opts) + + require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&consistency=quorum&org=org&precision=ns", srv.WriteURL()) + opts = write.DefaultOptions() + srv = NewService("org", "buc", hs, opts) + + require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&org=org&precision=ns", srv.WriteURL()) +}