Skip to content

Commit 548fb43

Browse files
authored
Merge pull request #101292 from AliceZhang2016/job_controller_metrics
Graduate indexed job to beta
2 parents c9bd08a + 0c99f29 commit 548fb43

File tree

14 files changed

+143
-23
lines changed

14 files changed

+143
-23
lines changed

api/openapi-spec/swagger.json

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/apis/batch/types.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,11 @@ type JobSpec struct {
183183
// for each index.
184184
// When value is `Indexed`, .spec.completions must be specified and
185185
// `.spec.parallelism` must be less than or equal to 10^5.
186+
// In addition, The Pod name takes the form
187+
// `$(job-name)-$(index)-$(random-string)`,
188+
// the Pod hostname takes the form `$(job-name)-$(index)`.
186189
//
187-
// This field is alpha-level and is only honored by servers that enable the
188-
// IndexedJob feature gate. More completion modes can be added in the future.
190+
// This field is beta-level. More completion modes can be added in the future.
189191
// If the Job controller observes a mode that it doesn't recognize, the
190192
// controller skips updates for the Job.
191193
// +optional

pkg/apis/batch/validation/validation.go

+10
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,16 @@ func ValidateJob(job *batch.Job, opts apivalidation.PodValidationOptions) field.
8888
allErrs := apivalidation.ValidateObjectMeta(&job.ObjectMeta, true, apivalidation.ValidateReplicationControllerName, field.NewPath("metadata"))
8989
allErrs = append(allErrs, ValidateGeneratedSelector(job)...)
9090
allErrs = append(allErrs, ValidateJobSpec(&job.Spec, field.NewPath("spec"), opts)...)
91+
if job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion && job.Spec.Completions != nil && *job.Spec.Completions > 0 {
92+
// For indexed job, the job controller appends a suffix (`-$INDEX`)
93+
// to the pod hostname when indexed job create pods.
94+
// The index could be maximum `.spec.completions-1`
95+
// If we don't validate this here, the indexed job will fail to create pods later.
96+
maximumPodHostname := fmt.Sprintf("%s-%d", job.ObjectMeta.Name, *job.Spec.Completions-1)
97+
if errs := apimachineryvalidation.IsDNS1123Label(maximumPodHostname); len(errs) > 0 {
98+
allErrs = append(allErrs, field.Invalid(field.NewPath("metadata").Child("name"), job.ObjectMeta.Name, fmt.Sprintf("will not able to create pod with invalid DNS label: %s", maximumPodHostname)))
99+
}
100+
}
91101
return allErrs
92102
}
93103

pkg/controller/job/indexed_job_utils.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func getCompletionIndex(annotations map[string]string) int {
170170
if annotations == nil {
171171
return unknownCompletionIndex
172172
}
173-
v, ok := annotations[batch.JobCompletionIndexAnnotationAlpha]
173+
v, ok := annotations[batch.JobCompletionIndexAnnotation]
174174
if !ok {
175175
return unknownCompletionIndex
176176
}
@@ -203,7 +203,7 @@ func addCompletionIndexEnvVariable(container *v1.Container) {
203203
Name: completionIndexEnvName,
204204
ValueFrom: &v1.EnvVarSource{
205205
FieldRef: &v1.ObjectFieldSelector{
206-
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotationAlpha),
206+
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation),
207207
},
208208
},
209209
})
@@ -213,7 +213,7 @@ func addCompletionIndexAnnotation(template *v1.PodTemplateSpec, index int) {
213213
if template.Annotations == nil {
214214
template.Annotations = make(map[string]string, 1)
215215
}
216-
template.Annotations[batch.JobCompletionIndexAnnotationAlpha] = strconv.Itoa(index)
216+
template.Annotations[batch.JobCompletionIndexAnnotation] = strconv.Itoa(index)
217217
}
218218

219219
type byCompletionIndex []*v1.Pod

