From 4b6de6cc40073cc493c8674bb2841ccf19adb742 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Mon, 30 Dec 2024 10:20:37 +0100 Subject: [PATCH 1/9] PDB code refactoring to enable a second one --- pkg/cluster/cluster.go | 62 +++++++++++++-------------- pkg/cluster/k8sres.go | 6 +-- pkg/cluster/k8sres_test.go | 2 +- pkg/cluster/resources.go | 86 ++++++++++++++++++++++++++------------ pkg/cluster/sync.go | 39 +++++++++++------ pkg/cluster/types.go | 18 ++++---- pkg/cluster/util_test.go | 2 +- 7 files changed, 129 insertions(+), 86 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 469eff2ea..2b00e2bf2 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -59,16 +59,16 @@ type Config struct { } type kubeResources struct { - Services map[PostgresRole]*v1.Service - Endpoints map[PostgresRole]*v1.Endpoints - PatroniEndpoints map[string]*v1.Endpoints - PatroniConfigMaps map[string]*v1.ConfigMap - Secrets map[types.UID]*v1.Secret - Statefulset *appsv1.StatefulSet - VolumeClaims map[types.UID]*v1.PersistentVolumeClaim - PodDisruptionBudget *policyv1.PodDisruptionBudget - LogicalBackupJob *batchv1.CronJob - Streams map[string]*zalandov1.FabricEventStream + Services map[PostgresRole]*v1.Service + Endpoints map[PostgresRole]*v1.Endpoints + PatroniEndpoints map[string]*v1.Endpoints + PatroniConfigMaps map[string]*v1.ConfigMap + Secrets map[types.UID]*v1.Secret + Statefulset *appsv1.StatefulSet + VolumeClaims map[types.UID]*v1.PersistentVolumeClaim + MasterPodDisruptionBudget *policyv1.PodDisruptionBudget + LogicalBackupJob *batchv1.CronJob + Streams map[string]*zalandov1.FabricEventStream //Pods are treated separately } @@ -343,14 +343,10 @@ func (c *Cluster) Create() (err error) { c.logger.Infof("secrets have been successfully created") c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Secrets", "The secrets have been successfully created") - if c.PodDisruptionBudget != nil { - return fmt.Errorf("pod disruption budget already exists in the cluster") + if err = c.createPodDisruptionBudgets(); err != nil { + return fmt.Errorf("could not create pod disruption budgets: %v", err) } - pdb, err := c.createPodDisruptionBudget() - if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) - } - c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(pdb.ObjectMeta)) + c.logger.Info("pod disruption budgets have been successfully created") if c.Statefulset != nil { return fmt.Errorf("statefulset already exists in the cluster") @@ -1081,9 +1077,9 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { } } - // pod disruption budget - if err := c.syncPodDisruptionBudget(true); err != nil { - c.logger.Errorf("could not sync pod disruption budget: %v", err) + // pod disruption budgets + if err := c.syncPodDisruptionBudgets(true); err != nil { + c.logger.Errorf("could not sync pod disruption budgets: %v", err) updateFailed = true } @@ -1228,10 +1224,10 @@ func (c *Cluster) Delete() error { c.logger.Info("not deleting secrets because disabled in configuration") } - if err := c.deletePodDisruptionBudget(); err != nil { + if err := c.deletePodDisruptionBudgets(); err != nil { anyErrors = true - c.logger.Warningf("could not delete pod disruption budget: %v", err) - c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budget: %v", err) + c.logger.Warningf("could not delete pod disruption budgets: %v", err) + c.eventRecorder.Eventf(c.GetReference(), v1.EventTypeWarning, "Delete", "could not delete pod disruption budgets: %v", err) } for _, role := range []PostgresRole{Master, Replica} { @@ -1730,16 +1726,16 @@ func (c *Cluster) GetCurrentProcess() Process { // GetStatus provides status of the cluster func (c *Cluster) GetStatus() *ClusterStatus { status := &ClusterStatus{ - Cluster: c.Name, - Namespace: c.Namespace, - Team: c.Spec.TeamID, - Status: c.Status, - Spec: c.Spec, - MasterService: c.GetServiceMaster(), - ReplicaService: c.GetServiceReplica(), - StatefulSet: c.GetStatefulSet(), - PodDisruptionBudget: c.GetPodDisruptionBudget(), - CurrentProcess: c.GetCurrentProcess(), + Cluster: c.Name, + Namespace: c.Namespace, + Team: c.Spec.TeamID, + Status: c.Status, + Spec: c.Spec, + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + StatefulSet: c.GetStatefulSet(), + MasterPodDisruptionBudget: c.GetMasterPodDisruptionBudget(), + CurrentProcess: c.GetCurrentProcess(), Error: fmt.Errorf("error: %s", c.Error), } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index ff5536303..d1ea02d78 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -109,7 +109,7 @@ func (c *Cluster) servicePort(role PostgresRole) int32 { return pgPort } -func (c *Cluster) podDisruptionBudgetName() string { +func (c *Cluster) masterPodDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } @@ -2207,7 +2207,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript return result } -func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { +func (c *Cluster) generateMasterPodDisruptionBudget() *policyv1.PodDisruptionBudget { minAvailable := intstr.FromInt(1) pdbEnabled := c.OpConfig.EnablePodDisruptionBudget pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector @@ -2225,7 +2225,7 @@ func (c *Cluster) generatePodDisruptionBudget() *policyv1.PodDisruptionBudget { return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ - Name: c.podDisruptionBudgetName(), + Name: c.masterPodDisruptionBudgetName(), Namespace: c.Namespace, Labels: c.labelsSet(true), Annotations: c.annotationsSet(nil), diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 612e4525a..564ee0a24 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2491,7 +2491,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } for _, tt := range tests { - result := tt.spec.generatePodDisruptionBudget() + result := tt.spec.generateMasterPodDisruptionBudget() for _, check := range tt.check { err := check(tt.spec, result) if err != nil { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 43b8dfaae..316071491 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -23,8 +23,8 @@ const ( ) func (c *Cluster) listResources() error { - if c.PodDisruptionBudget != nil { - c.logger.Infof("found pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta), c.PodDisruptionBudget.UID) + if c.MasterPodDisruptionBudget != nil { + c.logger.Infof("found master pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.MasterPodDisruptionBudget.ObjectMeta), c.MasterPodDisruptionBudget.UID) } if c.Statefulset != nil { @@ -417,26 +417,47 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset return result } -func (c *Cluster) createPodDisruptionBudget() (*policyv1.PodDisruptionBudget, error) { - podDisruptionBudgetSpec := c.generatePodDisruptionBudget() +func (c *Cluster) createMasterPodDisruptionBudget() error { + c.logger.Debug("creating master pod disruption budget") + if c.MasterPodDisruptionBudget != nil { + return fmt.Errorf("master pod disruption budget already exists in the cluster") + } + + podDisruptionBudgetSpec := c.generateMasterPodDisruptionBudget() podDisruptionBudget, err := c.KubeClient. PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) if err != nil { - return nil, err + return err + } + c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.MasterPodDisruptionBudget = podDisruptionBudget + + return nil +} + +func (c *Cluster) createPodDisruptionBudgets() error { + errors := make([]string, 0) + + err := c.createMasterPodDisruptionBudget() + if err != nil { + errors = append(errors, fmt.Sprintf("could not create master pod disruption budget: %v", err)) } - c.PodDisruptionBudget = podDisruptionBudget - return podDisruptionBudget, nil + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil } -func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { - if c.PodDisruptionBudget == nil { +func (c *Cluster) updateMasterPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating master pod disruption budget") + if c.MasterPodDisruptionBudget == nil { return fmt.Errorf("there is no pod disruption budget in the cluster") } - if err := c.deletePodDisruptionBudget(); err != nil { + if err := c.deleteMasterPodDisruptionBudget(); err != nil { return fmt.Errorf("could not delete pod disruption budget: %v", err) } @@ -446,30 +467,30 @@ func (c *Cluster) updatePodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) e if err != nil { return fmt.Errorf("could not create pod disruption budget: %v", err) } - c.PodDisruptionBudget = newPdb + c.MasterPodDisruptionBudget = newPdb return nil } -func (c *Cluster) deletePodDisruptionBudget() error { - c.logger.Debug("deleting pod disruption budget") - if c.PodDisruptionBudget == nil { - c.logger.Debug("there is no pod disruption budget in the cluster") +func (c *Cluster) deleteMasterPodDisruptionBudget() error { + c.logger.Debug("deleting master pod disruption budget") + if c.MasterPodDisruptionBudget == nil { + c.logger.Debug("there is no master pod disruption budget in the cluster") return nil } - pdbName := util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta) + pdbName := util.NameFromMeta(c.MasterPodDisruptionBudget.ObjectMeta) err := c.KubeClient. - PodDisruptionBudgets(c.PodDisruptionBudget.Namespace). - Delete(context.TODO(), c.PodDisruptionBudget.Name, c.deleteOptions) + PodDisruptionBudgets(c.MasterPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.MasterPodDisruptionBudget.Name, c.deleteOptions) if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.MasterPodDisruptionBudget.ObjectMeta)) } else if err != nil { - return fmt.Errorf("could not delete PodDisruptionBudget: %v", err) + return fmt.Errorf("could not delete master pod disruption budget: %v", err) } - c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PodDisruptionBudget.ObjectMeta)) - c.PodDisruptionBudget = nil + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.MasterPodDisruptionBudget.ObjectMeta)) + c.MasterPodDisruptionBudget = nil err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { @@ -483,9 +504,22 @@ func (c *Cluster) deletePodDisruptionBudget() error { return false, err2 }) if err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + return fmt.Errorf("could not delete master pod disruption budget: %v", err) + } + + return nil +} + +func (c *Cluster) deletePodDisruptionBudgets() error { + errors := make([]string, 0) + + if err := c.deleteMasterPodDisruptionBudget(); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) } + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } return nil } @@ -705,7 +739,7 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { return c.Statefulset } -// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget -func (c *Cluster) GetPodDisruptionBudget() *policyv1.PodDisruptionBudget { - return c.PodDisruptionBudget +// GetMasterPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for Master Pod +func (c *Cluster) GetMasterPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.MasterPodDisruptionBudget } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index f2248ba95..3440c2935 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -117,8 +117,8 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { } c.logger.Debug("syncing pod disruption budgets") - if err = c.syncPodDisruptionBudget(false); err != nil { - err = fmt.Errorf("could not sync pod disruption budget: %v", err) + if err = c.syncPodDisruptionBudgets(false); err != nil { + err = fmt.Errorf("could not sync pod disruption budgets: %v", err) return err } @@ -452,22 +452,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return nil } -func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { +func (c *Cluster) syncMasterPodDisruptionBudget(isUpdate bool) error { var ( pdb *policyv1.PodDisruptionBudget err error ) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err == nil { - c.PodDisruptionBudget = pdb - newPDB := c.generatePodDisruptionBudget() + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.masterPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.MasterPodDisruptionBudget = pdb + newPDB := c.generateMasterPodDisruptionBudget() match, reason := c.comparePodDisruptionBudget(pdb, newPDB) if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) - if err = c.updatePodDisruptionBudget(newPDB); err != nil { + if err = c.updateMasterPodDisruptionBudget(newPDB); err != nil { return err } } else { - c.PodDisruptionBudget = pdb + c.MasterPodDisruptionBudget = pdb } return nil @@ -476,24 +476,37 @@ func (c *Cluster) syncPodDisruptionBudget(isUpdate bool) error { return fmt.Errorf("could not get pod disruption budget: %v", err) } // no existing pod disruption budget, create new one - c.logger.Infof("could not find the cluster's pod disruption budget") + c.logger.Infof("could not find the master pod disruption budget") - if pdb, err = c.createPodDisruptionBudget(); err != nil { + if err = c.createMasterPodDisruptionBudget(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { - return fmt.Errorf("could not create pod disruption budget: %v", err) + return fmt.Errorf("could not create master pod disruption budget: %v", err) } c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.podDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.masterPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) } } c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.PodDisruptionBudget = pdb + c.MasterPodDisruptionBudget = pdb return nil } +func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error { + errors := make([]string, 0) + + if err := c.syncMasterPodDisruptionBudget(isUpdate); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + + if len(errors) > 0 { + return fmt.Errorf("%v", strings.Join(errors, `', '`)) + } + return nil +} + func (c *Cluster) syncStatefulSet() error { var ( restartWait uint32 diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 8e9263d49..2e64aef96 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -58,15 +58,15 @@ type WorkerStatus struct { // ClusterStatus describes status of the cluster type ClusterStatus struct { - Team string - Cluster string - Namespace string - MasterService *v1.Service - ReplicaService *v1.Service - MasterEndpoint *v1.Endpoints - ReplicaEndpoint *v1.Endpoints - StatefulSet *appsv1.StatefulSet - PodDisruptionBudget *policyv1.PodDisruptionBudget + Team string + Cluster string + Namespace string + MasterService *v1.Service + ReplicaService *v1.Service + MasterEndpoint *v1.Endpoints + ReplicaEndpoint *v1.Endpoints + StatefulSet *appsv1.StatefulSet + MasterPodDisruptionBudget *policyv1.PodDisruptionBudget CurrentProcess Process Worker uint32 diff --git a/pkg/cluster/util_test.go b/pkg/cluster/util_test.go index 12dfaf8e5..b66b22f04 100644 --- a/pkg/cluster/util_test.go +++ b/pkg/cluster/util_test.go @@ -329,7 +329,7 @@ func newInheritedAnnotationsCluster(client k8sutil.KubernetesClient) (*Cluster, if err != nil { return nil, err } - _, err = cluster.createPodDisruptionBudget() + err = cluster.createPodDisruptionBudgets() if err != nil { return nil, err } From 53b82cae14a398f02c3935b00f35c66e5b7e5948 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Mon, 30 Dec 2024 15:09:42 +0100 Subject: [PATCH 2/9] Better naming --- pkg/cluster/cluster.go | 40 ++++++++++++------------ pkg/cluster/k8sres.go | 6 ++-- pkg/cluster/k8sres_test.go | 2 +- pkg/cluster/resources.go | 62 +++++++++++++++++++------------------- pkg/cluster/sync.go | 24 +++++++-------- pkg/cluster/types.go | 18 +++++------ 6 files changed, 76 insertions(+), 76 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2b00e2bf2..2f6e9a5e0 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -59,16 +59,16 @@ type Config struct { } type kubeResources struct { - Services map[PostgresRole]*v1.Service - Endpoints map[PostgresRole]*v1.Endpoints - PatroniEndpoints map[string]*v1.Endpoints - PatroniConfigMaps map[string]*v1.ConfigMap - Secrets map[types.UID]*v1.Secret - Statefulset *appsv1.StatefulSet - VolumeClaims map[types.UID]*v1.PersistentVolumeClaim - MasterPodDisruptionBudget *policyv1.PodDisruptionBudget - LogicalBackupJob *batchv1.CronJob - Streams map[string]*zalandov1.FabricEventStream + Services map[PostgresRole]*v1.Service + Endpoints map[PostgresRole]*v1.Endpoints + PatroniEndpoints map[string]*v1.Endpoints + PatroniConfigMaps map[string]*v1.ConfigMap + Secrets map[types.UID]*v1.Secret + Statefulset *appsv1.StatefulSet + VolumeClaims map[types.UID]*v1.PersistentVolumeClaim + GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget + LogicalBackupJob *batchv1.CronJob + Streams map[string]*zalandov1.FabricEventStream //Pods are treated separately } @@ -1726,16 +1726,16 @@ func (c *Cluster) GetCurrentProcess() Process { // GetStatus provides status of the cluster func (c *Cluster) GetStatus() *ClusterStatus { status := &ClusterStatus{ - Cluster: c.Name, - Namespace: c.Namespace, - Team: c.Spec.TeamID, - Status: c.Status, - Spec: c.Spec, - MasterService: c.GetServiceMaster(), - ReplicaService: c.GetServiceReplica(), - StatefulSet: c.GetStatefulSet(), - MasterPodDisruptionBudget: c.GetMasterPodDisruptionBudget(), - CurrentProcess: c.GetCurrentProcess(), + Cluster: c.Name, + Namespace: c.Namespace, + Team: c.Spec.TeamID, + Status: c.Status, + Spec: c.Spec, + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + StatefulSet: c.GetStatefulSet(), + GeneralPodDisruptionBudget: c.GetGeneralPodDisruptionBudget(), + CurrentProcess: c.GetCurrentProcess(), Error: fmt.Errorf("error: %s", c.Error), } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index d1ea02d78..e91423fe5 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -109,7 +109,7 @@ func (c *Cluster) servicePort(role PostgresRole) int32 { return pgPort } -func (c *Cluster) masterPodDisruptionBudgetName() string { +func (c *Cluster) generalPodDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } @@ -2207,7 +2207,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript return result } -func (c *Cluster) generateMasterPodDisruptionBudget() *policyv1.PodDisruptionBudget { +func (c *Cluster) generateGeneralPodDisruptionBudget() *policyv1.PodDisruptionBudget { minAvailable := intstr.FromInt(1) pdbEnabled := c.OpConfig.EnablePodDisruptionBudget pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector @@ -2225,7 +2225,7 @@ func (c *Cluster) generateMasterPodDisruptionBudget() *policyv1.PodDisruptionBud return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ - Name: c.masterPodDisruptionBudgetName(), + Name: c.generalPodDisruptionBudgetName(), Namespace: c.Namespace, Labels: c.labelsSet(true), Annotations: c.annotationsSet(nil), diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 564ee0a24..b2b4c1845 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2491,7 +2491,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } for _, tt := range tests { - result := tt.spec.generateMasterPodDisruptionBudget() + result := tt.spec.generateGeneralPodDisruptionBudget() for _, check := range tt.check { err := check(tt.spec, result) if err != nil { diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 316071491..83f830296 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -23,8 +23,8 @@ const ( ) func (c *Cluster) listResources() error { - if c.MasterPodDisruptionBudget != nil { - c.logger.Infof("found master pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.MasterPodDisruptionBudget.ObjectMeta), c.MasterPodDisruptionBudget.UID) + if c.GeneralPodDisruptionBudget != nil { + c.logger.Infof("found general pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta), c.GeneralPodDisruptionBudget.UID) } if c.Statefulset != nil { @@ -417,13 +417,13 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset return result } -func (c *Cluster) createMasterPodDisruptionBudget() error { - c.logger.Debug("creating master pod disruption budget") - if c.MasterPodDisruptionBudget != nil { - return fmt.Errorf("master pod disruption budget already exists in the cluster") +func (c *Cluster) createGeneralPodDisruptionBudget() error { + c.logger.Debug("creating general pod disruption budget") + if c.GeneralPodDisruptionBudget != nil { + return fmt.Errorf("general pod disruption budget already exists in the cluster") } - podDisruptionBudgetSpec := c.generateMasterPodDisruptionBudget() + podDisruptionBudgetSpec := c.generateGeneralPodDisruptionBudget() podDisruptionBudget, err := c.KubeClient. PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) @@ -432,7 +432,7 @@ func (c *Cluster) createMasterPodDisruptionBudget() error { return err } c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) - c.MasterPodDisruptionBudget = podDisruptionBudget + c.GeneralPodDisruptionBudget = podDisruptionBudget return nil } @@ -440,9 +440,9 @@ func (c *Cluster) createMasterPodDisruptionBudget() error { func (c *Cluster) createPodDisruptionBudgets() error { errors := make([]string, 0) - err := c.createMasterPodDisruptionBudget() + err := c.createGeneralPodDisruptionBudget() if err != nil { - errors = append(errors, fmt.Sprintf("could not create master pod disruption budget: %v", err)) + errors = append(errors, fmt.Sprintf("could not create general pod disruption budget: %v", err)) } if len(errors) > 0 { @@ -451,13 +451,13 @@ func (c *Cluster) createPodDisruptionBudgets() error { return nil } -func (c *Cluster) updateMasterPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { - c.logger.Debug("updating master pod disruption budget") - if c.MasterPodDisruptionBudget == nil { +func (c *Cluster) updateGeneralPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating general pod disruption budget") + if c.GeneralPodDisruptionBudget == nil { return fmt.Errorf("there is no pod disruption budget in the cluster") } - if err := c.deleteMasterPodDisruptionBudget(); err != nil { + if err := c.deleteGeneralPodDisruptionBudget(); err != nil { return fmt.Errorf("could not delete pod disruption budget: %v", err) } @@ -467,30 +467,30 @@ func (c *Cluster) updateMasterPodDisruptionBudget(pdb *policyv1.PodDisruptionBud if err != nil { return fmt.Errorf("could not create pod disruption budget: %v", err) } - c.MasterPodDisruptionBudget = newPdb + c.GeneralPodDisruptionBudget = newPdb return nil } -func (c *Cluster) deleteMasterPodDisruptionBudget() error { - c.logger.Debug("deleting master pod disruption budget") - if c.MasterPodDisruptionBudget == nil { - c.logger.Debug("there is no master pod disruption budget in the cluster") +func (c *Cluster) deleteGeneralPodDisruptionBudget() error { + c.logger.Debug("deleting general pod disruption budget") + if c.GeneralPodDisruptionBudget == nil { + c.logger.Debug("there is no general pod disruption budget in the cluster") return nil } - pdbName := util.NameFromMeta(c.MasterPodDisruptionBudget.ObjectMeta) + pdbName := util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta) err := c.KubeClient. - PodDisruptionBudgets(c.MasterPodDisruptionBudget.Namespace). - Delete(context.TODO(), c.MasterPodDisruptionBudget.Name, c.deleteOptions) + PodDisruptionBudgets(c.GeneralPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.GeneralPodDisruptionBudget.Name, c.deleteOptions) if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.MasterPodDisruptionBudget.ObjectMeta)) + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta)) } else if err != nil { - return fmt.Errorf("could not delete master pod disruption budget: %v", err) + return fmt.Errorf("could not delete general pod disruption budget: %v", err) } - c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.MasterPodDisruptionBudget.ObjectMeta)) - c.MasterPodDisruptionBudget = nil + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta)) + c.GeneralPodDisruptionBudget = nil err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { @@ -504,7 +504,7 @@ func (c *Cluster) deleteMasterPodDisruptionBudget() error { return false, err2 }) if err != nil { - return fmt.Errorf("could not delete master pod disruption budget: %v", err) + return fmt.Errorf("could not delete general pod disruption budget: %v", err) } return nil @@ -513,7 +513,7 @@ func (c *Cluster) deleteMasterPodDisruptionBudget() error { func (c *Cluster) deletePodDisruptionBudgets() error { errors := make([]string, 0) - if err := c.deleteMasterPodDisruptionBudget(); err != nil { + if err := c.deleteGeneralPodDisruptionBudget(); err != nil { errors = append(errors, fmt.Sprintf("%v", err)) } @@ -739,7 +739,7 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { return c.Statefulset } -// GetMasterPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for Master Pod -func (c *Cluster) GetMasterPodDisruptionBudget() *policyv1.PodDisruptionBudget { - return c.MasterPodDisruptionBudget +// GetPodDisruptionBudget returns cluster's general kubernetes PodDisruptionBudget +func (c *Cluster) GetGeneralPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.GeneralPodDisruptionBudget } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 3440c2935..0603d91a9 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -452,22 +452,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return nil } -func (c *Cluster) syncMasterPodDisruptionBudget(isUpdate bool) error { +func (c *Cluster) syncGeneralPodDisruptionBudget(isUpdate bool) error { var ( pdb *policyv1.PodDisruptionBudget err error ) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.masterPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { - c.MasterPodDisruptionBudget = pdb - newPDB := c.generateMasterPodDisruptionBudget() + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.generalPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.GeneralPodDisruptionBudget = pdb + newPDB := c.generateGeneralPodDisruptionBudget() match, reason := c.comparePodDisruptionBudget(pdb, newPDB) if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) - if err = c.updateMasterPodDisruptionBudget(newPDB); err != nil { + if err = c.updateGeneralPodDisruptionBudget(newPDB); err != nil { return err } } else { - c.MasterPodDisruptionBudget = pdb + c.GeneralPodDisruptionBudget = pdb } return nil @@ -476,20 +476,20 @@ func (c *Cluster) syncMasterPodDisruptionBudget(isUpdate bool) error { return fmt.Errorf("could not get pod disruption budget: %v", err) } // no existing pod disruption budget, create new one - c.logger.Infof("could not find the master pod disruption budget") + c.logger.Infof("could not find the general pod disruption budget") - if err = c.createMasterPodDisruptionBudget(); err != nil { + if err = c.createGeneralPodDisruptionBudget(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { - return fmt.Errorf("could not create master pod disruption budget: %v", err) + return fmt.Errorf("could not create general pod disruption budget: %v", err) } c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.masterPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.generalPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) } } c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.MasterPodDisruptionBudget = pdb + c.GeneralPodDisruptionBudget = pdb return nil } @@ -497,7 +497,7 @@ func (c *Cluster) syncMasterPodDisruptionBudget(isUpdate bool) error { func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error { errors := make([]string, 0) - if err := c.syncMasterPodDisruptionBudget(isUpdate); err != nil { + if err := c.syncGeneralPodDisruptionBudget(isUpdate); err != nil { errors = append(errors, fmt.Sprintf("%v", err)) } diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 2e64aef96..383018e2c 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -58,15 +58,15 @@ type WorkerStatus struct { // ClusterStatus describes status of the cluster type ClusterStatus struct { - Team string - Cluster string - Namespace string - MasterService *v1.Service - ReplicaService *v1.Service - MasterEndpoint *v1.Endpoints - ReplicaEndpoint *v1.Endpoints - StatefulSet *appsv1.StatefulSet - MasterPodDisruptionBudget *policyv1.PodDisruptionBudget + Team string + Cluster string + Namespace string + MasterService *v1.Service + ReplicaService *v1.Service + MasterEndpoint *v1.Endpoints + ReplicaEndpoint *v1.Endpoints + StatefulSet *appsv1.StatefulSet + GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget CurrentProcess Process Worker uint32 From 0d8f679dc2961ecda302c5456ca1847612e8601c Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 31 Dec 2024 13:22:38 +0100 Subject: [PATCH 3/9] Add critical op pdb --- pkg/cluster/cluster.go | 21 ++++----- pkg/cluster/k8sres.go | 37 ++++++++++++++++ pkg/cluster/resources.go | 94 ++++++++++++++++++++++++++++++++++++++-- pkg/cluster/sync.go | 46 ++++++++++++++++++++ 4 files changed, 185 insertions(+), 13 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 2f6e9a5e0..d2442040e 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -59,16 +59,17 @@ type Config struct { } type kubeResources struct { - Services map[PostgresRole]*v1.Service - Endpoints map[PostgresRole]*v1.Endpoints - PatroniEndpoints map[string]*v1.Endpoints - PatroniConfigMaps map[string]*v1.ConfigMap - Secrets map[types.UID]*v1.Secret - Statefulset *appsv1.StatefulSet - VolumeClaims map[types.UID]*v1.PersistentVolumeClaim - GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget - LogicalBackupJob *batchv1.CronJob - Streams map[string]*zalandov1.FabricEventStream + Services map[PostgresRole]*v1.Service + Endpoints map[PostgresRole]*v1.Endpoints + PatroniEndpoints map[string]*v1.Endpoints + PatroniConfigMaps map[string]*v1.ConfigMap + Secrets map[types.UID]*v1.Secret + Statefulset *appsv1.StatefulSet + VolumeClaims map[types.UID]*v1.PersistentVolumeClaim + GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget + CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget + LogicalBackupJob *batchv1.CronJob + Streams map[string]*zalandov1.FabricEventStream //Pods are treated separately } diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index e91423fe5..64b04b542 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -113,6 +113,11 @@ func (c *Cluster) generalPodDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } +func (c *Cluster) criticalOpPodDisruptionBudgetName() string { + pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-operation-pdb") + return pdbTemplate.Format("cluster", c.Name) +} + func makeDefaultResources(config *config.Config) acidv1.Resources { defaultRequests := acidv1.ResourceDescription{ @@ -2240,6 +2245,38 @@ func (c *Cluster) generateGeneralPodDisruptionBudget() *policyv1.PodDisruptionBu } } +func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { + minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances) + pdbEnabled := c.OpConfig.EnablePodDisruptionBudget + pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector + + // if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0. + if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 { + minAvailable = intstr.FromInt(0) + } + + labels := c.labelsSet(false) + if pdbMasterLabelSelector == nil || *c.OpConfig.PDBMasterLabelSelector { + labels["critical-operaton"] = "true" + } + + return &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.criticalOpPodDisruptionBudgetName(), + Namespace: c.Namespace, + Labels: c.labelsSet(true), + Annotations: c.annotationsSet(nil), + OwnerReferences: c.ownerReferences(), + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + }, + } +} + // getClusterServiceConnectionParameters fetches cluster host name and port // TODO: perhaps we need to query the service (i.e. if non-standard port is used?) // TODO: handle clusters in different namespaces diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 83f830296..513beb838 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -437,6 +437,26 @@ func (c *Cluster) createGeneralPodDisruptionBudget() error { return nil } +func (c *Cluster) createCriticalOpPodDisruptionBudget() error { + c.logger.Debug("creating pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget != nil { + return fmt.Errorf("pod disruption budget for critical operations already exists in the cluster") + } + + podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget() + podDisruptionBudget, err := c.KubeClient. + PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). + Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) + + if err != nil { + return err + } + c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = podDisruptionBudget + + return nil +} + func (c *Cluster) createPodDisruptionBudgets() error { errors := make([]string, 0) @@ -445,6 +465,11 @@ func (c *Cluster) createPodDisruptionBudgets() error { errors = append(errors, fmt.Sprintf("could not create general pod disruption budget: %v", err)) } + err = c.createCriticalOpPodDisruptionBudget() + if err != nil { + errors = append(errors, fmt.Sprintf("could not create pod disruption budget for critical operations: %v", err)) + } + if len(errors) > 0 { return fmt.Errorf("%v", strings.Join(errors, `', '`)) } @@ -454,24 +479,45 @@ func (c *Cluster) createPodDisruptionBudgets() error { func (c *Cluster) updateGeneralPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { c.logger.Debug("updating general pod disruption budget") if c.GeneralPodDisruptionBudget == nil { - return fmt.Errorf("there is no pod disruption budget in the cluster") + return fmt.Errorf("there is no general pod disruption budget in the cluster") } if err := c.deleteGeneralPodDisruptionBudget(); err != nil { - return fmt.Errorf("could not delete pod disruption budget: %v", err) + return fmt.Errorf("could not delete general pod disruption budget: %v", err) } newPdb, err := c.KubeClient. PodDisruptionBudgets(pdb.Namespace). Create(context.TODO(), pdb, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("could not create pod disruption budget: %v", err) + return fmt.Errorf("could not create general pod disruption budget: %v", err) } c.GeneralPodDisruptionBudget = newPdb return nil } +func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget == nil { + return fmt.Errorf("there is no pod disruption budget for critical operations in the cluster") + } + + if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) + } + + newPdb, err := c.KubeClient. + PodDisruptionBudgets(pdb.Namespace). + Create(context.TODO(), pdb, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err) + } + c.CriticalOpPodDisruptionBudget = newPdb + + return nil +} + func (c *Cluster) deleteGeneralPodDisruptionBudget() error { c.logger.Debug("deleting general pod disruption budget") if c.GeneralPodDisruptionBudget == nil { @@ -510,6 +556,44 @@ func (c *Cluster) deleteGeneralPodDisruptionBudget() error { return nil } +func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error { + c.logger.Debug("deleting pod disruption budget for critical operations") + if c.CriticalOpPodDisruptionBudget == nil { + c.logger.Debug("there is no pod disruption budget for critical operations in the cluster") + return nil + } + + pdbName := util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta) + err := c.KubeClient. + PodDisruptionBudgets(c.CriticalOpPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.CriticalOpPodDisruptionBudget.Name, c.deleteOptions) + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)) + } else if err != nil { + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) + } + + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = nil + + err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, + func() (bool, error) { + _, err2 := c.KubeClient.PodDisruptionBudgets(pdbName.Namespace).Get(context.TODO(), pdbName.Name, metav1.GetOptions{}) + if err2 == nil { + return false, nil + } + if k8sutil.ResourceNotFound(err2) { + return true, nil + } + return false, err2 + }) + if err != nil { + return fmt.Errorf("could not delete pod disruption budget for critical operations: %v", err) + } + + return nil +} + func (c *Cluster) deletePodDisruptionBudgets() error { errors := make([]string, 0) @@ -517,6 +601,10 @@ func (c *Cluster) deletePodDisruptionBudgets() error { errors = append(errors, fmt.Sprintf("%v", err)) } + if err := c.deleteCriticalOpPodDisruptionBudget(); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + if len(errors) > 0 { return fmt.Errorf("%v", strings.Join(errors, `', '`)) } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 0603d91a9..666b0d2e1 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -494,6 +494,48 @@ func (c *Cluster) syncGeneralPodDisruptionBudget(isUpdate bool) error { return nil } +func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error { + var ( + pdb *policyv1.PodDisruptionBudget + err error + ) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.CriticalOpPodDisruptionBudget = pdb + newPDB := c.generateCriticalOpPodDisruptionBudget() + match, reason := c.comparePodDisruptionBudget(pdb, newPDB) + if !match { + c.logPDBChanges(pdb, newPDB, isUpdate, reason) + if err = c.updateCriticalOpPodDisruptionBudget(newPDB); err != nil { + return err + } + } else { + c.CriticalOpPodDisruptionBudget = pdb + } + return nil + + } + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("could not get pod disruption budget: %v", err) + } + // no existing pod disruption budget, create new one + c.logger.Infof("could not find pod disruption budget for critical operations") + + if err = c.createCriticalOpPodDisruptionBudget(); err != nil { + if !k8sutil.ResourceAlreadyExists(err) { + return fmt.Errorf("could not create pod disruption budget for critical operations: %v", err) + } + c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.criticalOpPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) + } + } + + c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) + c.CriticalOpPodDisruptionBudget = pdb + + return nil +} + func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error { errors := make([]string, 0) @@ -501,6 +543,10 @@ func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error { errors = append(errors, fmt.Sprintf("%v", err)) } + if err := c.syncCriticalOpPodDisruptionBudget(isUpdate); err != nil { + errors = append(errors, fmt.Sprintf("%v", err)) + } + if len(errors) > 0 { return fmt.Errorf("%v", strings.Join(errors, `', '`)) } From e7d862aada9105682a1a8ab75f8f9388237ebec0 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 31 Dec 2024 15:05:12 +0100 Subject: [PATCH 4/9] Extend tests + consequent code fix --- e2e/tests/test_e2e.py | 3 + pkg/cluster/k8sres.go | 5 +- pkg/cluster/k8sres_test.go | 138 +++++++++++++++++++++++++++++++------ 3 files changed, 122 insertions(+), 24 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index b29fdae7f..4633f5105 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2549,6 +2549,9 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") + pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-operation-pdb".format(cluster_name), cluster_namespace) + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") + pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed") standby_secret = k8s.api.core_v1.read_namespaced_secret("standby.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 64b04b542..452eb21bc 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -2248,7 +2248,6 @@ func (c *Cluster) generateGeneralPodDisruptionBudget() *policyv1.PodDisruptionBu func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { minAvailable := intstr.FromInt32(c.Spec.NumberOfInstances) pdbEnabled := c.OpConfig.EnablePodDisruptionBudget - pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector // if PodDisruptionBudget is disabled or if there are no DB pods, set the budget to 0. if (pdbEnabled != nil && !(*pdbEnabled)) || c.Spec.NumberOfInstances <= 0 { @@ -2256,9 +2255,7 @@ func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptio } labels := c.labelsSet(false) - if pdbMasterLabelSelector == nil || *c.OpConfig.PDBMasterLabelSelector { - labels["critical-operaton"] = "true" - } + labels["critical-operaton"] = "true" return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index b2b4c1845..8f2e25d5d 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2349,22 +2349,34 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } } - testLabelsAndSelectors := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { - masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector - if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { - return fmt.Errorf("Object Namespace incorrect.") - } - if !reflect.DeepEqual(podDisruptionBudget.Labels, map[string]string{"team": "myapp", "cluster-name": "myapp-database"}) { - return fmt.Errorf("Labels incorrect.") - } - if !masterLabelSelectorDisabled && - !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, &metav1.LabelSelector{ - MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}}) { + testLabelsAndSelectors := func(isGeneral bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector + if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { + return fmt.Errorf("Object Namespace incorrect.") + } + expectedLabels := map[string]string{"team": "myapp", "cluster-name": "myapp-database"} + if !reflect.DeepEqual(podDisruptionBudget.Labels, expectedLabels) { + return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels) + } + if !masterLabelSelectorDisabled { + if isGeneral { + expectedLabels := &metav1.LabelSelector{ + MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}} + if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { + return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) + } + } else { + expectedLabels := &metav1.LabelSelector{ + MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operaton": "true"}} + if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { + return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) + } + } + } - return fmt.Errorf("MatchLabels incorrect.") + return nil } - - return nil } testPodDisruptionBudgetOwnerReference := func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { @@ -2400,7 +2412,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2417,7 +2429,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(0), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2434,7 +2446,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(0), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2451,7 +2463,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-databass-budget"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2468,7 +2480,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, { @@ -2485,7 +2497,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { testPodDisruptionBudgetOwnerReference, hasName("postgres-myapp-database-pdb"), hasMinAvailable(1), - testLabelsAndSelectors, + testLabelsAndSelectors(true), }, }, } @@ -2500,6 +2512,92 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } } } + + testCriticalOp := []struct { + scenario string + spec *Cluster + check []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error + }{ + { + scenario: "With multiple instances", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-operation-pdb"), + hasMinAvailable(3), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With zero instances", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb"}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 0}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-operation-pdb"), + hasMinAvailable(0), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With PodDisruptionBudget disabled", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role"}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.False()}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-operation-pdb"), + hasMinAvailable(0), + testLabelsAndSelectors(false), + }, + }, + { + scenario: "With OwnerReference enabled", + spec: New( + Config{OpConfig: config.Config{Resources: config.Resources{ClusterNameLabel: "cluster-name", PodRoleLabel: "spilo-role", EnableOwnerReferences: util.True()}, PDBNameFormat: "postgres-{cluster}-pdb", EnablePodDisruptionBudget: util.True()}}, + k8sutil.KubernetesClient{}, + acidv1.Postgresql{ + ObjectMeta: metav1.ObjectMeta{Name: "myapp-database", Namespace: "myapp"}, + Spec: acidv1.PostgresSpec{TeamID: "myapp", NumberOfInstances: 3}}, + logger, + eventRecorder), + check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ + testPodDisruptionBudgetOwnerReference, + hasName("postgres-myapp-database-critical-operation-pdb"), + hasMinAvailable(3), + testLabelsAndSelectors(false), + }, + }, + } + + for _, tt := range testCriticalOp { + result := tt.spec.generateCriticalOpPodDisruptionBudget() + for _, check := range tt.check { + err := check(tt.spec, result) + if err != nil { + t.Errorf("%s [%s]: PodDisruptionBudget spec is incorrect, %+v", + testName, tt.scenario, err) + } + } + } } func TestGenerateService(t *testing.T) { From 6ce1987fdb3a44fa9ef0840edaddfd1802cfb368 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 31 Dec 2024 15:58:40 +0100 Subject: [PATCH 5/9] Add new pdb to ClusterStatus --- pkg/cluster/cluster.go | 21 +++++++++++---------- pkg/cluster/resources.go | 5 +++++ pkg/cluster/types.go | 19 ++++++++++--------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index d2442040e..8db9f5643 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -1727,16 +1727,17 @@ func (c *Cluster) GetCurrentProcess() Process { // GetStatus provides status of the cluster func (c *Cluster) GetStatus() *ClusterStatus { status := &ClusterStatus{ - Cluster: c.Name, - Namespace: c.Namespace, - Team: c.Spec.TeamID, - Status: c.Status, - Spec: c.Spec, - MasterService: c.GetServiceMaster(), - ReplicaService: c.GetServiceReplica(), - StatefulSet: c.GetStatefulSet(), - GeneralPodDisruptionBudget: c.GetGeneralPodDisruptionBudget(), - CurrentProcess: c.GetCurrentProcess(), + Cluster: c.Name, + Namespace: c.Namespace, + Team: c.Spec.TeamID, + Status: c.Status, + Spec: c.Spec, + MasterService: c.GetServiceMaster(), + ReplicaService: c.GetServiceReplica(), + StatefulSet: c.GetStatefulSet(), + GeneralPodDisruptionBudget: c.GetGeneralPodDisruptionBudget(), + CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(), + CurrentProcess: c.GetCurrentProcess(), Error: fmt.Errorf("error: %s", c.Error), } diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 513beb838..94e658d28 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -831,3 +831,8 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { func (c *Cluster) GetGeneralPodDisruptionBudget() *policyv1.PodDisruptionBudget { return c.GeneralPodDisruptionBudget } + +// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations +func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.CriticalOpPodDisruptionBudget +} diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 383018e2c..306f728e6 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -58,15 +58,16 @@ type WorkerStatus struct { // ClusterStatus describes status of the cluster type ClusterStatus struct { - Team string - Cluster string - Namespace string - MasterService *v1.Service - ReplicaService *v1.Service - MasterEndpoint *v1.Endpoints - ReplicaEndpoint *v1.Endpoints - StatefulSet *appsv1.StatefulSet - GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget + Team string + Cluster string + Namespace string + MasterService *v1.Service + ReplicaService *v1.Service + MasterEndpoint *v1.Endpoints + ReplicaEndpoint *v1.Endpoints + StatefulSet *appsv1.StatefulSet + GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget + CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget CurrentProcess Process Worker uint32 From 339784f91cac8f745a579f6079995d5ed0b6b473 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 7 Jan 2025 07:44:25 +0100 Subject: [PATCH 6/9] Address review comment --- e2e/tests/test_e2e.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 4633f5105..26231dd60 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2547,10 +2547,10 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed") pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) - self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget owner reference check failed") pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-operation-pdb".format(cluster_name), cluster_namespace) - self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption owner reference check failed") + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed") pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(pg_secret.metadata.owner_references, inverse), "postgres secret owner reference check failed") From 940ed18be487171b75dd76c78f969bca4567bfee Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 28 Jan 2025 12:35:53 +0100 Subject: [PATCH 7/9] Address review --- docs/administrator.md | 30 +++++++--- docs/quickstart.md | 2 +- docs/reference/operator_parameters.md | 4 +- e2e/tests/test_e2e.py | 4 +- pkg/cluster/cluster.go | 4 +- pkg/cluster/k8sres.go | 10 ++-- pkg/cluster/k8sres_test.go | 16 +++--- pkg/cluster/resources.go | 81 ++++++++++++++------------- pkg/cluster/sync.go | 24 ++++---- pkg/cluster/types.go | 2 +- 10 files changed, 97 insertions(+), 80 deletions(-) diff --git a/docs/administrator.md b/docs/administrator.md index 55abebc8b..d0dd9956c 100644 --- a/docs/administrator.md +++ b/docs/administrator.md @@ -620,22 +620,34 @@ By default the topology key for the pod anti affinity is set to `kubernetes.io/hostname`, you can set another topology key e.g. `failure-domain.beta.kubernetes.io/zone`. See [built-in node labels](https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#interlude-built-in-node-labels) for available topology keys. -## Pod Disruption Budget +## Pod Disruption Budgets -By default the operator uses a PodDisruptionBudget (PDB) to protect the cluster -from voluntarily disruptions and hence unwanted DB downtime. The `MinAvailable` -parameter of the PDB is set to `1` which prevents killing masters in single-node -clusters and/or the last remaining running instance in a multi-node cluster. +By default the operator creates two PodDisruptionBudgets (PDB) to protect the cluster +from voluntarily disruptions and hence unwanted DB downtime: so-called primary PDB and +and PDB for critical operations. + +### Primary PDB +The `MinAvailable` parameter of this PDB is set to `1` and, if `pdb_master_label_selector` +is enabled, label selector includes `spilo-role=master` condition, which prevents killing +masters in single-node clusters and/or the last remaining running instance in a multi-node +cluster. + +## PDB for critical operations +The `MinAvailable` parameter of this PDB is equal to the `numberOfInstances` set in the +cluster manifest, while label selector includes `critical-operation=true` condition. This +allows to protect all pods of a cluster, given they are labeled accordingly. +For example, Operator labels all Spilo pods with `critical-operation=true` during the major +version upgrade run. You may want to protect cluster pods during other critical operations +by assigning the label to pods yourself or using other means of automation. The PDB is only relaxed in two scenarios: * If a cluster is scaled down to `0` instances (e.g. for draining nodes) * If the PDB is disabled in the configuration (`enable_pod_disruption_budget`) -The PDB is still in place having `MinAvailable` set to `0`. If enabled it will -be automatically set to `1` on scale up. Disabling PDBs helps avoiding blocking -Kubernetes upgrades in managed K8s environments at the cost of prolonged DB -downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384) +The PDBs are still in place having `MinAvailable` set to `0`. Disabling PDBs +helps avoiding blocking Kubernetes upgrades in managed K8s environments at the +cost of prolonged DB downtime. See PR [#384](https://github.com/zalando/postgres-operator/pull/384) for the use case. ## Add cluster-specific labels diff --git a/docs/quickstart.md b/docs/quickstart.md index f080bd567..2d6742354 100644 --- a/docs/quickstart.md +++ b/docs/quickstart.md @@ -230,7 +230,7 @@ kubectl delete postgresql acid-minimal-cluster ``` This should remove the associated StatefulSet, database Pods, Services and -Endpoints. The PersistentVolumes are released and the PodDisruptionBudget is +Endpoints. The PersistentVolumes are released and the PodDisruptionBudgets are deleted. Secrets however are not deleted and backups will remain in place. When deleting a cluster while it is still starting up or got stuck during that diff --git a/docs/reference/operator_parameters.md b/docs/reference/operator_parameters.md index 3bd9e44f7..7a9cdc709 100644 --- a/docs/reference/operator_parameters.md +++ b/docs/reference/operator_parameters.md @@ -334,13 +334,13 @@ configuration they are grouped under the `kubernetes` key. pod namespace). * **pdb_name_format** - defines the template for PDB (Pod Disruption Budget) names created by the + defines the template for primary PDB (Pod Disruption Budget) name created by the operator. The default is `postgres-{cluster}-pdb`, where `{cluster}` is replaced by the cluster name. Only the `{cluster}` placeholders is allowed in the template. * **pdb_master_label_selector** - By default the PDB will match the master role hence preventing nodes to be + By default the primary PDB will match the master role hence preventing nodes to be drained if the node_readiness_label is not used. If this option if set to `false` the `spilo-role=master` selector will not be added to the PDB. diff --git a/e2e/tests/test_e2e.py b/e2e/tests/test_e2e.py index 26231dd60..febf4a374 100644 --- a/e2e/tests/test_e2e.py +++ b/e2e/tests/test_e2e.py @@ -2547,9 +2547,9 @@ def check_cluster_child_resources_owner_references(self, cluster_name, cluster_n self.assertTrue(self.has_postgresql_owner_reference(config_ep.metadata.owner_references, inverse), "config endpoint owner reference check failed") pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-pdb".format(cluster_name), cluster_namespace) - self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget owner reference check failed") + self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "primary pod disruption budget owner reference check failed") - pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-operation-pdb".format(cluster_name), cluster_namespace) + pdb = k8s.api.policy_v1.read_namespaced_pod_disruption_budget("postgres-{}-critical-op-pdb".format(cluster_name), cluster_namespace) self.assertTrue(self.has_postgresql_owner_reference(pdb.metadata.owner_references, inverse), "pod disruption budget for critical operations owner reference check failed") pg_secret = k8s.api.core_v1.read_namespaced_secret("postgres.{}.credentials.postgresql.acid.zalan.do".format(cluster_name), cluster_namespace) diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 8db9f5643..e2b53a7ce 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -66,7 +66,7 @@ type kubeResources struct { Secrets map[types.UID]*v1.Secret Statefulset *appsv1.StatefulSet VolumeClaims map[types.UID]*v1.PersistentVolumeClaim - GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget + PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget LogicalBackupJob *batchv1.CronJob Streams map[string]*zalandov1.FabricEventStream @@ -1735,7 +1735,7 @@ func (c *Cluster) GetStatus() *ClusterStatus { MasterService: c.GetServiceMaster(), ReplicaService: c.GetServiceReplica(), StatefulSet: c.GetStatefulSet(), - GeneralPodDisruptionBudget: c.GetGeneralPodDisruptionBudget(), + PrimaryPodDisruptionBudget: c.GetPrimaryPodDisruptionBudget(), CriticalOpPodDisruptionBudget: c.GetCriticalOpPodDisruptionBudget(), CurrentProcess: c.GetCurrentProcess(), diff --git a/pkg/cluster/k8sres.go b/pkg/cluster/k8sres.go index 452eb21bc..c5a58ed5a 100644 --- a/pkg/cluster/k8sres.go +++ b/pkg/cluster/k8sres.go @@ -109,12 +109,12 @@ func (c *Cluster) servicePort(role PostgresRole) int32 { return pgPort } -func (c *Cluster) generalPodDisruptionBudgetName() string { +func (c *Cluster) PrimaryPodDisruptionBudgetName() string { return c.OpConfig.PDBNameFormat.Format("cluster", c.Name) } func (c *Cluster) criticalOpPodDisruptionBudgetName() string { - pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-operation-pdb") + pdbTemplate := config.StringTemplate("postgres-{cluster}-critical-op-pdb") return pdbTemplate.Format("cluster", c.Name) } @@ -2212,7 +2212,7 @@ func (c *Cluster) generateStandbyEnvironment(description *acidv1.StandbyDescript return result } -func (c *Cluster) generateGeneralPodDisruptionBudget() *policyv1.PodDisruptionBudget { +func (c *Cluster) generatePrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget { minAvailable := intstr.FromInt(1) pdbEnabled := c.OpConfig.EnablePodDisruptionBudget pdbMasterLabelSelector := c.OpConfig.PDBMasterLabelSelector @@ -2230,7 +2230,7 @@ func (c *Cluster) generateGeneralPodDisruptionBudget() *policyv1.PodDisruptionBu return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ - Name: c.generalPodDisruptionBudgetName(), + Name: c.PrimaryPodDisruptionBudgetName(), Namespace: c.Namespace, Labels: c.labelsSet(true), Annotations: c.annotationsSet(nil), @@ -2255,7 +2255,7 @@ func (c *Cluster) generateCriticalOpPodDisruptionBudget() *policyv1.PodDisruptio } labels := c.labelsSet(false) - labels["critical-operaton"] = "true" + labels["critical-operation"] = "true" return &policyv1.PodDisruptionBudget{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/cluster/k8sres_test.go b/pkg/cluster/k8sres_test.go index 8f2e25d5d..137c24081 100644 --- a/pkg/cluster/k8sres_test.go +++ b/pkg/cluster/k8sres_test.go @@ -2349,7 +2349,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } } - testLabelsAndSelectors := func(isGeneral bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { + testLabelsAndSelectors := func(isPrimary bool) func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { return func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error { masterLabelSelectorDisabled := cluster.OpConfig.PDBMasterLabelSelector != nil && !*cluster.OpConfig.PDBMasterLabelSelector if podDisruptionBudget.ObjectMeta.Namespace != "myapp" { @@ -2360,7 +2360,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { return fmt.Errorf("Labels incorrect, got %#v, expected %#v", podDisruptionBudget.Labels, expectedLabels) } if !masterLabelSelectorDisabled { - if isGeneral { + if isPrimary { expectedLabels := &metav1.LabelSelector{ MatchLabels: map[string]string{"spilo-role": "master", "cluster-name": "myapp-database"}} if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { @@ -2368,7 +2368,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } } else { expectedLabels := &metav1.LabelSelector{ - MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operaton": "true"}} + MatchLabels: map[string]string{"cluster-name": "myapp-database", "critical-operation": "true"}} if !reflect.DeepEqual(podDisruptionBudget.Spec.Selector, expectedLabels) { return fmt.Errorf("MatchLabels incorrect, got %#v, expected %#v", podDisruptionBudget.Spec.Selector, expectedLabels) } @@ -2503,7 +2503,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { } for _, tt := range tests { - result := tt.spec.generateGeneralPodDisruptionBudget() + result := tt.spec.generatePrimaryPodDisruptionBudget() for _, check := range tt.check { err := check(tt.spec, result) if err != nil { @@ -2530,7 +2530,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { eventRecorder), check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ testPodDisruptionBudgetOwnerReference, - hasName("postgres-myapp-database-critical-operation-pdb"), + hasName("postgres-myapp-database-critical-op-pdb"), hasMinAvailable(3), testLabelsAndSelectors(false), }, @@ -2547,7 +2547,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { eventRecorder), check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ testPodDisruptionBudgetOwnerReference, - hasName("postgres-myapp-database-critical-operation-pdb"), + hasName("postgres-myapp-database-critical-op-pdb"), hasMinAvailable(0), testLabelsAndSelectors(false), }, @@ -2564,7 +2564,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { eventRecorder), check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ testPodDisruptionBudgetOwnerReference, - hasName("postgres-myapp-database-critical-operation-pdb"), + hasName("postgres-myapp-database-critical-op-pdb"), hasMinAvailable(0), testLabelsAndSelectors(false), }, @@ -2581,7 +2581,7 @@ func TestGeneratePodDisruptionBudget(t *testing.T) { eventRecorder), check: []func(cluster *Cluster, podDisruptionBudget *policyv1.PodDisruptionBudget) error{ testPodDisruptionBudgetOwnerReference, - hasName("postgres-myapp-database-critical-operation-pdb"), + hasName("postgres-myapp-database-critical-op-pdb"), hasMinAvailable(3), testLabelsAndSelectors(false), }, diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index 94e658d28..db3acacc6 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -23,8 +23,13 @@ const ( ) func (c *Cluster) listResources() error { - if c.GeneralPodDisruptionBudget != nil { - c.logger.Infof("found general pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta), c.GeneralPodDisruptionBudget.UID) + if c.PrimaryPodDisruptionBudget != nil { + c.logger.Infof("found primary pod disruption budget: %q (uid: %q)", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta), c.PrimaryPodDisruptionBudget.UID) + } + + if c.CriticalOpPodDisruptionBudget != nil { + c.logger.Infof("found pod disruption budget for critical operations: %q (uid: %q)", util.NameFromMeta(c.CriticalOpPodDisruptionBudget.ObjectMeta), c.CriticalOpPodDisruptionBudget.UID) + } if c.Statefulset != nil { @@ -417,13 +422,13 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset return result } -func (c *Cluster) createGeneralPodDisruptionBudget() error { - c.logger.Debug("creating general pod disruption budget") - if c.GeneralPodDisruptionBudget != nil { - return fmt.Errorf("general pod disruption budget already exists in the cluster") +func (c *Cluster) createPrimaryPodDisruptionBudget() error { + c.logger.Debug("creating primary pod disruption budget") + if c.PrimaryPodDisruptionBudget != nil { + return fmt.Errorf("primary pod disruption budget already exists in the cluster") } - podDisruptionBudgetSpec := c.generateGeneralPodDisruptionBudget() + podDisruptionBudgetSpec := c.generatePrimaryPodDisruptionBudget() podDisruptionBudget, err := c.KubeClient. PodDisruptionBudgets(podDisruptionBudgetSpec.Namespace). Create(context.TODO(), podDisruptionBudgetSpec, metav1.CreateOptions{}) @@ -431,8 +436,8 @@ func (c *Cluster) createGeneralPodDisruptionBudget() error { if err != nil { return err } - c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) - c.GeneralPodDisruptionBudget = podDisruptionBudget + c.logger.Infof("primary pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.PrimaryPodDisruptionBudget = podDisruptionBudget return nil } @@ -440,7 +445,7 @@ func (c *Cluster) createGeneralPodDisruptionBudget() error { func (c *Cluster) createCriticalOpPodDisruptionBudget() error { c.logger.Debug("creating pod disruption budget for critical operations") if c.CriticalOpPodDisruptionBudget != nil { - return fmt.Errorf("pod disruption budget for critical operations already exists in the cluster") + return fmt.Errorf("pod disruption budget for critical operations already exists in the cluster") } podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget() @@ -451,7 +456,7 @@ func (c *Cluster) createCriticalOpPodDisruptionBudget() error { if err != nil { return err } - c.logger.Infof("pod disruption budget %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) + c.logger.Infof("pod disruption budget for critical operations %q has been successfully created", util.NameFromMeta(podDisruptionBudget.ObjectMeta)) c.CriticalOpPodDisruptionBudget = podDisruptionBudget return nil @@ -460,9 +465,9 @@ func (c *Cluster) createCriticalOpPodDisruptionBudget() error { func (c *Cluster) createPodDisruptionBudgets() error { errors := make([]string, 0) - err := c.createGeneralPodDisruptionBudget() + err := c.createPrimaryPodDisruptionBudget() if err != nil { - errors = append(errors, fmt.Sprintf("could not create general pod disruption budget: %v", err)) + errors = append(errors, fmt.Sprintf("could not create primary pod disruption budget: %v", err)) } err = c.createCriticalOpPodDisruptionBudget() @@ -476,23 +481,23 @@ func (c *Cluster) createPodDisruptionBudgets() error { return nil } -func (c *Cluster) updateGeneralPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { - c.logger.Debug("updating general pod disruption budget") - if c.GeneralPodDisruptionBudget == nil { - return fmt.Errorf("there is no general pod disruption budget in the cluster") +func (c *Cluster) updatePrimaryPodDisruptionBudget(pdb *policyv1.PodDisruptionBudget) error { + c.logger.Debug("updating primary pod disruption budget") + if c.PrimaryPodDisruptionBudget == nil { + return fmt.Errorf("there is no primary pod disruption budget in the cluster") } - if err := c.deleteGeneralPodDisruptionBudget(); err != nil { - return fmt.Errorf("could not delete general pod disruption budget: %v", err) + if err := c.deletePrimaryPodDisruptionBudget(); err != nil { + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) } newPdb, err := c.KubeClient. PodDisruptionBudgets(pdb.Namespace). Create(context.TODO(), pdb, metav1.CreateOptions{}) if err != nil { - return fmt.Errorf("could not create general pod disruption budget: %v", err) + return fmt.Errorf("could not create primary pod disruption budget: %v", err) } - c.GeneralPodDisruptionBudget = newPdb + c.PrimaryPodDisruptionBudget = newPdb return nil } @@ -518,25 +523,25 @@ func (c *Cluster) updateCriticalOpPodDisruptionBudget(pdb *policyv1.PodDisruptio return nil } -func (c *Cluster) deleteGeneralPodDisruptionBudget() error { - c.logger.Debug("deleting general pod disruption budget") - if c.GeneralPodDisruptionBudget == nil { - c.logger.Debug("there is no general pod disruption budget in the cluster") +func (c *Cluster) deletePrimaryPodDisruptionBudget() error { + c.logger.Debug("deleting primary pod disruption budget") + if c.PrimaryPodDisruptionBudget == nil { + c.logger.Debug("there is no primary pod disruption budget in the cluster") return nil } - pdbName := util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta) + pdbName := util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta) err := c.KubeClient. - PodDisruptionBudgets(c.GeneralPodDisruptionBudget.Namespace). - Delete(context.TODO(), c.GeneralPodDisruptionBudget.Name, c.deleteOptions) + PodDisruptionBudgets(c.PrimaryPodDisruptionBudget.Namespace). + Delete(context.TODO(), c.PrimaryPodDisruptionBudget.Name, c.deleteOptions) if k8sutil.ResourceNotFound(err) { - c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta)) + c.logger.Debugf("PodDisruptionBudget %q has already been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)) } else if err != nil { - return fmt.Errorf("could not delete general pod disruption budget: %v", err) + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) } - c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.GeneralPodDisruptionBudget.ObjectMeta)) - c.GeneralPodDisruptionBudget = nil + c.logger.Infof("pod disruption budget %q has been deleted", util.NameFromMeta(c.PrimaryPodDisruptionBudget.ObjectMeta)) + c.PrimaryPodDisruptionBudget = nil err = retryutil.Retry(c.OpConfig.ResourceCheckInterval, c.OpConfig.ResourceCheckTimeout, func() (bool, error) { @@ -550,7 +555,7 @@ func (c *Cluster) deleteGeneralPodDisruptionBudget() error { return false, err2 }) if err != nil { - return fmt.Errorf("could not delete general pod disruption budget: %v", err) + return fmt.Errorf("could not delete primary pod disruption budget: %v", err) } return nil @@ -597,7 +602,7 @@ func (c *Cluster) deleteCriticalOpPodDisruptionBudget() error { func (c *Cluster) deletePodDisruptionBudgets() error { errors := make([]string, 0) - if err := c.deleteGeneralPodDisruptionBudget(); err != nil { + if err := c.deletePrimaryPodDisruptionBudget(); err != nil { errors = append(errors, fmt.Sprintf("%v", err)) } @@ -827,12 +832,12 @@ func (c *Cluster) GetStatefulSet() *appsv1.StatefulSet { return c.Statefulset } -// GetPodDisruptionBudget returns cluster's general kubernetes PodDisruptionBudget -func (c *Cluster) GetGeneralPodDisruptionBudget() *policyv1.PodDisruptionBudget { - return c.GeneralPodDisruptionBudget +// GetPrimaryPodDisruptionBudget returns cluster's primary kubernetes PodDisruptionBudget +func (c *Cluster) GetPrimaryPodDisruptionBudget() *policyv1.PodDisruptionBudget { + return c.PrimaryPodDisruptionBudget } -// GetPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations +// GetCriticalOpPodDisruptionBudget returns cluster's kubernetes PodDisruptionBudget for critical operations func (c *Cluster) GetCriticalOpPodDisruptionBudget() *policyv1.PodDisruptionBudget { return c.CriticalOpPodDisruptionBudget } diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 666b0d2e1..709c2fc59 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -452,22 +452,22 @@ func (c *Cluster) syncEndpoint(role PostgresRole) error { return nil } -func (c *Cluster) syncGeneralPodDisruptionBudget(isUpdate bool) error { +func (c *Cluster) syncPrimaryPodDisruptionBudget(isUpdate bool) error { var ( pdb *policyv1.PodDisruptionBudget err error ) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.generalPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { - c.GeneralPodDisruptionBudget = pdb - newPDB := c.generateGeneralPodDisruptionBudget() + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err == nil { + c.PrimaryPodDisruptionBudget = pdb + newPDB := c.generatePrimaryPodDisruptionBudget() match, reason := c.comparePodDisruptionBudget(pdb, newPDB) if !match { c.logPDBChanges(pdb, newPDB, isUpdate, reason) - if err = c.updateGeneralPodDisruptionBudget(newPDB); err != nil { + if err = c.updatePrimaryPodDisruptionBudget(newPDB); err != nil { return err } } else { - c.GeneralPodDisruptionBudget = pdb + c.PrimaryPodDisruptionBudget = pdb } return nil @@ -476,20 +476,20 @@ func (c *Cluster) syncGeneralPodDisruptionBudget(isUpdate bool) error { return fmt.Errorf("could not get pod disruption budget: %v", err) } // no existing pod disruption budget, create new one - c.logger.Infof("could not find the general pod disruption budget") + c.logger.Infof("could not find the primary pod disruption budget") - if err = c.createGeneralPodDisruptionBudget(); err != nil { + if err = c.createPrimaryPodDisruptionBudget(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { - return fmt.Errorf("could not create general pod disruption budget: %v", err) + return fmt.Errorf("could not create primary pod disruption budget: %v", err) } c.logger.Infof("pod disruption budget %q already exists", util.NameFromMeta(pdb.ObjectMeta)) - if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.generalPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { + if pdb, err = c.KubeClient.PodDisruptionBudgets(c.Namespace).Get(context.TODO(), c.PrimaryPodDisruptionBudgetName(), metav1.GetOptions{}); err != nil { return fmt.Errorf("could not fetch existing %q pod disruption budget", util.NameFromMeta(pdb.ObjectMeta)) } } c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.GeneralPodDisruptionBudget = pdb + c.PrimaryPodDisruptionBudget = pdb return nil } @@ -539,7 +539,7 @@ func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error { func (c *Cluster) syncPodDisruptionBudgets(isUpdate bool) error { errors := make([]string, 0) - if err := c.syncGeneralPodDisruptionBudget(isUpdate); err != nil { + if err := c.syncPrimaryPodDisruptionBudget(isUpdate); err != nil { errors = append(errors, fmt.Sprintf("%v", err)) } diff --git a/pkg/cluster/types.go b/pkg/cluster/types.go index 306f728e6..17c4e705e 100644 --- a/pkg/cluster/types.go +++ b/pkg/cluster/types.go @@ -66,7 +66,7 @@ type ClusterStatus struct { MasterEndpoint *v1.Endpoints ReplicaEndpoint *v1.Endpoints StatefulSet *appsv1.StatefulSet - GeneralPodDisruptionBudget *policyv1.PodDisruptionBudget + PrimaryPodDisruptionBudget *policyv1.PodDisruptionBudget CriticalOpPodDisruptionBudget *policyv1.PodDisruptionBudget CurrentProcess Process From a87752469a86da4c99d41e62f62d8d8e2ed0b983 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 28 Jan 2025 13:16:20 +0100 Subject: [PATCH 8/9] Fix sigsegv --- pkg/cluster/sync.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index 709c2fc59..06f98e42f 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -488,9 +488,6 @@ func (c *Cluster) syncPrimaryPodDisruptionBudget(isUpdate bool) error { } } - c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.PrimaryPodDisruptionBudget = pdb - return nil } @@ -530,9 +527,6 @@ func (c *Cluster) syncCriticalOpPodDisruptionBudget(isUpdate bool) error { } } - c.logger.Infof("created missing pod disruption budget %q", util.NameFromMeta(pdb.ObjectMeta)) - c.CriticalOpPodDisruptionBudget = pdb - return nil } From f3246c33ff8ec5445665c7b2209ee532918d0b88 Mon Sep 17 00:00:00 2001 From: Polina Bungina Date: Tue, 28 Jan 2025 15:47:44 +0100 Subject: [PATCH 9/9] Address review feedbacl --- pkg/cluster/resources.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/cluster/resources.go b/pkg/cluster/resources.go index db3acacc6..2c87efe47 100644 --- a/pkg/cluster/resources.go +++ b/pkg/cluster/resources.go @@ -425,7 +425,8 @@ func (c *Cluster) generateEndpointSubsets(role PostgresRole) []v1.EndpointSubset func (c *Cluster) createPrimaryPodDisruptionBudget() error { c.logger.Debug("creating primary pod disruption budget") if c.PrimaryPodDisruptionBudget != nil { - return fmt.Errorf("primary pod disruption budget already exists in the cluster") + c.logger.Warning("primary pod disruption budget already exists in the cluster") + return nil } podDisruptionBudgetSpec := c.generatePrimaryPodDisruptionBudget() @@ -445,7 +446,8 @@ func (c *Cluster) createPrimaryPodDisruptionBudget() error { func (c *Cluster) createCriticalOpPodDisruptionBudget() error { c.logger.Debug("creating pod disruption budget for critical operations") if c.CriticalOpPodDisruptionBudget != nil { - return fmt.Errorf("pod disruption budget for critical operations already exists in the cluster") + c.logger.Warning("pod disruption budget for critical operations already exists in the cluster") + return nil } podDisruptionBudgetSpec := c.generateCriticalOpPodDisruptionBudget()