Skip to content

switch to batch API v1 for (Cron)Jobs #2019

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions e2e/tests/k8s_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(self):

self.core_v1 = client.CoreV1Api()
self.apps_v1 = client.AppsV1Api()
self.batch_v1_beta1 = client.BatchV1beta1Api()
self.batch_v1 = client.BatchV1Api()
self.custom_objects_api = client.CustomObjectsApi()
self.policy_v1 = client.PolicyV1Api()
self.storage_v1_api = client.StorageV1Api()
Expand Down Expand Up @@ -217,7 +217,7 @@ def wait_for_namespace_creation(self, namespace='default'):
time.sleep(self.RETRY_TIMEOUT_SEC)

def get_logical_backup_job(self, namespace='default'):
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
return self.api.batch_v1.list_namespaced_cron_job(namespace, label_selector="application=spilo")

def wait_for_logical_backup_job(self, expected_num_of_jobs):
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
Expand Down Expand Up @@ -499,7 +499,7 @@ def wait_for_pod_failover(self, failover_targets, labels, namespace='default'):
time.sleep(self.RETRY_TIMEOUT_SEC)

def get_logical_backup_job(self, namespace='default'):
return self.api.batch_v1_beta1.list_namespaced_cron_job(namespace, label_selector="application=spilo")
return self.api.batch_v1.list_namespaced_cron_job(namespace, label_selector="application=spilo")

def wait_for_logical_backup_job(self, expected_num_of_jobs):
while (len(self.get_logical_backup_job().items) != expected_num_of_jobs):
Expand Down
11 changes: 5 additions & 6 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/zalando/postgres-operator/pkg/util/patroni"
"github.com/zalando/postgres-operator/pkg/util/retryutil"
batchv1 "k8s.io/api/batch/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
"k8s.io/apimachinery/pkg/labels"
)

Expand Down Expand Up @@ -2019,7 +2018,7 @@ func (c *Cluster) getClusterServiceConnectionParameters(clusterName string) (hos
return
}

func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
func (c *Cluster) generateLogicalBackupJob() (*batchv1.CronJob, error) {

var (
err error
Expand Down Expand Up @@ -2110,7 +2109,7 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {

// configure a cron job

jobTemplateSpec := batchv1beta1.JobTemplateSpec{
jobTemplateSpec := batchv1.JobTemplateSpec{
Spec: jobSpec,
}

Expand All @@ -2119,17 +2118,17 @@ func (c *Cluster) generateLogicalBackupJob() (*batchv1beta1.CronJob, error) {
schedule = c.OpConfig.LogicalBackupSchedule
}

cronJob := &batchv1beta1.CronJob{
cronJob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: c.getLogicalBackupJobName(),
Namespace: c.Namespace,
Labels: c.labelsSet(true),
Annotations: c.annotationsSet(nil),
},
Spec: batchv1beta1.CronJobSpec{
Spec: batchv1.CronJobSpec{
Schedule: schedule,
JobTemplate: jobTemplateSpec,
ConcurrencyPolicy: batchv1beta1.ForbidConcurrent,
ConcurrencyPolicy: batchv1.ForbidConcurrent,
},
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/cluster/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"strings"

appsv1 "k8s.io/api/apps/v1"
batchv1beta1 "k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -546,7 +546,7 @@ func (c *Cluster) createLogicalBackupJob() (err error) {
return nil
}

func (c *Cluster) patchLogicalBackupJob(newJob *batchv1beta1.CronJob) error {
func (c *Cluster) patchLogicalBackupJob(newJob *batchv1.CronJob) error {
c.setProcessName("patching logical backup job")

patchData, err := specPatch(newJob.Spec)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/zalando/postgres-operator/pkg/util"
"github.com/zalando/postgres-operator/pkg/util/constants"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
batchv1beta1 "k8s.io/api/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -1128,8 +1128,8 @@ func (c *Cluster) syncExtensions(extensions map[string]string) error {

func (c *Cluster) syncLogicalBackupJob() error {
var (
job *batchv1beta1.CronJob
desiredJob *batchv1beta1.CronJob
job *batchv1.CronJob
desiredJob *batchv1.CronJob
err error
)
c.setProcessName("syncing the logical backup job")
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/k8sutil/k8sutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
b64 "encoding/base64"
"encoding/json"

batchv1beta1 "k8s.io/api/batch/v1beta1"
clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1"
batchv1 "k8s.io/api/batch/v1"
clientbatchv1 "k8s.io/client-go/kubernetes/typed/batch/v1"

apiacidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
zalandoclient "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned"
Expand Down Expand Up @@ -63,7 +63,7 @@ type KubernetesClient struct {
rbacv1.RoleBindingsGetter
policyv1.PodDisruptionBudgetsGetter
apiextv1.CustomResourceDefinitionsGetter
clientbatchv1beta1.CronJobsGetter
clientbatchv1.CronJobsGetter
acidv1.OperatorConfigurationsGetter
acidv1.PostgresTeamsGetter
acidv1.PostgresqlsGetter
Expand Down Expand Up @@ -159,7 +159,7 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) {
kubeClient.PodDisruptionBudgetsGetter = client.PolicyV1()
kubeClient.RESTClient = client.CoreV1().RESTClient()
kubeClient.RoleBindingsGetter = client.RbacV1()
kubeClient.CronJobsGetter = client.BatchV1beta1()
kubeClient.CronJobsGetter = client.BatchV1()
kubeClient.EventsGetter = client.CoreV1()

apiextClient, err := apiextclient.NewForConfig(cfg)
Expand Down Expand Up @@ -224,12 +224,12 @@ func SamePDB(cur, new *apipolicyv1.PodDisruptionBudget) (match bool, reason stri
return
}

func getJobImage(cronJob *batchv1beta1.CronJob) string {
func getJobImage(cronJob *batchv1.CronJob) string {
return cronJob.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Image
}

// SameLogicalBackupJob compares Specs of logical backup cron jobs
func SameLogicalBackupJob(cur, new *batchv1beta1.CronJob) (match bool, reason string) {
func SameLogicalBackupJob(cur, new *batchv1.CronJob) (match bool, reason string) {

if cur.Spec.Schedule != new.Spec.Schedule {
return false, fmt.Sprintf("new job's schedule %q does not match the current one %q",
Expand Down