Skip to content

Commit a1e4948

Browse files
committed
init
0 parents  commit a1e4948

13 files changed

+1420
-0
lines changed

.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
cortex-tenant
2+
sink
3+
config.yml
4+
.out
5+
*.rpm

LICENSE

+373
Large diffs are not rendered by default.

Makefile

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
NAME := cortex-tenant
2+
MAINTAINER := Igor Novgorodov
3+
DESCRIPTION := Cortex tenant proxy
4+
URL := https://github.com/blind-oracle/cortex-tenant
5+
LICENSE := MPL
6+
7+
VERSION := 1.0.0
8+
RELEASE := 1
9+
10+
GO ?= go
11+
12+
RPM := $(NAME)-$(VERSION)-$(RELEASE).x86_64.rpm
13+
DIR := $(NAME)-git
14+
OUT := .out
15+
16+
REPO_HOST := tvovma-mgt035
17+
REPO := cortex
18+
19+
all: rpm
20+
21+
build:
22+
GOARCH=amd64 \
23+
GOOS=linux \
24+
$(GO) build -ldflags "-s -w -extldflags \"-static\" -X main.version=$(VERSION)"
25+
26+
prepare:
27+
cd deploy && \
28+
rm -rf $(OUT) && \
29+
mkdir -p $(OUT)/usr/sbin $(OUT)/var/lib/$(NAME) $(OUT)/etc/sysconfig $(OUT)/usr/lib/systemd/system && \
30+
cp $(NAME).env $(OUT)/etc/sysconfig/$(NAME) && \
31+
cp $(NAME).service $(OUT)/usr/lib/systemd/system && \
32+
cp $(NAME).yml $(OUT)/etc/$(NAME).yml && \
33+
cp ../$(NAME) $(OUT)/usr/sbin
34+
35+
rpm: build prepare build-rpm
36+
37+
rpm-upload:
38+
scp $(RPM) $(REPO_HOST):
39+
ssh $(REPO_HOST) sudo pulp-admin rpm repo uploads rpm --repo-id $(REPO) -f $(RPM)
40+
ssh $(REPO_HOST) sudo pulp-admin rpm repo publish run --repo-id $(REPO)
41+
42+
build-rpm:
43+
fpm \
44+
-s dir \
45+
--config-files etc/$(NAME).yml \
46+
--config-files etc/sysconfig/$(NAME) \
47+
-C deploy/$(OUT)/ \
48+
-t rpm \
49+
--after-install deploy/after_install.sh \
50+
-n $(NAME) \
51+
-v $(VERSION) \
52+
--iteration $(RELEASE) \
53+
--force \
54+
--rpm-compression bzip2 \
55+
--rpm-os linux \
56+
--url $(URL) \
57+
--description "$(DESCRIPTION)" \
58+
-m "$(MAINTAINER)" \
59+
--license "$(LICENSE)" \
60+
-a amd64 \
61+
.

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# cortex-tenant
2+
3+
Prometheus remote write proxy which groups metrics by Cortex tenants based on labels

deploy/after_install.sh

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
#!/bin/bash
2+
3+
USER="cortex-tenant"
4+
HOME="/var/lib/$USER"
5+
6+
useradd -d $HOME -s /bin/false -M $USER > /dev/null 2>&1 || true
7+
chown $USER:$USER $HOME

deploy/cortex-tenant.env

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
CONFIG_FILE="/etc/cortex-tenant.yml"

deploy/cortex-tenant.service

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[Unit]
2+
Description=Cortex-Tenant
3+
Documentation=https://github.com/blind-oracle/cortex-tenant
4+
Wants=network-online.target
5+
After=network-online.target
6+
7+
[Service]
8+
Restart=always
9+
User=cortex-tenant
10+
EnvironmentFile=/etc/sysconfig/cortex-tenant
11+
ExecStart=/usr/sbin/cortex -config $CONFIG_FILE
12+
ExecReload=/bin/kill -HUP $MAINPID
13+
TimeoutStopSec=30
14+
SendSIGKILL=no
15+
WorkingDirectory=/var/lib/cortex-tenant
16+
17+
[Install]
18+
WantedBy=multi-user.target

