Skip to content

Commit 61c9fd1

Browse files
authoredJun 19, 2018
Merge pull request #79 from dmage/metrics-pullthrough
Add pullthrough metrics
2 parents ea521a8 + c96fa96 commit 61c9fd1

17 files changed

+821
-153
lines changed
 

Diff for: ‎pkg/dockerregistry/server/app.go

+10
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
1414
registryconfig "github.com/openshift/image-registry/pkg/dockerregistry/server/configuration"
1515
"github.com/openshift/image-registry/pkg/dockerregistry/server/maxconnections"
16+
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
1617
"github.com/openshift/image-registry/pkg/dockerregistry/server/supermiddleware"
1718
)
1819

@@ -55,6 +56,9 @@ type App struct {
5556

5657
// cache is a shared cache of digests and descriptors.
5758
cache cache.DigestCache
59+
60+
// metrics provide methods to collect statistics.
61+
metrics metrics.Metrics
5862
}
5963

6064
func (app *App) Storage(driver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
@@ -85,6 +89,12 @@ func NewApp(ctx context.Context, registryClient client.RegistryClient, dockerCon
8589
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
8690
}
8791

92+
if app.config.Metrics.Enabled {
93+
app.metrics = metrics.NewMetrics(metrics.NewPrometheusSink())
94+
} else {
95+
app.metrics = metrics.NewNoopMetrics()
96+
}
97+
8898
cacheTTL := time.Duration(0)
8999
if !app.config.Cache.Disabled {
90100
cacheTTL = app.config.Cache.BlobRepositoryTTL

Diff for: ‎pkg/dockerregistry/server/metrichandler.go

-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
)
1212

1313
func RegisterMetricHandler(app *handlers.App) {
14-
metrics.Register()
1514
getMetricsAccess := func(r *http.Request) []auth.Access {
1615
return []auth.Access{
1716
{

Diff for: ‎pkg/dockerregistry/server/metrics/cache.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package metrics
2+
3+
// Cache provides generic metrics for caches.
4+
type Cache interface {
5+
Request(hit bool)
6+
}
7+
8+
type cache struct {
9+
hitCounter Counter
10+
missCounter Counter
11+
}
12+
13+
func (c *cache) Request(hit bool) {
14+
if hit {
15+
c.hitCounter.Inc()
16+
} else {
17+
c.missCounter.Inc()
18+
}
19+
}
20+
21+
type noopCache struct{}
22+
23+
func (c noopCache) Request(hit bool) {
24+
}

Diff for: ‎pkg/dockerregistry/server/metrics/metrics.go

+118-53
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,148 @@
11
package metrics
22

33
import (
4+
"net/url"
45
"strings"
5-
"time"
66

7-
"github.com/prometheus/client_golang/prometheus"
7+
gocontext "golang.org/x/net/context"
88

99
"github.com/docker/distribution"
1010
"github.com/docker/distribution/context"
11+
"github.com/docker/distribution/registry/api/errcode"
1112

1213
"github.com/openshift/image-registry/pkg/dockerregistry/server/wrapped"
14+
"github.com/openshift/image-registry/pkg/origin-common/image/registryclient"
1315
)
1416

15-
const (
16-
registryNamespace = "openshift"
17-
registrySubsystem = "registry"
18-
)
17+
// Observer captures individual observations.
18+
type Observer interface {
19+
Observe(float64)
20+
}
1921

20-
var (
21-
registryAPIRequests *prometheus.HistogramVec
22-
)
22+
// Counter represents a single numerical value that only goes up.
23+
type Counter interface {
24+
Inc()
25+
}
2326

24-
// Register the metrics.
25-
func Register() {
26-
registryAPIRequests = prometheus.NewHistogramVec(
27-
prometheus.HistogramOpts{
28-
Namespace: registryNamespace,
29-
Subsystem: registrySubsystem,
30-
Name: "request_duration_seconds",
31-
Help: "Request latency summary in microseconds for each operation",
32-
},
33-
[]string{"operation", "name"},
34-
)
35-
prometheus.MustRegister(registryAPIRequests)
36-
}
37-
38-
// Timer is a helper type to time functions.
39-
type Timer interface {
40-
// Stop records the duration passed since the Timer was created with NewTimer.
41-
Stop()
42-
}
43-
44-
// NewTimer wraps the HistogramVec and used to track amount of time passed since the Timer was created.
45-
func NewTimer(collector *prometheus.HistogramVec, labels []string) Timer {
46-
return &metricTimer{
47-
collector: collector,
48-
labels: labels,
49-
startTime: time.Now(),
27+
// Sink provides an interface for exposing metrics.
28+
type Sink interface {
29+
RequestDuration(funcname, reponame string) Observer
30+
PullthroughBlobstoreCacheRequests(resultType string) Counter
31+
PullthroughRepositoryDuration(registry, funcname string) Observer
32+
PullthroughRepositoryErrors(registry, funcname, errcode string) Counter
33+
}
34+
35+
// Metrics is a set of all metrics that can be provided.
36+
type Metrics interface {
37+
Core
38+
Pullthrough
39+
}
40+
41+
// Core is a set of metrics for the core functionality.
42+
type Core interface {
43+
// Repository wraps a distribution.Repository to collect statistics.
44+
Repository(r distribution.Repository, reponame string) distribution.Repository
45+
}
46+
47+
// Pullthrough is a set of metrics for the pullthrough subsystem.
48+
type Pullthrough interface {
49+
// RepositoryRetriever wraps RepositoryRetriever to collect statistics.
50+
RepositoryRetriever(retriever registryclient.RepositoryRetriever) registryclient.RepositoryRetriever
51+
52+
// DigestBlobStoreCache() returns an interface to count cache hits/misses
53+
// for pullthrough blobstores.
54+
DigestBlobStoreCache() Cache
55+
}
56+
57+
func dockerErrorCode(err error) string {
58+
if e, ok := err.(errcode.Error); ok {
59+
return e.ErrorCode().String()
5060
}
61+
return "UNKNOWN"
5162
}
5263

53-
type metricTimer struct {
54-
collector *prometheus.HistogramVec
55-
labels []string
56-
startTime time.Time
64+
func pullthroughRepositoryWrapper(ctx context.Context, sink Sink, registry string, funcname string, f func(ctx context.Context) error) error {
65+
registry = strings.ToLower(registry)
66+
defer NewTimer(sink.PullthroughRepositoryDuration(registry, funcname)).Stop()
67+
err := f(ctx)
68+
if err != nil {
69+
sink.PullthroughRepositoryErrors(registry, funcname, dockerErrorCode(err)).Inc()
70+
}
71+
return err
72+
}
73+
74+
type repositoryRetriever struct {
75+
retriever registryclient.RepositoryRetriever
76+
sink Sink
5777
}
5878

59-
func (m *metricTimer) Stop() {
60-
m.collector.WithLabelValues(m.labels...).Observe(float64(time.Since(m.startTime)) / float64(time.Second))
79+
func (rr repositoryRetriever) Repository(ctx gocontext.Context, registry *url.URL, repoName string, insecure bool) (repo distribution.Repository, err error) {
80+
err = pullthroughRepositoryWrapper(ctx, rr.sink, registry.Host, "Init", func(ctx context.Context) error {
81+
repo, err = rr.retriever.Repository(ctx, registry, repoName, insecure)
82+
return err
83+
})
84+
if err != nil {
85+
return repo, err
86+
}
87+
return wrapped.NewRepository(repo, func(ctx context.Context, funcname string, f func(ctx context.Context) error) error {
88+
return pullthroughRepositoryWrapper(ctx, rr.sink, registry.Host, funcname, f)
89+
}), nil
6190
}
6291

63-
func newWrapper(reponame string) wrapped.Wrapper {
64-
return func(ctx context.Context, funcname string, f func(ctx context.Context) error) error {
65-
defer NewTimer(registryAPIRequests, []string{strings.ToLower(funcname), reponame}).Stop()
92+
type metrics struct {
93+
sink Sink
94+
}
95+
96+
var _ Metrics = &metrics{}
97+
98+
// NewMetrics returns a helper that exposes the metrics through sink to
99+
// instrument the application.
100+
func NewMetrics(sink Sink) Metrics {
101+
return &metrics{
102+
sink: sink,
103+
}
104+
}
105+
106+
func (m *metrics) Repository(r distribution.Repository, reponame string) distribution.Repository {
107+
return wrapped.NewRepository(r, func(ctx context.Context, funcname string, f func(ctx context.Context) error) error {
108+
defer NewTimer(m.sink.RequestDuration(funcname, reponame)).Stop()
66109
return f(ctx)
110+
})
111+
}
112+
113+
func (m *metrics) RepositoryRetriever(retriever registryclient.RepositoryRetriever) registryclient.RepositoryRetriever {
114+
return repositoryRetriever{
115+
retriever: retriever,
116+
sink: m.sink,
67117
}
68118
}
69119

70-
// NewBlobStore wraps a distribution.BlobStore to collect statistics.
71-
func NewBlobStore(bs distribution.BlobStore, reponame string) distribution.BlobStore {
72-
return wrapped.NewBlobStore(bs, newWrapper(reponame))
120+
func (m *metrics) DigestBlobStoreCache() Cache {
121+
return &cache{
122+
hitCounter: m.sink.PullthroughBlobstoreCacheRequests("Hit"),
123+
missCounter: m.sink.PullthroughBlobstoreCacheRequests("Miss"),
124+
}
125+
}
126+
127+
type noopMetrics struct {
128+
}
129+
130+
var _ Metrics = noopMetrics{}
131+
132+
// NewNoopMetrics return a helper that implements the Metrics interface, but
133+
// does nothing.
134+
func NewNoopMetrics() Metrics {
135+
return noopMetrics{}
136+
}
137+
138+
func (m noopMetrics) Repository(r distribution.Repository, reponame string) distribution.Repository {
139+
return r
73140
}
74141

75-
// NewManifestService wraps a distribution.ManifestService to collect statistics
76-
func NewManifestService(ms distribution.ManifestService, reponame string) distribution.ManifestService {
77-
return wrapped.NewManifestService(ms, newWrapper(reponame))
142+
func (m noopMetrics) RepositoryRetriever(retriever registryclient.RepositoryRetriever) registryclient.RepositoryRetriever {
143+
return retriever
78144
}
79145

80-
// NewTagService wraps a distribution.TagService to collect statistics
81-
func NewTagService(ts distribution.TagService, reponame string) distribution.TagService {
82-
return wrapped.NewTagService(ts, newWrapper(reponame))
146+
func (m noopMetrics) DigestBlobStoreCache() Cache {
147+
return noopCache{}
83148
}

Diff for: ‎pkg/dockerregistry/server/metrics/prometheus.go

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
package metrics
2+
3+
import (
4+
"sync"
5+
6+
"github.com/prometheus/client_golang/prometheus"
7+
)
8+
9+
const (
10+
namespace = "imageregistry"
11+
12+
pullthroughSubsystem = "pullthrough"
13+
)
14+
15+
var (
16+
requestDurationSeconds = prometheus.NewHistogramVec(
17+
prometheus.HistogramOpts{
18+
Namespace: namespace,
19+
Name: "request_duration_seconds",
20+
Help: "Request latency in seconds for each operation.",
21+
},
22+
[]string{"operation", "name"},
23+
)
24+
25+
pullthroughBlobstoreCacheRequestsTotal = prometheus.NewCounterVec(
26+
prometheus.CounterOpts{
27+
Namespace: namespace,
28+
Subsystem: pullthroughSubsystem,
29+
Name: "blobstore_cache_requests_total",
30+
Help: "Total number of requests to the BlobStore cache.",
31+
},
32+
[]string{"type"},
33+
)
34+
pullthroughRepositoryDurationSeconds = prometheus.NewHistogramVec(
35+
prometheus.HistogramOpts{
36+
Namespace: namespace,
37+
Subsystem: pullthroughSubsystem,
38+
Name: "repository_duration_seconds",
39+
Help: "Latency of operations with remote registries in seconds.",
40+
},
41+
[]string{"registry", "operation"},
42+
)
43+
pullthroughRepositoryErrorsTotal = prometheus.NewCounterVec(
44+
prometheus.CounterOpts{
45+
Namespace: namespace,
46+
Subsystem: pullthroughSubsystem,
47+
Name: "repository_errors_total",
48+
Help: "Cumulative number of failed operations with remote registries.",
49+
},
50+
[]string{"registry", "operation", "code"},
51+
)
52+
)
53+
54+
var prometheusOnce sync.Once
55+
56+
type prometheusSink struct{}
57+
58+
// NewPrometheusSink returns a sink for exposing Prometheus metrics.
59+
func NewPrometheusSink() Sink {
60+
prometheusOnce.Do(func() {
61+
prometheus.MustRegister(requestDurationSeconds)
62+
prometheus.MustRegister(pullthroughBlobstoreCacheRequestsTotal)
63+
prometheus.MustRegister(pullthroughRepositoryDurationSeconds)
64+
prometheus.MustRegister(pullthroughRepositoryErrorsTotal)
65+
})
66+
return prometheusSink{}
67+
}
68+
69+
func (s prometheusSink) RequestDuration(funcname, reponame string) Observer {
70+
return requestDurationSeconds.WithLabelValues(funcname, reponame)
71+
}
72+
73+
func (s prometheusSink) PullthroughBlobstoreCacheRequests(resultType string) Counter {
74+
return pullthroughBlobstoreCacheRequestsTotal.WithLabelValues(resultType)
75+
}
76+
77+
func (s prometheusSink) PullthroughRepositoryDuration(registry, funcname string) Observer {
78+
return pullthroughRepositoryDurationSeconds.WithLabelValues(registry, funcname)
79+
}
80+
81+
func (s prometheusSink) PullthroughRepositoryErrors(registry, funcname, errcode string) Counter {
82+
return pullthroughRepositoryErrorsTotal.WithLabelValues(registry, funcname, errcode)
83+
}

Diff for: ‎pkg/dockerregistry/server/metrics/testing/counter.go

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package testing
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
7+
"github.com/openshift/image-registry/pkg/testutil/counter"
8+
)
9+
10+
type callbackObserver func(float64)
11+
12+
func (f callbackObserver) Observe(value float64) {
13+
f(value)
14+
}
15+
16+
type callbackCounter func()
17+
18+
func (f callbackCounter) Inc() {
19+
f()
20+
}
21+
22+
type counterSink struct {
23+
c counter.Counter
24+
}
25+
26+
var _ metrics.Sink = &counterSink{}
27+
28+
func (s counterSink) RequestDuration(funcname, reponame string) metrics.Observer {
29+
return callbackObserver(func(float64) {
30+
s.c.Add(fmt.Sprintf("request:%s:%s", funcname, reponame), 1)
31+
})
32+
}
33+
34+
func (s counterSink) PullthroughBlobstoreCacheRequests(resultType string) metrics.Counter {
35+
return callbackCounter(func() {
36+
s.c.Add(fmt.Sprintf("pullthrough_blobstore_cache_requests:%s", resultType), 1)
37+
})
38+
}
39+
40+
func (s counterSink) PullthroughRepositoryDuration(registry, funcname string) metrics.Observer {
41+
return callbackObserver(func(float64) {
42+
s.c.Add(fmt.Sprintf("pullthrough_repository:%s:%s", registry, funcname), 1)
43+
})
44+
}
45+
46+
func (s counterSink) PullthroughRepositoryErrors(registry, funcname, errcode string) metrics.Counter {
47+
return callbackCounter(func() {
48+
s.c.Add(fmt.Sprintf("pullthrough_repository_errors:%s:%s:%s", registry, funcname, errcode), 1)
49+
})
50+
}
51+
52+
func NewCounterSink() (counter.Counter, metrics.Sink) {
53+
c := counter.New()
54+
return c, counterSink{c: c}
55+
}

Diff for: ‎pkg/dockerregistry/server/metrics/timer.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package metrics
2+
3+
import (
4+
"time"
5+
)
6+
7+
// Timer is a helper type to time functions.
8+
type Timer interface {
9+
// Stop records the duration passed since the Timer was created with NewTimer.
10+
Stop()
11+
}
12+
13+
// NewTimer wraps the HistogramVec and used to track amount of time passed since the Timer was created.
14+
func NewTimer(observer Observer) Timer {
15+
return &timer{
16+
observer: observer,
17+
startTime: time.Now(),
18+
}
19+
}
20+
21+
type timer struct {
22+
observer Observer
23+
startTime time.Time
24+
}
25+
26+
func (t *timer) Stop() {
27+
t.observer.Observe(time.Since(t.startTime).Seconds())
28+
}

Diff for: ‎pkg/dockerregistry/server/pullthroughblobstore_test.go

+101-2
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ import (
2222

2323
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
2424
dockerregistryclient "github.com/openshift/image-registry/pkg/dockerregistry/server/client"
25+
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
26+
metricstesting "github.com/openshift/image-registry/pkg/dockerregistry/server/metrics/testing"
2527
"github.com/openshift/image-registry/pkg/imagestream"
2628
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
2729
originregistryclient "github.com/openshift/image-registry/pkg/origin-common/image/registryclient"
2830
"github.com/openshift/image-registry/pkg/testutil"
31+
"github.com/openshift/image-registry/pkg/testutil/counter"
2932
)
3033

3134
func TestPullthroughServeBlob(t *testing.T) {
@@ -157,7 +160,9 @@ func TestPullthroughServeBlob(t *testing.T) {
157160
remoteBlobGetter := NewBlobGetterService(
158161
imageStream,
159162
imageStream.GetSecrets,
160-
cache)
163+
cache,
164+
metrics.NewNoopMetrics(),
165+
)
161166

162167
ptbs := &pullthroughBlobStore{
163168
BlobStore: localBlobStore,
@@ -591,7 +596,9 @@ func TestPullthroughServeBlobInsecure(t *testing.T) {
591596
remoteBlobGetter := NewBlobGetterService(
592597
imageStream,
593598
imageStream.GetSecrets,
594-
cache)
599+
cache,
600+
metrics.NewNoopMetrics(),
601+
)
595602

596603
ptbs := &pullthroughBlobStore{
597604
BlobStore: localBlobStore,
@@ -655,6 +662,98 @@ func TestPullthroughServeBlobInsecure(t *testing.T) {
655662
}
656663
}
657664

665+
func TestPullthroughMetrics(t *testing.T) {
666+
ctx := context.Background()
667+
ctx = testutil.WithTestLogger(ctx, t)
668+
669+
namespace, name := "user", "app"
670+
repoName := fmt.Sprintf("%s/%s", namespace, name)
671+
ctx = withAppMiddleware(ctx, &fakeAccessControllerMiddleware{t: t})
672+
673+
testImage, err := testutil.NewImageForManifest(repoName, testutil.SampleImageManifestSchema1, "", false)
674+
if err != nil {
675+
t.Fatal(err)
676+
}
677+
678+
remoteRegistryServer := createTestRegistryServer(t, ctx)
679+
defer remoteRegistryServer.Close()
680+
681+
serverURL, err := url.Parse(remoteRegistryServer.URL)
682+
if err != nil {
683+
t.Fatalf("error parsing server url: %v", err)
684+
}
685+
testImage.DockerImageReference = fmt.Sprintf("%s/%s@%s", serverURL.Host, repoName, testImage.Name)
686+
687+
fos, imageClient := testutil.NewFakeOpenShiftWithClient(ctx)
688+
testutil.AddImageStream(t, fos, namespace, name, map[string]string{
689+
imageapi.InsecureRepositoryAnnotation: "true",
690+
})
691+
testutil.AddImage(t, fos, testImage, namespace, name, "latest")
692+
693+
blobDesc, _, err := testutil.UploadRandomTestBlob(ctx, serverURL.String(), nil, repoName)
694+
if err != nil {
695+
t.Fatal(err)
696+
}
697+
698+
localBlobStore := newTestBlobStore(nil, nil)
699+
700+
imageStream := imagestream.New(ctx, namespace, name, dockerregistryclient.NewFakeRegistryAPIClient(nil, imageClient))
701+
702+
digestCache, err := cache.NewBlobDigest(
703+
defaultDescriptorCacheSize,
704+
defaultDigestToRepositoryCacheSize,
705+
24*time.Hour, // for tests it's virtually forever
706+
)
707+
if err != nil {
708+
t.Fatalf("unable to create cache: %v", err)
709+
}
710+
711+
cache := &cache.RepoDigest{
712+
Cache: digestCache,
713+
}
714+
c, sink := metricstesting.NewCounterSink()
715+
remoteBlobGetter := NewBlobGetterService(
716+
imageStream,
717+
imageStream.GetSecrets,
718+
cache,
719+
metrics.NewMetrics(sink),
720+
)
721+
722+
ptbs := &pullthroughBlobStore{
723+
BlobStore: localBlobStore,
724+
remoteBlobGetter: remoteBlobGetter,
725+
}
726+
727+
dgst := digest.Digest(blobDesc.Digest)
728+
729+
_, err = ptbs.Stat(ctx, dgst)
730+
if err != nil {
731+
t.Fatalf("Stat returned unexpected error: %#+v", err)
732+
}
733+
734+
if diff := c.Diff(counter.M{
735+
"pullthrough_blobstore_cache_requests:Miss": 1,
736+
fmt.Sprintf("pullthrough_repository:%s:Init", serverURL.Host): 1,
737+
fmt.Sprintf("pullthrough_repository:%s:BlobStore.Stat", serverURL.Host): 1,
738+
}); diff != nil {
739+
t.Fatal(diff)
740+
}
741+
742+
_, err = ptbs.Stat(ctx, dgst)
743+
if err != nil {
744+
t.Fatalf("Stat returned unexpected error: %#+v", err)
745+
}
746+
747+
if diff := c.Diff(counter.M{
748+
"pullthrough_blobstore_cache_requests:Miss": 1,
749+
"pullthrough_blobstore_cache_requests:Hit": 1,
750+
fmt.Sprintf("pullthrough_repository:%s:Init", serverURL.Host): 1,
751+
fmt.Sprintf("pullthrough_repository:%s:BlobStore.Stat", serverURL.Host): 2,
752+
}); diff != nil {
753+
t.Fatal(diff)
754+
}
755+
}
756+
658757
const (
659758
unknownBlobDigest = "sha256:bef57ec7f53a6d40beb640a780a639c83bc29ac8a9816f1fc6c5c6dcd93c4721"
660759
)

Diff for: ‎pkg/dockerregistry/server/pullthroughmanifestservice.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/docker/distribution/digest"
99

1010
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
11+
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
1112
"github.com/openshift/image-registry/pkg/errors"
1213
"github.com/openshift/image-registry/pkg/imagestream"
1314
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
@@ -24,6 +25,7 @@ type pullthroughManifestService struct {
2425
cache cache.RepositoryDigest
2526
mirror bool
2627
registryAddr string
28+
metrics metrics.Pullthrough
2729
}
2830

2931
var _ distribution.ManifestService = &pullthroughManifestService{}
@@ -98,7 +100,7 @@ func (m *pullthroughManifestService) mirrorManifest(ctx context.Context, manifes
98100
}
99101

100102
func (m *pullthroughManifestService) getRemoteRepositoryClient(ctx context.Context, ref *imageapi.DockerImageReference, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Repository, error) {
101-
retriever := getImportContext(ctx, m.imageStream.GetSecrets)
103+
retriever := getImportContext(ctx, m.imageStream.GetSecrets, m.metrics)
102104

103105
// determine, whether to fall-back to insecure transport based on a specification of image's tag
104106
// if the client pulls by tag, use that

Diff for: ‎pkg/dockerregistry/server/pullthroughmanifestservice_test.go

+78
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,12 @@ import (
2020

2121
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
2222
registryclient "github.com/openshift/image-registry/pkg/dockerregistry/server/client"
23+
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
24+
metricstesting "github.com/openshift/image-registry/pkg/dockerregistry/server/metrics/testing"
2325
"github.com/openshift/image-registry/pkg/imagestream"
2426
imageapi "github.com/openshift/image-registry/pkg/origin-common/image/apis/image"
2527
"github.com/openshift/image-registry/pkg/testutil"
28+
"github.com/openshift/image-registry/pkg/testutil/counter"
2629
)
2730

2831
func createTestRegistryServer(t *testing.T, ctx context.Context) *httptest.Server {
@@ -174,6 +177,7 @@ func TestPullthroughManifests(t *testing.T) {
174177
imageStream: imageStream,
175178
cache: cache,
176179
registryAddr: "localhost:5000",
180+
metrics: metrics.NewNoopMetrics(),
177181
}
178182

179183
manifestResult, err := ptms.Get(ctx, tc.manifestDigest)
@@ -408,6 +412,7 @@ func TestPullthroughManifestInsecure(t *testing.T) {
408412
ManifestService: localManifestService,
409413
imageStream: imageStream,
410414
cache: cache,
415+
metrics: metrics.NewNoopMetrics(),
411416
}
412417

413418
manifestResult, err := ptms.Get(ctx, tc.manifestDigest)
@@ -540,6 +545,7 @@ func TestPullthroughManifestDockerReference(t *testing.T) {
540545
ptms := &pullthroughManifestService{
541546
ManifestService: newTestManifestService(tc.repoName, nil),
542547
imageStream: imageStream,
548+
metrics: metrics.NewNoopMetrics(),
543549
}
544550

545551
ptms.Get(ctx, digest.Digest(img.Name))
@@ -691,6 +697,7 @@ func TestPullthroughManifestMirroring(t *testing.T) {
691697
newLocalManifestService: func(ctx context.Context) (distribution.ManifestService, error) { return ms, nil },
692698
imageStream: imageStream,
693699
mirror: true,
700+
metrics: metrics.NewNoopMetrics(),
694701
}
695702

696703
_, err = ptms.Get(ctx, digest.Digest(img.Name))
@@ -704,3 +711,74 @@ func TestPullthroughManifestMirroring(t *testing.T) {
704711
t.Fatal("timeout while waiting for manifest to be mirrored")
705712
}
706713
}
714+
715+
func TestPullthroughManifestMetrics(t *testing.T) {
716+
namespace := "myproject"
717+
repo := "myapp"
718+
repoName := fmt.Sprintf("%s/%s", namespace, repo)
719+
720+
mediaType := "application/vnd.docker.distribution.manifest.v2+json"
721+
manifest := `{"schemaVersion":2,"mediaType":"` + mediaType + `"}`
722+
config := `{}`
723+
724+
manifestDigest := digest.FromBytes([]byte(manifest))
725+
726+
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
727+
switch r.URL.Path {
728+
case "/v2/":
729+
fmt.Fprint(w, "{}")
730+
case "/v2/remoteimage/manifests/" + manifestDigest.String():
731+
w.Header().Set("Content-Type", mediaType)
732+
fmt.Fprint(w, manifest)
733+
default:
734+
t.Logf("unhandled request: %s %v", r.Method, r.URL)
735+
http.Error(w, "404 not found", http.StatusNotFound)
736+
}
737+
}))
738+
defer ts.Close()
739+
740+
tsURL, err := url.Parse(ts.URL)
741+
if err != nil {
742+
t.Fatalf("failed to parse test server URL: %v", err)
743+
}
744+
745+
img, err := testutil.NewImageForManifest("unused", manifest, config, false)
746+
if err != nil {
747+
t.Fatal(err)
748+
}
749+
img.DockerImageReference = fmt.Sprintf("%s/remoteimage", tsURL.Host)
750+
img.DockerImageManifest = ""
751+
img.DockerImageConfig = ""
752+
753+
ctx := context.Background()
754+
ctx = testutil.WithTestLogger(ctx, t)
755+
756+
fos, imageClient := testutil.NewFakeOpenShiftWithClient(ctx)
757+
testutil.AddImageStream(t, fos, namespace, repo, map[string]string{
758+
imageapi.InsecureRepositoryAnnotation: "true",
759+
})
760+
testutil.AddImage(t, fos, img, namespace, repo, "latest")
761+
762+
imageStream := imagestream.New(ctx, namespace, repo, registryclient.NewFakeRegistryAPIClient(nil, imageClient))
763+
764+
c, sink := metricstesting.NewCounterSink()
765+
ms := newTestManifestService(repoName, nil)
766+
ptms := &pullthroughManifestService{
767+
ManifestService: ms,
768+
newLocalManifestService: func(ctx context.Context) (distribution.ManifestService, error) { return ms, nil },
769+
imageStream: imageStream,
770+
metrics: metrics.NewMetrics(sink),
771+
}
772+
773+
_, err = ptms.Get(ctx, digest.Digest(img.Name))
774+
if err != nil {
775+
t.Fatalf("failed to get manifest: %v", err)
776+
}
777+
778+
if diff := c.Diff(counter.M{
779+
fmt.Sprintf("pullthrough_repository:%s:Init", tsURL.Host): 1,
780+
fmt.Sprintf("pullthrough_repository:%s:ManifestService.Get", tsURL.Host): 1,
781+
}); diff != nil {
782+
t.Fatalf("unexpected metrics: %v", diff)
783+
}
784+
}

Diff for: ‎pkg/dockerregistry/server/registry_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
1717
registryclient "github.com/openshift/image-registry/pkg/dockerregistry/server/client"
1818
"github.com/openshift/image-registry/pkg/dockerregistry/server/configuration"
19+
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
1920
"github.com/openshift/image-registry/pkg/dockerregistry/server/supermiddleware"
2021
)
2122

@@ -68,6 +69,7 @@ func newTestRegistry(
6869
quotaEnforcing: &quotaEnforcingConfig{
6970
enforcementEnabled: false,
7071
},
72+
metrics: metrics.NewNoopMetrics(),
7173
}
7274

7375
if storageDriver == nil {

Diff for: ‎pkg/dockerregistry/server/remoteblobgetter.go

+95-75
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
corev1 "k8s.io/api/core/v1"
1212

1313
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
14+
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
1415
"github.com/openshift/image-registry/pkg/imagestream"
1516
"github.com/openshift/image-registry/pkg/origin-common/image/registryclient"
1617
)
@@ -28,21 +29,26 @@ type secretsGetter func() ([]corev1.Secret, error)
2829
// concurrently from different goroutines (from an HTTP handler and background
2930
// mirroring, for example).
3031
type digestBlobStoreCache struct {
31-
mu sync.RWMutex
32-
data map[string]distribution.BlobStore
32+
mu sync.RWMutex
33+
data map[string]distribution.BlobStore
34+
metrics metrics.Cache
3335
}
3436

35-
func newDigestBlobStoreCache() *digestBlobStoreCache {
37+
func newDigestBlobStoreCache(m metrics.Pullthrough) *digestBlobStoreCache {
3638
return &digestBlobStoreCache{
37-
data: make(map[string]distribution.BlobStore),
39+
data: make(map[string]distribution.BlobStore),
40+
metrics: m.DigestBlobStoreCache(),
3841
}
3942
}
4043

41-
func (c *digestBlobStoreCache) Get(dgst digest.Digest) (distribution.BlobStore, bool) {
42-
c.mu.RLock()
43-
defer c.mu.RUnlock()
44-
bs, ok := c.data[dgst.String()]
45-
return bs, ok
44+
func (c *digestBlobStoreCache) Get(dgst digest.Digest) (bs distribution.BlobStore, ok bool) {
45+
func() {
46+
c.mu.RLock()
47+
defer c.mu.RUnlock()
48+
bs, ok = c.data[dgst.String()]
49+
}()
50+
c.metrics.Request(ok)
51+
return
4652
}
4753

4854
func (c *digestBlobStoreCache) Put(dgst digest.Digest, bs distribution.BlobStore) {
@@ -58,6 +64,7 @@ type remoteBlobGetterService struct {
5864
getSecrets secretsGetter
5965
cache cache.RepositoryDigest
6066
digestToStore *digestBlobStoreCache
67+
metrics metrics.Pullthrough
6168
}
6269

6370
var _ BlobGetterService = &remoteBlobGetterService{}
@@ -68,97 +75,117 @@ func NewBlobGetterService(
6875
imageStream imagestream.ImageStream,
6976
secretsGetter secretsGetter,
7077
cache cache.RepositoryDigest,
78+
m metrics.Pullthrough,
7179
) BlobGetterService {
7280
return &remoteBlobGetterService{
7381
imageStream: imageStream,
7482
getSecrets: secretsGetter,
7583
cache: cache,
76-
digestToStore: newDigestBlobStoreCache(),
84+
digestToStore: newDigestBlobStoreCache(m),
85+
metrics: m,
7786
}
7887
}
7988

80-
// Stat provides metadata about a blob identified by the digest. If the
81-
// blob is unknown to the describer, ErrBlobUnknown will be returned.
82-
func (rbgs *remoteBlobGetterService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
83-
context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).Stat: starting with dgst=%s", dgst.String())
89+
func (rbgs *remoteBlobGetterService) findBlobStore(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, distribution.BlobStore, error) {
8490
// look up the potential remote repositories that this blob could be part of (at this time,
8591
// we don't know which image in the image stream surfaced the content).
8692
ok, err := rbgs.imageStream.Exists()
8793
if err != nil {
88-
return distribution.Descriptor{}, err
94+
return distribution.Descriptor{}, nil, err
8995
}
9096
if !ok {
91-
return distribution.Descriptor{}, distribution.ErrBlobUnknown
97+
return distribution.Descriptor{}, nil, distribution.ErrBlobUnknown
9298
}
9399

94100
cached, _ := rbgs.cache.Repositories(dgst)
95101

96-
retriever := getImportContext(ctx, rbgs.getSecrets)
102+
retriever := getImportContext(ctx, rbgs.getSecrets, rbgs.metrics)
97103

98104
// look at the first level of tagged repositories first
99105
repositoryCandidates, search, err := rbgs.imageStream.IdentifyCandidateRepositories(true)
100106
if err != nil {
101-
return distribution.Descriptor{}, err
107+
return distribution.Descriptor{}, nil, err
102108
}
103-
if desc, err := rbgs.findCandidateRepository(ctx, repositoryCandidates, search, cached, dgst, retriever); err == nil {
104-
return desc, nil
109+
if desc, bs, err := rbgs.findCandidateRepository(ctx, repositoryCandidates, search, cached, dgst, retriever); err == nil {
110+
return desc, bs, nil
105111
}
106112

107113
// look at all other repositories tagged by the server
108114
repositoryCandidates, secondary, err := rbgs.imageStream.IdentifyCandidateRepositories(false)
109115
if err != nil {
110-
return distribution.Descriptor{}, err
116+
return distribution.Descriptor{}, nil, err
111117
}
112118
for k := range search {
113119
delete(secondary, k)
114120
}
115-
if desc, err := rbgs.findCandidateRepository(ctx, repositoryCandidates, secondary, cached, dgst, retriever); err == nil {
116-
return desc, nil
121+
if desc, bs, err := rbgs.findCandidateRepository(ctx, repositoryCandidates, secondary, cached, dgst, retriever); err == nil {
122+
return desc, bs, nil
117123
}
118124

119-
return distribution.Descriptor{}, distribution.ErrBlobUnknown
125+
return distribution.Descriptor{}, nil, distribution.ErrBlobUnknown
120126
}
121127

122-
func (rbgs *remoteBlobGetterService) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
123-
context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).Open: starting with dgst=%s", dgst.String())
124-
store, ok := rbgs.digestToStore.Get(dgst)
128+
// Stat provides metadata about a blob identified by the digest. If the
129+
// blob is unknown to the describer, ErrBlobUnknown will be returned.
130+
func (rbgs *remoteBlobGetterService) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) {
131+
context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).Stat: starting with dgst=%s", dgst)
132+
133+
bs, ok := rbgs.digestToStore.Get(dgst)
125134
if ok {
126-
return store.Open(ctx, dgst)
135+
desc, err := bs.Stat(ctx, dgst)
136+
if err == nil {
137+
return desc, nil
138+
}
139+
140+
context.GetLogger(ctx).Warnf("Stat: failed to stat blob %s in cached remote repository: %v", dgst, err)
141+
142+
// There are two possible scenarios:
143+
//
144+
// * the blob is no longer available on the remote server,
145+
// * the registry isn't available at the moment.
146+
//
147+
// In both cases we can move on and hopefully we'll find another
148+
// registry.
127149
}
128150

129-
desc, err := rbgs.Stat(ctx, dgst)
151+
desc, bs, err := rbgs.findBlobStore(ctx, dgst)
130152
if err != nil {
131-
context.GetLogger(ctx).Errorf("Open: failed to stat blob %q in remote repositories: %v", dgst.String(), err)
132-
return nil, err
153+
return desc, err
133154
}
134155

135-
store, ok = rbgs.digestToStore.Get(desc.Digest)
136-
if !ok {
137-
return nil, distribution.ErrBlobUnknown
138-
}
156+
rbgs.digestToStore.Put(dgst, bs)
139157

140-
return store.Open(ctx, desc.Digest)
158+
return desc, nil
141159
}
142160

143-
func (rbgs *remoteBlobGetterService) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error {
144-
context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).ServeBlob: starting with dgst=%s", dgst.String())
145-
store, ok := rbgs.digestToStore.Get(dgst)
146-
if ok {
147-
return store.ServeBlob(ctx, w, req, dgst)
161+
func (rbgs *remoteBlobGetterService) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) {
162+
context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).Open: starting with dgst=%s", dgst)
163+
bs, ok := rbgs.digestToStore.Get(dgst)
164+
if !ok {
165+
var err error
166+
_, bs, err = rbgs.findBlobStore(ctx, dgst)
167+
if err != nil {
168+
return nil, err
169+
}
170+
rbgs.digestToStore.Put(dgst, bs)
148171
}
149172

150-
desc, err := rbgs.Stat(ctx, dgst)
151-
if err != nil {
152-
context.GetLogger(ctx).Errorf("ServeBlob: failed to stat blob %q in remote repositories: %v", dgst.String(), err)
153-
return err
154-
}
173+
return bs.Open(ctx, dgst)
174+
}
155175

156-
store, ok = rbgs.digestToStore.Get(desc.Digest)
176+
func (rbgs *remoteBlobGetterService) ServeBlob(ctx context.Context, w http.ResponseWriter, req *http.Request, dgst digest.Digest) error {
177+
context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).ServeBlob: starting with dgst=%s", dgst)
178+
bs, ok := rbgs.digestToStore.Get(dgst)
157179
if !ok {
158-
return distribution.ErrBlobUnknown
180+
var err error
181+
_, bs, err = rbgs.findBlobStore(ctx, dgst)
182+
if err != nil {
183+
return err
184+
}
185+
rbgs.digestToStore.Put(dgst, bs)
159186
}
160187

161-
return store.ServeBlob(ctx, w, req, desc.Digest)
188+
return bs.ServeBlob(ctx, w, req, dgst)
162189
}
163190

164191
// proxyStat attempts to locate the digest in the provided remote repository or returns an error. If the digest is found,
@@ -168,7 +195,7 @@ func (rbgs *remoteBlobGetterService) proxyStat(
168195
retriever registryclient.RepositoryRetriever,
169196
spec *imagestream.ImagePullthroughSpec,
170197
dgst digest.Digest,
171-
) (distribution.Descriptor, error) {
198+
) (distribution.Descriptor, distribution.BlobStore, error) {
172199
ref := spec.DockerImageReference
173200
insecureNote := ""
174201
if spec.Insecure {
@@ -178,7 +205,7 @@ func (rbgs *remoteBlobGetterService) proxyStat(
178205
repo, err := retriever.Repository(ctx, ref.RegistryURL(), ref.RepositoryName(), spec.Insecure)
179206
if err != nil {
180207
context.GetLogger(ctx).Errorf("Error getting remote repository for image %q: %v", ref.AsRepository().Exact(), err)
181-
return distribution.Descriptor{}, err
208+
return distribution.Descriptor{}, nil, err
182209
}
183210

184211
pullthroughBlobStore := repo.Blobs(ctx)
@@ -187,33 +214,26 @@ func (rbgs *remoteBlobGetterService) proxyStat(
187214
if err != distribution.ErrBlobUnknown {
188215
context.GetLogger(ctx).Errorf("Error statting blob %s in remote repository %q: %v", dgst, ref.AsRepository().Exact(), err)
189216
}
190-
return distribution.Descriptor{}, err
217+
return distribution.Descriptor{}, nil, err
191218
}
192219

193-
rbgs.digestToStore.Put(dgst, pullthroughBlobStore)
194-
return desc, nil
220+
return desc, pullthroughBlobStore, nil
195221
}
196222

197223
// Get attempts to fetch the requested blob by digest using a remote proxy store if necessary.
198224
func (rbgs *remoteBlobGetterService) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
199225
context.GetLogger(ctx).Debugf("(*remoteBlobGetterService).Get: starting with dgst=%s", dgst.String())
200-
store, ok := rbgs.digestToStore.Get(dgst)
201-
if ok {
202-
return store.Get(ctx, dgst)
203-
}
204-
205-
desc, err := rbgs.Stat(ctx, dgst)
206-
if err != nil {
207-
context.GetLogger(ctx).Errorf("Get: failed to stat blob %q in remote repositories: %v", dgst.String(), err)
208-
return nil, err
209-
}
210-
211-
store, ok = rbgs.digestToStore.Get(desc.Digest)
226+
bs, ok := rbgs.digestToStore.Get(dgst)
212227
if !ok {
213-
return nil, distribution.ErrBlobUnknown
228+
var err error
229+
_, bs, err = rbgs.findBlobStore(ctx, dgst)
230+
if err != nil {
231+
return nil, err
232+
}
233+
rbgs.digestToStore.Put(dgst, bs)
214234
}
215235

216-
return store.Get(ctx, desc.Digest)
236+
return bs.Get(ctx, dgst)
217237
}
218238

219239
// findCandidateRepository looks in search for a particular blob, referring to previously cached items
@@ -224,10 +244,10 @@ func (rbgs *remoteBlobGetterService) findCandidateRepository(
224244
cachedRepos []string,
225245
dgst digest.Digest,
226246
retriever registryclient.RepositoryRetriever,
227-
) (distribution.Descriptor, error) {
247+
) (distribution.Descriptor, distribution.BlobStore, error) {
228248
// no possible remote locations to search, exit early
229249
if len(search) == 0 {
230-
return distribution.Descriptor{}, distribution.ErrBlobUnknown
250+
return distribution.Descriptor{}, nil, distribution.ErrBlobUnknown
231251
}
232252

233253
// see if any of the previously located repositories containing this digest are in this
@@ -237,13 +257,13 @@ func (rbgs *remoteBlobGetterService) findCandidateRepository(
237257
if !ok {
238258
continue
239259
}
240-
desc, err := rbgs.proxyStat(ctx, retriever, &spec, dgst)
260+
desc, bs, err := rbgs.proxyStat(ctx, retriever, &spec, dgst)
241261
if err != nil {
242262
delete(search, repo)
243263
continue
244264
}
245265
context.GetLogger(ctx).Infof("Found digest location from cache %q in %q", dgst, repo)
246-
return desc, nil
266+
return desc, bs, nil
247267
}
248268

249269
// search the remaining registries for this digest
@@ -252,14 +272,14 @@ func (rbgs *remoteBlobGetterService) findCandidateRepository(
252272
if !ok {
253273
continue
254274
}
255-
desc, err := rbgs.proxyStat(ctx, retriever, &spec, dgst)
275+
desc, bs, err := rbgs.proxyStat(ctx, retriever, &spec, dgst)
256276
if err != nil {
257277
continue
258278
}
259279
_ = rbgs.cache.AddDigest(dgst, repo)
260280
context.GetLogger(ctx).Infof("Found digest location by search %q in %q", dgst, repo)
261-
return desc, nil
281+
return desc, bs, nil
262282
}
263283

264-
return distribution.Descriptor{}, distribution.ErrBlobUnknown
284+
return distribution.Descriptor{}, nil, distribution.ErrBlobUnknown
265285
}

Diff for: ‎pkg/dockerregistry/server/repository.go

+6-14
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
"github.com/openshift/image-registry/pkg/dockerregistry/server/audit"
1414
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
15-
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
1615
"github.com/openshift/image-registry/pkg/imagestream"
1716
)
1817

@@ -82,12 +81,16 @@ func (app *App) Repository(ctx context.Context, repo distribution.Repository, cr
8281
r.imageStream,
8382
r.imageStream.GetSecrets,
8483
r.cache,
84+
r.app.metrics,
8585
)
8686
}
8787

88+
repo = distribution.Repository(r)
89+
repo = r.app.metrics.Repository(repo, repo.Named().Name())
90+
8891
bdsf := blobDescriptorServiceFactoryFunc(r.BlobDescriptorService)
8992

90-
return r, bdsf, nil
93+
return repo, bdsf, nil
9194
}
9295

9396
// Manifests returns r, which implements distribution.ManifestService.
@@ -119,6 +122,7 @@ func (r *repository) Manifests(ctx context.Context, options ...distribution.Mani
119122
cache: r.cache,
120123
mirror: r.app.config.Pullthrough.Mirror,
121124
registryAddr: r.app.config.Server.Addr,
125+
metrics: r.app.metrics,
122126
}
123127
}
124128

@@ -128,10 +132,6 @@ func (r *repository) Manifests(ctx context.Context, options ...distribution.Mani
128132
ms = audit.NewManifestService(ctx, ms)
129133
}
130134

131-
if r.app.config.Metrics.Enabled {
132-
ms = metrics.NewManifestService(ms, r.Named().Name())
133-
}
134-
135135
return ms, nil
136136
}
137137

@@ -164,10 +164,6 @@ func (r *repository) Blobs(ctx context.Context) distribution.BlobStore {
164164
bs = audit.NewBlobStore(ctx, bs)
165165
}
166166

167-
if r.app.config.Metrics.Enabled {
168-
bs = metrics.NewBlobStore(bs, r.Named().Name())
169-
}
170-
171167
return bs
172168
}
173169

@@ -187,10 +183,6 @@ func (r *repository) Tags(ctx context.Context) distribution.TagService {
187183
ts = audit.NewTagService(ctx, ts)
188184
}
189185

190-
if r.app.config.Metrics.Enabled {
191-
ts = metrics.NewTagService(ts, r.Named().Name())
192-
}
193-
194186
return ts
195187
}
196188

Diff for: ‎pkg/dockerregistry/server/util.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
1616
registrymanifest "github.com/openshift/image-registry/pkg/dockerregistry/server/manifest"
17+
"github.com/openshift/image-registry/pkg/dockerregistry/server/metrics"
1718
"github.com/openshift/image-registry/pkg/origin-common/image/registryclient"
1819
)
1920

@@ -38,13 +39,16 @@ func getNamespaceName(resourceName string) (string, string, error) {
3839

3940
// getImportContext loads secrets and returns a context for getting
4041
// distribution clients to remote repositories.
41-
func getImportContext(ctx context.Context, secretsGetter secretsGetter) registryclient.RepositoryRetriever {
42+
func getImportContext(ctx context.Context, secretsGetter secretsGetter, m metrics.Pullthrough) registryclient.RepositoryRetriever {
4243
secrets, err := secretsGetter()
4344
if err != nil {
4445
context.GetLogger(ctx).Errorf("error getting secrets: %v", err)
4546
}
4647
credentials := registryclient.NewCredentialsForSecrets(secrets)
47-
return registryclient.NewContext(secureTransport, insecureTransport).WithCredentials(credentials)
48+
var retriever registryclient.RepositoryRetriever
49+
retriever = registryclient.NewContext(secureTransport, insecureTransport).WithCredentials(credentials)
50+
retriever = m.RepositoryRetriever(retriever)
51+
return retriever
4852
}
4953

5054
// RememberLayersOfImage caches the layer digests of given image.

Diff for: ‎pkg/testframework/registry.go

+9
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,15 @@ func (o DisableMirroring) Apply(dockerConfig *configuration.Configuration, extra
2727
extraConfig.Pullthrough.Mirror = false
2828
}
2929

30+
type EnableMetrics struct {
31+
Secret string
32+
}
33+
34+
func (o EnableMetrics) Apply(dockerConfig *configuration.Configuration, extraConfig *registryconfig.Configuration) {
35+
extraConfig.Metrics.Enabled = true
36+
extraConfig.Metrics.Secret = o.Secret
37+
}
38+
3039
func StartTestRegistry(t *testing.T, kubeConfigPath string, options ...RegistryOption) (net.Listener, CloseFunc) {
3140
localIPv4, err := DefaultLocalIP4()
3241
if err != nil {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
package integration
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"strings"
11+
"testing"
12+
13+
"github.com/docker/distribution"
14+
digest "github.com/docker/distribution/digest"
15+
16+
corev1 "k8s.io/api/core/v1"
17+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
19+
imageapiv1 "github.com/openshift/api/image/v1"
20+
imageclientv1 "github.com/openshift/client-go/image/clientset/versioned/typed/image/v1"
21+
22+
"github.com/openshift/image-registry/pkg/testframework"
23+
)
24+
25+
func TestPullthroughBlob(t *testing.T) {
26+
config := "{}"
27+
configDigest := digest.FromBytes([]byte(config))
28+
29+
foo := "foo"
30+
fooDigest := digest.FromBytes([]byte(foo))
31+
32+
master := testframework.NewMaster(t)
33+
defer master.Close()
34+
35+
testuser := master.CreateUser("testuser", "testp@ssw0rd")
36+
testproject := master.CreateProject("testproject", testuser.Name)
37+
teststreamName := "pullthrough"
38+
39+
// TODO(dmage): use atomic variable
40+
remoteRegistryRequiresAuth := false
41+
ts := testframework.NewHTTPServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
42+
req := fmt.Sprintf("%s %s", r.Method, r.URL.Path)
43+
44+
t.Logf("remote registry: %s", req)
45+
46+
w.Header().Set("Docker-Distribution-API-Version", "registry/2.0")
47+
48+
if remoteRegistryRequiresAuth {
49+
w.WriteHeader(http.StatusUnauthorized)
50+
return
51+
}
52+
53+
switch req {
54+
case "GET /v2/":
55+
w.Write([]byte(`{}`))
56+
case "GET /v2/remoteimage/manifests/latest":
57+
mediaType := "application/vnd.docker.distribution.manifest.v2+json"
58+
w.Header().Set("Content-Type", mediaType)
59+
_ = json.NewEncoder(w).Encode(map[string]interface{}{
60+
"schemaVersion": 2,
61+
"mediaType": mediaType,
62+
"config": map[string]interface{}{
63+
"mediaType": "application/vnd.docker.container.image.v1+json",
64+
"size": len(config),
65+
"digest": configDigest.String(),
66+
},
67+
"layers": []map[string]interface{}{
68+
{
69+
"mediaType": "application/vnd.docker.image.rootfs.diff.tar.gzip",
70+
"size": len(foo),
71+
"digest": fooDigest.String(),
72+
},
73+
},
74+
})
75+
case "GET /v2/remoteimage/blobs/" + configDigest.String():
76+
w.Write([]byte(config))
77+
case "HEAD /v2/remoteimage/blobs/" + fooDigest.String():
78+
w.Header().Set("Content-Length", fmt.Sprintf("%d", len(foo)))
79+
w.WriteHeader(http.StatusOK)
80+
case "GET /v2/remoteimage/blobs/" + fooDigest.String():
81+
w.Write([]byte(foo))
82+
default:
83+
t.Errorf("error: remote registry got unexpected request %s: %#+v", req, r)
84+
}
85+
}))
86+
defer ts.Close()
87+
88+
imageClient := imageclientv1.NewForConfigOrDie(master.AdminKubeConfig())
89+
90+
isi, err := imageClient.ImageStreamImports(testproject.Name).Create(&imageapiv1.ImageStreamImport{
91+
ObjectMeta: metav1.ObjectMeta{
92+
Name: teststreamName,
93+
},
94+
Spec: imageapiv1.ImageStreamImportSpec{
95+
Import: true,
96+
Images: []imageapiv1.ImageImportSpec{
97+
{
98+
From: corev1.ObjectReference{
99+
Kind: "DockerImage",
100+
Name: fmt.Sprintf("%s/remoteimage:latest", ts.URL.Host),
101+
},
102+
ImportPolicy: imageapiv1.TagImportPolicy{
103+
Insecure: true,
104+
},
105+
},
106+
},
107+
},
108+
})
109+
if err != nil {
110+
t.Fatal(err)
111+
}
112+
113+
teststream, err := imageClient.ImageStreams(testproject.Name).Get(teststreamName, metav1.GetOptions{})
114+
if err != nil {
115+
t.Fatal(err)
116+
}
117+
118+
if len(teststream.Status.Tags) == 0 {
119+
t.Fatalf("failed to import image: %#+v %#+v", isi, teststream)
120+
}
121+
122+
remoteRegistryRequiresAuth = true
123+
124+
registry := master.StartRegistry(t, testframework.DisableMirroring{}, testframework.EnableMetrics{Secret: "MetricsSecret"})
125+
defer registry.Close()
126+
127+
repo := registry.Repository(testproject.Name, teststream.Name, testuser)
128+
129+
ctx := context.Background()
130+
131+
_, err = repo.Blobs(ctx).Get(ctx, fooDigest)
132+
if err != distribution.ErrBlobUnknown {
133+
t.Fatal(err)
134+
}
135+
136+
req, err := http.NewRequest("GET", registry.BaseURL()+"/extensions/v2/metrics", nil)
137+
if err != nil {
138+
t.Fatal(err)
139+
}
140+
req.Header.Set("Authorization", "Bearer MetricsSecret")
141+
142+
resp, err := http.DefaultClient.Do(req)
143+
if err != nil {
144+
t.Fatal(err)
145+
}
146+
defer resp.Body.Close()
147+
148+
metrics := []struct {
149+
name string
150+
values []string
151+
}{
152+
{
153+
name: "imageregistry_pullthrough_repository_errors_total",
154+
values: []string{`operation="BlobStore.Stat"`, `code="UNAUTHORIZED"`},
155+
},
156+
{
157+
name: "imageregistry_pullthrough_blobstore_cache_requests_total",
158+
values: []string{`type="Miss"`},
159+
},
160+
{
161+
name: "imageregistry_pullthrough_repository_duration_seconds_bucket",
162+
values: []string{`operation="Init"`},
163+
},
164+
}
165+
166+
r := bufio.NewReader(resp.Body)
167+
for {
168+
line, err := r.ReadString('\n')
169+
line = strings.TrimRight(line, "\n")
170+
t.Log(line)
171+
172+
metric:
173+
for i, m := range metrics {
174+
if !strings.HasPrefix(line, m.name+"{") {
175+
continue
176+
}
177+
for _, v := range m.values {
178+
if !strings.Contains(line, v) {
179+
continue metric
180+
}
181+
}
182+
183+
// metric found, delete it
184+
metrics[i] = metrics[len(metrics)-1]
185+
metrics = metrics[:len(metrics)-1]
186+
break
187+
}
188+
189+
if err == io.EOF {
190+
break
191+
} else if err != nil {
192+
t.Fatal(err)
193+
}
194+
}
195+
if len(metrics) != 0 {
196+
t.Fatalf("unable to find metrics: %v", metrics)
197+
}
198+
}

Diff for: ‎test/integration/pullthroughblob/pullthroughblob_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func TestPullthroughBlob(t *testing.T) {
116116
"GET /v2/remoteimage/manifests/latest": 1,
117117
"GET /v2/remoteimage/blobs/" + configDigest.String(): 1,
118118
}); diff != nil {
119-
t.Fatalf("unexpected count of requests: %q", diff)
119+
t.Fatalf("unexpected number of requests: %q", diff)
120120
}
121121

122122
// Reset counter
@@ -138,12 +138,12 @@ func TestPullthroughBlob(t *testing.T) {
138138
t.Fatalf("got %q, want %q", string(data), foo)
139139
}
140140

141-
// TODO(dmage): reduce number of requests
141+
// TODO(dmage): remove the HEAD request
142142
if diff := requestCounter.Diff(counter.M{
143-
"GET /v2/": 2,
144-
"HEAD /v2/remoteimage/blobs/" + fooDigest.String(): 2,
143+
"GET /v2/": 1,
144+
"HEAD /v2/remoteimage/blobs/" + fooDigest.String(): 1,
145145
"GET /v2/remoteimage/blobs/" + fooDigest.String(): 1,
146146
}); diff != nil {
147-
t.Fatalf("unexpected count of requests: %q", diff)
147+
t.Fatalf("unexpected number of requests: %q", diff)
148148
}
149149
}

0 commit comments

Comments
 (0)
Please sign in to comment.