From 35b6056f79e428b5aa13b7c16e33d87b6f332dd8 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 11 Aug 2023 14:19:11 -0400 Subject: [PATCH 1/2] deleted AWs should not be added to event queue --- .../queuejob/queuejob_controller_ex.go | 44 +++++++++---------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 2146a4ac..d3077fcf 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1613,6 +1613,12 @@ func larger(a, b string) bool { return a > b // Equal length, lexicographic order } +//When an AW is deleted, do not add such AWs to the event queue. +//AW can never be brought back when it is deleted by external client, so do not bother adding it to event queue. +//There will be a scenario, where an AW is in middle to dispatch and it is deleted. at that point when such an +//AW is added to etcd a conflict error will be raised. This is cause the current AW to be skipped. +//If there are large number of delete's will informer miss few delete events is under question for this simplification. +//For 1K AW all of them are delete from the system, and the next resubmitted AW begins processing after in less than 2 mins func (cc *XController) deleteQueueJob(obj interface{}) { qj, ok := obj.(*arbv1.AppWrapper) if !ok { @@ -1628,12 +1634,22 @@ func (cc *XController) deleteQueueJob(obj interface{}) { } else { accessor.SetDeletionTimestamp(¤t_ts) } - klog.V(3).Infof("[Informer-deleteQJ] %s enqueue deletion, deletion ts = %v", qj.Name, qj.GetDeletionTimestamp()) - //Remove stale copy - cc.eventQueue.Delete(qj) - cc.qjqueue.Delete(qj) - //Add fresh copy - cc.eventQueue.Add(qj) + // validate that app wraper has not been marked for deletion by the infomer's delete handler + if qj.DeletionTimestamp != nil { + klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s set for deletion.", qj.Namespace, qj.Name) + // cleanup resources for running job, ignoring errors + if err00 := cc.Cleanup(context.Background(), qj); err00 != nil { + klog.Warningf("Failed to cleanup resources for app wrapper '%s/%s', err = %v", qj.Namespace, qj.Name, err00) + } + // empty finalizers and delete the queuejob again + if accessor, err00 := meta.Accessor(qj); err00 == nil { + accessor.SetFinalizers(nil) + } + // we delete the job from the queue if it is there, ignoring errors + cc.qjqueue.Delete(qj) + cc.eventQueue.Delete(qj) + klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s deleted.", qj.Namespace, qj.Name) + } } func (cc *XController) enqueue(obj interface{}) error { @@ -1767,22 +1783,6 @@ func (cc *XController) worker() { } func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) error { - // validate that app wraper has not been marked for deletion by the infomer's delete handler - if qj.DeletionTimestamp != nil { - klog.V(3).Infof("[syncQueueJob] AW job=%s/%s set for deletion.", qj.Namespace, qj.Name) - // cleanup resources for running job, ignoring errors - if err00 := cc.Cleanup(ctx, qj); err00 != nil { - klog.Warningf("Failed to cleanup resources for app wrapper '%s/%s', err = %v", qj.Namespace, qj.Name, err00) - } - // empty finalizers and delete the queuejob again - if accessor, err00 := meta.Accessor(qj); err00 == nil { - accessor.SetFinalizers(nil) - } - // we delete the job from the queue if it is there, ignoring errors - cc.qjqueue.Delete(qj) - klog.V(3).Infof("[syncQueueJob] AW job=%s/%s deleted.", qj.Namespace, qj.Name) - return nil - } cacheAWJob, err := cc.getAppWrapper(qj.Namespace, qj.Name, "[syncQueueJob] get fresh appwrapper ") if err != nil { if apierrors.IsNotFound(err) { From ee275aa5c0330ca8ec04c1d39eccaf1d0a34a06d Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 11 Aug 2023 15:57:01 -0400 Subject: [PATCH 2/2] fix typo in docs --- pkg/controller/queuejob/queuejob_controller_ex.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index d3077fcf..82fddd07 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1614,11 +1614,11 @@ func larger(a, b string) bool { } //When an AW is deleted, do not add such AWs to the event queue. -//AW can never be brought back when it is deleted by external client, so do not bother adding it to event queue. -//There will be a scenario, where an AW is in middle to dispatch and it is deleted. at that point when such an -//AW is added to etcd a conflict error will be raised. This is cause the current AW to be skipped. -//If there are large number of delete's will informer miss few delete events is under question for this simplification. -//For 1K AW all of them are delete from the system, and the next resubmitted AW begins processing after in less than 2 mins +//AW can never be brought back when it is deleted by an external client, so do not bother adding it to event queue. +//There will be a scenario, where an AW is in middle of dispatch cycle and it may be deleted. At that point when such an +//AW is added to etcd a conflict error will be raised. This will cause the current AW to be skipped. +//If there are large number of delete's may be informer misses few delete events for this simplification. +//For 1K AW all of them are deleted from the system, and the next batch of re-submitted AW begins processing in less than 2 mins func (cc *XController) deleteQueueJob(obj interface{}) { qj, ok := obj.(*arbv1.AppWrapper) if !ok {