Skip to content

Commit cc6193b

Browse files
author
Kubernetes Submit Queue
authored
Merge pull request kubernetes#34266 from jessfraz/automated-cherry-pick-of-#32914-kubernetes#33163-kubernetes#33227-kubernetes#33359-kubernetes#33605-kubernetes#33967-kubernetes#33977-kubernetes#34158-origin-release-1.4
Automatic merge from submit-queue Automated cherry pick of kubernetes#32914 kubernetes#33163 kubernetes#33227 kubernetes#33359 kubernetes#33605 kubernetes#33967 kubernetes#33977 kubernetes#34158 origin release 1.4 Cherry pick of kubernetes#32914 kubernetes#33163 kubernetes#33227 kubernetes#33359 kubernetes#33605 kubernetes#33967 kubernetes#33977 kubernetes#34158 on release-1.4. kubernetes#32914: Limit the number of names per image reported in the node kubernetes#33163: fix the appending bug kubernetes#33227: remove cpu limits for dns pod. The current limits are not kubernetes#33359: Fix goroutine leak in federation service controller kubernetes#33605: Add periodic ingress reconciliations. kubernetes#33967: scheduler: cache.delete deletes the pod from node specified kubernetes#33977: Heal the namespaceless ingresses in federation e2e. kubernetes#34158: Add missing argument to log message in federated ingress
2 parents 645c959 + 64ab233 commit cc6193b

File tree

12 files changed

+117
-28
lines changed

12 files changed

+117
-28
lines changed

cluster/ubuntu/reconfDocker.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ function restart_docker {
5959

6060
source /run/flannel/subnet.env
6161
source /etc/default/docker
62-
echo DOCKER_OPTS=\"${DOCKER_OPTS} -H tcp://127.0.0.1:4243 -H unix:///var/run/docker.sock \
62+
echo DOCKER_OPTS=\" -H tcp://127.0.0.1:4243 -H unix:///var/run/docker.sock \
6363
--bip=${FLANNEL_SUBNET} --mtu=${FLANNEL_MTU}\" > /etc/default/docker
6464
sudo service docker restart
6565
}

