Skip to content

Commit ef613c0

Browse files
author
Michal Minář
committed
Registry catalog
Implements the upstream repository catalog API call: GET /v2/_catalog Returned is a list of repository names as json: 200 OK Content-Type: application/json { "repositories": [ <name>, ... ] } Pagination is supported by mapping the last returned repository name to the opaque continue token received from master API and using it on the next invocation. Signed-off-by: Michal Minář <[email protected]>
1 parent 8a7fc2c commit ef613c0

File tree

7 files changed

+424
-8
lines changed

7 files changed

+424
-8
lines changed

pkg/dockerregistry/server/app.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/docker/distribution/configuration"
99
"github.com/docker/distribution/context"
1010
storagedriver "github.com/docker/distribution/registry/storage/driver"
11+
kubecache "k8s.io/apimachinery/pkg/util/cache"
1112

1213
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
1314
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
@@ -20,6 +21,7 @@ const (
2021
// Default values
2122
defaultDescriptorCacheSize = 4096
2223
defaultDigestToRepositoryCacheSize = 2048
24+
defaultPaginationCacheSize = 1024
2325
)
2426

2527
// appMiddleware should be used only in tests.
@@ -55,16 +57,23 @@ type App struct {
5557

5658
// cache is a shared cache of digests and descriptors.
5759
cache cache.DigestCache
60+
61+
// paginationCache maps repository names to opaque continue tokens received from master API for subsequent
62+
// list imagestreams requests
63+
paginationCache *kubecache.LRUExpireCache
5864
}
5965

6066
func (app *App) Storage(driver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
6167
app.driver = driver
6268
return driver, nil
6369
}
6470

