Skip to content

Commit e0800f5

Browse files
authored
Make Write message type more flexble, address some feedback (#1710)
* Address remaining feedback Signed-off-by: Saswata Mukherjee <[email protected]> * Make Write message type more flexible Signed-off-by: Saswata Mukherjee <[email protected]> --------- Signed-off-by: Saswata Mukherjee <[email protected]>
1 parent bffa922 commit e0800f5

File tree

6 files changed

+154
-50
lines changed

6 files changed

+154
-50
lines changed

Makefile

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ fmt: common-format $(GOIMPORTS)
5555
proto: ## Regenerate Go from remote write proto.
5656
proto: $(BUF)
5757
@echo ">> regenerating Prometheus Remote Write proto"
58-
@cd api/prometheus/v1/genproto && $(BUF) generate
59-
@cd api/prometheus/v1 && find genproto/ -type f -exec sed -i '' 's/protohelpers "github.com\/planetscale\/vtprotobuf\/protohelpers"/protohelpers "github.com\/prometheus\/client_golang\/internal\/github.com\/planetscale\/vtprotobuf\/protohelpers"/g' {} \;
58+
@cd api/prometheus/v1/remote/genproto && $(BUF) generate
59+
@cd api/prometheus/v1/remote && find genproto/ -type f -exec sed -i '' 's/protohelpers "github.com\/planetscale\/vtprotobuf\/protohelpers"/protohelpers "github.com\/prometheus\/client_golang\/internal\/github.com\/planetscale\/vtprotobuf\/protohelpers"/g' {} \;
6060
# For some reasons buf generates this unused import, kill it manually for now and reformat.
61-
@cd api/prometheus/v1 && find genproto/ -type f -exec sed -i '' 's/_ "github.com\/gogo\/protobuf\/gogoproto"//g' {} \;
62-
@cd api/prometheus/v1 && go fmt ./genproto/...
61+
@cd api/prometheus/v1/remote && find genproto/ -type f -exec sed -i '' 's/_ "github.com\/gogo\/protobuf\/gogoproto"//g' {} \;
62+
@cd api/prometheus/v1/remote && go fmt ./genproto/...

api/prometheus/v1/remote/genproto/v2/types.pb.go

Lines changed: 2 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/prometheus/v1/remote/genproto/v2/types_vtproto.pb.go

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/prometheus/v1/remote/remote_api.go

Lines changed: 142 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ type apiOpts struct {
5151
logger *slog.Logger
5252
backoff backoff.Config
5353
compression Compression
54+
path string
5455
retryOnRateLimit bool
5556
}
5657

@@ -63,6 +64,7 @@ var defaultAPIOpts = &apiOpts{
6364
// Hardcoded for now.
6465
retryOnRateLimit: true,
6566
compression: SnappyBlockCompression,
67+
path: "api/v1/write",
6668
}
6769

6870
// WithAPILogger returns APIOption that allows providing slog logger.
@@ -74,6 +76,22 @@ func WithAPILogger(logger *slog.Logger) APIOption {
7476
}
7577
}
7678

79+
// WithAPIPath returns APIOption that allows providing path to send remote write requests to.
80+
func WithAPIPath(path string) APIOption {
81+
return func(o *apiOpts) error {
82+
o.path = path
83+
return nil
84+
}
85+
}
86+
87+
// WithAPIRetryOnRateLimit returns APIOption that disables retrying on rate limit status code.
88+
func WithAPINoRetryOnRateLimit() APIOption {
89+
return func(o *apiOpts) error {
90+
o.retryOnRateLimit = false
91+
return nil
92+
}
93+
}
94+
7795
type nopSlogHandler struct{}
7896

7997
func (n nopSlogHandler) Enabled(context.Context, slog.Level) bool { return false }
@@ -117,33 +135,67 @@ type vtProtoEnabled interface {
117135
MarshalToSizedBufferVT(dAtA []byte) (int, error)
118136
}
119137

