Skip to content

Commit 47efca3

Browse files
authored
Improve inherited annotations (zalando#2657)
* Annotate PVC on Sync/Update, not only change PVC template * Don't rotate pods when only annotations changed * Annotate Logical Backup's and Pooler's pods * Annotate PDB, Endpoints created by the Operator, Secrets, Logical Backup jobs Inherited annotations are only added/updated, not removed
1 parent 2ef7d58 commit 47efca3

12 files changed

+643
-490
lines changed

e2e/tests/test_e2e.py

+19-14
Original file line numberDiff line numberDiff line change
@@ -909,22 +909,8 @@ def test_ignored_annotations(self):
909909
'''
910910
k8s = self.k8s
911911

912-
annotation_patch = {
913-
"metadata": {
914-
"annotations": {
915-
"k8s-status": "healthy"
916-
},
917-
}
918-
}
919912

920913
try:
921-
sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
922-
old_sts_creation_timestamp = sts.metadata.creation_timestamp
923-
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
924-
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
925-
old_svc_creation_timestamp = svc.metadata.creation_timestamp
926-
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)
927-
928914
patch_config_ignored_annotations = {
929915
"data": {
930916
"ignored_annotations": "k8s-status",
@@ -933,6 +919,25 @@ def test_ignored_annotations(self):
933919
k8s.update_config(patch_config_ignored_annotations)
934920
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
935921

922+
sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
923+
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')
924+
925+
annotation_patch = {
926+
"metadata": {
927+
"annotations": {
928+
"k8s-status": "healthy"
929+
},
930+
}
931+
}
932+
933+
old_sts_creation_timestamp = sts.metadata.creation_timestamp
934+
k8s.api.apps_v1.patch_namespaced_stateful_set(sts.metadata.name, sts.metadata.namespace, annotation_patch)
935+
old_svc_creation_timestamp = svc.metadata.creation_timestamp
936+
k8s.api.core_v1.patch_namespaced_service(svc.metadata.name, svc.metadata.namespace, annotation_patch)
937+
938+
k8s.delete_operator_pod()
939+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
940+
936941
sts = k8s.api.apps_v1.read_namespaced_stateful_set('acid-minimal-cluster', 'default')
937942
new_sts_creation_timestamp = sts.metadata.creation_timestamp
938943
svc = k8s.api.core_v1.read_namespaced_service('acid-minimal-cluster', 'default')

manifests/operator-service-account-rbac.yaml

+1
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ rules:
102102
- delete
103103
- get
104104
- update
105+
- patch
105106
# to check nodes for node readiness label
106107
- apiGroups:
107108
- ""

pkg/cluster/cluster.go

+41-50
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
appsv1 "k8s.io/api/apps/v1"
3131
batchv1 "k8s.io/api/batch/v1"
3232
v1 "k8s.io/api/core/v1"
33+
apipolicyv1 "k8s.io/api/policy/v1"
3334
policyv1 "k8s.io/api/policy/v1"
3435
rbacv1 "k8s.io/api/rbac/v1"
3536
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -433,6 +434,12 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
433434
reasons = append(reasons, "new statefulset's pod management policy do not match")
434435
}
435436

437+
if c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy == nil {
438+
c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy = &appsv1.StatefulSetPersistentVolumeClaimRetentionPolicy{
439+
WhenDeleted: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
440+
WhenScaled: appsv1.RetainPersistentVolumeClaimRetentionPolicyType,
441+
}
442+
}
436443
if !reflect.DeepEqual(c.Statefulset.Spec.PersistentVolumeClaimRetentionPolicy, statefulSet.Spec.PersistentVolumeClaimRetentionPolicy) {
437444
match = false
438445
needsReplace = true
@@ -493,7 +500,6 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
493500
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.Template.Annotations, statefulSet.Spec.Template.Annotations); changed {
494501
match = false
495502
needsReplace = true
496-
needsRollUpdate = true
497503
reasons = append(reasons, "new statefulset's pod template metadata annotations does not match "+reason)
498504
}
499505
if !reflect.DeepEqual(c.Statefulset.Spec.Template.Spec.SecurityContext, statefulSet.Spec.Template.Spec.SecurityContext) {
@@ -513,9 +519,9 @@ func (c *Cluster) compareStatefulSetWith(statefulSet *appsv1.StatefulSet) *compa
513519
reasons = append(reasons, fmt.Sprintf("new statefulset's name for volume %d does not match the current one", i))
514520
continue
515521
}
516-
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations) {
522+
if changed, reason := c.compareAnnotations(c.Statefulset.Spec.VolumeClaimTemplates[i].Annotations, statefulSet.Spec.VolumeClaimTemplates[i].Annotations); changed {
517523
needsReplace = true
518-
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one", name))
524+
reasons = append(reasons, fmt.Sprintf("new statefulset's annotations for volume %q does not match the current one: %s", name, reason))
519525
}
520526
if !reflect.DeepEqual(c.Statefulset.Spec.VolumeClaimTemplates[i].Spec, statefulSet.Spec.VolumeClaimTemplates[i].Spec) {
521527
name := c.Statefulset.Spec.VolumeClaimTemplates[i].Name
@@ -780,10 +786,6 @@ func (c *Cluster) compareServices(old, new *v1.Service) (bool, string) {
780786
}
781787
}
782788

783-
if changed, reason := c.compareAnnotations(old.Annotations, new.Annotations); changed {
784-
return !changed, "new service's annotations does not match the current one:" + reason
785-
}
786-
787789
return true, ""
788790
}
789791

@@ -801,6 +803,12 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
801803
newImage, curImage)
802804
}
803805

806+
newPodAnnotation := new.Spec.JobTemplate.Spec.Template.Annotations
807+
curPodAnnotation := cur.Spec.JobTemplate.Spec.Template.Annotations
808+
if changed, reason := c.compareAnnotations(curPodAnnotation, newPodAnnotation); changed {
809+
return false, fmt.Sprintf("new job's pod template metadata annotations does not match " + reason)
810+
}
811+
804812
newPgVersion := getPgVersion(new)
805813
curPgVersion := getPgVersion(cur)
806814
if newPgVersion != curPgVersion {
@@ -818,6 +826,17 @@ func (c *Cluster) compareLogicalBackupJob(cur, new *batchv1.CronJob) (match bool
818826
return true, ""
819827
}
820828

829+
func (c *Cluster) comparePodDisruptionBudget(cur, new *apipolicyv1.PodDisruptionBudget) (bool, string) {
830+
//TODO: improve comparison
831+
if match := reflect.DeepEqual(new.Spec, cur.Spec); !match {
832+
return false, "new PDB spec does not match the current one"
833+
}
834+
if changed, reason := c.compareAnnotations(cur.Annotations, new.Annotations); changed {
835+
return false, "new PDB's annotations does not match the current one:" + reason
836+
}
837+
return true, ""
838+
}
839+
821840
func getPgVersion(cronJob *batchv1.CronJob) string {
822841
envs := cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Env
823842
for _, env := range envs {
@@ -883,7 +902,6 @@ func (c *Cluster) hasFinalizer() bool {
883902
func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
884903
updateFailed := false
885904
userInitFailed := false
886-
syncStatefulSet := false
887905

888906
c.mu.Lock()
889907
defer c.mu.Unlock()
@@ -914,20 +932,16 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
914932
if IsBiggerPostgresVersion(oldSpec.Spec.PostgresqlParam.PgVersion, c.GetDesiredMajorVersion()) {
915933
c.logger.Infof("postgresql version increased (%s -> %s), depending on config manual upgrade needed",
916934
oldSpec.Spec.PostgresqlParam.PgVersion, newSpec.Spec.PostgresqlParam.PgVersion)
917-
syncStatefulSet = true
918935
} else {
919936
c.logger.Infof("postgresql major version unchanged or smaller, no changes needed")
920937
// sticking with old version, this will also advance GetDesiredVersion next time.
921938
newSpec.Spec.PostgresqlParam.PgVersion = oldSpec.Spec.PostgresqlParam.PgVersion
922939
}
923940

924941
// Service
925-
if !reflect.DeepEqual(c.generateService(Master, &oldSpec.Spec), c.generateService(Master, &newSpec.Spec)) ||
926-
!reflect.DeepEqual(c.generateService(Replica, &oldSpec.Spec), c.generateService(Replica, &newSpec.Spec)) {
927-
if err := c.syncServices(); err != nil {
928-
c.logger.Errorf("could not sync services: %v", err)
929-
updateFailed = true
930-
}
942+
if err := c.syncServices(); err != nil {
943+
c.logger.Errorf("could not sync services: %v", err)
944+
updateFailed = true
931945
}
932946

933947
// Users
@@ -946,15 +960,19 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
946960
// only when streams were not specified in oldSpec but in newSpec
947961
needStreamUser := len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0
948962

949-
if !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser {
963+
annotationsChanged, _ := c.compareAnnotations(oldSpec.Annotations, newSpec.Annotations)
964+
965+
initUsers := !sameUsers || !sameRotatedUsers || needPoolerUser || needStreamUser
966+
if initUsers {
950967
c.logger.Debugf("initialize users")
951968
if err := c.initUsers(); err != nil {
952969
c.logger.Errorf("could not init users - skipping sync of secrets and databases: %v", err)
953970
userInitFailed = true
954971
updateFailed = true
955972
return
956973
}
957-
974+
}
975+
if initUsers || annotationsChanged {
958976
c.logger.Debugf("syncing secrets")
959977
//TODO: mind the secrets of the deleted/new users
960978
if err := c.syncSecrets(); err != nil {
@@ -968,38 +986,14 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
968986
if c.OpConfig.StorageResizeMode != "off" {
969987
c.syncVolumes()
970988
} else {
971-
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume sync.")
972-
}
973-
974-
// streams configuration
975-
if len(oldSpec.Spec.Streams) == 0 && len(newSpec.Spec.Streams) > 0 {
976-
syncStatefulSet = true
989+
c.logger.Infof("Storage resize is disabled (storage_resize_mode is off). Skipping volume size sync.")
977990
}
978991

979992
// Statefulset
980993
func() {
981-
oldSs, err := c.generateStatefulSet(&oldSpec.Spec)
982-
if err != nil {
983-
c.logger.Errorf("could not generate old statefulset spec: %v", err)
994+
if err := c.syncStatefulSet(); err != nil {
995+
c.logger.Errorf("could not sync statefulsets: %v", err)
984996
updateFailed = true
985-
return
986-
}
987-
988-
newSs, err := c.generateStatefulSet(&newSpec.Spec)
989-
if err != nil {
990-
c.logger.Errorf("could not generate new statefulset spec: %v", err)
991-
updateFailed = true
992-
return
993-
}
994-
995-
if syncStatefulSet || !reflect.DeepEqual(oldSs, newSs) {
996-
c.logger.Debugf("syncing statefulsets")
997-
syncStatefulSet = false
998-
// TODO: avoid generating the StatefulSet object twice by passing it to syncStatefulSet
999-
if err := c.syncStatefulSet(); err != nil {
1000-
c.logger.Errorf("could not sync statefulsets: %v", err)
1001-
updateFailed = true
1002-
}
1003997
}
1004998
}()
1005999

@@ -1011,12 +1005,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error {
10111005
}
10121006

10131007
// pod disruption budget
1014-
if oldSpec.Spec.NumberOfInstances != newSpec.Spec.NumberOfInstances {
1015-
c.logger.Debug("syncing pod disruption budgets")
1016-
if err := c.syncPodDisruptionBudget(true); err != nil {
1017-
c.logger.Errorf("could not sync pod disruption budget: %v", err)
1018-
updateFailed = true
1019-
}
1008+
if err := c.syncPodDisruptionBudget(true); err != nil {
1009+
c.logger.Errorf("could not sync pod disruption budget: %v", err)
1010+
updateFailed = true
10201011
}
10211012

10221013
// logical backup job

0 commit comments

Comments
 (0)