Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registry catalog #87

Merged
merged 1 commit into from
Jun 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions pkg/dockerregistry/server/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/docker/distribution/configuration"
"github.com/docker/distribution/context"
storagedriver "github.com/docker/distribution/registry/storage/driver"
kubecache "k8s.io/apimachinery/pkg/util/cache"

"github.com/openshift/image-registry/pkg/dockerregistry/server/cache"
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
Expand All @@ -21,6 +22,7 @@ const (
// Default values
defaultDescriptorCacheSize = 4096
defaultDigestToRepositoryCacheSize = 2048
defaultPaginationCacheSize = 1024
)

// appMiddleware should be used only in tests.
Expand Down Expand Up @@ -59,16 +61,23 @@ type App struct {

// metrics provide methods to collect statistics.
metrics metrics.Metrics

// paginationCache maps repository names to opaque continue tokens received from master API for subsequent
// list imagestreams requests
paginationCache *kubecache.LRUExpireCache
}

func (app *App) Storage(driver storagedriver.StorageDriver, options map[string]interface{}) (storagedriver.StorageDriver, error) {
app.driver = app.metrics.StorageDriver(driver)
return app.driver, nil
}

func (app *App) Registry(registry distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
app.registry = registry
return registry, nil
func (app *App) Registry(nm distribution.Namespace, options map[string]interface{}) (distribution.Namespace, error) {
app.registry = nm
return &registry{
registry: nm,
enumerator: NewCachingRepositoryEnumerator(app.registryClient, app.paginationCache),
}, nil
}

func (app *App) BlobStatter() distribution.BlobStatter {
Expand All @@ -82,11 +91,12 @@ func (app *App) BlobStatter() distribution.BlobStatter {
// The program will be terminated if an error happens.
func NewApp(ctx context.Context, registryClient client.RegistryClient, dockerConfig *configuration.Configuration, extraConfig *registryconfig.Configuration, writeLimiter maxconnections.Limiter) http.Handler {
app := &App{
ctx: ctx,
registryClient: registryClient,
config: extraConfig,
writeLimiter: writeLimiter,
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
ctx: ctx,
registryClient: registryClient,
config: extraConfig,
writeLimiter: writeLimiter,
quotaEnforcing: newQuotaEnforcingConfig(ctx, extraConfig.Quota),
paginationCache: kubecache.NewLRUExpireCache(defaultPaginationCacheSize),
}

if app.config.Metrics.Enabled {
Expand Down
18 changes: 18 additions & 0 deletions pkg/dockerregistry/server/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,20 @@ func (ac *AccessController) Authorized(ctx context.Context, accessRecords ...reg
default:
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
}

case "registry":
switch access.Resource.Name {
case "catalog":
if access.Action != "*" {
return nil, ac.wrapErr(ctx, ErrUnsupportedAction)
}
if err := verifyCatalogAccess(ctx, osClient); err != nil {
return nil, ac.wrapErr(ctx, err)
}
default:
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
}

default:
return nil, ac.wrapErr(ctx, ErrUnsupportedResource)
}
Expand Down Expand Up @@ -440,6 +454,10 @@ func verifyPruneAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNam
return verifyWithGlobalSAR(ctx, "images", "", "delete", c)
}

func verifyCatalogAccess(ctx context.Context, c client.SelfSubjectAccessReviewsNamespacer) error {
return verifyWithGlobalSAR(ctx, "imagestreams", "", "list", c)
}

func verifyMetricsAccess(ctx context.Context, metrics configuration.Metrics, token string, c client.SelfSubjectAccessReviewsNamespacer) error {
if !metrics.Enabled {
return ErrOpenShiftAccessDenied
Expand Down
163 changes: 163 additions & 0 deletions pkg/dockerregistry/server/catalog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package server

import (
"errors"
"fmt"
"io"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/cache"

"github.com/docker/distribution/context"

imageapiv1 "github.com/openshift/api/image/v1"
"github.com/openshift/image-registry/pkg/dockerregistry/server/client"
)

const paginationEntryTTL = time.Minute

// RepositoryEnumerator allows to enumerate repositories known to the registry.
type RepositoryEnumerator interface {
// EnumerateRepositories fills the given repos slice with image stream names. The slice's length
// determines the maximum number of repositories returned. The repositories are lexicographically sorted.
// The last argument allows for pagination. It is the offset in the catalog. Returned is a number of
// repositories filled. If there are no more repositories to return, io.EOF is returned.
EnumerateRepositories(ctx context.Context, repos []string, last string) (n int, err error)
}

// cachingRepositoryEnumerator is an enumerator that supports chunking by caching associations between
// repository names and opaque continuation tokens.
type cachingRepositoryEnumerator struct {
client client.RegistryClient
// a cache of opaque continue tokens for repository enumeration
cache *cache.LRUExpireCache
}

var _ RepositoryEnumerator = &cachingRepositoryEnumerator{}

// NewCachingRepositoryEnumerator returns a new caching repository enumerator.
func NewCachingRepositoryEnumerator(client client.RegistryClient, cache *cache.LRUExpireCache) RepositoryEnumerator {
return &cachingRepositoryEnumerator{
client: client,
cache: cache,
}
}

type isHandlerFunc func(is *imageapiv1.ImageStream) error

var errNoSpaceInSlice = errors.New("no space in slice")
var errEnumerationFinished = errors.New("enumeration finished")

func (re *cachingRepositoryEnumerator) EnumerateRepositories(
ctx context.Context,
repos []string,
last string,
) (n int, err error) {
if len(repos) == 0 {
// Client explicitly requested 0 results. Returning nil for error seems more appropriate but this is
// more in line with upstream does. Returning nil actually makes the upstream code panic.
// TODO: patch upstream? /_catalog?n=0 results in 500
return 0, errNoSpaceInSlice
}

err = re.enumerateImageStreams(ctx, int64(len(repos)), last, func(is *imageapiv1.ImageStream) error {
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
repos[n] = name
n++

if n >= len(repos) {
return errEnumerationFinished
}

return nil
})

switch err {
case errEnumerationFinished:
err = nil
case nil:
err = io.EOF
}

return
}

func (r *cachingRepositoryEnumerator) enumerateImageStreams(
ctx context.Context,
limit int64,
last string,
handler isHandlerFunc,
) error {
var (
start string
warned bool
)

client, ok := userClientFrom(ctx)
if !ok {
context.GetLogger(ctx).Warnf("user token not set, falling back to registry client")
osClient, err := r.client.Client()
if err != nil {
return err
}
client = osClient
}

if len(last) > 0 {
if c, ok := r.cache.Get(last); !ok {
context.GetLogger(ctx).Warnf("failed to find opaque continue token for last repository=%q -> requesting the full image stream list instead of %d items", last, limit)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should set warned=true?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessary, but it will add to legibility and robustness.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

warned = true
limit = 0
} else {
start = c.(string)
}
}

iss, err := client.ImageStreams("").List(metav1.ListOptions{
Limit: limit,
Continue: start,
})
if apierrors.IsResourceExpired(err) {
context.GetLogger(ctx).Warnf("continuation token expired (%v) -> requesting the full image stream list", err)
iss, err = client.ImageStreams("").List(metav1.ListOptions{})
warned = true
}

if err != nil {
return err
}

warnBrokenPagination := func(msg string) {
if !warned {
context.GetLogger(ctx).Warnf("pagination not working: %s; the master API does not support chunking", msg)
warned = true
}
}

if limit > 0 && limit < int64(len(iss.Items)) {
warnBrokenPagination(fmt.Sprintf("received %d image streams instead of requested maximum %d", len(iss.Items), limit))
}
if len(iss.Items) > 0 && len(iss.ListMeta.Continue) > 0 {
last := iss.Items[len(iss.Items)-1]
r.cache.Add(fmt.Sprintf("%s/%s", last.Namespace, last.Name), iss.ListMeta.Continue, paginationEntryTTL)
}

for _, is := range iss.Items {
name := fmt.Sprintf("%s/%s", is.Namespace, is.Name)
if len(last) > 0 && name <= last {
if !warned {
warnBrokenPagination(fmt.Sprintf("received unexpected repository name %q -"+
" lexicographically preceding the requested %q", name, last))
}
continue
}
err := handler(&is)
if err != nil {
return err
}
}

return nil
}
Loading