65-
func (app *App) Registry(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
66-
app.registry = registry
67-
return registry, nil
71+
func (app *App) Registry(nm distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
72+
app.registry = nm
73+
return &registry{
74+
registry: nm,
75+
enumerator: NewCachingRepositoryEnumerator(app.registryClient, app.paginationCache),
76+
}, nil
6877
}
6978

7079
func (app *App) BlobStatter() distribution.BlobStatter {
@@ -78,11 +87,12 @@ func (app *App) BlobStatter() distribution.BlobStatter {
7887
// The program will be terminated if an error happens.
7988
func NewApp(ctx context.Context, registryClient client.RegistryClient, dockerConfig *configuration.Configuration, extraConfig *registryconfig.Configuration, writeLimiter maxconnections.Limiter) http.Handler {
8089
app := &App{
81-
ctx: ctx,
82-
registryClient: registryClient,
83-
config: extraConfig,
84-
writeLimiter: writeLimiter,
85-
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
90+
ctx: ctx,
91+
registryClient: registryClient,
92+
config: extraConfig,
93+
writeLimiter: writeLimiter,
94+
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
95+
paginationCache: kubecache.NewLRUExpireCache(defaultPaginationCacheSize),
8696
}
8797

8898
cacheTTL := time.Duration(0)

pkg/dockerregistry/server/auth.go

+18
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,20 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg
301301
default:
302302
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
303303
}
304+
305+
case "registry":
306+
switch access.Resource.Name {
307+
case "catalog":
308+
if access.Action != "*" {
309+
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
310+
}
311+
if err := verifyCatalogAccess(ctx, osClient); err != nil {
312+
return nil, ac.wrapErr(ctx, err)
313+
}
314+
default:
315+
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
316+
}
317+
304318
default:
305319
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
306320
}
@@ -440,6 +454,10 @@ func verifyPruneAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNam
440454
return verifyWithGlobalSAR(ctx, "images", "", "delete", c)
441455
}
442456

457+
func verifyCatalogAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNamespacer) error {
458+
return verifyWithGlobalSAR(ctx, "imagestreams", "", "list", c)
459+
}
460+
443461
func verifyMetricsAccess(ctx context.Context, metrics configuration.Metrics, token string, c client.SelfSubjectAccessReviewsNamespacer) error {
444462
if !metrics.Enabled {
445463
return ErrOpenShiftAccessDenied

pkg/dockerregistry/server/catalog.go

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package server
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
"time"
8+
9+
apierrors "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/util/cache"
12+
13+
"github.com/docker/distribution/context"
14+
15+
imageapiv1 "github.com/openshift/api/image/v1"
16+
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
17+
)
18+
19+
const paginationEntryTTL = time.Minute
20+
21+
// RepositoryEnumerator allows to enumerate repositories known to the registry.
22+
type RepositoryEnumerator interface {
23+
// EnumerateRepositories fills the given repos slice with image stream names. The slice's length
24+
// determines the maximum number of repositories returned. The repositories are lexicographically sorted.
25+
// The last argument allows for pagination. It is the offset in the catalog. Returned is a number of
26+
// repositories filled. If there are no more repositories to return, io.EOF is returned.
27+
EnumerateRepositories(ctx context.Context, repos []string, last string) (n int, err error)
28+
}
29+
30+
// cachingRepositoryEnumerator is an enumerator that supports chunking by caching associations between
31+
// repository names and opaque continuation tokens.
32+
type cachingRepositoryEnumerator struct {
33+
client client.RegistryClient
34+
// a cache of opaque continue tokens for repository enumeration
35+
cache *cache.LRUExpireCache
36+
}
37+
38+
var _ RepositoryEnumerator = &cachingRepositoryEnumerator{}
39+
40+
// NewCachingRepositoryEnumerator returns a new caching repository enumerator.
41+
func NewCachingRepositoryEnumerator(client client.RegistryClient, cache *cache.LRUExpireCache) RepositoryEnumerator {
42+
return &cachingRepositoryEnumerator{
43+
client: client,
44+
cache: cache,
45+
}
46+
}
47+
48+
type isHandlerFunc func(is *imageapiv1.ImageStream) error
49+
50+
var errNoSpaceInSlice = errors.New("no space in slice")
51+
var errEnumerationFinished = errors.New("enumeration finished")
52+
53+
func (re *cachingRepositoryEnumerator) EnumerateRepositories(
54+
ctx context.Context,
55+
repos []string,
56+
last string,
57+
) (n int, err error) {
58+
if len(repos) == 0 {
59+
// Client explicitly requested 0 results. Returning nil for error seems more appropriate but this is
60+
// more in line with upstream does. Returning nil actually makes the upstream code panic.
61+
// TODO: patch upstream? /_catalog?n=0 results in 500
62+
return 0, errNoSpaceInSlice
63+
}
64+
65+
err = re.enumerateImageStreams(ctx, int64(len(repos)), last, func(is *imageapiv1.ImageStream) error {
66+
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
67+
repos[n] = name
68+
n++
69+
70+
if n >= len(repos) {
71+
return errEnumerationFinished
72+
}
73+
74+
return nil
75+
})
76+
77+
switch err {
78+
case errEnumerationFinished:
79+
err = nil
80+
case nil:
81+
err = io.EOF
82+
}
83+
84+
return
85+
}
86+
87+
func (r *cachingRepositoryEnumerator) enumerateImageStreams(
88+
ctx context.Context,
89+
limit int64,
90+
last string,
91+
handler isHandlerFunc,
92+
) error {
93+
var (
94+
start string
95+
warned bool
96+
)
97+
98+
client, ok := userClientFrom(ctx)
99+
if !ok {
100+
context.GetLogger(ctx).Warnf("user token not set, falling back to registry client")
101+
osClient, err := r.client.Client()
102+
if err != nil {
103+
return err
104+
}
105+
client = osClient
106+
}
107+
108+
if len(last) > 0 {
109+
if c, ok := r.cache.Get(last); !ok {
110+
context.GetLogger(ctx).Warnf("failed to find opaque continue token for last repository=%q -> requesting the full image stream list instead of %d items", last, limit)
111+
warned = true
112+
limit = 0
113+
} else {
114+
start = c.(string)
115+
}
116+
}
117+
118+
iss, err := client.ImageStreams("").List(metav1.ListOptions{
119+
Limit: limit,
120+
Continue: start,
121+
})
122+
if apierrors.IsResourceExpired(err) {
123+
context.GetLogger(ctx).Warnf("continuation token expired (%v) -> requesting the full image stream list", err)
124+
iss, err = client.ImageStreams("").List(metav1.ListOptions{})
125+
warned = true
126+
}
127+
128+
if err != nil {
129+
return err
130+
}
131+
132+
warnBrokenPagination := func(msg string) {
133+
if !warned {
134+
context.GetLogger(ctx).Warnf("pagination not working: %s; the master API does not support chunking", msg)
135+
warned = true
136+
}
137+
}
138+
139+
if limit > 0 && limit < int64(len(iss.Items)) {
140+
warnBrokenPagination(fmt.Sprintf("received %d image streams instead of requested maximum %d", len(iss.Items), limit))
141+
}
142+
if len(iss.Items) > 0 && len(iss.ListMeta.Continue) > 0 {
143+
last := iss.Items[len(iss.Items)-1]
144+
r.cache.Add(fmt.Sprintf("%s/%s", last.Namespace, last.Name), iss.ListMeta.Continue, paginationEntryTTL)
145+
}
146+
147+
for _, is := range iss.Items {
148+
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
149+
if len(last) > 0 && name <= last {
150+
if !warned {
151+
warnBrokenPagination(fmt.Sprintf("received unexpected repository name %q -"+
152+
" lexicographically preceding the requested %q", name, last))
153+
}
154+
continue
155+
}
156+
err := handler(&is)
157+
if err != nil {
158+
return err
159+
}
160+
}
161+
162+
return nil
163+
}
+142
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package server
2+
3+
import (
4+
"io"
5+
"testing"
6+
"time"
7+
8+
"github.com/docker/distribution/context"
9+
10+
registryclient "github.com/openshift/image-registry/pkg/dockerregistry/server/client"
11+
"github.com/openshift/image-registry/pkg/testutil"
12+
)
13+
14+
func TestCatalog(t *testing.T) {
15+
const blobRepoCacheTTL = time.Millisecond * 500
16+
17+
type isMeta struct{ namespace, name string }
18+
19+
for _, tc := range []struct {
20+
name string
21+
isObjectMeta []isMeta
22+
buffer []string
23+
last string
24+
expectedRepos []string
25+
expectedError error
26+
}{
27+
{
28+
name: "no image streams",
29+
buffer: make([]string, 2),
30+
expectedError: io.EOF,
31+
},
32+
33+
{
34+
name: "one image stream",
35+
isObjectMeta: []isMeta{{"nm", "foo"}},
36+
buffer: make([]string, 2),
37+
expectedRepos: []string{"nm/foo"},
38+
expectedError: io.EOF,
39+
},
40+
41+
{
42+
name: "2 image streams in the same namespace",
43+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
44+
buffer: make([]string, 2),
45+
expectedRepos: []string{"nm/bar", "nm/foo"},
46+
expectedError: nil,
47+
},
48+
49+
{
50+
name: "3 image streams in different namespaces",
51+
isObjectMeta: []isMeta{{"fst", "is"}, {"snd", "is"}, {"trd", "name"}},
52+
buffer: make([]string, 4),
53+
expectedRepos: []string{"fst/is", "snd/is", "trd/name"},
54+
expectedError: io.EOF,
55+
},
56+
57+
{
58+
name: "repositories get sorted",
59+
isObjectMeta: []isMeta{
60+
{"nm", "ab"}, {"nmc", "aa"}, {"ab", "cd"}, {"ss", "is"}, {"ab", "aa"}, {"a", "nn"},
61+
},
62+
buffer: make([]string, 7),
63+
expectedRepos: []string{"a/nn", "ab/aa", "ab/cd", "nm/ab", "nmc/aa", "ss/is"},
64+
expectedError: io.EOF,
65+
},
66+
67+
{
68+
name: "short buffer",
69+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
70+
buffer: make([]string, 1),
71+
expectedRepos: []string{"nm/bar"},
72+
expectedError: nil,
73+
},
74+
75+
{
76+
name: "skip the first",
77+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
78+
buffer: make([]string, 2),
79+
last: "nm/bar",
80+
expectedRepos: []string{"nm/foo"},
81+
expectedError: io.EOF,
82+
},
83+
84+
{
85+
name: "skip the last",
86+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
87+
buffer: make([]string, 2),
88+
last: "nm/foo",
89+
expectedRepos: []string{},
90+
expectedError: io.EOF,
91+
},
92+
93+
{
94+
name: "bigger buffer capacity does not matter",
95+
isObjectMeta: []isMeta{{"nm", "foo"}, {"nm", "bar"}},
96+
buffer: make([]string, 1, 2),
97+
expectedRepos: []string{"nm/bar"},
98+
expectedError: nil,
99+
},
100+
} {
101+
t.Run(tc.name, func(t *testing.T) {
102+
ctx := context.Background()
103+
ctx = testutil.WithTestLogger(ctx, t)
104+
ctx = withAuthPerformed(ctx)
105+
106+
fos, imageClient := testutil.NewFakeOpenShiftWithClient(ctx)
107+
for _, is := range tc.isObjectMeta {
108+
testutil.AddImageStream(t, fos, is.namespace, is.name, nil)
109+
}
110+
111+
reg, err := newTestRegistry(
112+
ctx,
113+
registryclient.NewFakeRegistryAPIClient(nil, imageClient),
114+
nil,
115+
blobRepoCacheTTL,
116+
false,
117+
false)
118+
if err != nil {
119+
t.Fatalf("unexpected error: %v", err)
120+
}
121+
122+
numFilled, err := reg.Repositories(ctx, tc.buffer, tc.last)
123+
if a, e := err, tc.expectedError; a != e {
124+
t.Errorf("got unexpected error: %q != %q", a, e)
125+
}
126+
127+
if a, e := numFilled, len(tc.expectedRepos); a != e {
128+
t.Errorf("got unexpected number of repos: %d != %d", numFilled, e)
129+
}
130+
131+
for i := 0; i < numFilled || i < len(tc.expectedRepos); i++ {
132+
if i < numFilled && i >= len(tc.expectedRepos) {
133+
t.Errorf("got unexpected repository at position #%d: %q", i, tc.buffer[i])
134+
} else if i < len(tc.expectedRepos) && i >= numFilled {
135+
t.Errorf("expected repository %q not returned", tc.expectedRepos[i])
136+
} else if a, e := tc.buffer[i], tc.expectedRepos[i]; a != e {
137+
t.Errorf("got unexpected repository at position #%d: %q != %q", i, a, e)
138+
}
139+
}
140+
})
141+
}
142+
}

0 commit comments

Comments
 (0)