Skip to content

Commit af98ede

Browse files
Merge pull request #2278 from atiratree/bump-1.31.8
OCPBUGS-55267: Update to Kubernetes v1.31.8
2 parents 0b8a681 + b9e1dc6 commit af98ede

File tree

8 files changed

+419
-121
lines changed

8 files changed

+419
-121
lines changed

CHANGELOG/CHANGELOG-1.31.md

Lines changed: 191 additions & 83 deletions
Large diffs are not rendered by default.

openshift-hack/e2e/annotate/generated/zz_generated.annotations.go

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

openshift-hack/images/hyperkube/Dockerfile.rhel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,4 @@ COPY --from=builder /tmp/build/* /usr/bin/
1414
LABEL io.k8s.display-name="OpenShift Kubernetes Server Commands" \
1515
io.k8s.description="OpenShift is a platform for developing, building, and deploying containerized applications." \
1616
io.openshift.tags="openshift,hyperkube" \
17-
io.openshift.build.versions="kubernetes=1.31.7"
17+
io.openshift.build.versions="kubernetes=1.31.8"

pkg/kubelet/volumemanager/populator/desired_state_of_world_populator.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"context"
2525
"errors"
2626
"fmt"
27+
"slices"
2728
"sync"
2829
"time"
2930

@@ -558,15 +559,21 @@ func (dswp *desiredStateOfWorldPopulator) getPVCExtractPV(
558559
return nil, fmt.Errorf("failed to fetch PVC from API server: %v", err)
559560
}
560561

561-
// Pods that uses a PVC that is being deleted must not be started.
562+
// Pods that uses a PVC that is being deleted and not protected by
563+
// kubernetes.io/pvc-protection must not be started.
562564
//
563-
// In case an old kubelet is running without this check or some kubelets
564-
// have this feature disabled, the worst that can happen is that such
565-
// pod is scheduled. This was the default behavior in 1.8 and earlier
566-
// and users should not be that surprised.
565+
// 1) In case an old kubelet is running without this check, the worst
566+
// that can happen is that such pod is scheduled. This was the default
567+
// behavior in 1.8 and earlier and users should not be that surprised.
567568
// It should happen only in very rare case when scheduler schedules
568569
// a pod and user deletes a PVC that's used by it at the same time.
569-
if pvc.ObjectMeta.DeletionTimestamp != nil {
570+
//
571+
// 2) Adding a check for kubernetes.io/pvc-protection here to prevent
572+
// the existing running pods from being affected during the rebuild of
573+
// the desired state of the world cache when the kubelet is restarted.
574+
// It is safe for kubelet to add this check here because the PVC will
575+
// be stuck in Terminating state until the pod is deleted.
576+
if pvc.ObjectMeta.DeletionTimestamp != nil && !slices.Contains(pvc.Finalizers, util.PVCProtectionFinalizer) {
570577
return nil, errors.New("PVC is being deleted")
571578
}
572579

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher.go

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,12 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
438438
for {
439439
select {
440440
case e := <-wc.incomingEventChan:
441-
res := wc.transform(e)
441+
res, err := wc.transform(e)
442+
if err != nil {
443+
wc.sendError(err)
444+
return
445+
}
446+
442447
if res == nil {
443448
continue
444449
}
@@ -461,10 +466,8 @@ func (wc *watchChan) serialProcessEvents(wg *sync.WaitGroup) {
461466

462467
func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
463468
p := concurrentOrderedEventProcessing{
464-
input: wc.incomingEventChan,
465-
processFunc: wc.transform,
466-
output: wc.resultChan,
467-
processingQueue: make(chan chan *watch.Event, processEventConcurrency-1),
469+
wc: wc,
470+
processingQueue: make(chan chan *processingResult, processEventConcurrency-1),
468471

469472
objectType: wc.watcher.objectType,
470473
groupResource: wc.watcher.groupResource,
@@ -481,12 +484,15 @@ func (wc *watchChan) concurrentProcessEvents(wg *sync.WaitGroup) {
481484
}()
482485
}
483486

487+
type processingResult struct {
488+
event *watch.Event
489+
err error
490+
}
491+
484492
type concurrentOrderedEventProcessing struct {
485-
input chan *event
486-
processFunc func(*event) *watch.Event
487-
output chan watch.Event
493+
wc *watchChan
488494

489-
processingQueue chan chan *watch.Event
495+
processingQueue chan chan *processingResult
490496
// Metadata for logging
491497
objectType string
492498
groupResource schema.GroupResource
@@ -498,28 +504,29 @@ func (p *concurrentOrderedEventProcessing) scheduleEventProcessing(ctx context.C
498504
select {
499505
case <-ctx.Done():
500506
return
501-
case e = <-p.input:
507+
case e = <-p.wc.incomingEventChan:
502508
}
503-
processingResponse := make(chan *watch.Event, 1)
509+
processingResponse := make(chan *processingResult, 1)
504510
select {
505511
case <-ctx.Done():
506512
return
507513
case p.processingQueue <- processingResponse:
508514
}
509515
wg.Add(1)
510-
go func(e *event, response chan<- *watch.Event) {
516+
go func(e *event, response chan<- *processingResult) {
511517
defer wg.Done()
518+
responseEvent, err := p.wc.transform(e)
512519
select {
513520
case <-ctx.Done():
514-
case response <- p.processFunc(e):
521+
case response <- &processingResult{event: responseEvent, err: err}:
515522
}
516523
}(e, processingResponse)
517524
}
518525
}
519526

520527
func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Context) {
521-
var processingResponse chan *watch.Event
522-
var e *watch.Event
528+
var processingResponse chan *processingResult
529+
var r *processingResult
523530
for {
524531
select {
525532
case <-ctx.Done():
@@ -529,21 +536,25 @@ func (p *concurrentOrderedEventProcessing) collectEventProcessing(ctx context.Co
529536
select {
530537
case <-ctx.Done():
531538
return
532-
case e = <-processingResponse:
539+
case r = <-processingResponse:
533540
}
534-
if e == nil {
541+
if r.err != nil {
542+
p.wc.sendError(r.err)
543+
return
544+
}
545+
if r.event == nil {
535546
continue
536547
}
537-
if len(p.output) == cap(p.output) {
538-
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.objectType, "groupResource", p.groupResource)
548+
if len(p.wc.resultChan) == cap(p.wc.resultChan) {
549+
klog.V(3).InfoS("Fast watcher, slow processing. Probably caused by slow dispatching events to watchers", "outgoingEvents", outgoingBufSize, "objectType", p.wc.watcher.objectType, "groupResource", p.wc.watcher.groupResource)
539550
}
540551
// If user couldn't receive results fast enough, we also block incoming events from watcher.
541552
// Because storing events in local will cause more memory usage.
542553
// The worst case would be closing the fast watcher.
543554
select {
544-
case <-ctx.Done():
555+
case p.wc.resultChan <- *r.event:
556+
case <-p.wc.ctx.Done():
545557
return
546-
case p.output <- *e:
547558
}
548559
}
549560
}
@@ -561,25 +572,23 @@ func (wc *watchChan) acceptAll() bool {
561572
}
562573

563574
// transform transforms an event into a result for user if not filtered.
564-
func (wc *watchChan) transform(e *event) (res *watch.Event) {
575+
func (wc *watchChan) transform(e *event) (res *watch.Event, err error) {
565576
curObj, oldObj, err := wc.prepareObjs(e)
566577
if err != nil {
567578
klog.Errorf("failed to prepare current and previous objects: %v", err)
568-
wc.sendError(err)
569-
return nil
579+
return nil, err
570580
}
571581

572582
switch {
573583
case e.isProgressNotify:
574584
object := wc.watcher.newFunc()
575585
if err := wc.watcher.versioner.UpdateObject(object, uint64(e.rev)); err != nil {
576586
klog.Errorf("failed to propagate object version: %v", err)
577-
return nil
587+
return nil, fmt.Errorf("failed to propagate object resource version: %w", err)
578588
}
579589
if e.isInitialEventsEndBookmark {
580590
if err := storage.AnnotateInitialEventsEndBookmark(object); err != nil {
581-
wc.sendError(fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %v", wc.watcher.groupResource, wc.watcher.objectType, object, err))
582-
return nil
591+
return nil, fmt.Errorf("error while accessing object's metadata gr: %v, type: %v, obj: %#v, err: %w", wc.watcher.groupResource, wc.watcher.objectType, object, err)
583592
}
584593
}
585594
res = &watch.Event{
@@ -588,15 +597,15 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
588597
}
589598
case e.isDeleted:
590599
if !wc.filter(oldObj) {
591-
return nil
600+
return nil, nil
592601
}
593602
res = &watch.Event{
594603
Type: watch.Deleted,
595604
Object: oldObj,
596605
}
597606
case e.isCreated:
598607
if !wc.filter(curObj) {
599-
return nil
608+
return nil, nil
600609
}
601610
res = &watch.Event{
602611
Type: watch.Added,
@@ -608,7 +617,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
608617
Type: watch.Modified,
609618
Object: curObj,
610619
}
611-
return res
620+
return res, nil
612621
}
613622
curObjPasses := wc.filter(curObj)
614623
oldObjPasses := wc.filter(oldObj)
@@ -630,7 +639,7 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
630639
}
631640
}
632641
}
633-
return res
642+
return res, nil
634643
}
635644

636645
func transformErrorToEvent(err error) *watch.Event {

staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ func TestEtcdWatchSemanticInitialEventsExtended(t *testing.T) {
144144
storagetesting.RunWatchSemanticInitialEventsExtended(ctx, t, store)
145145
}
146146

147+
func TestWatchErrorEventIsBlockingFurtherEvent(t *testing.T) {
148+
ctx, store, _ := testSetup(t)
149+
storagetesting.RunWatchErrorIsBlockingFurtherEvents(ctx, t, &storeWithPrefixTransformer{store})
150+
}
151+
147152
// =======================================================================
148153
// Implementation-specific tests are following.
149154
// The following tests are exercising the details of the implementation

staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1586,6 +1586,73 @@ func RunWatchSemanticInitialEventsExtended(ctx context.Context, t *testing.T, st
15861586
testCheckNoMoreResults(t, w)
15871587
}
15881588

1589+
func RunWatchErrorIsBlockingFurtherEvents(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
1590+
foo := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "foo"}}
1591+
fooKey := fmt.Sprintf("/pods/%s/%s", foo.Namespace, foo.Name)
1592+
fooCreated := &example.Pod{}
1593+
if err := store.Create(context.Background(), fooKey, foo, fooCreated, 0); err != nil {
1594+
t.Errorf("failed to create object: %v", err)
1595+
}
1596+
bar := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "bar"}}
1597+
barKey := fmt.Sprintf("/pods/%s/%s", bar.Namespace, bar.Name)
1598+
barCreated := &example.Pod{}
1599+
if err := store.Create(context.Background(), barKey, bar, barCreated, 0); err != nil {
1600+
t.Errorf("failed to create object: %v", err)
1601+
}
1602+
1603+
// Update transformer to ensure that foo will become effectively corrupted.
1604+
revertTransformer := store.UpdatePrefixTransformer(
1605+
func(transformer *PrefixTransformer) value.Transformer {
1606+
transformer.prefix = []byte("other-prefix")
1607+
return transformer
1608+
})
1609+
defer revertTransformer()
1610+
1611+
baz := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "baz"}}
1612+
bazKey := fmt.Sprintf("/pods/%s/%s", baz.Namespace, baz.Name)
1613+
bazCreated := &example.Pod{}
1614+
if err := store.Create(context.Background(), bazKey, baz, bazCreated, 0); err != nil {
1615+
t.Errorf("failed to create object: %v", err)
1616+
}
1617+
1618+
opts := storage.ListOptions{
1619+
ResourceVersion: fooCreated.ResourceVersion,
1620+
Predicate: storage.Everything,
1621+
Recursive: true,
1622+
}
1623+
1624+
// Run N concurrent watches. Given the asynchronous nature, we increase the
1625+
// probability of hitting the race in at least one of those watches.
1626+
concurrentWatches := 10
1627+
wg := sync.WaitGroup{}
1628+
for i := 0; i < concurrentWatches; i++ {
1629+
wg.Add(1)
1630+
go func() {
1631+
defer wg.Done()
1632+
w, err := store.Watch(ctx, "/pods", opts)
1633+
if err != nil {
1634+
t.Errorf("failed to create watch: %v", err)
1635+
return
1636+
}
1637+
1638+
// We issue the watch starting from object bar.
1639+
// The object fails TransformFromStorage and generates ERROR watch event.
1640+
// The further events (i.e. ADDED event for baz object) should not be
1641+
// emitted, so we verify no events other than ERROR type are emitted.
1642+
for {
1643+
event, ok := <-w.ResultChan()
1644+
if !ok {
1645+
break
1646+
}
1647+
if event.Type != watch.Error {
1648+
t.Errorf("unexpected event: %#v", event)
1649+
}
1650+
}
1651+
}()
1652+
}
1653+
wg.Wait()
1654+
}
1655+
15891656
func makePod(namePrefix string) *example.Pod {
15901657
return &example.Pod{
15911658
ObjectMeta: metav1.ObjectMeta{

0 commit comments

Comments
 (0)