Skip to content

Make Write message type more flexble, address some feedback #1710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ fmt: common-format $(GOIMPORTS)
proto: ## Regenerate Go from remote write proto.
proto: $(BUF)
@echo ">> regenerating Prometheus Remote Write proto"
@cd api/prometheus/v1/genproto && $(BUF) generate
@cd api/prometheus/v1 && find genproto/ -type f -exec sed -i '' 's/protohelpers "github.com\/planetscale\/vtprotobuf\/protohelpers"/protohelpers "github.com\/prometheus\/client_golang\/internal\/github.com\/planetscale\/vtprotobuf\/protohelpers"/g' {} \;
@cd api/prometheus/v1/remote/genproto && $(BUF) generate
@cd api/prometheus/v1/remote && find genproto/ -type f -exec sed -i '' 's/protohelpers "github.com\/planetscale\/vtprotobuf\/protohelpers"/protohelpers "github.com\/prometheus\/client_golang\/internal\/github.com\/planetscale\/vtprotobuf\/protohelpers"/g' {} \;
# For some reasons buf generates this unused import, kill it manually for now and reformat.
@cd api/prometheus/v1 && find genproto/ -type f -exec sed -i '' 's/_ "github.com\/gogo\/protobuf\/gogoproto"//g' {} \;
@cd api/prometheus/v1 && go fmt ./genproto/...
@cd api/prometheus/v1/remote && find genproto/ -type f -exec sed -i '' 's/_ "github.com\/gogo\/protobuf\/gogoproto"//g' {} \;
@cd api/prometheus/v1/remote && go fmt ./genproto/...
5 changes: 2 additions & 3 deletions api/prometheus/v1/remote/genproto/v2/types.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions api/prometheus/v1/remote/genproto/v2/types_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

175 changes: 142 additions & 33 deletions api/prometheus/v1/remote/remote_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type apiOpts struct {
logger *slog.Logger
backoff backoff.Config
compression Compression
path string
retryOnRateLimit bool
}

Expand All @@ -63,6 +64,7 @@ var defaultAPIOpts = &apiOpts{
// Hardcoded for now.
retryOnRateLimit: true,
compression: SnappyBlockCompression,
path: "api/v1/write",
}

// WithAPILogger returns APIOption that allows providing slog logger.
Expand All @@ -74,6 +76,22 @@ func WithAPILogger(logger *slog.Logger) APIOption {
}
}

// WithAPIPath returns APIOption that allows providing path to send remote write requests to.
func WithAPIPath(path string) APIOption {
return func(o *apiOpts) error {
o.path = path
return nil
}
}

// WithAPIRetryOnRateLimit returns APIOption that disables retrying on rate limit status code.
func WithAPINoRetryOnRateLimit() APIOption {
return func(o *apiOpts) error {
o.retryOnRateLimit = false
return nil
}
}

type nopSlogHandler struct{}

func (n nopSlogHandler) Enabled(context.Context, slog.Level) bool { return false }
Expand Down Expand Up @@ -117,33 +135,67 @@ type vtProtoEnabled interface {
MarshalToSizedBufferVT(dAtA []byte) (int, error)
}

type gogoProtoEnabled interface {
Size() (n int)
MarshalToSizedBuffer(dAtA []byte) (n int, err error)
}

// Sort of a hack to identify v2 requests.
// Under any marshaling scheme, v2 requests have a `Symbols` field of type []string.
// So would always have a `GetSymbols()` method which doesn't rely on any other types.
type v2Request interface {
GetSymbols() []string
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems a bit hacky, but need it for the contentType detection. I wonder if there is a better way here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I think this is acceptable for now.

}