deploy/cortex-tenant.yml

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
listen: 0.0.0.0:8080
2+
target: http://127.0.0.1:9091/receive
3+
4+
log_level: warn
5+
6+
max_tenants: 100
7+
buffer_size: 10000
8+
batch_size: 100
9+
flush_interval: 1s
10+
timeout: 10s
11+
12+
tenant:
13+
label: tenant
14+
label_remove: true
15+
header: X-Scope-OrgID
16+
default: default
17+
recycle_age: 10m

go.mod

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
module github.com/blind-oracle/cortex-tenant
2+
3+
go 1.15
4+
5+
require (
6+
github.com/andybalholm/brotli v1.0.1 // indirect
7+
github.com/blind-oracle/go-common v1.0.5
8+
github.com/gogo/protobuf v1.3.1
9+
github.com/golang/snappy v0.0.2
10+
github.com/grpc-ecosystem/grpc-gateway v1.15.0 // indirect
11+
github.com/hashicorp/errwrap v1.1.0 // indirect
12+
github.com/hashicorp/go-multierror v1.1.0
13+
github.com/klauspost/compress v1.11.1 // indirect
14+
github.com/prometheus/common v0.14.0
15+
github.com/prometheus/prometheus v2.5.0+incompatible
16+
github.com/sirupsen/logrus v1.7.0
17+
github.com/valyala/fasthttp v1.16.0
18+
golang.org/x/net v0.0.0-20201002202402-0a1ea396d57c // indirect
19+
golang.org/x/sys v0.0.0-20201005172224-997123666555 // indirect
20+
golang.org/x/text v0.3.3 // indirect
21+
google.golang.org/genproto v0.0.0-20201006033701-bcad7cf615f2 // indirect
22+
google.golang.org/protobuf v1.25.0 // indirect
23+
gopkg.in/yaml.v2 v2.3.0
24+
)

go.sum

+472
Large diffs are not rendered by default.

http.go

