@@ -438,7 +438,12 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
438
438
for {
439
439
select {
440
440
case e := <- wc .incomingEventChan :
441
- res := wc .transform (e )
441
+ res , err := wc .transform (e )
442
+ if err != nil {
443
+ wc .sendError (err )
444
+ return
445
+ }
446
+
442
447
if res == nil {
443
448
continue
444
449
}
@@ -461,10 +466,8 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
461
466
462
467
func (wc * watchChan ) concurrentProcessEvents (wg * sync.WaitGroup ) {
463
468
p := concurrentOrderedEventProcessing {
464
- input : wc .incomingEventChan ,
465
- processFunc : wc .transform ,
466
- output : wc .resultChan ,
467
- processingQueue : make (chan chan * watch.Event , processEventConcurrency - 1 ),
469
+ wc : wc ,
470
+ processingQueue : make (chan chan * processingResult , processEventConcurrency - 1 ),
468
471
469
472
objectType : wc .watcher .objectType ,
470
473
groupResource : wc .watcher .groupResource ,
@@ -481,12 +484,15 @@ func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
481
484
}()
482
485
}
483
486
487
+ type processingResult struct {
488
+ event * watch.Event
489
+ err error
490
+ }
491
+
484
492
type concurrentOrderedEventProcessing struct {
485
- input chan * event
486
- processFunc func (* event ) * watch.Event
487
- output chan watch.Event
493
+ wc * watchChan
488
494
489
- processingQueue chan chan * watch. Event
495
+ processingQueue chan chan * processingResult
490
496
// Metadata for logging
491
497
objectType string
492
498
groupResource schema.GroupResource
@@ -498,28 +504,29 @@ func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.C
498
504
select {
499
505
case <- ctx .Done ():
500
506
return
501
- case e = <- p .input :
507
+ case e = <- p .wc . incomingEventChan :
502
508
}
503
- processingResponse := make (chan * watch. Event , 1 )
509
+ processingResponse := make (chan * processingResult , 1 )
504
510
select {
505
511
case <- ctx .Done ():
506
512
return
507
513
case p .processingQueue <- processingResponse :
508
514
}
509
515
wg .Add (1 )
510
- go func (e * event , response chan <- * watch. Event ) {
516
+ go func (e * event , response chan <- * processingResult ) {
511
517
defer wg .Done ()
518
+ responseEvent , err := p .wc .transform (e )
512
519
select {
513
520
case <- ctx .Done ():
514
- case response <- p . processFunc ( e ) :
521
+ case response <- & processingResult { event : responseEvent , err : err } :
515
522
}
516
523
}(e , processingResponse )
517
524
}
518
525
}
519
526
520
527
func (p * concurrentOrderedEventProcessing ) collectEventProcessing (ctx context.Context ) {
521
- var processingResponse chan * watch. Event
522
- var e * watch. Event
528
+ var processingResponse chan * processingResult
529
+ var r * processingResult
523
530
for {
524
531
select {
525
532
case <- ctx .Done ():
@@ -529,21 +536,25 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co
529
536
select {
530
537
case <- ctx .Done ():
531
538
return
532
- case e = <- processingResponse :
539
+ case r = <- processingResponse :
533
540
}
534
- if e == nil {
541
+ if r .err != nil {
542
+ p .wc .sendError (r .err )
543
+ return
544
+ }
545
+ if r .event == nil {
535
546
continue
536
547
}
537
- if len (p .output ) == cap (p .output ) {
538
- klog .V (3 ).InfoS ("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers" , "outgoingEvents" , outgoingBufSize , "objectType" , p .objectType , "groupResource" , p .groupResource )
548
+ if len (p .wc . resultChan ) == cap (p .wc . resultChan ) {
549
+ klog .V (3 ).InfoS ("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers" , "outgoingEvents" , outgoingBufSize , "objectType" , p .wc . watcher . objectType , "groupResource" , p . wc . watcher .groupResource )
539
550
}
540
551
// If user couldn't receive results fast enough, we also block incoming events from watcher.
541
552
// Because storing events in local will cause more memory usage.
542
553
// The worst case would be closing the fast watcher.
543
554
select {
544
- case <- ctx .Done ():
555
+ case p .wc .resultChan <- * r .event :
556
+ case <- p .wc .ctx .Done ():
545
557
return
546
- case p .output <- * e :
547
558
}
548
559
}
549
560
}
@@ -561,25 +572,23 @@ func (wc *watchChan) acceptAll() bool {
561
572
}
562
573
563
574
// transform transforms an event into a result for user if not filtered.
564
- func (wc * watchChan ) transform (e * event ) (res * watch.Event ) {
575
+ func (wc * watchChan ) transform (e * event ) (res * watch.Event , err error ) {
565
576
curObj , oldObj , err := wc .prepareObjs (e )
566
577
if err != nil {
567
578
klog .Errorf ("failed to prepare current and previous objects: %v" , err )
568
- wc .sendError (err )
569
- return nil
579
+ return nil , err
570
580
}
571
581
572
582
switch {
573
583
case e .isProgressNotify :
574
584
object := wc .watcher .newFunc ()
575
585
if err := wc .watcher .versioner .UpdateObject (object , uint64 (e .rev )); err != nil {
576
586
klog .Errorf ("failed to propagate object version: %v" , err )
577
- return nil
587
+ return nil , fmt . Errorf ( "failed to propagate object resource version: %w" , err )
578
588
}
579
589
if e .isInitialEventsEndBookmark {
580
590
if err := storage .AnnotateInitialEventsEndBookmark (object ); err != nil {
581
- wc .sendError (fmt .Errorf ("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v" , wc .watcher .groupResource , wc .watcher .objectType , object , err ))
582
- return nil
591
+ return nil , fmt .Errorf ("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w" , wc .watcher .groupResource , wc .watcher .objectType , object , err )
583
592
}
584
593
}
585
594
res = & watch.Event {
@@ -588,15 +597,15 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
588
597
}
589
598
case e .isDeleted :
590
599
if ! wc .filter (oldObj ) {
591
- return nil
600
+ return nil , nil
592
601
}
593
602
res = & watch.Event {
594
603
Type : watch .Deleted ,
595
604
Object : oldObj ,
596
605
}
597
606
case e .isCreated :
598
607
if ! wc .filter (curObj ) {
599
- return nil
608
+ return nil , nil
600
609
}
601
610
res = & watch.Event {
602
611
Type : watch .Added ,
@@ -608,7 +617,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
608
617
Type : watch .Modified ,
609
618
Object : curObj ,
610
619
}
611
- return res
620
+ return res , nil
612
621
}
613
622
curObjPasses := wc .filter (curObj )
614
623
oldObjPasses := wc .filter (oldObj )
@@ -630,7 +639,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
630
639
}
631
640
}
632
641
}
633
- return res
642
+ return res , nil
634
643
}
635
644
636
645
func transformErrorToEvent (err error ) * watch.Event {
0 commit comments