// Write writes given, non-empty, protobuf message to a remote storage.
// The https://github.com/planetscale/vtprotobuf methods will be used if your msg
// supports those (e.g. SizeVT() and MarshalToSizedBufferVT(...)), for efficiency.
func (r *API) Write(ctx context.Context, msg proto.Message) (_ WriteResponseStats, err error) {
//
// Depending on serialization methods,
// - https://github.com/planetscale/vtprotobuf methods will be used if your msg
// supports those (e.g. SizeVT() and MarshalToSizedBufferVT(...)), for efficiency
// - Otherwise https://github.com/gogo/protobuf methods (e.g. Size() and MarshalToSizedBuffer(...))
// will be used
// - If neither is supported, it will marshaled using generic google.golang.org/protobuf methods and
// error out on unknown scheme.
func (r *API) Write(ctx context.Context, msg any) (_ WriteResponseStats, err error) {
// Detect content-type.
cType := WriteProtoFullName(proto.MessageName(msg))
cType := WriteProtoFullNameV1
if _, ok := msg.(v2Request); ok {
cType = WriteProtoFullNameV2
}

if err := cType.Validate(); err != nil {
return WriteResponseStats{}, err
}

// Encode the payload.
if emsg, ok := msg.(vtProtoEnabled); ok {
switch m := msg.(type) {
case vtProtoEnabled:
// Use optimized vtprotobuf if supported.
size := emsg.SizeVT()
size := m.SizeVT()
if len(r.reqBuf) < size {
r.reqBuf = make([]byte, size)
}
if _, err := emsg.MarshalToSizedBufferVT(r.reqBuf[:size]); err != nil {
if _, err := m.MarshalToSizedBufferVT(r.reqBuf[:size]); err != nil {
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
}
} else {
case gogoProtoEnabled:
// Gogo proto if supported.
size := m.Size()
if len(r.reqBuf) < size {
r.reqBuf = make([]byte, size)
}
if _, err := m.MarshalToSizedBuffer(r.reqBuf[:size]); err != nil {
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
}
case proto.Message:
// Generic proto.
r.reqBuf = r.reqBuf[:0]
r.reqBuf, err = (proto.MarshalOptions{}).MarshalAppend(r.reqBuf, msg)
r.reqBuf, err = (proto.MarshalOptions{}).MarshalAppend(r.reqBuf, m)
if err != nil {
return WriteResponseStats{}, fmt.Errorf("encoding request %w", err)
}
default:
return WriteResponseStats{}, fmt.Errorf("unknown message type %T", m)
}

payload, err := compressPayload(&r.comprBuf, r.opts.compression, r.reqBuf)
Expand Down Expand Up @@ -214,7 +266,7 @@ func compressPayload(tmpbuf *[]byte, enc Compression, inp []byte) (compressed []
}

func (r *API) attemptWrite(ctx context.Context, compr Compression, proto WriteProtoFullName, payload []byte, attempt int) (WriteResponseStats, error) {
u := r.client.URL("api/v1/write", nil)
u := r.client.URL(r.opts.path, nil)
req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(payload))
if err != nil {
// Errors from NewRequest are from unparsable URLs, so are not
Expand All @@ -241,9 +293,12 @@ func (r *API) attemptWrite(ctx context.Context, compr Compression, proto WritePr
return WriteResponseStats{}, retryableError{err, 0}
}

rs, err := parseWriteResponseStats(resp)
if err != nil {
r.opts.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err)
rs := WriteResponseStats{}
if proto == WriteProtoFullNameV2 {
rs, err = parseWriteResponseStats(resp)
if err != nil {
r.opts.logger.Warn("parsing rw write statistics failed; partial or no stats", "err", err)
}
}

if resp.StatusCode/100 == 2 {
Expand Down Expand Up @@ -279,18 +334,59 @@ type writeStorage interface {
Store(ctx context.Context, proto WriteProtoFullName, serializedRequest []byte) (_ WriteResponseStats, code int, _ error)
}

// remoteWriteDecompressor is an interface that allows decompressing the body of the request.
type remoteWriteDecompressor interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you remind me why we need this?

Another option is to use server middleware interface for this. There are common middlewares for this work ppl use e.g. Otel

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some downstream users have their own request body size limiter+instrumentaion impls, that they would probably want to use. I know Thanos and Cortex have them #1658 (comment)

I don't think compression-wise things would be too different (maybe using s2).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I will merge for now, but we need to switch to middleware injection technique

Decompress(ctx context.Context, body io.ReadCloser) (decompressed []byte, _ error)
}

type handler struct {
logger *slog.Logger
store writeStorage
store writeStorage
opts handlerOpts
}

type handlerOpts struct {
logger *slog.Logger
decompressor remoteWriteDecompressor
}

// HandlerOption represents an option for the handler.
type HandlerOption func(o *handlerOpts)

// WithHandlerLogger returns HandlerOption that allows providing slog logger.
// By default, nothing is logged.
func WithHandlerLogger(logger *slog.Logger) HandlerOption {
return func(o *handlerOpts) {
o.logger = logger
}
}

// WithHandlerDecompressor returns HandlerOption that allows providing remoteWriteDecompressor.
// By default, SimpleSnappyDecompressor is used.
func WithHandlerDecompressor(decompressor remoteWriteDecompressor) HandlerOption {
return func(o *handlerOpts) {
o.decompressor = decompressor
}
}

// NewRemoteWriteHandler returns HTTP handler that receives Remote Write 2.0
// protocol https://prometheus.io/docs/specs/remote_write_spec_2_0/.
func NewRemoteWriteHandler(logger *slog.Logger, store writeStorage) http.Handler {
return &handler{logger: logger, store: store}
func NewRemoteWriteHandler(store writeStorage, opts ...HandlerOption) http.Handler {
o := handlerOpts{logger: slog.New(nopSlogHandler{}), decompressor: &SimpleSnappyDecompressor{}}
for _, opt := range opts {
opt(&o)
}
return &handler{opts: o, store: store}
}

func parseProtoMsg(contentType string) (WriteProtoFullName, error) {
// ParseProtoMsg parses the content-type header and returns the proto message type.
//
// The expected content-type will be of the form,
// - `application/x-protobuf;proto=io.prometheus.write.v2.Request` which will be treated as RW2.0 request,
// - `application/x-protobuf;proto=prometheus.WriteRequest` which will be treated as RW1.0 request,
// - `application/x-protobuf` which will be treated as RW1.0 request.
//
// If the content-type is not of the above forms, it will return an error.
func ParseProtoMsg(contentType string) (WriteProtoFullName, error) {
contentType = strings.TrimSpace(contentType)

parts := strings.Split(contentType, ";")
Expand Down Expand Up @@ -323,9 +419,9 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
contentType = appProtoContentType
}

msgType, err := parseProtoMsg(contentType)
msgType, err := ParseProtoMsg(contentType)
if err != nil {
h.logger.Error("Error decoding remote write request", "err", err)
h.opts.logger.Error("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}
Expand All @@ -337,22 +433,14 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// We could give http.StatusUnsupportedMediaType, but let's assume snappy by default.
} else if enc != string(SnappyBlockCompression) {
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, SnappyBlockCompression)
h.logger.Error("Error decoding remote write request", "err", err)
h.opts.logger.Error("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
}

// Read the request body.
body, err := io.ReadAll(r.Body)
// Decompress the request body.
decompressed, err := h.opts.decompressor.Decompress(r.Context(), r.Body)
if err != nil {
h.logger.Error("Error decoding remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

decompressed, err := snappy.Decode(nil, body)
if err != nil {
// TODO(bwplotka): Add more context to responded error?
h.logger.Error("Error decompressing remote write request", "err", err.Error())
h.opts.logger.Error("Error decompressing remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
Expand All @@ -367,10 +455,31 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
code = http.StatusInternalServerError
}
if code/5 == 100 { // 5xx
h.logger.Error("Error while remote writing the v2 request", "err", storeErr.Error())
h.opts.logger.Error("Error while storing the remote write request", "err", storeErr.Error())
}
http.Error(w, storeErr.Error(), code)
return
}
w.WriteHeader(http.StatusNoContent)
}

// SimpleSnappyDecompressor is a simple implementation of the remoteWriteDecompressor interface.
type SimpleSnappyDecompressor struct{}

func (s *SimpleSnappyDecompressor) Decompress(ctx context.Context, body io.ReadCloser) (decompressed []byte, _ error) {
// Read the request body.
bodyBytes, err := io.ReadAll(body)
if err != nil {
return nil, fmt.Errorf("error reading request body: %w", err)
}

decompressed, err = snappy.Decode(nil, bodyBytes)
if err != nil {
// TODO(bwplotka): Add more context to responded error?
return nil, fmt.Errorf("error snappy decoding request body: %w", err)
}

return decompressed, nil
}

var _ remoteWriteDecompressor = &SimpleSnappyDecompressor{}
6 changes: 3 additions & 3 deletions api/prometheus/v1/remote/remote_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func stats(req *writev2.Request) (s WriteResponseStats) {
func TestRemoteAPI_Write_WithHandler(t *testing.T) {
tLogger := slog.Default()
mStore := &mockStorage{}
srv := httptest.NewServer(NewRemoteWriteHandler(tLogger, mStore))
srv := httptest.NewServer(NewRemoteWriteHandler(mStore, WithHandlerLogger(tLogger)))
t.Cleanup(srv.Close)

cl, err := api.NewClient(api.Config{
Expand All @@ -135,7 +135,7 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
if err != nil {
t.Fatal(err)
}
client, err := NewAPI(cl, WithAPILogger(tLogger))
client, err := NewAPI(cl, WithAPILogger(tLogger), WithAPIPath("api/v1/write"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -149,7 +149,7 @@ func TestRemoteAPI_Write_WithHandler(t *testing.T) {
t.Fatal("unexpected stats", diff)
}
if len(mStore.v2Reqs) != 1 {
t.Fatal("expected 1 request stored, got", mStore.v2Reqs)
t.Fatal("expected 1 v2 request stored, got", mStore.v2Reqs)
}
if diff := cmp.Diff(req, mStore.v2Reqs[0], protocmp.Transform()); diff != "" {
t.Fatal("unexpected request received", diff)
Expand Down
4 changes: 1 addition & 3 deletions api/prometheus/v1/remote/remote_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"net/http"
"strconv"
"strings"

"google.golang.org/protobuf/reflect/protoreflect"
)

const (
Expand All @@ -43,7 +41,7 @@ const (
// WriteProtoFullName represents the fully qualified name of the protobuf message
// to use in Remote write 1.0 and 2.0 protocols.
// See https://prometheus.io/docs/specs/remote_write_spec_2_0/#protocol.
type WriteProtoFullName protoreflect.FullName
type WriteProtoFullName string

const (
// WriteProtoFullNameV1 represents the `prometheus.WriteRequest` protobuf
Expand Down
Loading