+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package main
2+
3+
import (
4+
"bytes"
5+
"net"
6+
"sync"
7+
"time"
8+
9+
"github.com/blind-oracle/go-common/logger"
10+
"github.com/gogo/protobuf/proto"
11+
"github.com/golang/snappy"
12+
me "github.com/hashicorp/go-multierror"
13+
"github.com/prometheus/prometheus/prompb"
14+
fh "github.com/valyala/fasthttp"
15+
)
16+
17+
type httpServer struct {
18+
cfg config
19+
srv *fh.Server
20+
21+
tenants map[string]*tenant
22+
chClose chan struct{}
23+
24+
wg sync.WaitGroup
25+
sync.RWMutex
26+
logger.Logger
27+
}
28+
29+
func newHTTPServer(c config) (s *httpServer, err error) {
30+
s = &httpServer{
31+
cfg: c,
32+
tenants: make(map[string]*tenant, c.MaxTenants),
33+
chClose: make(chan struct{}),
34+
Logger: logger.NewSimpleLogger("http"),
35+
}
36+
37+
s.srv = &fh.Server{
38+
Name: "cortex-tenant",
39+
Handler: s.handle,
40+
41+
MaxRequestBodySize: 8 * 1024 * 1024,
42+
43+
ReadTimeout: c.Timeout,
44+
WriteTimeout: c.Timeout,
45+
IdleTimeout: 60 * time.Second,
46+
}
47+
48+
l, err := net.Listen("tcp", c.Listen)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
go s.srv.Serve(l)
54+
s.wg.Add(1)
55+
go s.recycler()
56+
return
57+
}
58+
59+
func (s *httpServer) handle(ctx *fh.RequestCtx) {
60+
var (
61+
rq prompb.WriteRequest
62+
buf *buffer
63+
decoded []byte
64+
err error
65+
)
66+
67+
if !bytes.Equal(ctx.Request.Header.Method(), []byte("POST")) {
68+
goto bad
69+
}
70+
71+
if !bytes.Equal(ctx.Path(), []byte("/push")) {
72+
goto bad
73+
}
74+
75+
buf = bufferPool.Get().(*buffer)
76+
defer bufferPool.Put(buf)
77+
78+
buf.grow()
79+
if decoded, err = snappy.Decode(buf.b, ctx.Request.Body()); err != nil {
80+
goto bad
81+
}
82+
83+
if err = proto.Unmarshal(decoded, &rq); err != nil {
84+
goto bad
85+
}
86+
87+
for _, ts := range rq.Timeseries {
88+
s.processTimeseries(ts)
89+
}
90+
91+
return
92+
93+
bad:
94+
msg := "Bad request"
95+
if err != nil {
96+
msg += ": " + err.Error()
97+
}
98+
99+
ctx.Error(msg, fh.StatusBadRequest)
100+
}
101+
102+
func (s *httpServer) processTimeseries(ts *prompb.TimeSeries) {
103+
var (
104+
tenant string
105+
idx int
106+
)
107+
108+
for i, l := range ts.Labels {
109+
if l.Name == s.cfg.Tenant.Label {
110+
tenant = l.Value
111+
idx = i
112+
break
113+
}
114+
}
115+
116+
if tenant == "" {
117+
tenant = s.cfg.Tenant.Default
118+
} else if s.cfg.Tenant.LabelRemove {
119+
cnt := len(ts.Labels)
120+
ts.Labels[idx] = ts.Labels[cnt-1]
121+
ts.Labels = ts.Labels[:cnt-1]
122+
}
123+
124+
s.RLock()
125+
t := s.tenants[tenant]
126+
s.RUnlock()
127+
128+
if t == nil {
129+
s.Lock()
130+
if s.cfg.MaxTenants > 0 && len(s.tenants) >= s.cfg.MaxTenants {
131+
s.Unlock()
132+
s.Errorf("MaxTenants (%s) reached, new tenant (%s) dropped", s.cfg.MaxTenants, tenant)
133+
return
134+
}
135+
136+
s.Warnf("Creating tenant '%s'", tenant)
137+
t = newTenant(tenant, s.cfg)
138+
139+
s.tenants[tenant] = t
140+
s.Unlock()
141+
}
142+
143+
t.push(ts)
144+
}
145+
146+
func (s *httpServer) close() (err error) {
147+
close(s.chClose)
148+
s.wg.Wait()
149+
150+
var errs *me.Error
151+
if err = s.srv.Shutdown(); err != nil {
152+
me.Append(errs, err)
153+
}
154+
155+
s.RLock()
156+
for _, t := range s.tenants {
157+
if err = t.close(); err != nil {
158+
me.Append(errs, err)
159+
}
160+
}
161+
s.RUnlock()
162+
163+
return errs.ErrorOrNil()
164+
}
165+
166+
func (s *httpServer) recycler() {
167+
ticker := time.NewTicker(10 * time.Second)
168+
169+
defer func() {
170+
ticker.Stop()
171+
s.wg.Done()
172+
}()
173+
174+
for {
175+
select {
176+
case <-ticker.C:
177+
now := time.Now()
178+
179+
toClose := []*tenant{}
180+
s.Lock()
181+
for tn, t := range s.tenants {
182+
if now.Sub(t.lastFlush.Load().(time.Time)) >= s.cfg.Tenant.RecycleAge {
183+
toClose = append(toClose, t)
184+
delete(s.tenants, tn)
185+
}
186+
}
187+
s.Unlock()
188+
189+
for _, t := range toClose {
190+
s.Warnf("Recycling tenant '%s'", t.name)
191+
192+
if err := t.close(); err != nil {
193+
s.Errorf("Errors while closing tenant '%s': %s", t.name, err)
194+
}
195+
}
196+
197+
case <-s.chClose:
198+
return
199+
}
200+
}
201+
}

0 commit comments

Comments
 (0)