138+
type gogoProtoEnabled interface {
139+
Size() (n int)
140+
MarshalToSizedBuffer(dAtA []byte) (n int, err error)
141+
}
142+
143+
// Sort of a hack to identify v2 requests.
144+
// Under any marshaling scheme, v2 requests have a `Symbols` field of type []string.
145+
// So would always have a `GetSymbols()` method which doesn't rely on any other types.
146+
type v2Request interface {
147+
GetSymbols() []string
148+
}
149+
120150
// Write writes given, non-empty, protobuf message to a remote storage.
121-
// The https://github.com/planetscale/vtprotobuf methods will be used if your msg
122-
// supports those (e.g. SizeVT() and MarshalToSizedBufferVT(...)), for efficiency.
123-
func (r *API) Write(ctx context.Context, msg proto.Message) (_ WriteResponseStats, err error) {
151+
//
152+
// Depending on serialization methods,
153+
// - https://github.com/planetscale/vtprotobuf methods will be used if your msg
154+
// supports those (e.g. SizeVT() and MarshalToSizedBufferVT(...)), for efficiency
155+
// - Otherwise https://github.com/gogo/protobuf methods (e.g. Size() and MarshalToSizedBuffer(...))
156+
// will be used
157+
// - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and
158+
// error out on unknown scheme.
159+
func (r *API) Write(ctx context.Context, msg any) (_ WriteResponseStats, err error) {
124160
// Detect content-type.
125-
cType := WriteProtoFullName(proto.MessageName(msg))
161+
cType := WriteProtoFullNameV1
162+
if _, ok := msg.(v2Request); ok {
163+
cType = WriteProtoFullNameV2
164+
}
165+
126166
if err := cType.Validate(); err != nil {
127167
return WriteResponseStats{}, err
128168
}
129169

130170
// Encode the payload.
131-
if emsg, ok := msg.(vtProtoEnabled); ok {
171+
switch m := msg.(type) {
172+
case vtProtoEnabled:
132173
// Use optimized vtprotobuf if supported.
133-
size := emsg.SizeVT()
174+
size := m.SizeVT()
134175
if len(r.reqBuf) < size {
135176
r.reqBuf = make([]byte, size)
136177
}
137-
if _, err := emsg.MarshalToSizedBufferVT(r.reqBuf[:size]); err != nil {
178+
if _, err := m.MarshalToSizedBufferVT(r.reqBuf[:size]); err != nil {
138179
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
139180
}
140-
} else {
181+
case gogoProtoEnabled:
182+
// Gogo proto if supported.
183+
size := m.Size()
184+
if len(r.reqBuf) < size {
185+
r.reqBuf = make([]byte, size)
186+
}
187+
if _, err := m.MarshalToSizedBuffer(r.reqBuf[:size]); err != nil {
188+
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
189+
}
190+
case proto.Message:
141191
// Generic proto.
142192
r.reqBuf = r.reqBuf[:0]
143-
r.reqBuf, err = (proto.MarshalOptions{}).MarshalAppend(r.reqBuf, msg)
193+
r.reqBuf, err = (proto.MarshalOptions{}).MarshalAppend(r.reqBuf, m)
144194
if err != nil {
145195
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
146196
}
197+
default:
198+
return WriteResponseStats{}, fmt.Errorf("unknown message type %T", m)
147199
}
148200

149201
payload, err := compressPayload(&r.comprBuf, r.opts.compression, r.reqBuf)
@@ -214,7 +266,7 @@ func compressPayload(tmpbuf *[]byte, enc Compression, inp []byte) (compressed []
214266
}
215267

216268
func (r *API) attemptWrite(ctx context.Context, compr Compression, proto WriteProtoFullName, payload []byte, attempt int) (WriteResponseStats, error) {
217-
u := r.client.URL("api/v1/write", nil)
269+
u := r.client.URL(r.opts.path, nil)
218270
req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload))
219271
if err != nil {
220272
// Errors from NewRequest are from unparsable URLs, so are not
@@ -241,9 +293,12 @@ func (r *API) attemptWrite(ctx context.Context, compr Compression, proto WritePr
241293
return WriteResponseStats{}, retryableError{err, 0}
242294
}
243295

244-
rs, err := parseWriteResponseStats(resp)
245-
if err != nil {
246-
r.opts.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err)
296+
rs := WriteResponseStats{}
297+
if proto == WriteProtoFullNameV2 {
298+
rs, err = parseWriteResponseStats(resp)
299+
if err != nil {
300+
r.opts.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err)
301+
}
247302
}
248303

249304
if resp.StatusCode/100 == 2 {
@@ -279,18 +334,59 @@ type writeStorage interface {
279334
Store(ctx context.Context, proto WriteProtoFullName, serializedRequest []byte) (_ WriteResponseStats, code int, _ error)
280335
}
281336

337+
// remoteWriteDecompressor is an interface that allows decompressing the body of the request.
338+
type remoteWriteDecompressor interface {
339+
Decompress(ctx context.Context, body io.ReadCloser) (decompressed []byte, _ error)
340+
}
341+
282342
type handler struct {
283-
logger *slog.Logger
284-
store writeStorage
343+
store writeStorage
344+
opts handlerOpts
345+
}
346+
347+
type handlerOpts struct {
348+
logger *slog.Logger
349+
decompressor remoteWriteDecompressor
350+
}
351+
352+
// HandlerOption represents an option for the handler.
353+
type HandlerOption func(o *handlerOpts)
354+
355+
// WithHandlerLogger returns HandlerOption that allows providing slog logger.
356+
// By default, nothing is logged.
357+
func WithHandlerLogger(logger *slog.Logger) HandlerOption {
358+
return func(o *handlerOpts) {
359+
o.logger = logger
360+
}
361+
}
362+
363+
// WithHandlerDecompressor returns HandlerOption that allows providing remoteWriteDecompressor.
364+
// By default, SimpleSnappyDecompressor is used.
365+
func WithHandlerDecompressor(decompressor remoteWriteDecompressor) HandlerOption {
366+
return func(o *handlerOpts) {
367+
o.decompressor = decompressor
368+
}
285369
}
286370

287371
// NewRemoteWriteHandler returns HTTP handler that receives Remote Write 2.0
288372
// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/.
289-
func NewRemoteWriteHandler(logger *slog.Logger, store writeStorage) http.Handler {
290-
return &handler{logger: logger, store: store}
373+
func NewRemoteWriteHandler(store writeStorage, opts ...HandlerOption) http.Handler {
374+
o := handlerOpts{logger: slog.New(nopSlogHandler{}), decompressor: &SimpleSnappyDecompressor{}}
375+
for _, opt := range opts {
376+
opt(&o)
377+
}
378+
return &handler{opts: o, store: store}
291379
}
292380

293-
func parseProtoMsg(contentType string) (WriteProtoFullName, error) {
381+
// ParseProtoMsg parses the content-type header and returns the proto message type.
382+
//
383+
// The expected content-type will be of the form,
384+
// - `application/x-protobuf;proto=io.prometheus.write.v2.Request` which will be treated as RW2.0 request,
385+
// - `application/x-protobuf;proto=prometheus.WriteRequest` which will be treated as RW1.0 request,
386+
// - `application/x-protobuf` which will be treated as RW1.0 request.
387+
//
388+
// If the content-type is not of the above forms, it will return an error.
389+
func ParseProtoMsg(contentType string) (WriteProtoFullName, error) {
294390
contentType = strings.TrimSpace(contentType)
295391

296392
parts := strings.Split(contentType, ";")
@@ -323,9 +419,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
323419
contentType = appProtoContentType
324420
}
325421

326-
msgType, err := parseProtoMsg(contentType)
422+
msgType, err := ParseProtoMsg(contentType)
327423
if err != nil {
328-
h.logger.Error("Error decoding remote write request", "err", err)
424+
h.opts.logger.Error("Error decoding remote write request", "err", err)
329425
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
330426
return
331427
}
@@ -337,22 +433,14 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
337433
// We could give http.StatusUnsupportedMediaType, but let's assume snappy by default.
338434
} else if enc != string(SnappyBlockCompression) {
339435
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, SnappyBlockCompression)
340-
h.logger.Error("Error decoding remote write request", "err", err)
436+
h.opts.logger.Error("Error decoding remote write request", "err", err)
341437
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
342438
}
343439

344-
// Read the request body.
345-
body, err := io.ReadAll(r.Body)
440+
// Decompress the request body.
441+
decompressed, err := h.opts.decompressor.Decompress(r.Context(), r.Body)
346442
if err != nil {
347-
h.logger.Error("Error decoding remote write request", "err", err.Error())
348-
http.Error(w, err.Error(), http.StatusBadRequest)
349-
return
350-
}
351-
352-
decompressed, err := snappy.Decode(nil, body)
353-
if err != nil {
354-
// TODO(bwplotka): Add more context to responded error?
355-
h.logger.Error("Error decompressing remote write request", "err", err.Error())
443+
h.opts.logger.Error("Error decompressing remote write request", "err", err.Error())
356444
http.Error(w, err.Error(), http.StatusBadRequest)
357445
return
358446
}
@@ -367,10 +455,31 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
367455
code = http.StatusInternalServerError
368456
}
369457
if code/5 == 100 { // 5xx
370-
h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error())
458+
h.opts.logger.Error("Error while storing the remote write request", "err", storeErr.Error())
371459
}
372460
http.Error(w, storeErr.Error(), code)
373461
return
374462
}
375463
w.WriteHeader(http.StatusNoContent)
376464
}
465+
466+
// SimpleSnappyDecompressor is a simple implementation of the remoteWriteDecompressor interface.
467+
type SimpleSnappyDecompressor struct{}
468+
469+
func (s *SimpleSnappyDecompressor) Decompress(ctx context.Context, body io.ReadCloser) (decompressed []byte, _ error) {
470+
// Read the request body.
471+
bodyBytes, err := io.ReadAll(body)
472+
if err != nil {
473+
return nil, fmt.Errorf("error reading request body: %w", err)
474+
}
475+
476+
decompressed, err = snappy.Decode(nil, bodyBytes)
477+
if err != nil {
478+
// TODO(bwplotka): Add more context to responded error?
479+
return nil, fmt.Errorf("error snappy decoding request body: %w", err)
480+
}
481+
482+
return decompressed, nil
483+
}
484+
485+
var _ remoteWriteDecompressor = &SimpleSnappyDecompressor{}

