Skip to content

Commit 0b8a79c

Browse files
author
Ricardo Lüders
authored
NO-JIRA: (refactor) job completion uses event instead polling (#888)
* refactor: job completion uses event instead polling * chore: add watch permission to batch * fix: handle event job as nil * fix: listen only modified event * fix: missing job name log * fix: clean up debug stuff * fix: handle event job casting fail * refactor: codereview feedback
1 parent 82cb56f commit 0b8a79c

File tree

3 files changed

+36
-18
lines changed

3 files changed

+36
-18
lines changed

manifests/03-clusterrole.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ rules:
438438
- get
439439
- list
440440
- delete
441+
- watch
441442
- apiGroups:
442443
- apps
443444
resources:

pkg/controller/periodic/job.go

+34-17
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,12 @@ package periodic
33
import (
44
"context"
55
"fmt"
6-
"time"
76

87
batchv1 "k8s.io/api/batch/v1"
98
corev1 "k8s.io/api/core/v1"
10-
"k8s.io/apimachinery/pkg/api/errors"
119
"k8s.io/apimachinery/pkg/api/resource"
1210
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13-
"k8s.io/apimachinery/pkg/util/wait"
11+
"k8s.io/apimachinery/pkg/watch"
1412
"k8s.io/client-go/kubernetes"
1513
)
1614

@@ -108,19 +106,38 @@ func (j *JobController) CreateGathererJob(ctx context.Context, dataGatherName, i
108106
return j.kubeClient.BatchV1().Jobs(insightsNamespace).Create(ctx, gj, metav1.CreateOptions{})
109107
}
110108

111-
// WaitForJobCompletion polls the Kubernetes API every 20 seconds and checks if the job finished.
112-
func (j *JobController) WaitForJobCompletion(ctx context.Context, job *batchv1.Job) error {
113-
return wait.PollUntilContextCancel(ctx, 20*time.Second, true, func(ctx context.Context) (done bool, err error) {
114-
j, err := j.kubeClient.BatchV1().Jobs(insightsNamespace).Get(ctx, job.Name, metav1.GetOptions{})
115-
if errors.IsNotFound(err) {
116-
return false, err
117-
}
118-
if j.Status.Succeeded > 0 {
119-
return true, nil
120-
}
121-
if j.Status.Failed > 0 {
122-
return true, fmt.Errorf("job %s failed", job.Name)
109+
// WaitForJobCompletion listen the Kubernetes events to check if job finished.
110+
func (j *JobController) WaitForJobCompletion(ctx context.Context, jobName string) error {
111+
watcher, err := j.kubeClient.BatchV1().Jobs(insightsNamespace).
112+
Watch(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("metadata.name=%s", jobName)})
113+
if err != nil {
114+
return err
115+
}
116+
defer watcher.Stop()
117+
118+
for {
119+
select {
120+
case <-ctx.Done():
121+
return ctx.Err()
122+
case event, ok := <-watcher.ResultChan():
123+
if !ok {
124+
return fmt.Errorf("watcher channel was closed unexpectedly")
125+
}
126+
127+
if event.Type != watch.Modified {
128+
continue
129+
}
130+
131+
job, ok := event.Object.(*batchv1.Job)
132+
if !ok {
133+
return fmt.Errorf("failed to cast job event: %v", event.Object)
134+
}
135+
if job.Status.Succeeded > 0 {
136+
return nil
137+
}
138+
if job.Status.Failed > 0 {
139+
return fmt.Errorf("job %s failed", job.Name)
140+
}
123141
}
124-
return false, nil
125-
})
142+
}
126143
}

pkg/controller/periodic/periodic.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ func (c *Controller) runJobAndCheckResults(ctx context.Context, dataGather *insi
346346
}
347347

348348
klog.Infof("Created new gathering job %v", gj.Name)
349-
err = c.jobController.WaitForJobCompletion(ctx, gj)
349+
err = c.jobController.WaitForJobCompletion(ctx, gj.Name)
350350
if err != nil {
351351
if errors.Is(err, context.DeadlineExceeded) {
352352
klog.Errorf("Failed to read job status: %v", err)

0 commit comments

Comments
 (0)