federation/pkg/federation-controller/ingress/ingress_controller.go

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
666666
ClusterName: cluster.Name,
667667
})
668668
} else {
669-
glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero. Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex)
669+
glog.V(4).Infof("No annotation %q exists on ingress %q in federation, and index of cluster %q is %d and not zero. Not queueing create operation for ingress %q until annotation exists", staticIPNameKeyWritable, ingress, cluster.Name, clusterIndex, ingress)
670670
}
671671
} else {
672672
clusterIngress := clusterIngressObj.(*extensions_v1beta1.Ingress)
@@ -679,6 +679,16 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
679679
glog.V(4).Infof(logStr, "Transferring")
680680
if !baseIPAnnotationExists && clusterIPNameExists {
681681
baseIngress.ObjectMeta.Annotations[staticIPNameKeyWritable] = clusterIPName
682+
glog.V(4).Infof("Attempting to update base federated ingress annotations: %v", baseIngress)
683+
if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil {
684+
glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err)
685+
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
686+
return
687+
} else {
688+
glog.V(4).Infof("Successfully updated federated ingress %q (added IP annotation), after update: %q", ingress, updatedFedIngress)
689+
ic.deliverIngress(ingress, ic.smallDelay, false)
690+
return
691+
}
682692
}
683693
if !baseLBStatusExists && clusterLBStatusExists {
684694
lbstatusObj, lbErr := conversion.NewCloner().DeepCopy(&clusterIngress.Status.LoadBalancer)
@@ -689,16 +699,16 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
689699
return
690700
}
691701
baseIngress.Status.LoadBalancer = *lbstatus
692-
}
693-
glog.V(4).Infof("Attempting to update base federated ingress: %v", baseIngress)
694-
if _, err = ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).Update(baseIngress); err != nil {
695-
glog.Errorf("Failed to add static IP annotation to federated ingress %q, will try again later: %v", ingress, err)
696-
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
697-
return
698-
} else {
699-
glog.V(4).Infof("Successfully added static IP annotation to federated ingress: %q", ingress)
700-
ic.deliverIngress(ingress, ic.smallDelay, false)
701-
return
702+
glog.V(4).Infof("Attempting to update base federated ingress status: %v", baseIngress)
703+
if updatedFedIngress, err := ic.federatedApiClient.Extensions().Ingresses(baseIngress.Namespace).UpdateStatus(baseIngress); err != nil {
704+
glog.Errorf("Failed to update federated ingress status of %q (loadbalancer status), will try again later: %v", ingress, err)
705+
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
706+
return
707+
} else {
708+
glog.V(4).Infof("Successfully updated federated ingress status of %q (added loadbalancer status), after update: %q", ingress, updatedFedIngress)
709+
ic.deliverIngress(ingress, ic.smallDelay, false)
710+
return
711+
}
702712
}
703713
} else {
704714
glog.V(4).Infof(logStr, "Not transferring")
@@ -712,10 +722,13 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
712722
objMeta, err := conversion.NewCloner().DeepCopy(clusterIngress.ObjectMeta)
713723
if err != nil {
714724
glog.Errorf("Error deep copying ObjectMeta: %v", err)
725+
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
726+
715727
}
716728
desiredIngress.ObjectMeta, ok = objMeta.(v1.ObjectMeta)
717729
if !ok {
718730
glog.Errorf("Internal error: Failed to cast to v1.ObjectMeta: %v", objMeta)
731+
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
719732
}
720733
// Merge any annotations and labels on the federated ingress onto the underlying cluster ingress,
721734
// overwriting duplicates.
@@ -748,6 +761,7 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
748761
if len(operations) == 0 {
749762
// Everything is in order
750763
glog.V(4).Infof("Ingress %q is up-to-date in all clusters - no propagation to clusters required.", ingress)
764+
ic.deliverIngress(ingress, ic.ingressReviewDelay, false)
751765
return
752766
}
753767
glog.V(4).Infof("Calling federatedUpdater.Update() - operations: %v", operations)
@@ -760,4 +774,6 @@ func (ic *IngressController) reconcileIngress(ingress types.NamespacedName) {
760774
ic.deliverIngress(ingress, ic.ingressReviewDelay, true)
761775
return
762776
}
777+
// Schedule another periodic reconciliation, only to account for possible bugs in watch processing.
778+
ic.deliverIngress(ingress, ic.ingressReviewDelay, false)
763779
}

