Skip to content

Commit 387485d

Browse files
Merge pull request #16126 from miminar/origin3.6-registry-prunes-orphanes
Automatic merge from submit-queue [3.6][Backport] Prune orphaned blobs on registry storage Resolves [bz#1479340](https://bugzilla.redhat.com/show_bug.cgi?id=1479340) Backports #14585
2 parents 6f6641c + 466f2c3 commit 387485d

File tree

12 files changed

+1347
-214
lines changed

12 files changed

+1347
-214
lines changed

pkg/cmd/dockerregistry/dockerregistry.go

+94-3
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,17 @@ package dockerregistry
33
import (
44
"crypto/tls"
55
"crypto/x509"
6+
"flag"
67
"fmt"
78
"io"
89
"io/ioutil"
910
"net/http"
1011
"os"
12+
"strings"
1113
"time"
1214

1315
log "github.com/Sirupsen/logrus"
16+
"github.com/docker/go-units"
1417
gorillahandlers "github.com/gorilla/handlers"
1518

1619
"github.com/Sirupsen/logrus/formatters/logstash"
@@ -19,8 +22,10 @@ import (
1922
"github.com/docker/distribution/health"
2023
"github.com/docker/distribution/registry/auth"
2124
"github.com/docker/distribution/registry/handlers"
25+
"github.com/docker/distribution/registry/storage"
26+
"github.com/docker/distribution/registry/storage/driver/factory"
2227
"github.com/docker/distribution/uuid"
23-
"github.com/docker/distribution/version"
28+
distversion "github.com/docker/distribution/version"
2429

2530
_ "github.com/docker/distribution/registry/auth/htpasswd"
2631
_ "github.com/docker/distribution/registry/auth/token"
@@ -35,18 +40,104 @@ import (
3540
_ "github.com/docker/distribution/registry/storage/driver/s3-aws"
3641
_ "github.com/docker/distribution/registry/storage/driver/swift"
3742

38-
"strings"
43+
kubeversion "k8s.io/kubernetes/pkg/version"
3944

4045
"github.com/openshift/origin/pkg/cmd/server/crypto"
4146
"github.com/openshift/origin/pkg/cmd/util/clientcmd"
4247
"github.com/openshift/origin/pkg/dockerregistry/server"
4348
"github.com/openshift/origin/pkg/dockerregistry/server/api"
4449
"github.com/openshift/origin/pkg/dockerregistry/server/audit"
4550
registryconfig "github.com/openshift/origin/pkg/dockerregistry/server/configuration"
51+
"github.com/openshift/origin/pkg/dockerregistry/server/prune"
52+
"github.com/openshift/origin/pkg/version"
4653
)
4754

55+
var pruneMode = flag.String("prune", "", "prune blobs from the storage and exit (check, delete)")
56+
57+
func versionFields() log.Fields {
58+
return log.Fields{
59+
"distribution_version": distversion.Version,
60+
"kubernetes_version": kubeversion.Get(),
61+
"openshift_version": version.Get(),
62+
}
63+
}
64+
65+
// ExecutePruner runs the pruner.
66+
func ExecutePruner(configFile io.Reader, dryRun bool) {
67+
config, _, err := registryconfig.Parse(configFile)
68+
if err != nil {
69+
log.Fatalf("error parsing configuration file: %s", err)
70+
}
71+
72+
// A lot of installations have the 'debug' log level in their config files,
73+
// but it's too verbose for pruning. Therefore we ignore it, but we still
74+
// respect overrides using environment variables.
75+
config.Loglevel = ""
76+
config.Log.Level = configuration.Loglevel(os.Getenv("REGISTRY_LOG_LEVEL"))
77+
if len(config.Log.Level) == 0 {
78+
config.Log.Level = "warning"
79+
}
80+
81+
ctx := context.Background()
82+
ctx, err = configureLogging(ctx, config)
83+
if err != nil {
84+
log.Fatalf("error configuring logging: %s", err)
85+
}
86+
87+
startPrune := "start prune"
88+
var registryOptions []storage.RegistryOption
89+
if dryRun {
90+
startPrune += " (dry-run mode)"
91+
} else {
92+
registryOptions = append(registryOptions, storage.EnableDelete)
93+
}
94+
log.WithFields(versionFields()).Info(startPrune)
95+
96+
registryClient := server.NewRegistryClient(clientcmd.NewConfig().BindToFile())
97+
98+
storageDriver, err := factory.Create(config.Storage.Type(), config.Storage.Parameters())
99+
if err != nil {
100+
log.Fatalf("error creating storage driver: %s", err)
101+
}
102+
103+
registry, err := storage.NewRegistry(ctx, storageDriver, registryOptions...)
104+
if err != nil {
105+
log.Fatalf("error creating registry: %s", err)
106+
}
107+
108+
stats, err := prune.Prune(ctx, storageDriver, registry, registryClient, dryRun)
109+
if err != nil {
110+
log.Error(err)
111+
}
112+
if dryRun {
113+
fmt.Printf("Would delete %d blobs\n", stats.Blobs)
114+
fmt.Printf("Would free up %s of disk space\n", units.BytesSize(float64(stats.DiskSpace)))
115+
fmt.Println("Use -prune=delete to actually delete the data")
116+
} else {
117+
fmt.Printf("Deleted %d blobs\n", stats.Blobs)
118+
fmt.Printf("Freed up %s of disk space\n", units.BytesSize(float64(stats.DiskSpace)))
119+
}
120+
if err != nil {
121+
os.Exit(1)
122+
}
123+
}
124+
48125
// Execute runs the Docker registry.
49126
func Execute(configFile io.Reader) {
127+
if len(*pruneMode) != 0 {
128+
var dryRun bool
129+
switch *pruneMode {
130+
case "delete":
131+
dryRun = false
132+
case "check":
133+
dryRun = true
134+
default:
135+
log.Fatal("invalid value for the -prune option")
136+
}
137+
ExecutePruner(configFile, dryRun)
138+
return
139+
}
140+
50141
dockerConfig, extraConfig, err := registryconfig.Parse(configFile)
51142
if err != nil {
52143
log.Fatalf("error parsing configuration file: %s", err)
@@ -64,7 +155,7 @@ func Execute(configFile io.Reader) {
64155
registryClient := server.NewRegistryClient(clientcmd.NewConfig().BindToFile())
65156
ctx = server.WithRegistryClient(ctx, registryClient)
66157

67-
log.Infof("version=%s", version.Version)
158+
log.WithFields(versionFields()).Info("start registry")
68159
// inject a logger into the uuid library. warns us if there is a problem
69160
// with uuid generation under low entropy.
70161
uuid.Loggerf = context.GetLogger(ctx).Warnf

pkg/dockerregistry/server/errorblobstore.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,8 @@ func (f statCrossMountCreateOptions) Apply(v interface{}) error {
151151
if err != nil {
152152
context.GetLogger(f.ctx).Infof("cannot mount blob %s from repository %s: %v - disabling cross-repo mount",
153153
opts.Mount.From.Digest().String(),
154-
opts.Mount.From.Name())
154+
opts.Mount.From.Name(),
155+
err)
155156
opts.Mount.ShouldMount = false
156157
return nil
157158
}
+200
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package prune
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/docker/distribution"
7+
"github.com/docker/distribution/context"
8+
"github.com/docker/distribution/digest"
9+
"github.com/docker/distribution/manifest/schema2"
10+
"github.com/docker/distribution/reference"
11+
"github.com/docker/distribution/registry/storage"
12+
"github.com/docker/distribution/registry/storage/driver"
13+
14+
kerrors "k8s.io/apimachinery/pkg/api/errors"
15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
16+
17+
"github.com/openshift/origin/pkg/dockerregistry/server"
18+
imageapi "github.com/openshift/origin/pkg/image/apis/image"
19+
)
20+
21+
func imageStreamHasManifestDigest(is *imageapi.ImageStream, dgst digest.Digest) bool {
22+
for _, tagEventList := range is.Status.Tags {
23+
for _, tagEvent := range tagEventList.Items {
24+
if tagEvent.Image == string(dgst) {
25+
return true
26+
}
27+
}
28+
}
29+
return false
30+
}
31+
32+
// Summary is cumulative information about what was pruned.
33+
type Summary struct {
34+
Blobs int
35+
DiskSpace int64
36+
}
37+
38+
// Prune removes blobs which are not used by Images in OpenShift.
39+
//
40+
// On error, the Summary will contain what was deleted so far.
41+
//
42+
// TODO(dmage): remove layer links to a blob if the blob is removed or it doesn't belong to the ImageStream.
43+
// TODO(dmage): keep young blobs (docker/distribution#2297).
44+
func Prune(ctx context.Context, storageDriver driver.StorageDriver, registry distribution.Namespace, registryClient server.RegistryClient, dryRun bool) (Summary, error) {
45+
logger := context.GetLogger(ctx)
46+
47+
repositoryEnumerator, ok := registry.(distribution.RepositoryEnumerator)
48+
if !ok {
49+
return Summary{}, fmt.Errorf("unable to convert Namespace to RepositoryEnumerator")
50+
}
51+
52+
oc, _, err := registryClient.Clients()
53+
if err != nil {
54+
return Summary{}, fmt.Errorf("error getting clients: %v", err)
55+
}
56+
57+
imageList, err := oc.Images().List(metav1.ListOptions{})
58+
if err != nil {
59+
return Summary{}, fmt.Errorf("error listing images: %v", err)
60+
}
61+
62+
inuse := make(map[string]string)
63+
for _, image := range imageList.Items {
64+
// Keep the manifest.
65+
inuse[image.Name] = image.DockerImageReference
66+
67+
// Keep the config for a schema 2 manifest.
68+
if image.DockerImageManifestMediaType == schema2.MediaTypeManifest {
69+
inuse[image.DockerImageMetadata.ID] = image.DockerImageReference
70+
}
71+
72+
// Keep image layers.
73+
for _, layer := range image.DockerImageLayers {
74+
inuse[layer.Name] = image.DockerImageReference
75+
}
76+
}
77+
78+
var stats Summary
79+
80+
var reposToDelete []string
81+
err = repositoryEnumerator.Enumerate(ctx, func(repoName string) error {
82+
logger.Debugln("Processing repository", repoName)
83+
84+
named, err := reference.WithName(repoName)
85+
if err != nil {
86+
return fmt.Errorf("failed to parse the repo name %s: %v", repoName, err)
87+
}
88+
89+
ref, err := imageapi.ParseDockerImageReference(repoName)
90+
if err != nil {
91+
return fmt.Errorf("failed to parse the image reference %s: %v", repoName, err)
92+
}
93+
94+
is, err := oc.ImageStreams(ref.Namespace).Get(ref.Name, metav1.GetOptions{})
95+
if kerrors.IsNotFound(err) {
96+
logger.Printf("The image stream %s/%s is not found, will remove the whole repository", ref.Namespace, ref.Name)
97+
98+
// We cannot delete the repository at this point, because it would break Enumerate.
99+
reposToDelete = append(reposToDelete, repoName)
100+
101+
return nil
102+
} else if err != nil {
103+
return fmt.Errorf("failed to get the image stream %s: %v", repoName, err)
104+
}
105+
106+
repository, err := registry.Repository(ctx, named)
107+
if err != nil {
108+
return err
109+
}
110+
111+
manifestService, err := repository.Manifests(ctx)
112+
if err != nil {
113+
return err
114+
}
115+
116+
manifestEnumerator, ok := manifestService.(distribution.ManifestEnumerator)
117+
if !ok {
118+
return fmt.Errorf("unable to convert ManifestService into ManifestEnumerator")
119+
}
120+
121+
err = manifestEnumerator.Enumerate(ctx, func(dgst digest.Digest) error {
122+
if _, ok := inuse[string(dgst)]; ok && imageStreamHasManifestDigest(is, dgst) {
123+
logger.Debugf("Keeping the manifest link %s@%s", repoName, dgst)
124+
return nil
125+
}
126+
127+
if dryRun {
128+
logger.Printf("Would delete manifest link: %s@%s", repoName, dgst)
129+
return nil
130+
}
131+
132+
logger.Printf("Deleting manifest link: %s@%s", repoName, dgst)
133+
if err := manifestService.Delete(ctx, dgst); err != nil {
134+
return fmt.Errorf("failed to delete the manifest link %s@%s: %v", repoName, dgst, err)
135+
}
136+
137+
return nil
138+
})
139+
if e, ok := err.(driver.PathNotFoundError); ok {
140+
logger.Printf("Skipped manifest link pruning for the repository %s: %v", repoName, e)
141+
} else if err != nil {
142+
return fmt.Errorf("failed to prune manifest links in the repository %s: %v", repoName, err)
143+
}
144+
145+
return nil
146+
})
147+
if e, ok := err.(driver.PathNotFoundError); ok {
148+
logger.Warnf("No repositories found: %v", e)
149+
return stats, nil
150+
} else if err != nil {
151+
return stats, err
152+
}
153+
154+
vacuum := storage.NewVacuum(ctx, storageDriver)
155+
156+
logger.Debugln("Removing repositories")
157+
for _, repoName := range reposToDelete {
158+
if dryRun {
159+
logger.Printf("Would delete repository: %s", repoName)
160+
continue
161+
}
162+
163+
if err = vacuum.RemoveRepository(repoName); err != nil {
164+
return stats, fmt.Errorf("unable to remove the repository %s: %v", repoName, err)
165+
}
166+
}
167+
168+
logger.Debugln("Processing blobs")
169+
blobStatter := registry.BlobStatter()
170+
err = registry.Blobs().Enumerate(ctx, func(dgst digest.Digest) error {
171+
if imageReference, ok := inuse[string(dgst)]; ok {
172+
logger.Debugf("Keeping the blob %s (it belongs to the image %s)", dgst, imageReference)
173+
return nil
174+
}
175+
176+
desc, err := blobStatter.Stat(ctx, dgst)
177+
if err != nil {
178+
return err
179+
}
180+
181+
stats.Blobs++
182+
stats.DiskSpace += desc.Size
183+
184+
if dryRun {
185+
logger.Printf("Would delete blob: %s", dgst)
186+
return nil
187+
}
188+
189+
if err := vacuum.RemoveBlob(string(dgst)); err != nil {
190+
return fmt.Errorf("failed to delete the blob %s: %v", dgst, err)
191+
}
192+
193+
return nil
194+
})
195+
if e, ok := err.(driver.PathNotFoundError); ok {
196+
logger.Warnf("No repositories found: %v", e)
197+
return stats, nil
198+
}
199+
return stats, err
200+
}

test/extended/imageapis/limitrange_admission.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,14 @@ var _ = g.Describe("[Feature:ImageQuota] Image limit range", func() {
184184

185185
g.It(fmt.Sprintf("should deny an import of a repository exceeding limit on %s resource", imageapi.ResourceImageStreamTags), func() {
186186
oc.SetOutputDir(exutil.TestContext.OutputDir)
187-
defer tearDown(oc)
188187

189188
maxBulkImport, err := getMaxImagesBulkImportedPerRepository()
190-
o.Expect(err).NotTo(o.HaveOccurred())
189+
if err != nil {
190+
g.Skip(err.Error())
191+
return
192+
}
193+
194+
defer tearDown(oc)
191195

192196
s1tag2Image, err := buildAndPushTestImagesTo(oc, "src1st", "tag", maxBulkImport+1)
193197
s2tag2Image, err := buildAndPushTestImagesTo(oc, "src2nd", "t", 2)
@@ -235,7 +239,7 @@ func buildAndPushTestImagesTo(oc *exutil.CLI, isName string, tagPrefix string, n
235239

236240
for i := 1; i <= numberOfImages; i++ {
237241
tag := fmt.Sprintf("%s%d", tagPrefix, i)
238-
dgst, err := imagesutil.BuildAndPushImageOfSizeWithDocker(oc, dClient, isName, tag, imageSize, 2, g.GinkgoWriter, true)
242+
dgst, _, err := imagesutil.BuildAndPushImageOfSizeWithDocker(oc, dClient, isName, tag, imageSize, 2, g.GinkgoWriter, true, true)
239243
if err != nil {
240244
return nil, err
241245
}
@@ -309,7 +313,7 @@ func bumpLimit(oc *exutil.CLI, resourceName kapi.ResourceName, limit string) (ka
309313
func getMaxImagesBulkImportedPerRepository() (int, error) {
310314
max := os.Getenv("MAX_IMAGES_BULK_IMPORTED_PER_REPOSITORY")
311315
if len(max) == 0 {
312-
return 0, fmt.Errorf("MAX_IMAGES_BULK_IMAGES_IMPORTED_PER_REPOSITORY needs to be set")
316+
return 0, fmt.Errorf("MAX_IMAGES_BULK_IMAGES_IMPORTED_PER_REPOSITORY is not set")
313317
}
314318
return strconv.Atoi(max)
315319
}

0 commit comments

Comments
 (0)