8
8
"io/ioutil"
9
9
"math/rand"
10
10
"net/http"
11
+ "net/url"
12
+ "path"
11
13
"sync"
12
14
"time"
13
15
@@ -57,6 +59,7 @@ type Config struct {
57
59
CacheResults bool `yaml:"cache_results"`
58
60
CompressResponses bool `yaml:"compress_responses"`
59
61
queryrange.ResultsCacheConfig `yaml:"results_cache"`
62
+ DownstreamURL string `yaml:"downstream"`
60
63
}
61
64
62
65
// RegisterFlags adds the flags required to config this to the given FlagSet.
@@ -68,6 +71,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
68
71
f .BoolVar (& cfg .CacheResults , "querier.cache-results" , false , "Cache query results." )
69
72
f .BoolVar (& cfg .CompressResponses , "querier.compress-http-responses" , false , "Compress HTTP responses." )
70
73
cfg .ResultsCacheConfig .RegisterFlags (f )
74
+ f .StringVar (& cfg .DownstreamURL , "querier.downstream-url" , "" , "URL of downstream Prometheus." )
71
75
}
72
76
73
77
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
@@ -99,6 +103,7 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
99
103
log : log ,
100
104
queues : map [string ]chan * request {},
101
105
}
106
+ f .cond = sync .NewCond (& f .mtx )
102
107
103
108
// Stack up the pipeline of various query range middlewares.
104
109
var queryRangeMiddleware []queryrange.Middleware
@@ -119,8 +124,24 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
119
124
queryRangeMiddleware = append (queryRangeMiddleware , queryrange .InstrumentMiddleware ("retry" , queryRangeDuration ), queryrange .NewRetryMiddleware (log , cfg .MaxRetries ))
120
125
}
121
126
122
- // Finally, if the user selected any query range middleware, stitch it in.
127
+ // If the user has specified a downstream Prometheus, then we should
128
+ // forward requests to that. Otherwise we will wait for queries to
129
+ // contact us.
123
130
var roundTripper http.RoundTripper = f
131
+ if cfg .DownstreamURL != "" {
132
+ u , err := url .Parse (cfg .DownstreamURL )
133
+ if err != nil {
134
+ return nil , err
135
+ }
136
+
137
+ roundTripper = RoundTripFunc (func (r * http.Request ) (* http.Response , error ) {
138
+ r .URL .Host = u .Host
139
+ r .URL .Path = path .Join (u .Path , r .URL .Path )
140
+ return http .DefaultTransport .RoundTrip (r )
141
+ })
142
+ }
143
+
144
+ // Finally, if the user selected any query range middleware, stitch it in.
124
145
if len (queryRangeMiddleware ) > 0 {
125
146
roundTripper = queryrange .NewRoundTripper (
126
147
f ,
@@ -129,10 +150,17 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
129
150
)
130
151
}
131
152
f .roundTripper = roundTripper
132
- f .cond = sync .NewCond (& f .mtx )
133
153
return f , nil
134
154
}
135
155
156
+ // RoundTripFunc is to http.RoundTripper what http.HandlerFunc is to http.Handler.
157
+ type RoundTripFunc func (* http.Request ) (* http.Response , error )
158
+
159
+ // RoundTrip implements http.RoundTripper.
160
+ func (f RoundTripFunc ) RoundTrip (r * http.Request ) (* http.Response , error ) {
161
+ return f (r )
162
+ }
163
+
136
164
// Close stops new requests and errors out any pending requests.
137
165
func (f * Frontend ) Close () {
138
166
f .mtx .Lock ()
0 commit comments