Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit f2f9660

Browse files
author
Michal Minář
committedMay 30, 2018
Registry catalog
Implements the upstream repository catalog API call: GET /v2/_catalog Returned is a list of repository names as json: 200 OK Content-Type: application/json { "repositories": [ <name>, ... ] } Pagination is supported by mapping the last returned repository name to the opaque continue token received from master API and using it on the next invocation. Signed-off-by: Michal Minář <[email protected]>
1 parent 8a7fc2c commit f2f9660

File tree

7 files changed

+440
-8
lines changed

7 files changed

+440
-8
lines changed
 

‎pkg/dockerregistry/server/app.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/docker/distribution/configuration"
99
"github.com/docker/distribution/context"
1010
storagedriver "github.com/docker/distribution/registry/storage/driver"
11+
kubecache "k8s.io/apimachinery/pkg/util/cache"
1112

1213
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
1314
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
@@ -20,6 +21,7 @@ const (
2021
// Default values
2122
defaultDescriptorCacheSize = 4096
2223
defaultDigestToRepositoryCacheSize = 2048
24+
defaultPaginationCacheSize = 1024
2325
)
2426

2527
// appMiddleware should be used only in tests.
@@ -55,16 +57,23 @@ type App struct {
5557

5658
// cache is a shared cache of digests and descriptors.
5759
cache cache.DigestCache
60+
61+
// paginationCache maps repository names to opaque continue tokens received from master API for subsequent
62+
// list imagestreams requests
63+
paginationCache *kubecache.LRUExpireCache
5864
}
5965

6066
func (app *App) Storage(driver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
6167
app.driver = driver
6268
return driver, nil
6369
}
6470

