@@ -9,22 +9,15 @@ import (
9
9
"github.com/docker/distribution"
10
10
"github.com/docker/distribution/context"
11
11
"github.com/docker/distribution/digest"
12
-
13
- "k8s.io/kubernetes/pkg/api/errors"
14
-
15
- imageapi "github.com/openshift/origin/pkg/image/api"
16
- "github.com/openshift/origin/pkg/image/importer"
17
12
)
18
13
19
14
// pullthroughBlobStore wraps a distribution.BlobStore and allows remote repositories to serve blobs from remote
20
15
// repositories.
21
16
type pullthroughBlobStore struct {
22
17
distribution.BlobStore
23
18
24
- repo * repository
25
- digestToStore map [string ]distribution.BlobStore
26
- pullFromInsecureRegistries bool
27
- mirror bool
19
+ repo * repository
20
+ mirror bool
28
21
}
29
22
30
23
var _ distribution.BlobStore = & pullthroughBlobStore {}
@@ -45,76 +38,13 @@ func (r *pullthroughBlobStore) Stat(ctx context.Context, dgst digest.Digest) (di
45
38
return desc , err
46
39
}
47
40
48
- return r .remoteStat (ctx , dgst )
49
- }
50
-
51
- // remoteStat attempts to find requested blob in candidate remote repositories and if found, it updates
52
- // digestToRepository store. ErrBlobUnknown will be returned if not found.
53
- func (r * pullthroughBlobStore ) remoteStat (ctx context.Context , dgst digest.Digest ) (distribution.Descriptor , error ) {
54
- // look up the potential remote repositories that this blob could be part of (at this time,
55
- // we don't know which image in the image stream surfaced the content).
56
- is , err := r .repo .getImageStream ()
57
- if err != nil {
58
- if errors .IsNotFound (err ) || errors .IsForbidden (err ) {
59
- return distribution.Descriptor {}, distribution .ErrBlobUnknown
60
- }
61
- context .GetLogger (ctx ).Errorf ("Error retrieving image stream for blob: %v" , err )
62
- return distribution.Descriptor {}, err
63
- }
64
-
65
- r .pullFromInsecureRegistries = false
66
-
67
- if insecure , ok := is .Annotations [imageapi .InsecureRepositoryAnnotation ]; ok {
68
- r .pullFromInsecureRegistries = insecure == "true"
69
- }
70
-
71
- var localRegistry string
72
- if local , err := imageapi .ParseDockerImageReference (is .Status .DockerImageRepository ); err == nil {
73
- // TODO: normalize further?
74
- localRegistry = local .Registry
75
- }
76
-
77
- retriever := r .repo .importContext ()
78
- cached := r .repo .cachedLayers .RepositoriesForDigest (dgst )
79
-
80
- // look at the first level of tagged repositories first
81
- search := identifyCandidateRepositories (is , localRegistry , true )
82
- if desc , err := r .findCandidateRepository (ctx , search , cached , dgst , retriever ); err == nil {
83
- return desc , nil
84
- }
85
-
86
- // look at all other repositories tagged by the server
87
- secondary := identifyCandidateRepositories (is , localRegistry , false )
88
- for k := range search {
89
- delete (secondary , k )
90
- }
91
- if desc , err := r .findCandidateRepository (ctx , secondary , cached , dgst , retriever ); err == nil {
92
- return desc , nil
93
- }
94
-
95
- return distribution.Descriptor {}, distribution .ErrBlobUnknown
96
- }
97
-
98
- // proxyStat attempts to locate the digest in the provided remote repository or returns an error. If the digest is found,
99
- // r.digestToStore saves the store.
100
- func (r * pullthroughBlobStore ) proxyStat (ctx context.Context , retriever importer.RepositoryRetriever , ref imageapi.DockerImageReference , dgst digest.Digest ) (distribution.Descriptor , error ) {
101
- context .GetLogger (ctx ).Infof ("Trying to stat %q from %q" , dgst , ref .Exact ())
102
- repo , err := retriever .Repository (ctx , ref .RegistryURL (), ref .RepositoryName (), r .pullFromInsecureRegistries )
103
- if err != nil {
104
- context .GetLogger (ctx ).Errorf ("Error getting remote repository for image %q: %v" , ref .Exact (), err )
105
- return distribution.Descriptor {}, err
106
- }
107
- pullthroughBlobStore := repo .Blobs (ctx )
108
- desc , err := pullthroughBlobStore .Stat (ctx , dgst )
109
- if err != nil {
110
- if err != distribution .ErrBlobUnknown {
111
- context .GetLogger (ctx ).Errorf ("Error getting pullthroughBlobStore for image %q: %v" , ref .Exact (), err )
112
- }
113
- return distribution.Descriptor {}, err
41
+ remoteGetter , found := RemoteBlobGetterFrom (r .repo .ctx )
42
+ if ! found {
43
+ context .GetLogger (ctx ).Errorf ("pullthroughBlobStore.Stat: failed to retrieve remote getter from context" )
44
+ return distribution.Descriptor {}, distribution .ErrBlobUnknown
114
45
}
115
46
116
- r .digestToStore [dgst .String ()] = pullthroughBlobStore
117
- return desc , nil
47
+ return remoteGetter .Stat (ctx , dgst )
118
48
}
119
49
120
50
// ServeBlob attempts to serve the requested digest onto w, using a remote proxy store if necessary.
@@ -123,130 +53,65 @@ func (r *pullthroughBlobStore) proxyStat(ctx context.Context, retriever importer
123
53
// success response with no actual body content.
124
54
// [1] https://docs.docker.com/registry/spec/api/#existing-layers
125
55
func (pbs * pullthroughBlobStore ) ServeBlob (ctx context.Context , w http.ResponseWriter , req * http.Request , dgst digest.Digest ) error {
126
- store , ok := pbs .digestToStore [dgst .String ()]
127
- if ! ok {
128
- return pbs .BlobStore .ServeBlob (ctx , w , req , dgst )
56
+ // This call should be done without BlobGetterService in the context.
57
+ err := pbs .BlobStore .ServeBlob (ctx , w , req , dgst )
58
+ switch {
59
+ case err == distribution .ErrBlobUnknown :
60
+ // continue on to the code below and look up the blob in a remote store since it is not in
61
+ // the local store
62
+ case err != nil :
63
+ context .GetLogger (ctx ).Errorf ("Failed to find blob %q: %#v" , dgst .String (), err )
64
+ fallthrough
65
+ default :
66
+ return err
67
+ }
68
+
69
+ remoteGetter , found := RemoteBlobGetterFrom (pbs .repo .ctx )
70
+ if ! found {
71
+ context .GetLogger (ctx ).Errorf ("pullthroughBlobStore.ServeBlob: failed to retrieve remote getter from context" )
72
+ return distribution .ErrBlobUnknown
129
73
}
130
74
131
75
// store the content locally if requested, but ensure only one instance at a time
132
76
// is storing to avoid excessive local writes
133
77
if pbs .mirror {
134
78
mu .Lock ()
135
- if _ , ok = inflight [dgst ]; ok {
79
+ if _ , ok : = inflight [dgst ]; ok {
136
80
mu .Unlock ()
137
81
context .GetLogger (ctx ).Infof ("Serving %q while mirroring in background" , dgst )
138
- _ , err := pbs .copyContent (store , ctx , dgst , w , req )
82
+ _ , err := pbs .copyContent (remoteGetter , ctx , dgst , w , req )
139
83
return err
140
84
}
141
85
inflight [dgst ] = struct {}{}
142
86
mu .Unlock ()
143
87
144
88
go func (dgst digest.Digest ) {
145
89
context .GetLogger (ctx ).Infof ("Start background mirroring of %q" , dgst )
146
- if err := pbs .storeLocal (store , ctx , dgst ); err != nil {
90
+ if err := pbs .storeLocal (remoteGetter , ctx , dgst ); err != nil {
147
91
context .GetLogger (ctx ).Errorf ("Error committing to storage: %s" , err .Error ())
148
92
}
149
93
context .GetLogger (ctx ).Infof ("Completed mirroring of %q" , dgst )
150
94
}(dgst )
151
95
}
152
96
153
- _ , err : = pbs .copyContent (store , ctx , dgst , w , req )
97
+ _ , err = pbs .copyContent (remoteGetter , ctx , dgst , w , req )
154
98
return err
155
99
}
156
100
157
101
// Get attempts to fetch the requested blob by digest using a remote proxy store if necessary.
158
102
func (r * pullthroughBlobStore ) Get (ctx context.Context , dgst digest.Digest ) ([]byte , error ) {
159
- store , ok := r .digestToStore [dgst .String ()]
160
- if ok {
161
- return store .Get (ctx , dgst )
162
- }
163
-
164
103
data , originalErr := r .BlobStore .Get (ctx , dgst )
165
104
if originalErr == nil {
166
105
return data , nil
167
106
}
168
107
169
- desc , err := r .remoteStat (ctx , dgst )
170
- if err != nil {
171
- context .GetLogger (ctx ).Errorf ("failed to stat blob %q in remote repositories: %v" , dgst .String (), err )
172
- return nil , originalErr
173
- }
174
- store , ok = r .digestToStore [desc .Digest .String ()]
175
- if ! ok {
108
+ remoteGetter , found := RemoteBlobGetterFrom (r .repo .ctx )
109
+ if ! found {
110
+ context .GetLogger (ctx ).Errorf ("pullthroughBlobStore.Get: failed to retrieve remote getter from context" )
176
111
return nil , originalErr
177
112
}
178
- return store .Get (ctx , desc .Digest )
179
- }
180
-
181
- // findCandidateRepository looks in search for a particular blob, referring to previously cached items
182
- func (r * pullthroughBlobStore ) findCandidateRepository (ctx context.Context , search map [string ]* imageapi.DockerImageReference , cachedLayers []string , dgst digest.Digest , retriever importer.RepositoryRetriever ) (distribution.Descriptor , error ) {
183
- // no possible remote locations to search, exit early
184
- if len (search ) == 0 {
185
- return distribution.Descriptor {}, distribution .ErrBlobUnknown
186
- }
187
113
188
- // see if any of the previously located repositories containing this digest are in this
189
- // image stream
190
- for _ , repo := range cachedLayers {
191
- ref , ok := search [repo ]
192
- if ! ok {
193
- continue
194
- }
195
- desc , err := r .proxyStat (ctx , retriever , * ref , dgst )
196
- if err != nil {
197
- delete (search , repo )
198
- continue
199
- }
200
- context .GetLogger (ctx ).Infof ("Found digest location from cache %q in %q" , dgst , repo )
201
- return desc , nil
202
- }
203
-
204
- // search the remaining registries for this digest
205
- for repo , ref := range search {
206
- desc , err := r .proxyStat (ctx , retriever , * ref , dgst )
207
- if err != nil {
208
- continue
209
- }
210
- r .repo .cachedLayers .RememberDigest (dgst , r .repo .blobrepositorycachettl , repo )
211
- context .GetLogger (ctx ).Infof ("Found digest location by search %q in %q" , dgst , repo )
212
- return desc , nil
213
- }
214
-
215
- return distribution.Descriptor {}, distribution .ErrBlobUnknown
216
- }
217
-
218
- // identifyCandidateRepositories returns a map of remote repositories referenced by this image stream.
219
- func identifyCandidateRepositories (is * imageapi.ImageStream , localRegistry string , primary bool ) map [string ]* imageapi.DockerImageReference {
220
- // identify the canonical location of referenced registries to search
221
- search := make (map [string ]* imageapi.DockerImageReference )
222
- for _ , tagEvent := range is .Status .Tags {
223
- var candidates []imageapi.TagEvent
224
- if primary {
225
- if len (tagEvent .Items ) == 0 {
226
- continue
227
- }
228
- candidates = tagEvent .Items [:1 ]
229
- } else {
230
- if len (tagEvent .Items ) <= 1 {
231
- continue
232
- }
233
- candidates = tagEvent .Items [1 :]
234
- }
235
- for _ , event := range candidates {
236
- ref , err := imageapi .ParseDockerImageReference (event .DockerImageReference )
237
- if err != nil {
238
- continue
239
- }
240
- // skip anything that matches the innate registry
241
- // TODO: there may be a better way to make this determination
242
- if len (localRegistry ) != 0 && localRegistry == ref .Registry {
243
- continue
244
- }
245
- ref = ref .DockerClientDefaults ()
246
- search [ref .AsRepository ().Exact ()] = & ref
247
- }
248
- }
249
- return search
114
+ return remoteGetter .Get (ctx , dgst )
250
115
}
251
116
252
117
// setResponseHeaders sets the appropriate content serving headers
@@ -264,7 +129,7 @@ var mu sync.Mutex
264
129
265
130
// copyContent attempts to load and serve the provided blob. If req != nil and writer is an instance of http.ResponseWriter,
266
131
// response headers will be set and range requests honored.
267
- func (pbs * pullthroughBlobStore ) copyContent (store distribution. BlobStore , ctx context.Context , dgst digest.Digest , writer io.Writer , req * http.Request ) (distribution.Descriptor , error ) {
132
+ func (pbs * pullthroughBlobStore ) copyContent (store BlobGetterService , ctx context.Context , dgst digest.Digest , writer io.Writer , req * http.Request ) (distribution.Descriptor , error ) {
268
133
desc , err := store .Stat (ctx , dgst )
269
134
if err != nil {
270
135
return distribution.Descriptor {}, err
@@ -292,7 +157,7 @@ func (pbs *pullthroughBlobStore) copyContent(store distribution.BlobStore, ctx c
292
157
}
293
158
294
159
// storeLocal retrieves the named blob from the provided store and writes it into the local store.
295
- func (pbs * pullthroughBlobStore ) storeLocal (store distribution. BlobStore , ctx context.Context , dgst digest.Digest ) error {
160
+ func (pbs * pullthroughBlobStore ) storeLocal (remoteGetter BlobGetterService , ctx context.Context , dgst digest.Digest ) error {
296
161
defer func () {
297
162
mu .Lock ()
298
163
delete (inflight , dgst )
@@ -308,7 +173,7 @@ func (pbs *pullthroughBlobStore) storeLocal(store distribution.BlobStore, ctx co
308
173
return err
309
174
}
310
175
311
- desc , err = pbs .copyContent (store , ctx , dgst , bw , nil )
176
+ desc , err = pbs .copyContent (remoteGetter , ctx , dgst , bw , nil )
312
177
if err != nil {
313
178
return err
314
179
}
0 commit comments