Skip to content

Commit 439bfe0

Browse files
author
Michal Minář
committed
Registry catalog
Implements the upstream repository catalog API call: GET /v2/_catalog Returned is a list of repository names as json: 200 OK Content-Type: application/json { "repositories": [ <name>, ... ] } Pagination is supported by mapping the last returned repository name to the opaque continue token received from master API and using it on the next invocation. Signed-off-by: Michal Minář <[email protected]>
1 parent 61c9fd1 commit 439bfe0

File tree

7 files changed

+441
-9
lines changed

7 files changed

+441
-9
lines changed

pkg/dockerregistry/server/app.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/docker/distribution/configuration"
99
"github.com/docker/distribution/context"
1010
storagedriver "github.com/docker/distribution/registry/storage/driver"
11+
kubecache "k8s.io/apimachinery/pkg/util/cache"
1112

1213
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
1314
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
@@ -21,6 +22,7 @@ const (
2122
// Default values
2223
defaultDescriptorCacheSize = 4096
2324
defaultDigestToRepositoryCacheSize = 2048
25+
defaultPaginationCacheSize = 1024
2426
)
2527

2628
// appMiddleware should be used only in tests.
@@ -59,16 +61,23 @@ type App struct {
5961

6062
// metrics provide methods to collect statistics.
6163
metrics metrics.Metrics
64+
65+
// paginationCache maps repository names to opaque continue tokens received from master API for subsequent
66+
// list imagestreams requests
67+
paginationCache *kubecache.LRUExpireCache
6268
}
6369

6470
func (app *App) Storage(driver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
6571
app.driver = driver
6672
return driver, nil
6773
}
6874