65-
func (app *App) Registry(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
66-
app.registry = registry
67-
return registry, nil
71+
func (app *App) Registry(nm distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
72+
app.registry = nm
73+
return &registry{
74+
registry: nm,
75+
enumerator: NewCachingRepositoryEnumerator(app.registryClient, app.paginationCache),
76+
}, nil
6877
}
6978

7079
func (app *App) BlobStatter() distribution.BlobStatter {
@@ -78,11 +87,12 @@ func (app *App) BlobStatter() distribution.BlobStatter {
7887
// The program will be terminated if an error happens.
7988
func NewApp(ctx context.Context, registryClient client.RegistryClient, dockerConfig *configuration.Configuration, extraConfig *registryconfig.Configuration, writeLimiter maxconnections.Limiter) http.Handler {
8089
app := &App{
81-
ctx: ctx,
82-
registryClient: registryClient,
83-
config: extraConfig,
84-
writeLimiter: writeLimiter,
85-
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
90+
ctx: ctx,
91+
registryClient: registryClient,
92+
config: extraConfig,
93+
writeLimiter: writeLimiter,
94+
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
95+
paginationCache: kubecache.NewLRUExpireCache(defaultPaginationCacheSize),
8696
}
8797

8898
cacheTTL := time.Duration(0)

‎pkg/dockerregistry/server/auth.go

+18
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,20 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg
301301
default:
302302
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
303303
}
304+
305+
case "registry":
306+
switch access.Resource.Name {
307+
case "catalog":
308+
if access.Action != "*" {
309+
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
310+
}
311+
if err := verifyCatalogAccess(ctx, osClient); err != nil {
312+
return nil, ac.wrapErr(ctx, err)
313+
}
314+
default:
315+
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
316+
}
317+
304318
default:
305319
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
306320
}
@@ -440,6 +454,10 @@ func verifyPruneAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNam
440454
return verifyWithGlobalSAR(ctx, "images", "", "delete", c)
441455
}
442456

457+
func verifyCatalogAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNamespacer) error {
458+
return verifyWithGlobalSAR(ctx, "imagestreams", "", "list", c)
459+
}
460+
443461
func verifyMetricsAccess(ctx context.Context, metrics configuration.Metrics, token string, c client.SelfSubjectAccessReviewsNamespacer) error {
444462
if !metrics.Enabled {
445463
return ErrOpenShiftAccessDenied

‎pkg/dockerregistry/server/catalog.go

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package server
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
"time"
8+
9+
apierrors "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/util/cache"
12+
13+
"github.com/docker/distribution/context"
14+
15+
imageapiv1 "github.com/openshift/api/image/v1"
16+
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
17+
)
18+
19+
const paginationEntryTTL = time.Minute
20+
21+
// RepositoryEnumerator allows to enumerate repositories known to the registry.
22+
type RepositoryEnumerator interface {
23+
// EnumerateRepositories fills the given repos slice with image stream names. The slice's length
24+
// determines the maximum number of repositories returned. The repositories are lexicographically sorted.
25+
// The last argument allows for pagination. It is the offset in the catalog. Returned is a number of
26+
// repositories filled. If there are no more repositories to return, io.EOF is returned.
27+
EnumerateRepositories(ctx context.Context, repos []string, last string) (n int, err error)
28+
}
29+
30+
// cachingRepositoryEnumerator is an enumerator that supports chunking by caching associations between
31+
// repository names and opaque continuation tokens.
32+
type cachingRepositoryEnumerator struct {
33+
client client.RegistryClient
34+
// a cache of opaque continue tokens for repository enumeration
35+
cache *cache.LRUExpireCache
36+
}
37+
38+
var _ RepositoryEnumerator = &cachingRepositoryEnumerator{}
39+
40+
// NewCachingRepositoryEnumerator returns a new caching repository enumerator.
41+
func NewCachingRepositoryEnumerator(client client.RegistryClient, cache *cache.LRUExpireCache) RepositoryEnumerator {
42+
return &cachingRepositoryEnumerator{
43+
client: client,
44+
cache: cache,
45+
}
46+
}
47+
48+
type isHandlerFunc func(is *imageapiv1.ImageStream) error
49+
50+
var errNoSpaceInSlice = errors.New("no space in slice")
51+
var errEnumerationFinished = errors.New("enumeration finished")
52+
53+
func (re *cachingRepositoryEnumerator) EnumerateRepositories(
54+
ctx context.Context,
55+
repos []string,
56+
last string,
57+
) (n int, err error) {
58+
if len(repos) == 0 {
59+
// Client explicitly requested 0 results. Returning nil for error seems more appropriate but this is
60+
// more in line with upstream does. Returning nil actually makes the upstream code panic.
61+
// TODO: patch upstream? /_catalog?n=0 results in 500
62+
return 0, errNoSpaceInSlice
63+
}
64+
65+
err = re.enumerateImageStreams(ctx, int64(len(repos)), last, func(is *imageapiv1.ImageStream) error {
66+
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
67+
repos[n] = name
68+
n++
69+
70+
if n >= len(repos) {
71+
return errEnumerationFinished
72+
}
73+
74+
return nil
75+
})
76+
77+
switch err {
78+
case errEnumerationFinished:
79+
err = nil
80+
case nil:
81+
err = io.EOF
82+
}
83+
84+
return
85+
}
86+
87+
func (r *cachingRepositoryEnumerator) enumerateImageStreams(
88+
ctx context.Context,
89+
limit int64,
90+
last string,
91+
handler isHandlerFunc,
92+
) error {
93+
var (
94+
start string
95+
warned bool
96+
)
97+
98+
client, ok := userClientFrom(ctx)
99+
if !ok {
100+
context.GetLogger(ctx).Warnf("user token not set, falling back to registry client")
101+
osClient, err := r.client.Client()
102+
if err != nil {
103+
return err
104+
}
105+
client = osClient
106+
}
107+
108+
if len(last) > 0 {
109+
if c, ok := r.cache.Get(last); !ok {
110+
context.GetLogger(ctx).Warnf("failed to find opaque continue token for last repository=%q -> requesting the full image stream list instead of %d items", last, limit)
111+
warned = true
112+
limit = 0
113+
} else {
114+
start = c.(string)
115+
}
116+
}
117+
118+
iss, err := client.ImageStreams("").List(metav1.ListOptions{
119+
Limit: limit,
120+
Continue: start,
121+
})
122+
if apierrors.IsResourceExpired(err) {
123+
context.GetLogger(ctx).Warnf("continuation token expired (%v) -> requesting the full image stream list", err)
124+
iss, err = client.ImageStreams("").List(metav1.ListOptions{})
125+
warned = true
126+
}
127+
128+
if err != nil {
129+
return err
130+
}
131+
132+
warnBrokenPagination := func(msg string) {
133+
if !warned {
134+
context.GetLogger(ctx).Warnf("pagination not working: %s; the master API does not support chunking", msg)
135+
warned = true
136+
}
137+
}
138+
139+
if limit > 0 && limit < int64(len(iss.Items)) {
140+
warnBrokenPagination(fmt.Sprintf("received %d image streams instead of requested maximum %d", len(iss.Items), limit))
141+
}
142+
if len(iss.Items) > 0 && len(iss.ListMeta.Continue) > 0 {
143+
last := iss.Items[len(iss.Items)-1]
144+
r.cache.Add(fmt.Sprintf("%s/%s", last.Namespace, last.Name), iss.ListMeta.Continue, paginationEntryTTL)
145+
}
146+
147+
for _, is := range iss.Items {
148+
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
149+
if len(last) > 0 && name <= last {
150+
if !warned {
151+
warnBrokenPagination(fmt.Sprintf("received unexpected repository name %q -"+
152+
" lexicographically preceding the requested %q", name, last))
153+
}
154+
continue
155+
}
156+
err := handler(&is)
157+
if err != nil {
158+
return err
159+
}
160+
}
161+
162+
return nil
163+
}
+151
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
package server
2+
3+
import (
4+
"io"
5+
"testing"
6+
"time"
7+
8+
"github.com/docker/distribution/context"
9+
10+
registryclient "github.com/openshift/image-registry/pkg/dockerregistry/server/client"
11+
"github.com/openshift/image-registry/pkg/testutil"
12+
)
13+
14+
func TestCatalog(t *testing.T) {
15+
const blobRepoCacheTTL = time.Millisecond * 500
16+
17+
type isMeta struct{ namespace, name string }
18+
19+
for _, tc := range []struct {
20+
name string
21+
isObjectMeta []isMeta
22+
buffer []string
23+
last string
24+
expectedRepos []string
25+
expectedError error
26+
}{
27+
{
28+
name: "no image streams",
29+
buffer: make([]string, 2),
30+
expectedError: io.EOF,
31+
},
32+
33+
{
34+
name: "one image stream",
35+
isObjectMeta: []isMeta{{"nm", "foo"}},
36+
buffer: make([]string, 2),
37+
expectedRepos: []string{"nm/foo"},
38+
expectedError: io.EOF,
39+
},
40+
41+
{
42+
name: "2 image streams in the same namespace",
43+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
44+
buffer: make([]string, 2),
45+
expectedRepos: []string{"nm/bar", "nm/foo"},
46+
expectedError: nil,
47+
},
48+
49+
{
50+
name: "3 image streams in different namespaces",
51+
isObjectMeta: []isMeta{{"fst", "is"}, {"snd", "is"}, {"trd", "name"}},
52+
buffer: make([]string, 4),
53+
expectedRepos: []string{"fst/is", "snd/is", "trd/name"},
54+
expectedError: io.EOF,
55+
},
56+
57+
{
58+
name: "repositories get sorted",
59+
isObjectMeta: []isMeta{
60+
{"nm", "ab"}, {"nmc", "aa"}, {"ab", "cd"}, {"ss", "is"}, {"ab", "aa"}, {"a", "nn"},
61+
},
62+
buffer: make([]string, 7),
63+
expectedRepos: []string{"a/nn", "ab/aa", "ab/cd", "nm/ab", "nmc/aa", "ss/is"},
64+
expectedError: io.EOF,
65+
},
66+
67+
{
68+
name: "short buffer",
69+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
70+
buffer: make([]string, 1),
71+
expectedRepos: []string{"nm/bar"},
72+
expectedError: nil,
73+
},
74+
75+
{
76+
name: "skip the first",
77+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
78+
buffer: make([]string, 2),
79+
last: "nm/bar",
80+
expectedRepos: []string{"nm/foo"},
81+
expectedError: io.EOF,
82+
},
83+
84+
{
85+
name: "skip the last",
86+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
87+
buffer: make([]string, 2),
88+
last: "nm/foo",
89+
expectedRepos: []string{},
90+
expectedError: io.EOF,
91+
},
92+
93+
{
94+
name: "bigger buffer capacity does not matter",
95+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
96+
buffer: make([]string, 1, 2),
97+
expectedRepos: []string{"nm/bar"},
98+
expectedError: nil,
99+
},
100+
101+
{
102+
name: "pick a repository in the middle",
103+
isObjectMeta: []isMeta{{"bar", "is"}, {"baz", "is"}, {"foo", "is"}},
104+
buffer: make([]string, 1),
105+
last: "bar/is",
106+
expectedRepos: []string{"baz/is"},
107+
expectedError: nil,
108+
},
109+
} {
110+
t.Run(tc.name, func(t *testing.T) {
111+
ctx := context.Background()
112+
ctx = testutil.WithTestLogger(ctx, t)
113+
ctx = withAuthPerformed(ctx)
114+
115+
fos, imageClient := testutil.NewFakeOpenShiftWithClient(ctx)
116+
for _, is := range tc.isObjectMeta {
117+
testutil.AddImageStream(t, fos, is.namespace, is.name, nil)
118+
}
119+
120+
reg, err := newTestRegistry(
121+
ctx,
122+
registryclient.NewFakeRegistryAPIClient(nil, imageClient),
123+
nil,
124+
blobRepoCacheTTL,
125+
false,
126+
false)
127+
if err != nil {
128+
t.Fatalf("unexpected error: %v", err)
129+
}
130+
131+
numFilled, err := reg.Repositories(ctx, tc.buffer, tc.last)
132+
if a, e := err, tc.expectedError; a != e {
133+
t.Errorf("got unexpected error: %q != %q", a, e)
134+
}
135+
136+
if a, e := numFilled, len(tc.expectedRepos); a != e {
137+
t.Errorf("got unexpected number of repos: %d != %d", numFilled, e)
138+
}
139+
140+
for i := 0; i < numFilled || i < len(tc.expectedRepos); i++ {
141+
if i < numFilled && i >= len(tc.expectedRepos) {
142+
t.Errorf("got unexpected repository at position #%d: %q", i, tc.buffer[i])
143+
} else if i < len(tc.expectedRepos) && i >= numFilled {
144+
t.Errorf("expected repository %q not returned", tc.expectedRepos[i])
145+
} else if a, e := tc.buffer[i], tc.expectedRepos[i]; a != e {
146+
t.Errorf("got unexpected repository at position #%d: %q != %q", i, a, e)
147+
}
148+
}
149+
})
150+
}
151+
}

‎pkg/dockerregistry/server/registry.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package server
2+
3+
import (
4+
"errors"
5+
"github.com/docker/distribution"
6+
"github.com/docker/distribution/context"
7+
"github.com/docker/distribution/reference"
8+
)
9+
10+
// registry wraps upstream registry object and overrides some of its methods
11+
// TODO: collect metrics for reimplemented methods
12+
type registry struct {
13+
registry distribution.Namespace
14+
enumerator RepositoryEnumerator
15+
}
16+
17+
var _ distribution.Namespace = &registry{}
18+
19+
func (r *registry) Scope() distribution.Scope {
20+
return r.registry.Scope()
21+
}
22+
23+
func (r *registry) Repository(ctx context.Context, name reference.Named) (distribution.Repository, error) {
24+
return r.registry.Repository(ctx, name)
25+
}
26+
27+
// Repositories lists repository names made out of image streams fetched from master API.
28+
func (r *registry) Repositories(ctx context.Context, repos []string, last string) (n int, err error) {
29+
n, err = r.enumerator.EnumerateRepositories(ctx, repos, last)
30+
if err == errNoSpaceInSlice {
31+
return n, errors.New("client requested zero entries")
32+
}
33+
return
34+
}
35+
36+
func (r *registry) Blobs() distribution.BlobEnumerator {
37+
return r.registry.Blobs()
38+
}
39+
40+
func (r *registry) BlobStatter() distribution.BlobStatter {
41+
return r.registry.BlobStatter()
42+
}

‎pkg/dockerregistry/server/registry_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package server
33
import (
44
"time"
55

6+
kubecache "k8s.io/apimachinery/pkg/util/cache"
7+
68
"github.com/docker/distribution"
79
dockercfg "github.com/docker/distribution/configuration"
810
"github.com/docker/distribution/context"
@@ -68,6 +70,7 @@ func newTestRegistry(
6870
quotaEnforcing: &quotaEnforcingConfig{
6971
enforcementEnabled: false,
7072
},
73+
paginationCache: kubecache.NewLRUExpireCache(128),
7174
}
7275

7376
if storageDriver == nil {

‎pkg/testutil/fakeopenshift.go

+45
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package testutil
22

33
import (
44
"fmt"
5+
"sort"
56
"sync"
67

78
"github.com/docker/distribution/context"
@@ -141,6 +142,27 @@ func (fos *FakeOpenShift) GetImageStream(namespace, repo string) (*imageapiv1.Im
141142
return &is, nil
142143
}
143144

145+
func (fos *FakeOpenShift) ListImageStreams(namespace string) (*imageapiv1.ImageStreamList, error) {
146+
fos.mu.Lock()
147+
defer fos.mu.Unlock()
148+
149+
iss := imageapiv1.ImageStreamList{
150+
ListMeta: metav1.ListMeta{},
151+
Items: []imageapiv1.ImageStream{},
152+
}
153+
154+
for _, is := range fos.imageStreams {
155+
if len(namespace) != 0 && namespace != is.Namespace {
156+
continue
157+
}
158+
iss.Items = append(iss.Items, is)
159+
}
160+
161+
sort.Sort(byRepositoryName(iss.Items))
162+
163+
return &iss, nil
164+
}
165+
144166
func (fos *FakeOpenShift) CreateImageStreamMapping(namespace string, ism *imageapiv1.ImageStreamMapping) (*imageapiv1.ImageStreamMapping, error) {
145167
is, err := fos.GetImageStream(namespace, ism.Name)
146168
if err != nil {
@@ -353,20 +375,31 @@ func (fos *FakeOpenShift) imageStreamsHandler(action clientgotesting.Action) (bo
353375
fmt.Sprintf("(*FakeOpenShift).imageStreamsHandler: %s %s/%s",
354376
action.GetVerb(), action.GetNamespace(), fos.getName(action)),
355377
func() (bool, runtime.Object, error) {
378+
if action.GetSubresource() != "" {
379+
return fos.todo(action)
380+
}
381+
356382
switch action := action.(type) {
357383
case clientgotesting.CreateActionImpl:
358384
is, err := fos.CreateImageStream(
359385
action.GetNamespace(),
360386
action.Object.(*imageapiv1.ImageStream),
361387
)
362388
return true, is, err
389+
363390
case clientgotesting.GetActionImpl:
364391
is, err := fos.GetImageStream(
365392
action.GetNamespace(),
366393
action.GetName(),
367394
)
368395
return true, is, err
396+
397+
case clientgotesting.ListActionImpl:
398+
fos.logger.Infof("(*FakeOpenShift).imageStreamsHandler: list kind=%s", action.Kind)
399+
iss, err := fos.ListImageStreams(action.GetNamespace())
400+
return true, iss, err
369401
}
402+
370403
return fos.todo(action)
371404
},
372405
)
@@ -415,3 +448,15 @@ func (fos *FakeOpenShift) AddReactorsTo(c *imagefakeclient.FakeImageV1) {
415448
c.AddReactor("*", "imagestreammappings", fos.imageStreamMappingsHandler)
416449
c.AddReactor("*", "imagestreamimages", fos.imageStreamImagesHandler)
417450
}
451+
452+
type byRepositoryName []imageapiv1.ImageStream
453+
454+
func (brn byRepositoryName) Len() int { return len(brn) }
455+
func (brn byRepositoryName) Swap(i, j int) { brn[i], brn[j] = brn[j], brn[i] }
456+
func (brn byRepositoryName) Less(i, j int) bool {
457+
a, b := brn[i], brn[j]
458+
if a.Namespace < b.Namespace {
459+
return true
460+
}
461+
return a.Namespace == b.Namespace && a.Name < b.Name
462+
}

0 commit comments

Comments
 (0)
Please sign in to comment.