@@ -63,37 +63,63 @@ func (g *Gatherer) GatherWorkloadInfo(ctx context.Context) ([]record.Record, []e
63
63
return gatherWorkloadInfo (ctx , gatherKubeClient .CoreV1 (), gatherOpenShiftClient )
64
64
}
65
65
66
- // nolint: funlen, gocyclo, gocritic
67
66
func gatherWorkloadInfo (
68
67
ctx context.Context ,
69
68
coreClient corev1client.CoreV1Interface ,
70
69
imageClient imageclient.ImageV1Interface ,
71
70
) ([]record.Record , []error ) {
72
- // load images as we find them
73
- imageCh := make (chan string , workloadGatherPageSize )
74
- imagesDoneCh := gatherWorkloadImageInfo (ctx , imageClient .Images (), imageCh )
71
+ imageCh , imagesDoneCh := gatherWorkloadImageInfo (ctx , imageClient .Images ())
75
72
76
- // load pods in order
77
73
start := time .Now ()
74
+ limitReached , info , err := workloadInfo (ctx , coreClient , imageCh )
75
+ if err != nil {
76
+ return nil , []error {err }
77
+ }
78
+
79
+ workloadImageResize (info .PodCount )
80
+
81
+ records := []record.Record {
82
+ {
83
+ Name : "config/workload_info" ,
84
+ Item : record.JSONMarshaller {Object : & info },
85
+ },
86
+ }
87
+
88
+ // wait for as many images as we can find to load
89
+ handleWorkloadImageInfo (ctx , & info , start , imagesDoneCh )
90
+
91
+ if limitReached {
92
+ return records , []error {fmt .Errorf ("the %d limit for number of pods gathered was reached" , podsLimit )}
93
+ }
94
+ return records , nil
95
+ }
96
+
97
+ // nolint: funlen, gocritic, gocyclo
98
+ func workloadInfo (
99
+ ctx context.Context ,
100
+ coreClient corev1client.CoreV1Interface ,
101
+ imageCh chan string ,
102
+ ) (bool , workloadPods , error ) {
103
+ defer close (imageCh )
78
104
limitReached := false
79
105
80
106
var info workloadPods
81
- var namespace string
82
- var namespaceHash string
107
+ var namespace , namespaceHash , continueValue string
83
108
var namespacePods workloadNamespacePods
84
109
h := sha256 .New ()
85
110
86
111
// Use the Limit and Continue fields to request the pod information in chunks.
87
- var continueValue string
88
112
for {
89
113
pods , err := coreClient .Pods ("" ).List (ctx , metav1.ListOptions {
90
114
Limit : workloadGatherPageSize ,
91
115
Continue : continueValue ,
92
116
})
93
117
if err != nil {
94
- return nil , [] error { err }
118
+ return false , workloadPods {}, err
95
119
}
96
- for _ , pod := range pods .Items {
120
+
121
+ for podIdx := range pods .Items {
122
+ pod := pods .Items [podIdx ]
97
123
// initialize the running state, including the namespace hash
98
124
if pod .Namespace != namespace {
99
125
if len (namespace ) != 0 {
@@ -117,45 +143,35 @@ func gatherWorkloadInfo(
117
143
namespacePods .Count ++
118
144
119
145
switch {
120
- case pod . Status . Phase == corev1 . PodSucceeded , pod . Status . Phase == corev1 . PodFailed :
146
+ case isPodTerminated ( & pod ) :
121
147
// track terminal pods but do not report their data
122
148
namespacePods .TerminalCount ++
123
149
continue
124
- case pod .Status .Phase != corev1 .PodRunning && pod .Status .Phase != corev1 .PodPending ,
125
- len (pod .Status .InitContainerStatuses ) != len (pod .Spec .InitContainers ),
126
- len (pod .Status .ContainerStatuses ) != len (pod .Spec .Containers ):
150
+ case podCanBeIgnored (& pod ):
127
151
// consider pods that are in a known state
128
152
// or pods without filled out status are invalid
129
153
namespacePods .IgnoredCount ++
130
154
continue
131
155
}
132
156
133
- var podShape workloadPodShape
134
- var ok bool
135
- podShape .InitContainers , ok = calculateWorkloadContainerShapes (h , pod .Spec .InitContainers , pod .Status .InitContainerStatuses )
157
+ podShape , ok := calculatePodShape (h , & pod )
136
158
if ! ok {
137
159
namespacePods .InvalidCount ++
138
160
continue
139
161
}
140
- podShape .Containers , ok = calculateWorkloadContainerShapes (h , pod .Spec .Containers , pod .Status .ContainerStatuses )
141
- if ! ok {
142
- namespacePods .InvalidCount ++
143
- continue
144
- }
145
-
146
- podShape .RestartsAlways = pod .Spec .RestartPolicy == corev1 .RestartPolicyAlways
147
162
148
163
if index := workloadPodShapeIndex (namespacePods .Shapes , podShape ); index != - 1 {
149
164
namespacePods .Shapes [index ].Duplicates ++
150
- } else {
151
- namespacePods .Shapes = append (namespacePods .Shapes , podShape )
165
+ continue
166
+ }
167
+ namespacePods .Shapes = append (namespacePods .Shapes , podShape )
152
168
153
- for _ , container := range podShape .InitContainers {
154
- imageCh <- container .ImageID
155
- }
156
- for _ , container := range podShape . Containers {
157
- imageCh <- container . ImageID
158
- }
169
+ for _ , container := range podShape .InitContainers {
170
+ imageCh <- container .ImageID
171
+ }
172
+
173
+ for _ , container := range podShape . Containers {
174
+ imageCh <- container . ImageID
159
175
}
160
176
}
161
177
@@ -166,6 +182,7 @@ func gatherWorkloadInfo(
166
182
}
167
183
continueValue = pods .Continue
168
184
}
185
+
169
186
// add the last set of pods
170
187
if len (namespace ) != 0 {
171
188
if info .Namespaces == nil {
@@ -175,23 +192,53 @@ func gatherWorkloadInfo(
175
192
info .PodCount += namespacePods .Count
176
193
}
177
194
178
- workloadImageResize (info .PodCount )
195
+ return limitReached , info , nil
196
+ }
179
197
180
- records := []record.Record {
181
- {
182
- Name : "config/workload_info" ,
183
- Item : record.JSONMarshaller {Object : & info },
184
- },
198
+ func isPodTerminated (pod * corev1.Pod ) bool {
199
+ return pod .Status .Phase == corev1 .PodSucceeded ||
200
+ pod .Status .Phase == corev1 .PodFailed
201
+ }
202
+
203
+ func podCanBeIgnored (pod * corev1.Pod ) bool {
204
+ return pod .Status .Phase != corev1 .PodRunning && pod .Status .Phase != corev1 .PodPending ||
205
+ len (pod .Status .InitContainerStatuses ) != len (pod .Spec .InitContainers ) ||
206
+ len (pod .Status .ContainerStatuses ) != len (pod .Spec .Containers )
207
+ }
208
+
209
+ func calculatePodShape (h hash.Hash , pod * corev1.Pod ) (workloadPodShape , bool ) {
210
+ var podShape workloadPodShape
211
+ var ok bool
212
+ podShape .InitContainers , ok = calculateWorkloadContainerShapes (h , pod .Spec .InitContainers , pod .Status .InitContainerStatuses )
213
+ if ! ok {
214
+ return workloadPodShape {}, false
185
215
}
186
216
187
- // wait for as many images as we can find to load
217
+ podShape .Containers , ok = calculateWorkloadContainerShapes (h , pod .Spec .Containers , pod .Status .ContainerStatuses )
218
+ if ! ok {
219
+ return workloadPodShape {}, false
220
+ }
221
+
222
+ podShape .RestartsAlways = pod .Spec .RestartPolicy == corev1 .RestartPolicyAlways
223
+
224
+ return podShape , true
225
+ }
226
+
227
+ func handleWorkloadImageInfo (
228
+ ctx context.Context ,
229
+ info * workloadPods ,
230
+ start time.Time ,
231
+ imagesDoneCh <- chan workloadImageInfo ,
232
+ ) {
188
233
var imageInfo workloadImageInfo
234
+
189
235
// wait proportional to the number of pods + a floor
190
236
waitDuration := time .Second * time .Duration (info .PodCount )/ 10 + 15 * time .Second
237
+
191
238
klog .V (2 ).Infof ("Loaded pods in %s, will wait %s for image data" ,
192
239
time .Since (start ).Round (time .Second ).String (),
193
240
waitDuration .Round (time .Second ).String ())
194
- close ( imageCh )
241
+
195
242
select {
196
243
case <- ctx .Done ():
197
244
select {
@@ -208,19 +255,15 @@ func gatherWorkloadInfo(
208
255
209
256
info .Images = imageInfo .images
210
257
info .ImageCount = imageInfo .count
211
- if limitReached {
212
- return records , []error {fmt .Errorf ("the %d limit for number of pods gathered was reached" , podsLimit )}
213
- }
214
- return records , nil
215
258
}
216
259
217
- // nolint: gocyclo
260
+ // nolint: gocritic
218
261
func gatherWorkloadImageInfo (
219
262
ctx context.Context ,
220
263
imageClient imageclient.ImageInterface ,
221
- imageCh <- chan string ,
222
- ) <- chan workloadImageInfo {
264
+ ) (chan string , <- chan workloadImageInfo ) {
223
265
images := make (map [string ]workloadImage )
266
+ imageCh := make (chan string , workloadGatherPageSize )
224
267
imagesDoneCh := make (chan workloadImageInfo )
225
268
226
269
go func () {
@@ -254,17 +297,8 @@ func gatherWorkloadImageInfo(
254
297
for k := range pendingIDs {
255
298
delete (pendingIDs , k )
256
299
}
257
- if _ , ok := images [imageID ]; ! ok {
258
- pendingIDs [imageID ] = struct {}{}
259
- }
260
- for l := len (imageCh ); l > 0 ; l = len (imageCh ) {
261
- for i := 0 ; i < l ; i ++ {
262
- imageID := <- imageCh
263
- if _ , ok := images [imageID ]; ! ok {
264
- pendingIDs [imageID ] = struct {}{}
265
- }
266
- }
267
- }
300
+
301
+ readImageSHAs (pendingIDs , images , imageID , imageCh )
268
302
269
303
for imageID := range pendingIDs {
270
304
if _ , ok := images [imageID ]; ok {
@@ -274,29 +308,55 @@ func gatherWorkloadImageInfo(
274
308
images [imageID ] = image
275
309
continue
276
310
}
311
+
277
312
images [imageID ] = workloadImage {}
278
- start := time .Now ()
279
- image , err := imageClient .Get (ctx , imageID , metav1.GetOptions {})
280
- if errors .IsNotFound (err ) {
281
- klog .V (4 ).Infof ("No image %s (%s)" , imageID , time .Since (start ).Round (time .Millisecond ).String ())
313
+ image := imageFromID (ctx , imageClient , imageID )
314
+ if image == nil {
282
315
continue
283
316
}
284
- if err == context .Canceled {
285
- return
286
- }
287
- if err != nil {
288
- klog .Errorf ("Unable to retrieve image %s" , imageID )
289
- continue
290
- }
291
- klog .V (4 ).Infof ("Found image %s (%s)" , imageID , time .Since (start ).Round (time .Millisecond ).String ())
292
317
info := calculateWorkloadInfo (h , image )
293
318
images [imageID ] = info
294
319
workloadImageAdd (imageID , info )
295
320
}
296
321
}
297
322
}
298
323
}()
299
- return imagesDoneCh
324
+ return imageCh , imagesDoneCh
325
+ }
326
+
327
+ // readImageSHAs drains the channel of any image IDs
328
+ func readImageSHAs (pendingIDs map [string ]struct {}, images map [string ]workloadImage , id string , imageCh <- chan string ) {
329
+ if _ , ok := images [id ]; ! ok {
330
+ pendingIDs [id ] = struct {}{}
331
+ }
332
+ for l := len (imageCh ); l > 0 ; l = len (imageCh ) {
333
+ for i := 0 ; i < l ; i ++ {
334
+ imageID := <- imageCh
335
+ if _ , ok := images [imageID ]; ! ok {
336
+ pendingIDs [imageID ] = struct {}{}
337
+ }
338
+ }
339
+ }
340
+ }
341
+
342
+ // imageFromID gets the container image from given ID
343
+ func imageFromID (ctx context.Context , client imageclient.ImageInterface , id string ) * imagev1.Image {
344
+ start := time .Now ()
345
+ image , err := client .Get (ctx , id , metav1.GetOptions {})
346
+ if errors .IsNotFound (err ) {
347
+ klog .V (4 ).Infof ("No image %s (%s)" , id , time .Since (start ).Round (time .Millisecond ).String ())
348
+ return nil
349
+ }
350
+ if err == context .Canceled {
351
+ return nil
352
+ }
353
+ if err != nil {
354
+ klog .Errorf ("Unable to retrieve image %s" , id )
355
+ return nil
356
+ }
357
+
358
+ klog .V (4 ).Infof ("Found image %s (%s)" , id , time .Since (start ).Round (time .Millisecond ).String ())
359
+ return image
300
360
}
301
361
302
362
// workloadPodShapeIndex attempts to find an equivalent shape within the current
0 commit comments