pkg/controller/job/indexed_job_utils_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ func hollowPodsWithIndexPhase(descs []indexPhase) []*v1.Pod {
279279
}
280280
if desc.Index != noIndex {
281281
p.Annotations = map[string]string{
282-
batch.JobCompletionIndexAnnotationAlpha: desc.Index,
282+
batch.JobCompletionIndexAnnotation: desc.Index,
283283
}
284284
}
285285
pods = append(pods, p)
@@ -297,7 +297,7 @@ func toIndexPhases(pods []*v1.Pod) []indexPhase {
297297
for i, p := range pods {
298298
index := noIndex
299299
if p.Annotations != nil {
300-
index = p.Annotations[batch.JobCompletionIndexAnnotationAlpha]
300+
index = p.Annotations[batch.JobCompletionIndexAnnotation]
301301
}
302302
result[i] = indexPhase{index, p.Status.Phase}
303303
}

pkg/controller/job/job_controller.go

+31-3
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"k8s.io/component-base/metrics/prometheus/ratelimiter"
4848
"k8s.io/klog/v2"
4949
"k8s.io/kubernetes/pkg/controller"
50+
"k8s.io/kubernetes/pkg/controller/job/metrics"
5051
"k8s.io/kubernetes/pkg/features"
5152
"k8s.io/utils/integer"
5253
)
@@ -60,7 +61,8 @@ var (
6061
// DefaultJobBackOff is the default backoff period, exported for the e2e test
6162
DefaultJobBackOff = 10 * time.Second
6263
// MaxJobBackOff is the max backoff period, exported for the e2e test
63-
MaxJobBackOff = 360 * time.Second
64+
MaxJobBackOff = 360 * time.Second
65+
maxPodCreateDeletePerSync = 500
6466
)
6567

6668
// Controller ensures that all Job objects have corresponding pods to
@@ -139,6 +141,8 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
139141
jm.updateHandler = jm.updateJobStatus
140142
jm.syncHandler = jm.syncJob
141143

144+
metrics.Register()
145+
142146
return jm
143147
}
144148

@@ -440,7 +444,7 @@ func (jm *Controller) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
440444
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
441445
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
442446
// concurrently with the same key.
443-
func (jm *Controller) syncJob(key string) (bool, error) {
447+
func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
444448
startTime := time.Now()
445449
defer func() {
446450
klog.V(4).Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
@@ -480,6 +484,21 @@ func (jm *Controller) syncJob(key string) (bool, error) {
480484
return false, nil
481485
}
482486

487+
completionMode := string(batch.NonIndexedCompletion)
488+
if isIndexedJob(&job) {
489+
completionMode = string(batch.IndexedCompletion)
490+
}
491+
492+
defer func() {
493+
result := "success"
494+
if rErr != nil {
495+
result = "error"
496+
}
497+
498+
metrics.JobSyncDurationSeconds.WithLabelValues(completionMode, result).Observe(time.Since(startTime).Seconds())
499+
metrics.JobSyncNum.WithLabelValues(completionMode, result).Inc()
500+
}()
501+
483502
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
484503
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
485504
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
@@ -546,6 +565,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
546565
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, v1.ConditionTrue, failureReason, failureMessage))
547566
jobConditionsChanged = true
548567
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
568+
metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc()
549569
} else {
550570
if jobNeedsSync && job.DeletionTimestamp == nil {
551571
active, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods)
@@ -581,6 +601,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
581601
now := metav1.Now()
582602
job.Status.CompletionTime = &now
583603
jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed")
604+
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
584605
} else if utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
585606
// Update the conditions / emit events only if manageJob was called in
586607
// this syncJob. Otherwise wait for the right syncJob call to make
@@ -613,7 +634,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
613634
}
614635
}
615636

616-
forget := false
637+
forget = false
617638
// Check if the number of jobs succeeded increased since the last check. If yes "forget" should be true
618639
// This logic is linked to the issue: https://github.com/kubernetes/kubernetes/issues/56853 that aims to
619640
// improve the Job backoff policy when parallelism > 1 and few Jobs failed but others succeed.
@@ -783,6 +804,9 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
783804
rmAtLeast = 0
784805
}
785806
podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
807+
if len(podsToDelete) > maxPodCreateDeletePerSync {
808+
podsToDelete = podsToDelete[:maxPodCreateDeletePerSync]
809+
}
786810
if len(podsToDelete) > 0 {
787811
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
788812
klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism)
@@ -803,6 +827,10 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
803827
return active, nil
804828
}
805829

830+
if diff > int32(maxPodCreateDeletePerSync) {
831+
diff = int32(maxPodCreateDeletePerSync)
832+
}
833+
806834
jm.expectations.ExpectCreations(jobKey, int(diff))
807835
errCh := make(chan error, diff)
808836
klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff)

pkg/controller/job/job_controller_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
149149
p.Status = v1.PodStatus{Phase: s.Phase}
150150
if s.Index != noIndex {
151151
p.Annotations = map[string]string{
152-
batch.JobCompletionIndexAnnotationAlpha: s.Index,
152+
batch.JobCompletionIndexAnnotation: s.Index,
153153
}
154154
}
155155
podIndexer.Add(p)
@@ -2176,7 +2176,7 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
21762176
Name: "JOB_COMPLETION_INDEX",
21772177
ValueFrom: &v1.EnvVarSource{
21782178
FieldRef: &v1.ObjectFieldSelector{
2179-
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotationAlpha),
2179+
FieldPath: fmt.Sprintf("metadata.annotations['%s']", batch.JobCompletionIndexAnnotation),
21802180
},
21812181
},
21822182
},

