Skip to content

Commit 78f5f9e

Browse files
author
Michal Minář
committed
Registry catalog
Implements the upstream repository catalog API call: GET /v2/_catalog Returned is a list of repository names as json: 200 OK Content-Type: application/json { "repositories": [ <name>, ... ] } Pagination is supported by mapping the last returned repository name to the opaque continue token received from master API and using it on the next invocation. Signed-off-by: Michal Minář <[email protected]>
1 parent 8a7fc2c commit 78f5f9e

File tree

7 files changed

+440
-8
lines changed

7 files changed

+440
-8
lines changed

pkg/dockerregistry/server/app.go

+18-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/docker/distribution/configuration"
99
"github.com/docker/distribution/context"
1010
storagedriver "github.com/docker/distribution/registry/storage/driver"
11+
kubecache "k8s.io/apimachinery/pkg/util/cache"
1112

1213
"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
1314
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
@@ -20,6 +21,7 @@ const (
2021
// Default values
2122
defaultDescriptorCacheSize = 4096
2223
defaultDigestToRepositoryCacheSize = 2048
24+
defaultPaginationCacheSize = 1024
2325
)
2426

2527
// appMiddleware should be used only in tests.
@@ -55,16 +57,23 @@ type App struct {
5557

5658
// cache is a shared cache of digests and descriptors.
5759
cache cache.DigestCache
60+
61+
// paginationCache maps repository names to opaque continue tokens received from master API for subsequent
62+
// list imagestreams requests
63+
paginationCache *kubecache.LRUExpireCache
5864
}
5965

6066
func (app *App) Storage(driver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
6167
app.driver = driver
6268
return driver, nil
6369
}
6470