api/prometheus/v1/remote/remote_api_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func stats(req *writev2.Request) (s WriteResponseStats) {
125125
func TestRemoteAPI_Write_WithHandler(t *testing.T) {
126126
tLogger := slog.Default()
127127
mStore := &mockStorage{}
128-
srv := httptest.NewServer(NewRemoteWriteHandler(tLogger, mStore))
128+
srv := httptest.NewServer(NewRemoteWriteHandler(mStore, WithHandlerLogger(tLogger)))
129129
t.Cleanup(srv.Close)
130130

131131
cl, err := api.NewClient(api.Config{
@@ -135,7 +135,7 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
135135
if err != nil {
136136
t.Fatal(err)
137137
}
138-
client, err := NewAPI(cl, WithAPILogger(tLogger))
138+
client, err := NewAPI(cl, WithAPILogger(tLogger), WithAPIPath("api/v1/write"))
139139
if err != nil {
140140
t.Fatal(err)
141141
}
@@ -149,7 +149,7 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
149149
t.Fatal("unexpected stats", diff)
150150
}
151151
if len(mStore.v2Reqs) != 1 {
152-
t.Fatal("expected 1 request stored, got", mStore.v2Reqs)
152+
t.Fatal("expected 1 v2 request stored, got", mStore.v2Reqs)
153153
}
154154
if diff := cmp.Diff(req, mStore.v2Reqs[0], protocmp.Transform()); diff != "" {
155155
t.Fatal("unexpected request received", diff)

api/prometheus/v1/remote/remote_headers.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ import (
1919
"net/http"
2020
"strconv"
2121
"strings"
22-
23-
"google.golang.org/protobuf/reflect/protoreflect"
2422
)
2523

2624
const (
@@ -43,7 +41,7 @@ const (
4341
// WriteProtoFullName represents the fully qualified name of the protobuf message
4442
// to use in Remote write 1.0 and 2.0 protocols.
4543
// See https://prometheus.io/docs/specs/remote_write_spec_2_0/#protocol.
46-
type WriteProtoFullName protoreflect.FullName
44+
type WriteProtoFullName string
4745

4846
const (
4947
// WriteProtoFullNameV1 represents the `prometheus.WriteRequest` protobuf

0 commit comments

Comments
 (0)