pkg/controller/job/metrics/metrics.go

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
Copyright 2021 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package metrics
18+
19+
import (
20+
"sync"
21+
22+
"k8s.io/component-base/metrics"
23+
"k8s.io/component-base/metrics/legacyregistry"
24+
)
25+
26+
// JobControllerSubsystem - subsystem name used for this controller.
27+
const JobControllerSubsystem = "job_controller"
28+
29+
var (
30+
// JobSyncDurationSeconds tracks the latency of job syncs as
31+
// completion_mode = Indexed / NonIndexed and result = success / error.
32+
JobSyncDurationSeconds = metrics.NewHistogramVec(
33+
&metrics.HistogramOpts{
34+
Subsystem: JobControllerSubsystem,
35+
Name: "job_sync_duration_seconds",
36+
Help: "The time it took to sync a job",
37+
StabilityLevel: metrics.ALPHA,
38+
Buckets: metrics.ExponentialBuckets(0.001, 2, 15),
39+
},
40+
[]string{"completion_mode", "result"},
41+
)
42+
// JobSyncNum tracks the number of job syncs as
43+
// completion_mode = Indexed / NonIndexed and result = success / error.
44+
JobSyncNum = metrics.NewCounterVec(
45+
&metrics.CounterOpts{
46+
Subsystem: JobControllerSubsystem,
47+
Name: "job_sync_total",
48+
Help: "The number of job syncs",
49+
StabilityLevel: metrics.ALPHA,
50+
},
51+
[]string{"completion_mode", "result"},
52+
)
53+
// JobFinishedNum tracks the number of jobs that finish as
54+
// completion_mode = Indexed / NonIndexed and result = failed / succeeded.
55+
JobFinishedNum = metrics.NewCounterVec(
56+
&metrics.CounterOpts{
57+
Subsystem: JobControllerSubsystem,
58+
Name: "job_finished_total",
59+
Help: "The number of finished job",
60+
StabilityLevel: metrics.ALPHA,
61+
},
62+
[]string{"completion_mode", "result"},
63+
)
64+
)
65+
66+
var registerMetrics sync.Once
67+
68+
// Register registers Job controller metrics.
69+
func Register() {
70+
registerMetrics.Do(func() {
71+
legacyregistry.MustRegister(JobSyncDurationSeconds)
72+
legacyregistry.MustRegister(JobSyncNum)
73+
legacyregistry.MustRegister(JobFinishedNum)
74+
})
75+
}

pkg/features/kube_features.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,7 @@ const (
293293

294294
// owner: @alculquicondor
295295
// alpha: v1.21
296+
// beta: v1.22
296297
//
297298
// Allows Job controller to manage Pod completions per completion index.
298299
IndexedJob featuregate.Feature = "IndexedJob"
@@ -779,7 +780,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
779780
NetworkPolicyEndPort: {Default: false, PreRelease: featuregate.Alpha},
780781
ProcMountType: {Default: false, PreRelease: featuregate.Alpha},
781782
TTLAfterFinished: {Default: true, PreRelease: featuregate.Beta},
782-
IndexedJob: {Default: false, PreRelease: featuregate.Alpha},
783+
IndexedJob: {Default: true, PreRelease: featuregate.Beta},
783784
KubeletPodResources: {Default: true, PreRelease: featuregate.Beta},
784785
LocalStorageCapacityIsolationFSQuotaMonitoring: {Default: false, PreRelease: featuregate.Alpha},
785786
NonPreemptingPriority: {Default: true, PreRelease: featuregate.Beta},

staging/src/k8s.io/api/batch/v1/generated.proto

+4-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

staging/src/k8s.io/api/batch/v1/types.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2222
)
2323

24-
const JobCompletionIndexAnnotationAlpha = "batch.kubernetes.io/job-completion-index"
24+
const JobCompletionIndexAnnotation = "batch.kubernetes.io/job-completion-index"
2525

2626
// +genclient
2727
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@@ -162,9 +162,11 @@ type JobSpec struct {
162162
// for each index.
163163
// When value is `Indexed`, .spec.completions must be specified and
164164
// `.spec.parallelism` must be less than or equal to 10^5.
165+
// In addition, The Pod name takes the form
166+
// `$(job-name)-$(index)-$(random-string)`,
167+
// the Pod hostname takes the form `$(job-name)-$(index)`.
165168
//
166-
// This field is alpha-level and is only honored by servers that enable the
167-
// IndexedJob feature gate. More completion modes can be added in the future.
169+
// This field is beta-level. More completion modes can be added in the future.
168170
// If the Job controller observes a mode that it doesn't recognize, the
169171
// controller skips updates for the Job.
170172
// +optional

0 commit comments

Comments
 (0)