65-
func (app *App) Registry(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
66-
app.registry = registry
67-
return registry, nil
71+
func (app *App) Registry(nm distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
72+
app.registry = nm
73+
return &registry{
74+
registry: nm,
75+
enumerator: NewCachingRepositoryEnumerator(app.registryClient, app.paginationCache),
76+
}, nil
6877
}
6978

7079
func (app *App) BlobStatter() distribution.BlobStatter {
@@ -78,11 +87,12 @@ func (app *App) BlobStatter() distribution.BlobStatter {
7887
// The program will be terminated if an error happens.
7988
func NewApp(ctx context.Context, registryClient client.RegistryClient, dockerConfig *configuration.Configuration, extraConfig *registryconfig.Configuration, writeLimiter maxconnections.Limiter) http.Handler {
8089
app := &App{
81-
ctx: ctx,
82-
registryClient: registryClient,
83-
config: extraConfig,
84-
writeLimiter: writeLimiter,
85-
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
90+
ctx: ctx,
91+
registryClient: registryClient,
92+
config: extraConfig,
93+
writeLimiter: writeLimiter,
94+
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
95+
paginationCache: kubecache.NewLRUExpireCache(defaultPaginationCacheSize),
8696
}
8797

8898
cacheTTL := time.Duration(0)

pkg/dockerregistry/server/auth.go

+18
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,20 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg
301301
default:
302302
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
303303
}
304+
305+
case "registry":
306+
switch access.Resource.Name {
307+
case "catalog":
308+
if access.Action != "*" {
309+
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
310+
}
311+
if err := verifyCatalogAccess(ctx, osClient); err != nil {
312+
return nil, ac.wrapErr(ctx, err)
313+
}
314+
default:
315+
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
316+
}
317+
304318
default:
305319
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
306320
}
@@ -440,6 +454,10 @@ func verifyPruneAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNam
440454
return verifyWithGlobalSAR(ctx, "images", "", "delete", c)
441455
}
442456

457+
func verifyCatalogAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNamespacer) error {
458+
return verifyWithGlobalSAR(ctx, "imagestreams", "", "list", c)
459+
}
460+
443461
func verifyMetricsAccess(ctx context.Context, metrics configuration.Metrics, token string, c client.SelfSubjectAccessReviewsNamespacer) error {
444462
if !metrics.Enabled {
445463
return ErrOpenShiftAccessDenied

pkg/dockerregistry/server/catalog.go

+163
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package server
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"io"
7+
"time"
8+
9+
apierrors "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/util/cache"
12+
13+
"github.com/docker/distribution/context"
14+
15+
imageapiv1 "github.com/openshift/api/image/v1"
16+
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
17+
)
18+
19+
const paginationEntryTTL = time.Minute
20+
21+
// RepositoryEnumerator allows to enumerate repositories known to the registry.
22+
type RepositoryEnumerator interface {
23+
// EnumerateRepositories fills the given repos slice with image stream names. The slice's length
24+
// determines the maximum number of repositories returned. The repositories are lexicographically sorted.
25+
// The last argument allows for pagination. It is the offset in the catalog. Returned is a number of
26+
// repositories filled. If there are no more repositories to return, io.EOF is returned.
27+
EnumerateRepositories(ctx context.Context, repos []string, last string) (n int, err error)
28+
}
29+
30+
// cachingRepositoryEnumerator is an enumerator that supports chunking by caching associations between
31+
// repository names and opaque continuation tokens.
32+
type cachingRepositoryEnumerator struct {
33+
client client.RegistryClient
34+
// a cache of opaque continue tokens for repository enumeration
35+
cache *cache.LRUExpireCache
36+
}
37+
38+
var _ RepositoryEnumerator = &cachingRepositoryEnumerator{}
39+
40+
// NewCachingRepositoryEnumerator returns a new caching repository enumerator.
41+
func NewCachingRepositoryEnumerator(client client.RegistryClient, cache *cache.LRUExpireCache) RepositoryEnumerator {
42+
return &cachingRepositoryEnumerator{
43+
client: client,
44+
cache: cache,
45+
}
46+
}
47+
48+
type isHandlerFunc func(is *imageapiv1.ImageStream) error
49+
50+
var errNoSpaceInSlice = errors.New("no space in slice")
51+
var errEnumerationFinished = errors.New("enumeration finished")
52+
53+
func (re *cachingRepositoryEnumerator) EnumerateRepositories(
54+
ctx context.Context,
55+
repos []string,
56+
last string,
57+
) (n int, err error) {
58+
if len(repos) == 0 {
59+
// Client explicitly requested 0 results. Returning nil for error seems more appropriate but this is
60+
// more in line with upstream does. Returning nil actually makes the upstream code panic.
61+
// TODO: patch upstream? /_catalog?n=0 results in 500
62+
return 0, errNoSpaceInSlice
63+
}
64+
65+
err = re.enumerateImageStreams(ctx, int64(len(repos)), last, func(is *imageapiv1.ImageStream) error {
66+
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
67+
repos[n] = name
68+
n++
69+
70+
if n >= len(repos) {
71+
return errEnumerationFinished
72+
}
73+
74+
return nil
75+
})
76+
77+
switch err {
78+
case errEnumerationFinished:
79+
err = nil
80+
case nil:
81+
err = io.EOF
82+
}
83+
84+
return
85+
}
86+
87+
func (r *cachingRepositoryEnumerator) enumerateImageStreams(
88+
ctx context.Context,
89+
limit int64,
90+
last string,
91+
handler isHandlerFunc,
92+
) error {
93+
var (
94+
start string
95+
warned bool
96+
)
97+
98+
client, ok := userClientFrom(ctx)
99+
if !ok {
100+
context.GetLogger(ctx).Warnf("user token not set, falling back to registry client")
101+
osClient, err := r.client.Client()
102+
if err != nil {
103+
return err
104+
}
105+
client = osClient
106+
}
107+
108+
if len(last) > 0 {
109+
if c, ok := r.cache.Get(last); !ok {
110+
context.GetLogger(ctx).Warnf("failed to find opaque continue token for last repository=%q -> requesting the full image stream list instead of %d items", last, limit)
111+
warned = true
112+
limit = 0
113+
} else {
114+
start = c.(string)
115+
}
116+
}
117+
118+
iss, err := client.ImageStreams("").List(metav1.ListOptions{
119+
Limit: limit,
120+
Continue: start,
121+
})
122+
if apierrors.IsResourceExpired(err) {
123+
context.GetLogger(ctx).Warnf("continuation token expired (%v) -> requesting the full image stream list", err)
124+
iss, err = client.ImageStreams("").List(metav1.ListOptions{})
125+
warned = true
126+
}
127+
128+
if err != nil {
129+
return err
130+
}
131+
132+
warnBrokenPagination := func(msg string) {
133+
if !warned {
134+
context.GetLogger(ctx).Warnf("pagination not working: %s; the master API does not support chunking", msg)
135+
warned = true
136+
}
137+
}
138+
139+
if limit > 0 && limit < int64(len(iss.Items)) {
140+
warnBrokenPagination(fmt.Sprintf("received %d image streams instead of requested maximum %d", len(iss.Items), limit))
141+
}
142+
if len(iss.Items) > 0 && len(iss.ListMeta.Continue) > 0 {
143+
last := iss.Items[len(iss.Items)-1]
144+
r.cache.Add(fmt.Sprintf("%s/%s", last.Namespace, last.Name), iss.ListMeta.Continue, paginationEntryTTL)
145+
}
146+
147+
for _, is := range iss.Items {
148+
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
149+
if len(last) > 0 && name <= last {
150+
if !warned {
151+
warnBrokenPagination(fmt.Sprintf("received unexpected repository name %q -"+
152+
" lexicographically preceding the requested %q", name, last))
153+
}
154+
continue
155+
}
156+
err := handler(&is)
157+
if err != nil {
158+
return err
159+
}
160+
}
161+
162+
return nil
163+
}

0 commit comments

Comments
 (0)