Skip to content

Commit 494c339

Browse files
Merge pull request #19248 from miminar/imageimportmetrics
Prometheus metrics for image import controllers
2 parents d9085dd + ec5839b commit 494c339

File tree

6 files changed

+402
-15
lines changed

6 files changed

+402
-15
lines changed

pkg/image/controller/factory.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -63,11 +63,13 @@ func (opts ScheduledImageStreamControllerOptions) GetRateLimiter() flowcontrol.R
6363
// NewImageStreamController returns a new image stream import controller.
6464
func NewImageStreamController(client imageclient.Interface, informer imageinformer.ImageStreamInformer) *ImageStreamController {
6565
controller := &ImageStreamController{
66-
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
66+
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ImageStreamController"),
6767

6868
client: client.Image(),
6969
lister: informer.Lister(),
7070
listerSynced: informer.Informer().HasSynced,
71+
72+
importCounter: NewImportMetricCounter(),
7173
}
7274
controller.syncHandler = controller.syncImageStream
7375

@@ -85,11 +87,12 @@ func NewScheduledImageStreamController(client imageclient.Interface, informer im
8587
bucketLimiter := flowcontrol.NewTokenBucketRateLimiter(opts.BucketsToQPS(), 1)
8688

8789
controller := &ScheduledImageStreamController{
88-
enabled: opts.Enabled,
89-
rateLimiter: opts.GetRateLimiter(),
90-
client: client.Image(),
91-
lister: informer.Lister(),
92-
listerSynced: informer.Informer().HasSynced,
90+
enabled: opts.Enabled,
91+
rateLimiter: opts.GetRateLimiter(),
92+
client: client.Image(),
93+
lister: informer.Lister(),
94+
listerSynced: informer.Informer().HasSynced,
95+
importCounter: NewImportMetricCounter(),
9396
}
9497

9598
controller.scheduler = newScheduler(opts.Buckets(), bucketLimiter, controller.syncTimed)

pkg/image/controller/imagestream_controller.go

+20-7
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
imageapi "github.com/openshift/origin/pkg/image/apis/image"
2020
imageclient "github.com/openshift/origin/pkg/image/generated/internalclientset/typed/image/internalversion"
2121
imageinformer "github.com/openshift/origin/pkg/image/generated/listers/image/internalversion"
22+
metrics "github.com/openshift/origin/pkg/image/metrics/prometheus"
2223
)
2324

2425
var ErrNotImportable = errors.New("requested image cannot be imported")
@@ -45,6 +46,9 @@ type ImageStreamController struct {
4546

4647
// notifier informs other controllers that an import is being performed
4748
notifier Notifier
49+
50+
// importCounter counts successful and failed imports for metric collection
51+
importCounter *ImportMetricCounter
4852
}
4953

5054
func (c *ImageStreamController) SetNotifier(n Notifier) {
@@ -68,6 +72,8 @@ func (c *ImageStreamController) Run(workers int, stopCh <-chan struct{}) {
6872
go wait.Until(c.worker, time.Second, stopCh)
6973
}
7074

75+
metrics.InitializeImportCollector(false, c.importCounter.Collect)
76+
7177
<-stopCh
7278
glog.Infof("Shutting down image stream controller")
7379
}
@@ -150,7 +156,9 @@ func (c *ImageStreamController) syncImageStream(key string) error {
150156
}
151157

152158
glog.V(3).Infof("Queued import of stream %s/%s...", stream.Namespace, stream.Name)
153-
return handleImageStream(stream, c.client, c.notifier)
159+
result, err := handleImageStream(stream, c.client, c.notifier)
160+
c.importCounter.Increment(result, err)
161+
return err
154162
}
155163