federation/pkg/federation-controller/ingress/ingress_controller_test.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ func TestIngressController(t *testing.T) {
9999
Name: "test-ingress",
100100
Namespace: "mynamespace",
101101
SelfLink: "/api/v1/namespaces/mynamespace/ingress/test-ingress",
102-
// TODO: Remove: Annotations: map[string]string{},
103102
},
104103
Status: extensions_v1beta1.IngressStatus{
105104
LoadBalancer: api_v1.LoadBalancerStatus{

federation/pkg/federation-controller/service/endpoint_helper.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,34 @@ import (
3131
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
3232
// It enforces that the syncHandler is never invoked concurrently with the same key.
3333
func (sc *ServiceController) clusterEndpointWorker() {
34-
fedClient := sc.federationClient
34+
// process all pending events in endpointWorkerDoneChan
35+
ForLoop:
36+
for {
37+
select {
38+
case clusterName := <-sc.endpointWorkerDoneChan:
39+
sc.endpointWorkerMap[clusterName] = false
40+
default:
41+
// non-blocking, comes here if all existing events are processed
42+
break ForLoop
43+
}
44+
}
45+
3546
for clusterName, cache := range sc.clusterCache.clientMap {
47+
workerExist, found := sc.endpointWorkerMap[clusterName]
48+
if found && workerExist {
49+
continue
50+
}
51+
52+
// create a worker only if the previous worker has finished and gone out of scope
3653
go func(cache *clusterCache, clusterName string) {
54+
fedClient := sc.federationClient
3755
for {
3856
func() {
3957
key, quit := cache.endpointQueue.Get()
4058
// update endpoint cache
4159
if quit {
60+
// send signal that current worker has finished tasks and is going out of scope
61+
sc.endpointWorkerDoneChan <- clusterName
4262
return
4363
}
4464
defer cache.endpointQueue.Done(key)
@@ -49,6 +69,7 @@ func (sc *ServiceController) clusterEndpointWorker() {
4969
}()
5070
}
5171
}(cache, clusterName)
72+
sc.endpointWorkerMap[clusterName] = true
5273
}
5374
}
5475

federation/pkg/federation-controller/service/service_helper.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,23 +35,44 @@ import (
3535
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
3636
// It enforces that the syncHandler is never invoked concurrently with the same key.
3737
func (sc *ServiceController) clusterServiceWorker() {
38-
fedClient := sc.federationClient
38+
// process all pending events in serviceWorkerDoneChan
39+
ForLoop:
40+
for {
41+
select {
42+
case clusterName := <-sc.serviceWorkerDoneChan:
43+
sc.serviceWorkerMap[clusterName] = false
44+
default:
45+
// non-blocking, comes here if all existing events are processed
46+
break ForLoop
47+
}
48+
}
49+
3950
for clusterName, cache := range sc.clusterCache.clientMap {
51+
workerExist, found := sc.serviceWorkerMap[clusterName]
52+
if found && workerExist {
53+
continue
54+
}
55+
56+
// create a worker only if the previous worker has finished and gone out of scope
4057
go func(cache *clusterCache, clusterName string) {
58+
fedClient := sc.federationClient
4159
for {
4260
func() {
4361
key, quit := cache.serviceQueue.Get()
44-
defer cache.serviceQueue.Done(key)
4562
if quit {
63+
// send signal that current worker has finished tasks and is going out of scope
64+
sc.serviceWorkerDoneChan <- clusterName
4665
return
4766
}
67+
defer cache.serviceQueue.Done(key)
4868
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
4969
if err != nil {
5070
glog.Errorf("Failed to sync service: %+v", err)
5171
}
5272
}()
5373
}
5474
}(cache, clusterName)
75+
sc.serviceWorkerMap[clusterName] = true
5576
}
5677
}
5778

federation/pkg/federation-controller/service/servicecontroller.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ const (
6868
UserAgentName = "federation-service-controller"
6969
KubeAPIQPS = 20.0
7070
KubeAPIBurst = 30
71+
72+
maxNoOfClusters = 100
7173
)
7274

7375
type cachedService struct {
@@ -119,6 +121,16 @@ type ServiceController struct {
119121
// services that need to be synced
120122
queue *workqueue.Type
121123
knownClusterSet sets.String
124+
// endpoint worker map contains all the clusters registered with an indication that worker exist
125+
// key clusterName
126+
endpointWorkerMap map[string]bool
127+
// channel for worker to signal that it is going out of existence
128+
endpointWorkerDoneChan chan string
129+
// service worker map contains all the clusters registered with an indication that worker exist
130+
// key clusterName
131+
serviceWorkerMap map[string]bool
132+
// channel for worker to signal that it is going out of existence
133+
serviceWorkerDoneChan chan string
122134
}
123135

124136
// New returns a new service controller to keep DNS provider service resources
@@ -205,6 +217,11 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
205217
},
206218
},
207219
)
220+
221+
s.endpointWorkerMap = make(map[string]bool)
222+
s.serviceWorkerMap = make(map[string]bool)
223+
s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters)
224+
s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters)
208225
return s
209226
}
210227

pkg/kubelet/kubelet.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@ const (
158158
// Period for performing image garbage collection.
159159
ImageGCPeriod = 5 * time.Minute
160160

161-
// maxImagesInStatus is the number of max images we store in image status.
162-
maxImagesInNodeStatus = 50
163-
164161
// Minimum number of dead containers to keep in a pod
165162
minDeadContainerInPod = 1
166163
)

pkg/kubelet/kubelet_node_status.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,15 @@ import (
3939
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
4040
)
4141

42+
const (
43+
// maxImagesInNodeStatus is the number of max images we store in image status.
44+
maxImagesInNodeStatus = 50
45+
46+
// maxNamesPerImageInNodeStatus is max number of names per image stored in
47+
// the node status.
48+
maxNamesPerImageInNodeStatus = 5
49+
)
50+
4251
// registerWithApiServer registers the node with the cluster master. It is safe
4352
// to call multiple times, but not concurrently (kl.registrationCompleted is
4453
// not locked).
@@ -501,8 +510,13 @@ func (kl *Kubelet) setNodeStatusImages(node *api.Node) {
501510
}
502511

