diff --git a/pkg/dockerregistry/server/pullthroughblobstore.go b/pkg/dockerregistry/server/pullthroughblobstore.go index 87e7381802f7..e12a3fba94dc 100644 --- a/pkg/dockerregistry/server/pullthroughblobstore.go +++ b/pkg/dockerregistry/server/pullthroughblobstore.go @@ -1,8 +1,10 @@ package server import ( + "fmt" "io" "net/http" + "os" "sync" "time" @@ -26,6 +28,7 @@ var _ distribution.BlobStore = &pullthroughBlobStore{} // the image stream and looks for those that have the layer. func (pbs *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { context.GetLogger(ctx).Debugf("(*pullthroughBlobStore).Stat: starting with dgst=%s", dgst.String()) + // check the local store for the blob desc, err := pbs.BlobStore.Stat(ctx, dgst) switch { @@ -108,6 +111,35 @@ func setResponseHeaders(w http.ResponseWriter, length int64, mediaType string, d w.Header().Set("Etag", digest.String()) } +// serveRemoteContent tries to use http.ServeContent for remote content. +func serveRemoteContent(rw http.ResponseWriter, req *http.Request, desc distribution.Descriptor, remoteReader io.ReadSeeker) (bool, error) { + // Set the appropriate content serving headers. + setResponseHeaders(rw, desc.Size, desc.MediaType, desc.Digest) + + // Fallback to Copy if request wasn't given. + if req == nil { + return false, nil + } + + // Check whether remoteReader is seekable. The remoteReader' Seek method must work: ServeContent uses + // a seek to the end of the content to determine its size. + if _, err := remoteReader.Seek(0, os.SEEK_END); err != nil { + // The remoteReader isn't seekable. It means that the remote response under the hood of remoteReader + // doesn't contain any Content-Range or Content-Length headers. In this case we need to rollback to + // simple Copy. + return false, nil + } + + // Move pointer back to begin. + if _, err := remoteReader.Seek(0, os.SEEK_SET); err != nil { + return false, err + } + + http.ServeContent(rw, req, desc.Digest.String(), time.Time{}, remoteReader) + + return true, nil +} + // inflight tracks currently downloading blobs var inflight = make(map[digest.Digest]struct{}) @@ -129,12 +161,16 @@ func (pbs *pullthroughBlobStore) copyContent(store BlobGetterService, ctx contex rw, ok := writer.(http.ResponseWriter) if ok { - setResponseHeaders(rw, desc.Size, desc.MediaType, dgst) - // serve range requests - if req != nil { - http.ServeContent(rw, req, desc.Digest.String(), time.Time{}, remoteReader) + contentHandled, err := serveRemoteContent(rw, req, desc, remoteReader) + if err != nil { + return distribution.Descriptor{}, err + } + + if contentHandled { return desc, nil } + + rw.Header().Set("Content-Length", fmt.Sprintf("%d", desc.Size)) } if _, err = io.CopyN(writer, remoteReader, desc.Size); err != nil { diff --git a/pkg/dockerregistry/server/pullthroughblobstore_test.go b/pkg/dockerregistry/server/pullthroughblobstore_test.go index 8940726be9f8..fe7f7635997f 100644 --- a/pkg/dockerregistry/server/pullthroughblobstore_test.go +++ b/pkg/dockerregistry/server/pullthroughblobstore_test.go @@ -4,11 +4,13 @@ import ( "bytes" "crypto/sha256" "fmt" + "io/ioutil" "net/http" "net/http/httptest" "net/url" "os" "strconv" + "strings" "testing" "time" @@ -214,6 +216,168 @@ func TestPullthroughServeBlob(t *testing.T) { } } +func TestPullthroughServeNotSeekableBlob(t *testing.T) { + namespace, name := "user", "app" + repoName := fmt.Sprintf("%s/%s", namespace, name) + log.SetLevel(log.DebugLevel) + installFakeAccessController(t) + setPassthroughBlobDescriptorServiceFactory() + + testImage, err := registrytest.NewImageForManifest(repoName, registrytest.SampleImageManifestSchema1, "", false) + if err != nil { + t.Fatal(err) + } + client := &testclient.Fake{} + client.AddReactor("get", "images", registrytest.GetFakeImageGetHandler(t, *testImage)) + + // TODO: get rid of those nasty global vars + backupRegistryClient := DefaultRegistryClient + DefaultRegistryClient = makeFakeRegistryClient(client, fake.NewSimpleClientset()) + defer func() { + // set it back once this test finishes to make other unit tests working again + DefaultRegistryClient = backupRegistryClient + }() + + reader, dgst, err := registrytest.CreateRandomTarFile() + if err != nil { + t.Fatalf("unexpected error generating test layer file: %v", err) + } + + blob1Content, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatalf("failed to read blob content: %v", err) + } + + blob1Storage := map[digest.Digest][]byte{dgst: blob1Content} + + // start regular HTTP server + remoteRegistryServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("External registry got %s %s", r.Method, r.URL.Path) + + w.Header().Set("Docker-Distribution-API-Version", "registry/2.0") + + switch r.URL.Path { + case "/v2/": + w.Write([]byte(`{}`)) + case "/v2/" + repoName + "/tags/list": + w.Write([]byte("{\"name\": \"" + repoName + "\", \"tags\": [\"latest\"]}")) + case "/v2/" + repoName + "/manifests/latest", "/v2/" + repoName + "/manifests/" + etcdDigest: + if r.Method == "HEAD" { + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(etcdManifest))) + w.Header().Set("Docker-Content-Digest", etcdDigest) + w.WriteHeader(http.StatusOK) + } else { + w.Write([]byte(etcdManifest)) + } + default: + if strings.HasPrefix(r.URL.Path, "/v2/"+repoName+"/blobs/") { + for dgst, payload := range blob1Storage { + if r.URL.Path != "/v2/"+repoName+"/blobs/"+dgst.String() { + continue + } + w.Header().Set("Content-Length", fmt.Sprintf("%d", len(payload))) + if r.Method == "HEAD" { + w.Header().Set("Docker-Content-Digest", dgst.String()) + w.WriteHeader(http.StatusOK) + return + } else { + // Important! + // + // We need to return any return code between 200 and 399, expept 200 and 206. + // https://github.com/docker/distribution/blob/master/registry/client/transport/http_reader.go#L192 + // + // In this case the docker client library will make a not truly + // seekable response. + // https://github.com/docker/distribution/blob/master/registry/client/transport/http_reader.go#L239 + w.WriteHeader(http.StatusAccepted) + } + w.Write(payload) + return + } + } + t.Fatalf("unexpected request %s: %#v", r.URL.Path, r) + } + })) + + serverURL, err := url.Parse(remoteRegistryServer.URL) + if err != nil { + t.Fatalf("error parsing server url: %v", err) + } + os.Setenv("DOCKER_REGISTRY_URL", serverURL.Host) + testImage.DockerImageReference = fmt.Sprintf("%s/%s@%s", serverURL.Host, repoName, testImage.Name) + + testImageStream := registrytest.TestNewImageStreamObject(namespace, name, "latest", testImage.Name, testImage.DockerImageReference) + if testImageStream.Annotations == nil { + testImageStream.Annotations = make(map[string]string) + } + testImageStream.Annotations[imageapi.InsecureRepositoryAnnotation] = "true" + + client.AddReactor("get", "imagestreams", imagetest.GetFakeImageStreamGetHandler(t, *testImageStream)) + + localBlobStore := newTestBlobStore(nil) + + ctx := WithTestPassthroughToUpstream(context.Background(), false) + repo := newTestRepositoryForPullthrough(t, ctx, nil, namespace, name, client, true) + ptbs := &pullthroughBlobStore{ + BlobStore: localBlobStore, + repo: repo, + } + + req, err := http.NewRequest("GET", fmt.Sprintf("http://example.org/v2/user/app/blobs/%s", dgst), nil) + if err != nil { + t.Fatalf("failed to create http request: %v", err) + } + w := httptest.NewRecorder() + + if _, err = ptbs.Stat(ctx, dgst); err != nil { + t.Fatalf("Stat returned unexpected error: %#+v", err) + } + + if err = ptbs.ServeBlob(ctx, w, req, dgst); err != nil { + t.Fatalf("ServeBlob returned unexpected error: %#+v", err) + } + + if w.Code != http.StatusOK { + t.Fatalf(`unexpected StatusCode: %d (expected %d)`, w.Code, http.StatusOK) + } + + clstr := w.Header().Get("Content-Length") + if cl, err := strconv.ParseInt(clstr, 10, 64); err != nil { + t.Fatalf(`unexpected Content-Length: %q (expected "%d")`, clstr, int64(len(blob1Content))) + } else { + if cl != int64(len(blob1Content)) { + t.Fatalf("Content-Length does not match expected size: %d != %d", cl, int64(len(blob1Content))) + } + } + + body := w.Body.Bytes() + if int64(len(body)) != int64(len(blob1Content)) { + t.Errorf("unexpected size of body: %d != %d", len(body), int64(len(blob1Content))) + } + + if localBlobStore.bytesServed != 0 { + t.Fatalf("remote blob served locally") + } + + expectedLocalCalls := map[string]int{ + "Stat": 1, + "ServeBlob": 1, + } + + for name, expCount := range expectedLocalCalls { + count := localBlobStore.calls[name] + if count != expCount { + t.Errorf("expected %d calls to method %s of local blob store, not %d", expCount, name, count) + } + } + + for name, count := range localBlobStore.calls { + if _, exists := expectedLocalCalls[name]; !exists { + t.Errorf("expected no calls to method %s of local blob store, got %d", name, count) + } + } +} + func TestPullthroughServeBlobInsecure(t *testing.T) { namespace := "user" repo1 := "app1"