-
Notifications
You must be signed in to change notification settings - Fork 40.4k
/
Copy pathscheduler.go
585 lines (505 loc) · 21.6 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"context"
"errors"
"fmt"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
configv1 "k8s.io/kube-scheduler/config/v1"
"k8s.io/kubernetes/pkg/features"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
internalcache "k8s.io/kubernetes/pkg/scheduler/backend/cache"
cachedebugger "k8s.io/kubernetes/pkg/scheduler/backend/cache/debugger"
internalqueue "k8s.io/kubernetes/pkg/scheduler/backend/queue"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/parallelize"
frameworkplugins "k8s.io/kubernetes/pkg/scheduler/framework/plugins"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/noderesources"
frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/profile"
"k8s.io/kubernetes/pkg/scheduler/util/assumecache"
)
const (
// Duration the scheduler will wait before expiring an assumed pod.
// See issue #106361 for more details about this parameter and its value.
durationToExpireAssumedPod time.Duration = 0
)
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods")
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
// It is expected that changes made via Cache will be observed
// by NodeLister and Algorithm.
Cache internalcache.Cache
Extenders []framework.Extender
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func(logger klog.Logger) (*framework.QueuedPodInfo, error)
// FailureHandler is called upon a scheduling failure.
FailureHandler FailureHandlerFn
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
// Return a struct of ScheduleResult with the name of suggested host on success,
// otherwise will return a FitError with reasons.
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
// Profiles are the scheduling profiles.
Profiles profile.Map
client clientset.Interface
nodeInfoSnapshot *internalcache.Snapshot
percentageOfNodesToScore int32
nextStartNodeIndex int
// logger *must* be initialized when creating a Scheduler,
// otherwise logging functions will access a nil sink and
// panic.
logger klog.Logger
// registeredHandlers contains the registrations of all handlers. It's used to check if all handlers have finished syncing before the scheduling cycles start.
registeredHandlers []cache.ResourceEventHandlerRegistration
}
func (sched *Scheduler) applyDefaultHandlers() {
sched.SchedulePod = sched.schedulePod
sched.FailureHandler = sched.handleSchedulingFailure
}
type schedulerOptions struct {
componentConfigVersion string
kubeConfig *restclient.Config
// Overridden by profile level percentageOfNodesToScore if set in v1.
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
podMaxInUnschedulablePodsDuration time.Duration
// Contains out-of-tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry frameworkruntime.Registry
profiles []schedulerapi.KubeSchedulerProfile
extenders []schedulerapi.Extender
frameworkCapturer FrameworkCapturer
parallelism int32
applyDefaultProfile bool
}
// Option configures a Scheduler
type Option func(*schedulerOptions)
// ScheduleResult represents the result of scheduling a pod.
type ScheduleResult struct {
// Name of the selected node.
SuggestedHost string
// The number of nodes the scheduler evaluated the pod against in the filtering
// phase and beyond.
EvaluatedNodes int
// The number of nodes out of the evaluated ones that fit the pod.
FeasibleNodes int
// The nominating info for scheduling cycle.
nominatingInfo *framework.NominatingInfo
}
// WithComponentConfigVersion sets the component config version to the
// KubeSchedulerConfiguration version used. The string should be the full
// scheme group/version of the external type we converted from (for example
// "kubescheduler.config.k8s.io/v1")
func WithComponentConfigVersion(apiVersion string) Option {
return func(o *schedulerOptions) {
o.componentConfigVersion = apiVersion
}
}
// WithKubeConfig sets the kube config for Scheduler.
func WithKubeConfig(cfg *restclient.Config) Option {
return func(o *schedulerOptions) {
o.kubeConfig = cfg
}
}
// WithProfiles sets profiles for Scheduler. By default, there is one profile
// with the name "default-scheduler".
func WithProfiles(p ...schedulerapi.KubeSchedulerProfile) Option {
return func(o *schedulerOptions) {
o.profiles = p
o.applyDefaultProfile = false
}
}
// WithParallelism sets the parallelism for all scheduler algorithms. Default is 16.
func WithParallelism(threads int32) Option {
return func(o *schedulerOptions) {
o.parallelism = threads
}
}
// WithPercentageOfNodesToScore sets percentageOfNodesToScore for Scheduler.
// The default value of 0 will use an adaptive percentage: 50 - (num of nodes)/125.
func WithPercentageOfNodesToScore(percentageOfNodesToScore *int32) Option {
return func(o *schedulerOptions) {
if percentageOfNodesToScore != nil {
o.percentageOfNodesToScore = *percentageOfNodesToScore
}
}
}
// WithFrameworkOutOfTreeRegistry sets the registry for out-of-tree plugins. Those plugins
// will be appended to the default registry.
func WithFrameworkOutOfTreeRegistry(registry frameworkruntime.Registry) Option {
return func(o *schedulerOptions) {
o.frameworkOutOfTreeRegistry = registry
}
}
// WithPodInitialBackoffSeconds sets podInitialBackoffSeconds for Scheduler, the default value is 1
func WithPodInitialBackoffSeconds(podInitialBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
o.podInitialBackoffSeconds = podInitialBackoffSeconds
}
}
// WithPodMaxBackoffSeconds sets podMaxBackoffSeconds for Scheduler, the default value is 10
func WithPodMaxBackoffSeconds(podMaxBackoffSeconds int64) Option {
return func(o *schedulerOptions) {
o.podMaxBackoffSeconds = podMaxBackoffSeconds
}
}
// WithPodMaxInUnschedulablePodsDuration sets podMaxInUnschedulablePodsDuration for PriorityQueue.
func WithPodMaxInUnschedulablePodsDuration(duration time.Duration) Option {
return func(o *schedulerOptions) {
o.podMaxInUnschedulablePodsDuration = duration
}
}
// WithExtenders sets extenders for the Scheduler
func WithExtenders(e ...schedulerapi.Extender) Option {
return func(o *schedulerOptions) {
o.extenders = e
}
}
// FrameworkCapturer is used for registering a notify function in building framework.
type FrameworkCapturer func(schedulerapi.KubeSchedulerProfile)
// WithBuildFrameworkCapturer sets a notify function for getting buildFramework details.
func WithBuildFrameworkCapturer(fc FrameworkCapturer) Option {
return func(o *schedulerOptions) {
o.frameworkCapturer = fc
}
}
var defaultSchedulerOptions = schedulerOptions{
percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore,
podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()),
podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()),
podMaxInUnschedulablePodsDuration: internalqueue.DefaultPodMaxInUnschedulablePodsDuration,
parallelism: int32(parallelize.DefaultParallelism),
// Ideally we would statically set the default profile here, but we can't because
// creating the default profile may require testing feature gates, which may get
// set dynamically in tests. Therefore, we delay creating it until New is actually
// invoked.
applyDefaultProfile: true,
}
// New returns a Scheduler
func New(ctx context.Context,
client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
opts ...Option) (*Scheduler, error) {
logger := klog.FromContext(ctx)
stopEverything := ctx.Done()
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
if options.applyDefaultProfile {
var versionedCfg configv1.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := schedulerapi.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
options.profiles = cfg.Profiles
}
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
metrics.Register()
extenders, err := buildExtenders(logger, options.extenders, options.profiles)
if err != nil {
return nil, fmt.Errorf("couldn't build extenders: %w", err)
}
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
snapshot := internalcache.NewEmptySnapshot()
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopEverything)
// waitingPods holds all the pods that are in the scheduler and waiting in the permit stage
waitingPods := frameworkruntime.NewWaitingPodsMap()
var resourceClaimCache *assumecache.AssumeCache
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
resourceClaimInformer := informerFactory.Resource().V1alpha3().ResourceClaims().Informer()
resourceClaimCache = assumecache.NewAssumeCache(logger, resourceClaimInformer, "ResourceClaim", "", nil)
}
profiles, err := profile.NewMap(ctx, options.profiles, registry, recorderFactory,
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
frameworkruntime.WithClientSet(client),
frameworkruntime.WithKubeConfig(options.kubeConfig),
frameworkruntime.WithInformerFactory(informerFactory),
frameworkruntime.WithResourceClaimCache(resourceClaimCache),
frameworkruntime.WithSnapshotSharedLister(snapshot),
frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
frameworkruntime.WithParallelism(int(options.parallelism)),
frameworkruntime.WithExtenders(extenders),
frameworkruntime.WithMetricsRecorder(metricsRecorder),
frameworkruntime.WithWaitingPods(waitingPods),
)
if err != nil {
return nil, fmt.Errorf("initializing profiles: %v", err)
}
if len(profiles) == 0 {
return nil, errors.New("at least one profile is required")
}
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
queueingHintsPerProfile := make(internalqueue.QueueingHintMapPerProfile)
var returnErr error
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
queueingHintsPerProfile[profileName], err = buildQueueingHintMap(ctx, profile.EnqueueExtensions())
if err != nil {
returnErr = errors.Join(returnErr, err)
}
}
if returnErr != nil {
return nil, returnErr
}
podQueue := internalqueue.NewSchedulingQueue(
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
informerFactory,
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
internalqueue.WithPodLister(podLister),
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
internalqueue.WithQueueingHintMapPerProfile(queueingHintsPerProfile),
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
internalqueue.WithMetricsRecorder(*metricsRecorder),
)
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
}
schedulerCache := internalcache.New(ctx, durationToExpireAssumedPod)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(ctx)
sched := &Scheduler{
Cache: schedulerCache,
client: client,
nodeInfoSnapshot: snapshot,
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders,
StopEverything: stopEverything,
SchedulingQueue: podQueue,
Profiles: profiles,
logger: logger,
}
sched.NextPod = podQueue.Pop
sched.applyDefaultHandlers()
if err = addAllEventHandlers(sched, informerFactory, dynInformerFactory, resourceClaimCache, unionedGVKs(queueingHintsPerProfile)); err != nil {
return nil, fmt.Errorf("adding event handlers: %w", err)
}
return sched, nil
}
// defaultQueueingHintFn is the default queueing hint function.
// It always returns Queue as the queueing hint.
var defaultQueueingHintFn = func(_ klog.Logger, _ *v1.Pod, _, _ interface{}) (framework.QueueingHint, error) {
return framework.Queue, nil
}
func buildQueueingHintMap(ctx context.Context, es []framework.EnqueueExtensions) (internalqueue.QueueingHintMap, error) {
queueingHintMap := make(internalqueue.QueueingHintMap)
var returnErr error
for _, e := range es {
events, err := e.EventsToRegister(ctx)
if err != nil {
returnErr = errors.Join(returnErr, err)
}
// This will happen when plugin registers with empty events, it's usually the case a pod
// will become reschedulable only for self-update, e.g. schedulingGates plugin, the pod
// will enter into the activeQ via priorityQueue.Update().
if len(events) == 0 {
continue
}
// Note: Rarely, a plugin implements EnqueueExtensions but returns nil.
// We treat it as: the plugin is not interested in any event, and hence pod failed by that plugin
// cannot be moved by any regular cluster event.
// So, we can just ignore such EventsToRegister here.
registerNodeAdded := false
registerNodeTaintUpdated := false
for _, event := range events {
fn := event.QueueingHintFn
if fn == nil || !utilfeature.DefaultFeatureGate.Enabled(features.SchedulerQueueingHints) {
fn = defaultQueueingHintFn
}
if event.Event.Resource == framework.Node {
if event.Event.ActionType&framework.Add != 0 {
registerNodeAdded = true
}
if event.Event.ActionType&framework.UpdateNodeTaint != 0 {
registerNodeTaintUpdated = true
}
}
queueingHintMap[event.Event] = append(queueingHintMap[event.Event], &internalqueue.QueueingHintFunction{
PluginName: e.Name(),
QueueingHintFn: fn,
})
}
if registerNodeAdded && !registerNodeTaintUpdated {
// Temporally fix for the issue https://github.com/kubernetes/kubernetes/issues/109437
// NodeAdded QueueingHint isn't always called because of preCheck.
// It's definitely not something expected for plugin developers,
// and registering UpdateNodeTaint event is the only mitigation for now.
//
// So, here registers UpdateNodeTaint event for plugins that has NodeAdded event, but don't have UpdateNodeTaint event.
// It has a bad impact for the requeuing efficiency though, a lot better than some Pods being stuch in the
// unschedulable pod pool.
// This behavior will be removed when we remove the preCheck feature.
// See: https://github.com/kubernetes/kubernetes/issues/110175
queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}] =
append(queueingHintMap[framework.ClusterEvent{Resource: framework.Node, ActionType: framework.UpdateNodeTaint}],
&internalqueue.QueueingHintFunction{
PluginName: e.Name(),
QueueingHintFn: defaultQueueingHintFn,
},
)
}
}
if returnErr != nil {
return nil, returnErr
}
return queueingHintMap, nil
}
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
logger := klog.FromContext(ctx)
sched.SchedulingQueue.Run(logger)
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
go wait.UntilWithContext(ctx, sched.ScheduleOne, 0)
<-ctx.Done()
sched.SchedulingQueue.Close()
// If the plugins satisfy the io.Closer interface, they are closed.
err := sched.Profiles.Close()
if err != nil {
logger.Error(err, "Failed to close plugins")
}
}
// NewInformerFactory creates a SharedInformerFactory and initializes a scheduler specific
// in-place podInformer.
func NewInformerFactory(cs clientset.Interface, resyncPeriod time.Duration) informers.SharedInformerFactory {
informerFactory := informers.NewSharedInformerFactory(cs, resyncPeriod)
informerFactory.InformerFor(&v1.Pod{}, newPodInformer)
return informerFactory
}
func buildExtenders(logger klog.Logger, extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
var fExtenders []framework.Extender
if len(extenders) == 0 {
return nil, nil
}
var ignoredExtendedResources []string
var ignorableExtenders []framework.Extender
for i := range extenders {
logger.V(2).Info("Creating extender", "extender", extenders[i])
extender, err := NewHTTPExtender(&extenders[i])
if err != nil {
return nil, err
}
if !extender.IsIgnorable() {
fExtenders = append(fExtenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range extenders[i].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
}
// place ignorable extenders to the tail of extenders
fExtenders = append(fExtenders, ignorableExtenders...)
// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
// This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
// plugin args (and in which case the extender ignored resources take precedence).
if len(ignoredExtendedResources) == 0 {
return fExtenders, nil
}
for i := range profiles {
prof := &profiles[i]
var found = false
for k := range prof.PluginConfig {
if prof.PluginConfig[k].Name == noderesources.Name {
// Update the existing args
pc := &prof.PluginConfig[k]
args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
}
args.IgnoredResources = ignoredExtendedResources
found = true
break
}
}
if !found {
return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
}
}
return fExtenders, nil
}
type FailureHandlerFn func(ctx context.Context, fwk framework.Framework, podInfo *framework.QueuedPodInfo, status *framework.Status, nominatingInfo *framework.NominatingInfo, start time.Time)
func unionedGVKs(queueingHintsPerProfile internalqueue.QueueingHintMapPerProfile) map[framework.GVK]framework.ActionType {
gvkMap := make(map[framework.GVK]framework.ActionType)
for _, queueingHints := range queueingHintsPerProfile {
for evt := range queueingHints {
if _, ok := gvkMap[evt.Resource]; ok {
gvkMap[evt.Resource] |= evt.ActionType
} else {
gvkMap[evt.Resource] = evt.ActionType
}
}
}
return gvkMap
}
// newPodInformer creates a shared index informer that returns only non-terminal pods.
// The PodInformer allows indexers to be added, but note that only non-conflict indexers are allowed.
func newPodInformer(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
selector := fmt.Sprintf("status.phase!=%v,status.phase!=%v", v1.PodSucceeded, v1.PodFailed)
tweakListOptions := func(options *metav1.ListOptions) {
options.FieldSelector = selector
}
informer := coreinformers.NewFilteredPodInformer(cs, metav1.NamespaceAll, resyncPeriod, cache.Indexers{}, tweakListOptions)
// Dropping `.metadata.managedFields` to improve memory usage.
// The Extract workflow (i.e. `ExtractPod`) should be unused.
trim := func(obj interface{}) (interface{}, error) {
if accessor, err := meta.Accessor(obj); err == nil {
if accessor.GetManagedFields() != nil {
accessor.SetManagedFields(nil)
}
}
return obj, nil
}
informer.SetTransform(trim)
return informer
}