503512
for _, image := range containerImages {
513+
names := append(image.RepoDigests, image.RepoTags...)
514+
// Report up to maxNamesPerImageInNodeStatus names per image.
515+
if len(names) > maxNamesPerImageInNodeStatus {
516+
names = names[0:maxNamesPerImageInNodeStatus]
517+
}
504518
imagesOnNode = append(imagesOnNode, api.ContainerImage{
505-
Names: append(image.RepoTags, image.RepoDigests...),
519+
Names: names,
506520
SizeBytes: image.Size,
507521
})
508522
}

pkg/kubelet/kubelet_node_status_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ import (
4444
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
4545
)
4646

47+
const (
48+
maxImageTagsForTest = 20
49+
)
50+
4751
// generateTestingImageList generate randomly generated image list and corresponding expectedImageList.
4852
func generateTestingImageList(count int) ([]kubecontainer.Image, []api.ContainerImage) {
4953
// imageList is randomly generated image list
@@ -64,7 +68,7 @@ func generateTestingImageList(count int) ([]kubecontainer.Image, []api.Container
6468
var expectedImageList []api.ContainerImage
6569
for _, kubeImage := range imageList {
6670
apiImage := api.ContainerImage{
67-
Names: kubeImage.RepoTags,
71+
Names: kubeImage.RepoTags[0:maxNamesPerImageInNodeStatus],
6872
SizeBytes: kubeImage.Size,
6973
}
7074

@@ -76,7 +80,9 @@ func generateTestingImageList(count int) ([]kubecontainer.Image, []api.Container
7680

7781
func generateImageTags() []string {
7882
var tagList []string
79-
count := rand.IntnRange(1, maxImageTagsForTest+1)
83+
// Generate > maxNamesPerImageInNodeStatus tags so that the test can verify
84+
// that kubelet report up to maxNamesPerImageInNodeStatus tags.
85+
count := rand.IntnRange(maxNamesPerImageInNodeStatus+1, maxImageTagsForTest+1)
8086
for ; count > 0; count-- {
8187
tagList = append(tagList, "gcr.io/google_containers:v"+strconv.Itoa(count))
8288
}

pkg/kubelet/kubelet_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,6 @@ const (
8686
testReservationCPU = "200m"
8787
testReservationMemory = "100M"
8888

89-
maxImageTagsForTest = 3
90-
9189
// TODO(harry) any global place for these two?
9290
// Reasonable size range of all container images. 90%ile of images on dockerhub drops into this range.
9391
minImgSize int64 = 23 * 1024 * 1024

plugin/pkg/scheduler/schedulercache/cache.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,12 +244,12 @@ func (cache *schedulerCache) RemovePod(pod *api.Pod) error {
244244
cache.mu.Lock()
245245
defer cache.mu.Unlock()
246246

247-
_, ok := cache.podStates[key]
247+
cachedstate, ok := cache.podStates[key]
248248
switch {
249249
// An assumed pod won't have Delete/Remove event. It needs to have Add event
250250
// before Remove event, in which case the state would change from Assumed to Added.
251251
case ok && !cache.assumedPods[key]:
252-
err := cache.removePod(pod)
252+
err := cache.removePod(cachedstate.pod)
253253
if err != nil {
254254
return err
255255
}

test/e2e/federated-ingress.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,10 @@ func createIngressOrFail(clientset *federation_release_1_4.Clientset, namespace
284284
},
285285
}
286286

287-
_, err := clientset.Extensions().Ingresses(namespace).Create(ingress)
287+
newIng, err := clientset.Extensions().Ingresses(namespace).Create(ingress)
288288
framework.ExpectNoError(err, "Creating ingress %q in namespace %q", ingress.Name, namespace)
289289
By(fmt.Sprintf("Successfully created federated ingress %q in namespace %q", FederatedIngressName, namespace))
290-
return ingress
290+
return newIng
291291
}
292292

293293
func updateIngressOrFail(clientset *federation_release_1_4.Clientset, namespace string) (newIng *v1beta1.Ingress) {

0 commit comments

Comments
 (0)