156164
// tagImportable is true if the given TagReference is importable by this controller
@@ -215,10 +223,14 @@ func needsImport(stream *imageapi.ImageStream) (ok bool, partial bool) {
215223
// 3. spec.DockerImageRepository not defined - import tags per each definition.
216224
//
217225
// Notifier, if passed, will be invoked if the stream is going to be imported.
218-
func handleImageStream(stream *imageapi.ImageStream, client imageclient.ImageInterface, notifier Notifier) error {
226+
func handleImageStream(
227+
stream *imageapi.ImageStream,
228+
client imageclient.ImageInterface,
229+
notifier Notifier,
230+
) (*imageapi.ImageStreamImport, error) {
219231
ok, partial := needsImport(stream)
220232
if !ok {
221-
return nil
233+
return nil, nil
222234
}
223235
glog.V(3).Infof("Importing stream %s/%s partial=%t...", stream.Namespace, stream.Name, partial)
224236

@@ -256,13 +268,14 @@ func handleImageStream(stream *imageapi.ImageStream, client imageclient.ImageInt
256268
result, err := client.ImageStreamImports(stream.Namespace).Create(isi)
257269
if err != nil {
258270
if apierrs.IsNotFound(err) && isStatusErrorKind(err, "imageStream") {
259-
return ErrNotImportable
271+
return result, ErrNotImportable
260272
}
261273
glog.V(4).Infof("Import stream %s/%s partial=%t error: %v", stream.Namespace, stream.Name, partial, err)
262-
} else {
263-
glog.V(5).Infof("Import stream %s/%s partial=%t import: %#v", stream.Namespace, stream.Name, partial, result.Status.Import)
274+
return result, err
264275
}
265-
return err
276+
277+
glog.V(5).Infof("Import stream %s/%s partial=%t import: %#v", stream.Namespace, stream.Name, partial, result.Status.Import)
278+
return result, nil
266279
}
267280

268281
// isStatusErrorKind returns true if this error describes the provided kind.

pkg/image/controller/imagestream_controller_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ func TestHandleImageStream(t *testing.T) {
334334
fake := imageclient.NewSimpleClientset()
335335
other := test.stream.DeepCopy()
336336

337-
if err := handleImageStream(test.stream, fake.Image(), nil); err != nil {
337+
if _, err := handleImageStream(test.stream, fake.Image(), nil); err != nil {
338338
t.Errorf("%d: unexpected error: %v", i, err)
339339
}
340340
if test.expected != nil {

pkg/image/controller/metrics.go

+224
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
package controller
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
7+
apierrs "k8s.io/apimachinery/pkg/api/errors"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
10+
11+
imageapi "github.com/openshift/origin/pkg/image/apis/image"
12+
metrics "github.com/openshift/origin/pkg/image/metrics/prometheus"
13+
)
14+
15+
const reasonUnknown = "Unknown"
16+
const reasonInvalidImageReference = "InvalidImageReference"
17+
18+
// ImportMetricCounter counts numbers of successful and failed imports for the purpose of metrics collection.
19+
type ImportMetricCounter struct {
20+
counterMutex sync.Mutex
21+
importSuccessCounts metrics.ImportSuccessCounts
22+
importErrorCounts metrics.ImportErrorCounts
23+
}
24+
25+
// NewImportMetricCounter returns a new ImportMetricCounter
26+
func NewImportMetricCounter() *ImportMetricCounter {
27+
return &ImportMetricCounter{
28+
importSuccessCounts: make(metrics.ImportSuccessCounts),
29+
importErrorCounts: make(metrics.ImportErrorCounts),
30+
}
31+
}
32+
33+
// Increment processes the given image stream import object as a result of successful or failed import and
34+
// increments the counters. The given error will be used to construct reason of the error_count metric unless
35+
// any reason is found in the image stream import object. It's safe to call this method with any of the
36+
// parameters nil.
37+
func (c *ImportMetricCounter) Increment(isi *imageapi.ImageStreamImport, err error) {
38+
if isi == nil {
39+
if err == nil {
40+
return
41+
}
42+
43+
c.counterMutex.Lock()
44+
defer c.counterMutex.Unlock()
45+
info := defaultErrorInfoReason(&metrics.ImportErrorInfo{}, err)
46+
c.importErrorCounts[*info]++
47+
return
48+
}
49+
50+
c.countRepositoryImport(isi, err)
51+
52+
if len(isi.Status.Images) == 0 {
53+
return
54+
}
55+
56+
c.counterMutex.Lock()
57+
defer c.counterMutex.Unlock()
58+
59+
enumerateIsImportStatuses(isi, func(info *metrics.ImportErrorInfo) {
60+
if len(info.Reason) == 0 {
61+
c.importSuccessCounts[info.Registry]++
62+
} else {
63+
c.importErrorCounts[*defaultErrorInfoReason(info, err)]++
64+
}
65+
})
66+
}
67+
68+
// countRepositoryImport increments either success or error counter if the isimport contains repository
69+
// request.
70+
func (c *ImportMetricCounter) countRepositoryImport(isi *imageapi.ImageStreamImport, err error) {
71+
errInfo := getIsImportRepositoryInfo(isi)
72+
if errInfo == nil {
73+
return
74+
}
75+
76+
c.counterMutex.Lock()
77+
defer c.counterMutex.Unlock()
78+
79+
if len(errInfo.Reason) == 0 {
80+
c.importSuccessCounts[errInfo.Registry]++
81+
} else {
82+
c.importErrorCounts[*defaultErrorInfoReason(errInfo, err)]++
83+
}
84+
}
85+
86+
// Collect is supposed to be called by the metrics collector. It returns the actual state of counters.
87+
func (c *ImportMetricCounter) Collect() (metrics.ImportSuccessCounts, metrics.ImportErrorCounts, error) {
88+
c.counterMutex.Lock()
89+
defer c.counterMutex.Unlock()
90+
91+
success := metrics.ImportSuccessCounts{}
92+
for registry, count := range c.importSuccessCounts {
93+
success[registry] = count
94+
}
95+
96+
failures := metrics.ImportErrorCounts{}
97+
for info, count := range c.importErrorCounts {
98+
failures[info] = count
99+
}
100+
101+
return success, failures, nil
102+
}
103+
104+
// getIsImportRepositoryInfo returns an import error info if the given isi contains repository request.
105+
// If the request succeeded, its Reason will be empty.
106+
func getIsImportRepositoryInfo(isi *imageapi.ImageStreamImport) *metrics.ImportErrorInfo {
107+
if isi.Status.Repository == nil || isi.Spec.Repository == nil {
108+
return nil
109+
}
110+
ref := isi.Spec.Repository.From
111+
if ref.Kind != "DockerImage" {
112+
return nil
113+
}
114+
imgRef, err := imageapi.ParseDockerImageReference(ref.Name)
115+
if err != nil {
116+
utilruntime.HandleError(fmt.Errorf(
117+
"failed to parse isi.spec.repository.from.name %q: %v",
118+
ref.Name, err))
119+
return nil
120+
}
121+
122+
info := mkImportInfo(imgRef.DockerClientDefaults().Registry, &isi.Status.Repository.Status)
123+
return &info
124+
}
125+
126+
// enumerateIsImportStatuses iterates over images of the given image stream import. For any valid recorded
127+
// import the cb callback will be colled with the obtains information.
128+
// If the image import is successful, the object passed to the cb will contain empty Reason.
129+
func enumerateIsImportStatuses(isi *imageapi.ImageStreamImport, cb func(*metrics.ImportErrorInfo)) {
130+
if len(isi.Status.Images) == 0 {
131+
return
132+
}
133+
134+
for i, status := range isi.Status.Images {
135+
var registry string
136+
137+
imgRef, err := getImageDockerReferenceForImage(isi, i)
138+
if err != nil {
139+
utilruntime.HandleError(err)
140+
} else {
141+
if imgRef == nil {
142+
continue
143+
}
144+
registry = imgRef.DockerClientDefaults().Registry
145+
}
146+
147+
info := mkImportInfo(registry, &status.Status)
148+
if err != nil {
149+
info.Reason = reasonInvalidImageReference
150+
}
151+
cb(&info)
152+
}
153+
}
154+
155+
func getImageDockerReferenceForImage(
156+
isi *imageapi.ImageStreamImport,
157+
index int,
158+
) (*imageapi.DockerImageReference, error) {
159+
var (
160+
imgRef imageapi.DockerImageReference
161+
err error
162+
)
163+
164+
// prefer the specification as the source of truth because the reference in status may belong to an
165+
// older image imported from somewhere else
166+
if index >= 0 && index < len(isi.Spec.Images) {
167+
imgSpec := &isi.Spec.Images[index]
168+
if imgSpec.From.Kind == "DockerImage" {
169+
imgRef, err = imageapi.ParseDockerImageReference(imgSpec.From.Name)
170+
if err == nil {
171+
return &imgRef, nil
172+
}
173+
err = fmt.Errorf("failed to parse isi.spec.images[%d].from.name %q: %v",
174+
index, imgSpec.From.Name, err)
175+
}
176+
}
177+
178+
// fall-back to the image in status
179+
if index < 0 || index >= len(isi.Status.Images) {
180+
return nil, err
181+
}
182+
183+
img := isi.Status.Images[index].Image
184+
if img == nil {
185+
return nil, err
186+
}
187+
188+
imgRef, err = imageapi.ParseDockerImageReference(img.DockerImageReference)
189+
if err != nil {
190+
return nil, fmt.Errorf(
191+
"failed to parse isi.status.images[%d].image.dockerImageReference %q: %v",
192+
index, img.DockerImageReference, err)
193+
}
194+
195+
return &imgRef, nil
196+
}
197+
198+
// mkImportInfo returns an import error info for the given status. If the import succeeded, the Reason field
199+
// will be empty.
200+
func mkImportInfo(registry string, status *metav1.Status) metrics.ImportErrorInfo {
201+
var reason string
202+
if status.Status != metav1.StatusSuccess {
203+
reason = string(status.Reason)
204+
if len(reason) == 0 {
205+
reason = reasonUnknown
206+
}
207+
}
208+
return metrics.ImportErrorInfo{
209+
Registry: registry,
210+
Reason: reason,
211+
}
212+
}
213+
214+
// defaultErrorInfoReason fills the Reason field of the import error info from the given error unless already
215+
// set.
216+
func defaultErrorInfoReason(info *metrics.ImportErrorInfo, err error) *metrics.ImportErrorInfo {
217+
if len(info.Reason) == 0 && err != nil {
218+
info.Reason = string(apierrs.ReasonForError(err))
219+
if len(info.Reason) == 0 {
220+
info.Reason = reasonUnknown
221+
}
222+
}
223+
return info
224+
}

pkg/image/controller/scheduled_image_controller.go

+9-1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
imageapi "github.com/openshift/origin/pkg/image/apis/image"
1414
imageclient "github.com/openshift/origin/pkg/image/generated/internalclientset/typed/image/internalversion"
1515
imageinformer "github.com/openshift/origin/pkg/image/generated/listers/image/internalversion"
16+
metrics "github.com/openshift/origin/pkg/image/metrics/prometheus"
1617
)
1718

1819
type uniqueItem struct {
@@ -37,6 +38,9 @@ type ScheduledImageStreamController struct {
3738

3839
// scheduler for timely image re-imports
3940
scheduler *scheduler
41+
42+
// importCounter counts successful and failed imports for metric collection
43+
importCounter *ImportMetricCounter
4044
}
4145

4246
// Importing is invoked when the controller decides to import a stream in order to push back
@@ -69,6 +73,8 @@ func (s *ScheduledImageStreamController) Run(stopCh <-chan struct{}) {
6973

7074
go s.scheduler.RunUntil(stopCh)
7175

76+
metrics.InitializeImportCollector(true, s.importCounter.Collect)
77+
7278
<-stopCh
7379
glog.Infof("Shutting down image stream controller")
7480
}
@@ -163,7 +169,9 @@ func (s *ScheduledImageStreamController) syncTimedByName(namespace, name string)
163169
resetScheduledTags(stream)
164170

165171
glog.V(3).Infof("Scheduled import of stream %s/%s...", stream.Namespace, stream.Name)
166-
return handleImageStream(stream, s.client, nil)
172+
result, err := handleImageStream(stream, s.client, nil)
173+
s.importCounter.Increment(result, err)
174+
return err
167175
}
168176

169177
// resetScheduledTags artificially increments the generation on the tags that should be imported.

0 commit comments

Comments
 (0)