69-
func (app *App) Registry(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
70-
app.registry = registry
71-
return registry, nil
75+
func (app *App) Registry(nm distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
76+
app.registry = nm
77+
return &registry{
78+
registry: nm,
79+
enumerator: NewCachingRepositoryEnumerator(app.registryClient, app.paginationCache),
80+
}, nil
7281
}
7382

7483
func (app *App) BlobStatter() distribution.BlobStatter {
@@ -82,11 +91,12 @@ func (app *App) BlobStatter() distribution.BlobStatter {
8291
// The program will be terminated if an error happens.
8392
func NewApp(ctx context.Context, registryClient client.RegistryClient, dockerConfig *configuration.Configuration, extraConfig *registryconfig.Configuration, writeLimiter maxconnections.Limiter) http.Handler {
8493
app := &App{
85-
ctx: ctx,
86-
registryClient: registryClient,
87-
config: extraConfig,
88-
writeLimiter: writeLimiter,
89-
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
94+
ctx: ctx,
95+
registryClient: registryClient,
96+
config: extraConfig,
97+
writeLimiter: writeLimiter,
98+
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
99+
paginationCache: kubecache.NewLRUExpireCache(defaultPaginationCacheSize),
90100
}
91101

92102
if app.config.Metrics.Enabled {

pkg/dockerregistry/server/auth.go

+18
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,20 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg
301301
default:
302302
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
303303
}
304+
305+
case "registry":
306+
switch access.Resource.Name {
307+
case "catalog":
308+
if access.Action != "*" {
309+
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
310+
}
311+
if err := verifyCatalogAccess(ctx, osClient); err != nil {
312+
return nil, ac.wrapErr(ctx, err)
313+
}
314+
default:
315+
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
316+
}
317+
304318
default:
305319
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
306320
}
@@ -440,6 +454,10 @@ func verifyPruneAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNam
440454
return verifyWithGlobalSAR(ctx, "images", "", "delete", c)
441455
}
442456

457+
func verifyCatalogAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNamespacer) error {
458+
return verifyWithGlobalSAR(ctx, "imagestreams", "", "list", c)
459+
}
460+
443461
func verifyMetricsAccess(ctx context.Context, metrics configuration.Metrics, token string, c client.SelfSubjectAccessReviewsNamespacer) error {
444462
if !metrics.Enabled {
445463
return ErrOpenShiftAccessDenied

pkg/dockerregistry/server/catalog.go

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package server
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
"time"
8+
9+
apierrors "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/util/cache"
12+
13+
"github.com/docker/distribution/context"
14+
15+
imageapiv1 "github.com/openshift/api/image/v1"
16+
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
17+
)
18+
19+
const paginationEntryTTL = time.Minute
20+
21+
// RepositoryEnumerator allows to enumerate repositories known to the registry.
22+
type RepositoryEnumerator interface {
23+
// EnumerateRepositories fills the given repos slice with image stream names. The slice's length
24+
// determines the maximum number of repositories returned. The repositories are lexicographically sorted.
25+
// The last argument allows for pagination. It is the offset in the catalog. Returned is a number of
26+
// repositories filled. If there are no more repositories to return, io.EOF is returned.
27+
EnumerateRepositories(ctx context.Context, repos []string, last string) (n int, err error)
28+
}
29+
30+
// cachingRepositoryEnumerator is an enumerator that supports chunking by caching associations between
31+
// repository names and opaque continuation tokens.
32+
type cachingRepositoryEnumerator struct {
33+
client client.RegistryClient
34+
// a cache of opaque continue tokens for repository enumeration
35+
cache *cache.LRUExpireCache
36+
}
37+
38+
var _ RepositoryEnumerator = &cachingRepositoryEnumerator{}
39+
40+
// NewCachingRepositoryEnumerator returns a new caching repository enumerator.
41+
func NewCachingRepositoryEnumerator(client client.RegistryClient, cache *cache.LRUExpireCache) RepositoryEnumerator {
42+
return &cachingRepositoryEnumerator{
43+
client: client,
44+
cache: cache,
45+
}
46+
}
47+
48+
type isHandlerFunc func(is *imageapiv1.ImageStream) error
49+
50+
var errNoSpaceInSlice = errors.New("no space in slice")
51+
var errEnumerationFinished = errors.New("enumeration finished")
52+
53+
func (re *cachingRepositoryEnumerator) EnumerateRepositories(
54+
ctx context.Context,
55+
repos []string,
56+
last string,
57+
) (n int, err error) {
58+
if len(repos) == 0 {
59+
// Client explicitly requested 0 results. Returning nil for error seems more appropriate but this is
60+
// more in line with upstream does. Returning nil actually makes the upstream code panic.
61+
// TODO: patch upstream? /_catalog?n=0 results in 500
62+
return 0, errNoSpaceInSlice
63+
}
64+
65+
err = re.enumerateImageStreams(ctx, int64(len(repos)), last, func(is *imageapiv1.ImageStream) error {
66+
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
67+
repos[n] = name
68+
n++
69+
70+
if n >= len(repos) {
71+
return errEnumerationFinished
72+
}
73+
74+
return nil
75+
})
76+
77+
switch err {
78+
case errEnumerationFinished:
79+
err = nil
80+
case nil:
81+
err = io.EOF
82+
}
83+
84+
return
85+
}
86+
87+
func (r *cachingRepositoryEnumerator) enumerateImageStreams(
88+
ctx context.Context,
89+
limit int64,
90+
last string,
91+
handler isHandlerFunc,
92+
) error {
93+
var (
94+
start string
95+
warned bool
96+
)
97+
98+
client, ok := userClientFrom(ctx)
99+
if !ok {
100+
context.GetLogger(ctx).Warnf("user token not set, falling back to registry client")
101+
osClient, err := r.client.Client()
102+
if err != nil {
103+
return err
104+
}
105+
client = osClient
106+
}
107+
108+
if len(last) > 0 {
109+
if c, ok := r.cache.Get(last); !ok {
110+
context.GetLogger(ctx).Warnf("failed to find opaque continue token for last repository=%q -> requesting the full image stream list instead of %d items", last, limit)
111+
warned = true
112+
limit = 0
113+
} else {
114+
start = c.(string)
115+
}
116+
}
117+
118+
iss, err := client.ImageStreams("").List(metav1.ListOptions{
119+
Limit: limit,
120+
Continue: start,
121+
})
122+
if apierrors.IsResourceExpired(err) {
123+
context.GetLogger(ctx).Warnf("continuation token expired (%v) -> requesting the full image stream list", err)
124+
iss, err = client.ImageStreams("").List(metav1.ListOptions{})
125+
warned = true
126+
}
127+
128+
if err != nil {
129+
return err
130+
}
131+
132+
warnBrokenPagination := func(msg string) {
133+
if !warned {
134+
context.GetLogger(ctx).Warnf("pagination not working: %s; the master API does not support chunking", msg)
135+
warned = true
136+
}
137+
}
138+
139+
if limit > 0 && limit < int64(len(iss.Items)) {
140+
warnBrokenPagination(fmt.Sprintf("received %d image streams instead of requested maximum %d", len(iss.Items), limit))
141+
}
142+
if len(iss.Items) > 0 && len(iss.ListMeta.Continue) > 0 {
143+
last := iss.Items[len(iss.Items)-1]
144+
r.cache.Add(fmt.Sprintf("%s/%s", last.Namespace, last.Name), iss.ListMeta.Continue, paginationEntryTTL)
145+
}
146+
147+
for _, is := range iss.Items {
148+
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
149+
if len(last) > 0 && name <= last {
150+
if !warned {
151+
warnBrokenPagination(fmt.Sprintf("received unexpected repository name %q -"+
152+
" lexicographically preceding the requested %q", name, last))
153+
}
154+
continue
155+
}
156+
err := handler(&is)
157+
if err != nil {
158+
return err
159+
}
160+
}
161+
162+
return nil
163+
}

0 commit comments

Comments
 (0)