8
8
"io/ioutil"
9
9
"math/rand"
10
10
"net/http"
11
+ "net/url"
12
+ "path"
11
13
"sync"
12
14
"time"
13
15
@@ -57,6 +59,7 @@ type Config struct {
57
59
CacheResults bool `yaml:"cache_results"`
58
60
CompressResponses bool `yaml:"compress_responses"`
59
61
ResultsCacheConfig `yaml:"results_cache"`
62
+ DownstreamURL string `yaml:"downstream"`
60
63
}
61
64
62
65
// RegisterFlags adds the flags required to config this to the given FlagSet.
@@ -68,6 +71,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
68
71
f .BoolVar (& cfg .CacheResults , "querier.cache-results" , false , "Cache query results." )
69
72
f .BoolVar (& cfg .CompressResponses , "querier.compress-http-responses" , false , "Compress HTTP responses." )
70
73
cfg .ResultsCacheConfig .RegisterFlags (f )
74
+ f .StringVar (& cfg .DownstreamURL , "querier.downstream-url" , "" , "URL of downstream Prometheus." )
71
75
}
72
76
73
77
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
@@ -99,6 +103,7 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
99
103
log : log ,
100
104
queues : map [string ]chan * request {},
101
105
}
106
+ f .cond = sync .NewCond (& f .mtx )
102
107
103
108
// Stack up the pipeline of various query range middlewares.
104
109
queryRangeMiddleware := []queryRangeMiddleware {}
@@ -116,8 +121,24 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
116
121
queryRangeMiddleware = append (queryRangeMiddleware , instrument ("results_cache" ), queryCacheMiddleware )
117
122
}
118
123
119
- // Finally, if the user selected any query range middleware, stitch it in.
124
+ // If the user has specified a downstream Prometheus, then we should
125
+ // forward requests to that. Otherwise we will wait for queries to
126
+ // contact us.
120
127
var roundTripper http.RoundTripper = f
128
+ if cfg .DownstreamURL != "" {
129
+ u , err := url .Parse (cfg .DownstreamURL )
130
+ if err != nil {
131
+ return nil , err
132
+ }
133
+
134
+ roundTripper = RoundTripFunc (func (r * http.Request ) (* http.Response , error ) {
135
+ r .URL .Host = u .Host
136
+ r .URL .Path = path .Join (u .Path , r .URL .Path )
137
+ return http .DefaultTransport .RoundTrip (r )
138
+ })
139
+ }
140
+
141
+ // Finally, if the user selected any query range middleware, stitch it in.
121
142
if len (queryRangeMiddleware ) > 0 {
122
143
roundTripper = & queryRangeRoundTripper {
123
144
next : roundTripper ,
@@ -128,10 +149,17 @@ func New(cfg Config, log log.Logger, limits *validation.Overrides) (*Frontend, e
128
149
}
129
150
}
130
151
f .roundTripper = roundTripper
131
- f .cond = sync .NewCond (& f .mtx )
132
152
return f , nil
133
153
}
134
154
155
+ // RoundTripFunc is to http.RoundTripper what http.HandlerFunc is to http.Handler.
156
+ type RoundTripFunc func (* http.Request ) (* http.Response , error )
157
+
158
+ // RoundTrip implements http.RoundTripper.
159
+ func (f RoundTripFunc ) RoundTrip (r * http.Request ) (* http.Response , error ) {
160
+ return f (r )
161
+ }
162
+
135
163
// Close stops new requests and errors out any pending requests.
136
164
func (f * Frontend ) Close () {
137
165
f .mtx .Lock ()
0 commit comments