diff --git a/charts/postgres-operator/crds/postgresqls.yaml b/charts/postgres-operator/crds/postgresqls.yaml index bcd212afd..a7072b8cc 100644 --- a/charts/postgres-operator/crds/postgresqls.yaml +++ b/charts/postgres-operator/crds/postgresqls.yaml @@ -474,6 +474,39 @@ spec: type: string gs_wal_path: type: string + streams: + type: array + nullable: true + items: + type: object + required: + - applicationId + - database + - tables + properties: + applicationId: + type: string + batchSize: + type: integer + database: + type: string + filter: + type: object + additionalProperties: + type: string + tables: + type: object + additionalProperties: + type: object + required: + - eventType + properties: + eventType: + type: string + idColumn: + type: string + payloadColumn: + type: string teamId: type: string tls: diff --git a/charts/postgres-operator/templates/clusterrole.yaml b/charts/postgres-operator/templates/clusterrole.yaml index 87fd38cd2..8b2e9136e 100644 --- a/charts/postgres-operator/templates/clusterrole.yaml +++ b/charts/postgres-operator/templates/clusterrole.yaml @@ -34,6 +34,22 @@ rules: - get - list - watch +# all verbs allowed for event streams +{{- if .Values.enableStreams }} +- apiGroups: + - zalando.org + resources: + - fabriceventstreams + verbs: + - create + - delete + - deletecollection + - get + - list + - patch + - update + - watch +{{- end }} # to create or get/update CRDs when starting up - apiGroups: - apiextensions.k8s.io diff --git a/charts/postgres-operator/values.yaml b/charts/postgres-operator/values.yaml index 8183894d3..288efe763 100644 --- a/charts/postgres-operator/values.yaml +++ b/charts/postgres-operator/values.yaml @@ -368,6 +368,9 @@ configConnectionPooler: connection_pooler_default_cpu_limit: "1" connection_pooler_default_memory_limit: 100Mi +# Zalando's internal CDC stream feature +enableStreams: false + rbac: # Specifies whether RBAC resources should be created create: true diff --git a/docs/reference/cluster_manifest.md b/docs/reference/cluster_manifest.md index ae72d6450..cff86333e 100644 --- a/docs/reference/cluster_manifest.md +++ b/docs/reference/cluster_manifest.md @@ -536,3 +536,54 @@ Those parameters are grouped under the `tls` top-level key. relative to the "/tls/", which is mount path of the tls secret. If `caSecretName` is defined, the ca.crt path is relative to "/tlsca/", otherwise to the same "/tls/". + +## Change data capture streams + +This sections enables change data capture (CDC) streams via Postgres' +[logical decoding](https://www.postgresql.org/docs/14/logicaldecoding.html) +feature and `pgoutput` plugin. While the Postgres operator takes responsibility +for providing the setup to publish change events, it relies on external tools +to consume them. At Zalando, we are using a workflow based on +[Debezium Connector](https://debezium.io/documentation/reference/stable/connectors/postgresql.html) +which can feed streams into Zalando’s distributed event broker [Nakadi](https://nakadi.io/) +among others. + +The Postgres Operator creates custom resources for Zalando's internal CDC +operator which will be used to set up the consumer part. Each stream object +can have the following properties: + +* **applicationId** + The application name to which the database and CDC belongs to. For each + set of streams with a distinct `applicationId` a separate stream CR as well + as a separate logical replication slot will be created. This means there can + be different streams in the same database and streams with the same + `applicationId` are bundled in one stream CR. The stream CR will be called + like the Postgres cluster plus "-" suffix. Required. + +* **database** + Name of the database from where events will be published via Postgres' + logical decoding feature. The operator will take care of updating the + database configuration (setting `wal_level: logical`, creating logical + replication slots, using output plugin `pgoutput` and creating a dedicated + replication user). Required. + +* **tables** + Defines a map of table names and their properties (`eventType`, `idColumn` + and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/). + The application is responsible for putting events into a (JSON/B or VARCHAR) + payload column of the outbox table in the structure of the specified target + event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/14/logical-replication-publication.html) + in Postgres for all tables specified for one `database` and `applicationId`. + The CDC operator will consume from it shortly after transactions are + committed to the outbox table. The `idColumn` will be used in telemetry for + the CDC operator. The names for `idColumn` and `payloadColumn` can be + configured. Defaults are `id` and `payload`. The target `eventType` has to + be defined. Required. + +* **filter** + Streamed events can be filtered by a jsonpath expression for each table. + Optional. + +* **batchSize** + Defines the size of batches in which events are consumed. Optional. + Defaults to 1. diff --git a/hack/update-codegen.sh b/hack/update-codegen.sh index d304004dc..d34db9c45 100755 --- a/hack/update-codegen.sh +++ b/hack/update-codegen.sh @@ -17,7 +17,7 @@ trap "cleanup" EXIT SIGINT bash "${CODEGEN_PKG}/generate-groups.sh" all \ "${OPERATOR_PACKAGE_ROOT}/pkg/generated" "${OPERATOR_PACKAGE_ROOT}/pkg/apis" \ - "acid.zalan.do:v1" \ + "acid.zalan.do:v1 zalando.org:v1" \ --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt cp -r "${OPERATOR_PACKAGE_ROOT}"/pkg/* "${TARGET_CODE_DIR}" diff --git a/manifests/complete-postgres-manifest.yaml b/manifests/complete-postgres-manifest.yaml index 890f5eed3..cb2a8ee1f 100644 --- a/manifests/complete-postgres-manifest.yaml +++ b/manifests/complete-postgres-manifest.yaml @@ -3,6 +3,7 @@ kind: postgresql metadata: name: acid-test-cluster # labels: +# application: test-app # environment: demo # annotations: # "acid.zalan.do/controller": "second-operator" @@ -17,7 +18,7 @@ spec: - superuser - createdb foo_user: [] -# flyway: [] +# flyway: [] # usersWithSecretRotation: # - foo_user # usersWithInPlaceSecretRotation: @@ -203,3 +204,21 @@ spec: # operator: In # values: # - enabled + +# Enables change data capture streams for defined database tables +# streams: +# - applicationId: test-app +# database: foo +# tables: +# data.state_pending_outbox: +# eventType: test-app.status-pending +# data.state_approved_outbox: +# eventType: test-app.status-approved +# data.orders_outbox: +# eventType: test-app.order-completed +# idColumn: o_id +# payloadColumn: o_payload +# # Optional. Filter ignores events before a certain txnId and lsn. Can be used to skip bad events +# filter: +# data.orders_outbox: "[?(@.source.txId > 500 && @.source.lsn > 123456)]" +# batchSize: 1000 diff --git a/manifests/operator-service-account-rbac.yaml b/manifests/operator-service-account-rbac.yaml index f0307f6a0..c10dc5fd7 100644 --- a/manifests/operator-service-account-rbac.yaml +++ b/manifests/operator-service-account-rbac.yaml @@ -35,6 +35,20 @@ rules: - get - list - watch +# all verbs allowed for event streams (Zalando-internal feature) +# - apiGroups: +# - zalando.org +# resources: +# - fabriceventstreams +# verbs: +# - create +# - delete +# - deletecollection +# - get +# - list +# - patch +# - update +# - watch # to create or get/update CRDs when starting up - apiGroups: - apiextensions.k8s.io diff --git a/manifests/postgresql.crd.yaml b/manifests/postgresql.crd.yaml index 1f1b064a7..7cf220eb7 100644 --- a/manifests/postgresql.crd.yaml +++ b/manifests/postgresql.crd.yaml @@ -472,6 +472,39 @@ spec: type: string gs_wal_path: type: string + streams: + type: array + nullable: true + items: + type: object + required: + - applicationId + - database + - tables + properties: + applicationId: + type: string + batchSize: + type: integer + database: + type: string + filter: + type: object + additionalProperties: + type: string + tables: + type: object + additionalProperties: + type: object + required: + - eventType + properties: + eventType: + type: string + idColumn: + type: string + payloadColumn: + type: string teamId: type: string tls: diff --git a/pkg/apis/acid.zalan.do/v1/crds.go b/pkg/apis/acid.zalan.do/v1/crds.go index 89a04b180..9dc3d167e 100644 --- a/pkg/apis/acid.zalan.do/v1/crds.go +++ b/pkg/apis/acid.zalan.do/v1/crds.go @@ -716,6 +716,54 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{ }, }, }, + "streams": { + Type: "array", + Items: &apiextv1.JSONSchemaPropsOrArray{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + Required: []string{"applicationId", "database", "tables"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "applicationId": { + Type: "string", + }, + "batchSize": { + Type: "integer", + }, + "database": { + Type: "string", + }, + "filter": { + Type: "object", + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "string", + }, + }, + }, + "tables": { + Type: "object", + AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{ + Schema: &apiextv1.JSONSchemaProps{ + Type: "object", + Required: []string{"eventType"}, + Properties: map[string]apiextv1.JSONSchemaProps{ + "eventType": { + Type: "string", + }, + "idColumn": { + Type: "string", + }, + "payloadColumn": { + Type: "string", + }, + }, + }, + }, + }, + }, + }, + }, + }, "teamId": { Type: "string", }, diff --git a/pkg/apis/acid.zalan.do/v1/postgresql_type.go b/pkg/apis/acid.zalan.do/v1/postgresql_type.go index a97ad35a3..a1fc4fcf7 100644 --- a/pkg/apis/acid.zalan.do/v1/postgresql_type.go +++ b/pkg/apis/acid.zalan.do/v1/postgresql_type.go @@ -77,6 +77,7 @@ type PostgresSpec struct { ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"` TLS *TLSDescription `json:"tls,omitempty"` AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"` + Streams []Stream `json:"streams,omitempty"` // deprecated json tags InitContainersOld []v1.Container `json:"init_containers,omitempty"` @@ -231,3 +232,17 @@ type ConnectionPooler struct { Resources `json:"resources,omitempty"` } + +type Stream struct { + ApplicationId string `json:"applicationId"` + Database string `json:"database"` + Tables map[string]StreamTable `json:"tables"` + Filter map[string]string `json:"filter,omitempty"` + BatchSize uint32 `json:"batchSize,omitempty"` +} + +type StreamTable struct { + EventType string `json:"eventType"` + IdColumn string `json:"idColumn,omitempty" defaults:"id"` + PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"` +} diff --git a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go index d960cd102..c2298fada 100644 --- a/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go +++ b/pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go @@ -749,6 +749,13 @@ func (in *PostgresSpec) DeepCopyInto(out *PostgresSpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Streams != nil { + in, out := &in.Streams, &out.Streams + *out = make([]Stream, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.InitContainersOld != nil { in, out := &in.InitContainersOld, &out.InitContainersOld *out = make([]corev1.Container, len(*in)) @@ -1152,6 +1159,52 @@ func (in *StandbyDescription) DeepCopy() *StandbyDescription { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Stream) DeepCopyInto(out *Stream) { + *out = *in + if in.Tables != nil { + in, out := &in.Tables, &out.Tables + *out = make(map[string]StreamTable, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.Filter != nil { + in, out := &in.Filter, &out.Filter + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Stream. +func (in *Stream) DeepCopy() *Stream { + if in == nil { + return nil + } + out := new(Stream) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *StreamTable) DeepCopyInto(out *StreamTable) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new StreamTable. +func (in *StreamTable) DeepCopy() *StreamTable { + if in == nil { + return nil + } + out := new(StreamTable) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *TLSDescription) DeepCopyInto(out *TLSDescription) { *out = *in diff --git a/pkg/apis/zalando.org/register.go b/pkg/apis/zalando.org/register.go new file mode 100644 index 000000000..3dbd3f089 --- /dev/null +++ b/pkg/apis/zalando.org/register.go @@ -0,0 +1,6 @@ +package zalando + +const ( + // GroupName is the group name for the operator CRDs + GroupName = "zalando.org" +) diff --git a/pkg/apis/zalando.org/v1/fabriceventstream.go b/pkg/apis/zalando.org/v1/fabriceventstream.go new file mode 100644 index 000000000..65d082c1e --- /dev/null +++ b/pkg/apis/zalando.org/v1/fabriceventstream.go @@ -0,0 +1,84 @@ +package v1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// FabricEventStream defines FabricEventStream Custom Resource Definition Object. +type FabricEventStream struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec FabricEventStreamSpec `json:"spec"` +} + +// FabricEventStreamSpec defines the specification for the FabricEventStream TPR. +type FabricEventStreamSpec struct { + ApplicationId string `json:"applicationId"` + EventStreams []EventStream `json:"eventStreams"` +} + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +// FabricEventStreamList defines a list of FabricEventStreams . +type FabricEventStreamList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []FabricEventStream `json:"items"` +} + +// EventStream defines the source, flow and sink of the event stream +type EventStream struct { + EventStreamFlow EventStreamFlow `json:"flow"` + EventStreamSink EventStreamSink `json:"sink"` + EventStreamSource EventStreamSource `json:"source"` +} + +// EventStreamFlow defines the flow characteristics of the event stream +type EventStreamFlow struct { + Type string `json:"type"` + PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"` +} + +// EventStreamSink defines the target of the event stream +type EventStreamSink struct { + Type string `json:"type"` + EventType string `json:"eventType,omitempty"` + MaxBatchSize uint32 `json:"maxBatchSize,omitempty"` +} + +// EventStreamSource defines the source of the event stream and connection for FES operator +type EventStreamSource struct { + Type string `json:"type"` + Schema string `json:"schema,omitempty" defaults:"public"` + EventStreamTable EventStreamTable `json:"table"` + Filter string `json:"filter,omitempty"` + Connection Connection `json:"jdbcConnection"` +} + +// EventStreamTable defines the name and ID column to be used for streaming +type EventStreamTable struct { + Name string `json:"name"` + IDColumn string `json:"idColumn,omitempty" defaults:"id"` +} + +// Connection to be used for allowing the FES operator to connect to a database +type Connection struct { + Url string `json:"jdbcUrl"` + SlotName string `json:"slotName"` + PluginType string `json:"pluginType,omitempty" defaults:"wal2json"` + PublicationName string `json:"publicationName,omitempty"` + DBAuth DBAuth `json:"databaseAuthentication"` +} + +// DBAuth specifies the credentials to be used for connecting with the database +type DBAuth struct { + Type string `json:"type"` + Name string `json:"name,omitempty"` + UserKey string `json:"userKey,omitempty"` + PasswordKey string `json:"passwordKey,omitempty"` +} diff --git a/pkg/apis/zalando.org/v1/register.go b/pkg/apis/zalando.org/v1/register.go new file mode 100644 index 000000000..33a2c718b --- /dev/null +++ b/pkg/apis/zalando.org/v1/register.go @@ -0,0 +1,44 @@ +package v1 + +import ( + "github.com/zalando/postgres-operator/pkg/apis/zalando.org" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/scheme" +) + +// APIVersion of the `fabriceventstream` CRD +const ( + APIVersion = "v1" +) + +var ( + schemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) + AddToScheme = schemeBuilder.AddToScheme +) + +func init() { + err := AddToScheme(scheme.Scheme) + if err != nil { + panic(err) + } +} + +// SchemeGroupVersion is the group version used to register these objects. +var SchemeGroupVersion = schema.GroupVersion{Group: zalando.GroupName, Version: APIVersion} + +// Resource takes an unqualified resource and returns a Group-qualified GroupResource. +func Resource(resource string) schema.GroupResource { + return SchemeGroupVersion.WithResource(resource).GroupResource() +} + +// addKnownTypes adds the set of types defined in this package to the supplied scheme. +func addKnownTypes(scheme *runtime.Scheme) error { + scheme.AddKnownTypes(SchemeGroupVersion, + &FabricEventStream{}, + &FabricEventStreamList{}, + ) + metav1.AddToGroupVersion(scheme, SchemeGroupVersion) + return nil +} diff --git a/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go new file mode 100644 index 000000000..a44439a94 --- /dev/null +++ b/pkg/apis/zalando.org/v1/zz_generated.deepcopy.go @@ -0,0 +1,227 @@ +// +build !ignore_autogenerated + +/* +Copyright 2021 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by deepcopy-gen. DO NOT EDIT. + +package v1 + +import ( + runtime "k8s.io/apimachinery/pkg/runtime" +) + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Connection) DeepCopyInto(out *Connection) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Connection) DeepCopy() *Connection { + if in == nil { + return nil + } + out := new(Connection) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DBAuth) DeepCopyInto(out *DBAuth) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DBAuth) DeepCopy() *DBAuth { + if in == nil { + return nil + } + out := new(DBAuth) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStream) DeepCopyInto(out *EventStream) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStream) DeepCopy() *EventStream { + if in == nil { + return nil + } + out := new(EventStream) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamFlow) DeepCopyInto(out *EventStreamFlow) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamFlow) DeepCopy() *EventStreamFlow { + if in == nil { + return nil + } + out := new(EventStreamFlow) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamSink) DeepCopyInto(out *EventStreamSink) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamSink) DeepCopy() *EventStreamSink { + if in == nil { + return nil + } + out := new(EventStreamSink) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamSource) DeepCopyInto(out *EventStreamSource) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamSource) DeepCopy() *EventStreamSource { + if in == nil { + return nil + } + out := new(EventStreamSource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamTable) DeepCopyInto(out *EventStreamTable) { + *out = *in + return +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EventStreamTable) DeepCopy() *EventStreamTable { + if in == nil { + return nil + } + out := new(EventStreamTable) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FabricEventStreamSpec) DeepCopyInto(out *FabricEventStreamSpec) { + *out = *in + if in.EventStreams != nil { + in, out := &in.EventStreams, &out.EventStreams + *out = make([]EventStream, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FabricEventStreamSpec. +func (in *FabricEventStreamSpec) DeepCopy() *FabricEventStreamSpec { + if in == nil { + return nil + } + out := new(FabricEventStreamSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FabricEventStream) DeepCopyInto(out *FabricEventStream) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FabricEventStream. +func (in *FabricEventStream) DeepCopy() *FabricEventStream { + if in == nil { + return nil + } + out := new(FabricEventStream) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FabricEventStream) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FabricEventStreamList) DeepCopyInto(out *FabricEventStreamList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]FabricEventStream, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FabricEventStreamList. +func (in *FabricEventStreamList) DeepCopy() *FabricEventStreamList { + if in == nil { + return nil + } + out := new(FabricEventStreamList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *FabricEventStreamList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 3e7f708a5..7dbfa6b88 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -361,6 +361,12 @@ func (c *Cluster) Create() error { // something fails, report warning c.createConnectionPooler(c.installLookupFunction) + if len(c.Spec.Streams) > 0 { + if err = c.syncStreams(); err != nil { + c.logger.Errorf("could not create streams: %v", err) + } + } + return nil } @@ -861,6 +867,13 @@ func (c *Cluster) Update(oldSpec, newSpec *acidv1.Postgresql) error { updateFailed = true } + if len(c.Spec.Streams) > 0 { + if err := c.syncStreams(); err != nil { + c.logger.Errorf("could not sync streams: %v", err) + updateFailed = true + } + } + if !updateFailed { // Major version upgrade must only fire after success of earlier operations and should stay last if err := c.majorVersionUpgrade(); err != nil { @@ -896,6 +909,10 @@ func (c *Cluster) Delete() { defer c.mu.Unlock() c.eventRecorder.Event(c.GetReference(), v1.EventTypeNormal, "Delete", "Started deletion of new cluster resources") + if err := c.deleteStreams(); err != nil { + c.logger.Warningf("could not delete event streams: %v", err) + } + // delete the backup job before the stateful set of the cluster to prevent connections to non-existing pods // deleting the cron job also removes pods and batch jobs it created if err := c.deleteLogicalBackupJob(); err != nil { @@ -1050,6 +1067,23 @@ func (c *Cluster) initSystemUsers() { c.systemUsers[constants.ConnectionPoolerUserKeyName] = connectionPoolerUser } } + + // replication users for event streams are another exception + // the operator will create one replication user for all streams + if len(c.Spec.Streams) > 0 { + username := constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix + streamUser := spec.PgUser{ + Origin: spec.RoleConnectionPooler, + Name: username, + Namespace: c.Namespace, + Flags: []string{constants.RoleFlagLogin, constants.RoleFlagReplication}, + Password: util.RandomPassword(constants.PasswordLength), + } + + if _, exists := c.pgUsers[username]; !exists { + c.pgUsers[username] = streamUser + } + } } func (c *Cluster) initPreparedDatabaseRoles() error { diff --git a/pkg/cluster/database.go b/pkg/cluster/database.go index 279cf828f..652f0d0ae 100644 --- a/pkg/cluster/database.go +++ b/pkg/cluster/database.go @@ -46,6 +46,13 @@ const ( createExtensionSQL = `CREATE EXTENSION IF NOT EXISTS "%s" SCHEMA "%s"` alterExtensionSQL = `ALTER EXTENSION "%s" SET SCHEMA "%s"` + getPublicationsSQL = `SELECT p.pubname, string_agg(pt.schemaname || '.' || pt.tablename, ', ' ORDER BY pt.schemaname, pt.tablename) + FROM pg_publication p + JOIN pg_publication_tables pt ON pt.pubname = p.pubname + GROUP BY p.pubname;` + createPublicationSQL = `CREATE PUBLICATION "%s" FOR TABLE %s WITH (publish = 'insert, update');` + alterPublicationSQL = `ALTER PUBLICATION "%s" SET TABLE %s;` + globalDefaultPrivilegesSQL = `SET ROLE TO "%s"; ALTER DEFAULT PRIVILEGES GRANT USAGE ON SCHEMAS TO "%s","%s"; ALTER DEFAULT PRIVILEGES GRANT SELECT ON TABLES TO "%s"; @@ -572,6 +579,68 @@ func (c *Cluster) execCreateOrAlterExtension(extName, schemaName, statement, doi return nil } +// getPublications returns the list of current database publications with tables +// The caller is responsible for opening and closing the database connection +func (c *Cluster) getPublications() (publications map[string]string, err error) { + var ( + rows *sql.Rows + ) + + if rows, err = c.pgDb.Query(getPublicationsSQL); err != nil { + return nil, fmt.Errorf("could not query database publications: %v", err) + } + + defer func() { + if err2 := rows.Close(); err2 != nil { + if err != nil { + err = fmt.Errorf("error when closing query cursor: %v, previous error: %v", err2, err) + } else { + err = fmt.Errorf("error when closing query cursor: %v", err2) + } + } + }() + + dbPublications := make(map[string]string) + + for rows.Next() { + var ( + dbPublication string + dbPublicationTables string + ) + + if err = rows.Scan(&dbPublication, &dbPublicationTables); err != nil { + return nil, fmt.Errorf("error when processing row: %v", err) + } + dbPublications[dbPublication] = dbPublicationTables + } + + return dbPublications, err +} + +// executeCreatePublication creates new publication for given tables +// The caller is responsible for opening and closing the database connection. +func (c *Cluster) executeCreatePublication(pubName, tableList string) error { + return c.execCreateOrAlterPublication(pubName, tableList, createPublicationSQL, + "creating publication", "create publication") +} + +// executeAlterExtension changes the table list of the given publication. +// The caller is responsible for opening and closing the database connection. +func (c *Cluster) executeAlterPublication(pubName, tableList string) error { + return c.execCreateOrAlterPublication(pubName, tableList, alterPublicationSQL, + "changing publication", "alter publication tables") +} + +func (c *Cluster) execCreateOrAlterPublication(pubName, tableList, statement, doing, operation string) error { + + c.logger.Debugf("%s %q with table list %q", doing, pubName, tableList) + if _, err := c.pgDb.Exec(fmt.Sprintf(statement, pubName, tableList)); err != nil { + return fmt.Errorf("could not execute %s: %v", operation, err) + } + + return nil +} + // Creates a connection pool credentials lookup function in every database to // perform remote authentication. func (c *Cluster) installLookupFunction(poolerSchema, poolerUser string) error { diff --git a/pkg/cluster/streams.go b/pkg/cluster/streams.go new file mode 100644 index 000000000..a1a1b4d4a --- /dev/null +++ b/pkg/cluster/streams.go @@ -0,0 +1,363 @@ +package cluster + +import ( + "context" + "fmt" + "reflect" + "sort" + "strings" + + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + "github.com/zalando/postgres-operator/pkg/util" + "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func (c *Cluster) createStreams(appId string) error { + c.setProcessName("creating streams") + + fes := c.generateFabricEventStream(appId) + if _, err := c.KubeClient.FabricEventStreams(c.Namespace).Create(context.TODO(), fes, metav1.CreateOptions{}); err != nil { + return err + } + + return nil +} + +func (c *Cluster) updateStreams(newEventStreams *zalandov1.FabricEventStream) error { + c.setProcessName("updating event streams") + + if _, err := c.KubeClient.FabricEventStreams(newEventStreams.Namespace).Update(context.TODO(), newEventStreams, metav1.UpdateOptions{}); err != nil { + return err + } + + return nil +} + +func (c *Cluster) deleteStreams() error { + c.setProcessName("deleting event streams") + + // check if stream CRD is installed before trying a delete + _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{}) + if k8sutil.ResourceNotFound(err) { + return nil + } + + err = c.KubeClient.FabricEventStreams(c.Namespace).Delete(context.TODO(), c.Name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("could not delete event stream custom resource: %v", err) + } + + return nil +} + +func gatherApplicationIds(streams []acidv1.Stream) []string { + appIds := make([]string, 0) + for _, stream := range streams { + if !util.SliceContains(appIds, stream.ApplicationId) { + appIds = append(appIds, stream.ApplicationId) + } + } + + return appIds +} + +func (c *Cluster) syncPostgresConfig(requiredPatroniConfig acidv1.Patroni) error { + errorMsg := "no pods found to update config" + + // if streams are defined wal_level must be switched to logical + requiredPgParameters := map[string]string{"wal_level": "logical"} + + // apply config changes in pods + pods, err := c.listPods() + if err != nil { + errorMsg = fmt.Sprintf("could not list pods of the statefulset: %v", err) + } + for i, pod := range pods { + podName := util.NameFromMeta(pods[i].ObjectMeta) + effectivePatroniConfig, effectivePgParameters, err := c.patroni.GetConfig(&pod) + if err != nil { + errorMsg = fmt.Sprintf("could not get Postgres config from pod %s: %v", podName, err) + continue + } + + _, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, effectivePatroniConfig, requiredPatroniConfig, effectivePgParameters, requiredPgParameters) + if err != nil { + errorMsg = fmt.Sprintf("could not set PostgreSQL configuration options for pod %s: %v", podName, err) + continue + } + + // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used + return nil + } + + return fmt.Errorf(errorMsg) +} + +func (c *Cluster) syncPublication(publication, dbName string, tables map[string]acidv1.StreamTable) error { + createPublications := make(map[string]string) + alterPublications := make(map[string]string) + + defer func() { + if err := c.closeDbConn(); err != nil { + c.logger.Errorf("could not close database connection: %v", err) + } + }() + + // check for existing publications + if err := c.initDbConnWithName(dbName); err != nil { + return fmt.Errorf("could not init database connection") + } + + currentPublications, err := c.getPublications() + if err != nil { + return fmt.Errorf("could not get current publications: %v", err) + } + + tableNames := make([]string, len(tables)) + i := 0 + for t := range tables { + tableName, schemaName := getTableSchema(t) + tableNames[i] = fmt.Sprintf("%s.%s", schemaName, tableName) + i++ + } + sort.Strings(tableNames) + tableList := strings.Join(tableNames, ", ") + + currentTables, exists := currentPublications[publication] + if !exists { + createPublications[publication] = tableList + } else if currentTables != tableList { + alterPublications[publication] = tableList + } + + if len(createPublications)+len(alterPublications) == 0 { + return nil + } + + for publicationName, tables := range createPublications { + if err = c.executeCreatePublication(publicationName, tables); err != nil { + return fmt.Errorf("creation of publication %q failed: %v", publicationName, err) + } + } + for publicationName, tables := range alterPublications { + if err = c.executeAlterPublication(publicationName, tables); err != nil { + return fmt.Errorf("update of publication %q failed: %v", publicationName, err) + } + } + + return nil +} + +func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream { + eventStreams := make([]zalandov1.EventStream, 0) + + for _, stream := range c.Spec.Streams { + if stream.ApplicationId != appId { + continue + } + for tableName, table := range stream.Tables { + streamSource := c.getEventStreamSource(stream, tableName, table.IdColumn) + streamFlow := getEventStreamFlow(stream, table.PayloadColumn) + streamSink := getEventStreamSink(stream, table.EventType) + + eventStreams = append(eventStreams, zalandov1.EventStream{ + EventStreamFlow: streamFlow, + EventStreamSink: streamSink, + EventStreamSource: streamSource}) + } + } + + return &zalandov1.FabricEventStream{ + TypeMeta: metav1.TypeMeta{ + APIVersion: constants.EventStreamCRDApiVersion, + Kind: constants.EventStreamCRDKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", c.Name, appId), + Namespace: c.Namespace, + Annotations: c.AnnotationsToPropagate(c.annotationsSet(nil)), + // make cluster StatefulSet the owner (like with connection pooler objects) + OwnerReferences: c.ownerReferences(), + }, + Spec: zalandov1.FabricEventStreamSpec{ + ApplicationId: appId, + EventStreams: eventStreams, + }, + } +} + +func (c *Cluster) getEventStreamSource(stream acidv1.Stream, tableName, idColumn string) zalandov1.EventStreamSource { + table, schema := getTableSchema(tableName) + streamFilter := stream.Filter[tableName] + return zalandov1.EventStreamSource{ + Type: constants.EventStreamSourcePGType, + Schema: schema, + EventStreamTable: getOutboxTable(table, idColumn), + Filter: streamFilter, + Connection: c.getStreamConnection( + stream.Database, + constants.EventStreamSourceSlotPrefix+constants.UserRoleNameSuffix, + stream.ApplicationId), + } +} + +func getEventStreamFlow(stream acidv1.Stream, payloadColumn string) zalandov1.EventStreamFlow { + return zalandov1.EventStreamFlow{ + Type: constants.EventStreamFlowPgGenericType, + PayloadColumn: payloadColumn, + } +} + +func getEventStreamSink(stream acidv1.Stream, eventType string) zalandov1.EventStreamSink { + return zalandov1.EventStreamSink{ + Type: constants.EventStreamSinkNakadiType, + EventType: eventType, + MaxBatchSize: stream.BatchSize, + } +} + +func getTableSchema(fullTableName string) (tableName, schemaName string) { + schemaName = "public" + tableName = fullTableName + if strings.Contains(fullTableName, ".") { + schemaName = strings.Split(fullTableName, ".")[0] + tableName = strings.Split(fullTableName, ".")[1] + } + + return tableName, schemaName +} + +func getOutboxTable(tableName, idColumn string) zalandov1.EventStreamTable { + return zalandov1.EventStreamTable{ + Name: tableName, + IDColumn: idColumn, + } +} + +func getSlotName(dbName, appId string) string { + return fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) +} + +func (c *Cluster) getStreamConnection(database, user, appId string) zalandov1.Connection { + return zalandov1.Connection{ + Url: fmt.Sprintf("jdbc:postgresql://%s.%s/%s?user=%s&ssl=true&sslmode=require", c.Name, c.Namespace, database, user), + SlotName: getSlotName(database, appId), + PluginType: constants.EventStreamSourcePluginType, + DBAuth: zalandov1.DBAuth{ + Type: constants.EventStreamSourceAuthType, + Name: c.credentialSecretNameForCluster(user, c.Name), + UserKey: "username", + PasswordKey: "password", + }, + } +} + +func (c *Cluster) syncStreams() error { + + c.setProcessName("syncing streams") + + _, err := c.KubeClient.CustomResourceDefinitions().Get(context.TODO(), constants.EventStreamCRDName, metav1.GetOptions{}) + if k8sutil.ResourceNotFound(err) { + c.logger.Debugf("event stream CRD not installed, skipping") + return nil + } + + slots := make(map[string]map[string]string) + publications := make(map[string]map[string]acidv1.StreamTable) + + requiredPatroniConfig := c.Spec.Patroni + if len(requiredPatroniConfig.Slots) > 0 { + slots = requiredPatroniConfig.Slots + } + + // gather list of required slots and publications + for _, stream := range c.Spec.Streams { + slot := map[string]string{ + "database": stream.Database, + "plugin": constants.EventStreamSourcePluginType, + "type": "logical", + } + slotName := getSlotName(stream.Database, stream.ApplicationId) + if _, exists := slots[slotName]; !exists { + slots[slotName] = slot + publications[slotName] = stream.Tables + } else { + streamTables := publications[slotName] + for tableName, table := range stream.Tables { + if _, exists := streamTables[tableName]; !exists { + streamTables[tableName] = table + } + } + publications[slotName] = streamTables + } + } + + // no slots = no streams defined + if len(slots) > 0 { + requiredPatroniConfig.Slots = slots + } else { + return nil + } + + // add extra logical slots to Patroni config + c.logger.Debug("syncing Postgres config for logical decoding") + err = c.syncPostgresConfig(requiredPatroniConfig) + if err != nil { + return fmt.Errorf("failed to snyc Postgres config for event streaming: %v", err) + } + + // next, create publications to each created slot + c.logger.Debug("syncing database publications") + for publication, tables := range publications { + // but first check for existing publications + dbName := slots[publication]["database"] + err = c.syncPublication(publication, dbName, tables) + if err != nil { + c.logger.Warningf("could not sync publication %q in database %q: %v", publication, dbName, err) + } + } + + err = c.createOrUpdateStreams() + if err != nil { + return err + } + + return nil +} + +func (c *Cluster) createOrUpdateStreams() error { + + appIds := gatherApplicationIds(c.Spec.Streams) + for _, appId := range appIds { + fesName := fmt.Sprintf("%s-%s", c.Name, appId) + effectiveStreams, err := c.KubeClient.FabricEventStreams(c.Namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) + if err != nil { + if !k8sutil.ResourceNotFound(err) { + return fmt.Errorf("failed reading event stream %s: %v", fesName, err) + } + + c.logger.Infof("event streams do not exist, create it") + err = c.createStreams(appId) + if err != nil { + return fmt.Errorf("failed creating event stream %s: %v", fesName, err) + } + c.logger.Infof("event stream %q has been successfully created", fesName) + } else { + desiredStreams := c.generateFabricEventStream(appId) + if !reflect.DeepEqual(effectiveStreams.Spec, desiredStreams.Spec) { + c.logger.Debug("updating event streams") + desiredStreams.ObjectMeta.ResourceVersion = effectiveStreams.ObjectMeta.ResourceVersion + err = c.updateStreams(desiredStreams) + if err != nil { + return fmt.Errorf("failed updating event stream %s: %v", fesName, err) + } + c.logger.Infof("event stream %q has been successfully updated", fesName) + } + } + } + + return nil +} diff --git a/pkg/cluster/streams_test.go b/pkg/cluster/streams_test.go new file mode 100644 index 000000000..89dd294ca --- /dev/null +++ b/pkg/cluster/streams_test.go @@ -0,0 +1,244 @@ +package cluster + +import ( + "encoding/json" + "fmt" + "reflect" + "strings" + + "context" + "testing" + + "github.com/stretchr/testify/assert" + acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + v1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + fakezalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/fake" + "github.com/zalando/postgres-operator/pkg/util" + "github.com/zalando/postgres-operator/pkg/util/config" + "github.com/zalando/postgres-operator/pkg/util/constants" + "github.com/zalando/postgres-operator/pkg/util/k8sutil" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" +) + +func newFakeK8sStreamClient() (k8sutil.KubernetesClient, *fake.Clientset) { + zalandoClientSet := fakezalandov1.NewSimpleClientset() + clientSet := fake.NewSimpleClientset() + + return k8sutil.KubernetesClient{ + FabricEventStreamsGetter: zalandoClientSet.ZalandoV1(), + PostgresqlsGetter: zalandoClientSet.AcidV1(), + PodsGetter: clientSet.CoreV1(), + StatefulSetsGetter: clientSet.AppsV1(), + }, clientSet +} + +var ( + clusterName string = "acid-test-cluster" + namespace string = "default" + appId string = "test-app" + dbName string = "foo" + fesUser string = constants.EventStreamSourceSlotPrefix + constants.UserRoleNameSuffix + fesName string = fmt.Sprintf("%s-%s", clusterName, appId) + slotName string = fmt.Sprintf("%s_%s_%s", constants.EventStreamSourceSlotPrefix, dbName, strings.Replace(appId, "-", "_", -1)) + + pg = acidv1.Postgresql{ + TypeMeta: metav1.TypeMeta{ + Kind: "Postgresql", + APIVersion: "acid.zalan.do/v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: clusterName, + Namespace: namespace, + }, + Spec: acidv1.PostgresSpec{ + Databases: map[string]string{ + dbName: dbName + constants.UserRoleNameSuffix, + }, + Streams: []acidv1.Stream{ + { + ApplicationId: appId, + Database: "foo", + Tables: map[string]acidv1.StreamTable{ + "data.bar": acidv1.StreamTable{ + EventType: "stream_type_a", + IdColumn: "b_id", + PayloadColumn: "b_payload", + }, + }, + Filter: map[string]string{ + "data.bar": "[?(@.source.txId > 500 && @.source.lsn > 123456)]", + }, + BatchSize: uint32(100), + }, + }, + Volume: acidv1.Volume{ + Size: "1Gi", + }, + }, + } + + fes = &v1.FabricEventStream{ + TypeMeta: metav1.TypeMeta{ + APIVersion: constants.EventStreamCRDApiVersion, + Kind: constants.EventStreamCRDKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fesName, + Namespace: namespace, + OwnerReferences: []metav1.OwnerReference{ + metav1.OwnerReference{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + Name: "acid-test-cluster", + Controller: util.True(), + }, + }, + }, + Spec: v1.FabricEventStreamSpec{ + ApplicationId: appId, + EventStreams: []v1.EventStream{ + { + EventStreamFlow: v1.EventStreamFlow{ + PayloadColumn: "b_payload", + Type: constants.EventStreamFlowPgGenericType, + }, + EventStreamSink: v1.EventStreamSink{ + EventType: "stream_type_a", + MaxBatchSize: uint32(100), + Type: constants.EventStreamSinkNakadiType, + }, + EventStreamSource: v1.EventStreamSource{ + Filter: "[?(@.source.txId > 500 && @.source.lsn > 123456)]", + Connection: v1.Connection{ + DBAuth: v1.DBAuth{ + Name: fmt.Sprintf("fes-user.%s.credentials.postgresql.acid.zalan.do", clusterName), + PasswordKey: "password", + Type: constants.EventStreamSourceAuthType, + UserKey: "username", + }, + Url: fmt.Sprintf("jdbc:postgresql://%s.%s/foo?user=%s&ssl=true&sslmode=require", clusterName, namespace, fesUser), + SlotName: slotName, + PluginType: constants.EventStreamSourcePluginType, + }, + Schema: "data", + EventStreamTable: v1.EventStreamTable{ + IDColumn: "b_id", + Name: "bar", + }, + Type: constants.EventStreamSourcePGType, + }, + }, + }, + }, + } +) + +func TestGenerateFabricEventStream(t *testing.T) { + client, _ := newFakeK8sStreamClient() + + var cluster = New( + Config{ + OpConfig: config.Config{ + Auth: config.Auth{ + SecretNameTemplate: "{username}.{cluster}.credentials.{tprkind}.{tprgroup}", + }, + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + }, + }, + }, client, pg, logger, eventRecorder) + + cluster.Name = clusterName + cluster.Namespace = namespace + + _, err := cluster.createStatefulSet() + assert.NoError(t, err) + + err = cluster.createOrUpdateStreams() + assert.NoError(t, err) + + result := cluster.generateFabricEventStream(appId) + + if !reflect.DeepEqual(result, fes) { + t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, result) + } + + streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) + assert.NoError(t, err) + + if !reflect.DeepEqual(streamCRD, fes) { + t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", fes, streamCRD) + } +} + +func TestUpdateFabricEventStream(t *testing.T) { + client, _ := newFakeK8sStreamClient() + + var cluster = New( + Config{ + OpConfig: config.Config{ + PodManagementPolicy: "ordered_ready", + Resources: config.Resources{ + ClusterLabels: map[string]string{"application": "spilo"}, + ClusterNameLabel: "cluster-name", + DefaultCPURequest: "300m", + DefaultCPULimit: "300m", + DefaultMemoryRequest: "300Mi", + DefaultMemoryLimit: "300Mi", + PodRoleLabel: "spilo-role", + }, + }, + }, client, pg, logger, eventRecorder) + + _, err := cluster.KubeClient.Postgresqls(namespace).Create( + context.TODO(), &pg, metav1.CreateOptions{}) + assert.NoError(t, err) + err = cluster.createOrUpdateStreams() + assert.NoError(t, err) + + var pgSpec acidv1.PostgresSpec + pgSpec.Streams = []acidv1.Stream{ + { + ApplicationId: appId, + Database: dbName, + Tables: map[string]acidv1.StreamTable{ + "data.bar": acidv1.StreamTable{ + EventType: "stream_type_b", + IdColumn: "b_id", + PayloadColumn: "b_payload", + }, + }, + BatchSize: uint32(250), + }, + } + patch, err := json.Marshal(struct { + PostgresSpec interface{} `json:"spec"` + }{&pgSpec}) + assert.NoError(t, err) + + pgPatched, err := cluster.KubeClient.Postgresqls(namespace).Patch( + context.TODO(), cluster.Name, types.MergePatchType, patch, metav1.PatchOptions{}, "spec") + assert.NoError(t, err) + + cluster.Postgresql.Spec = pgPatched.Spec + err = cluster.createOrUpdateStreams() + assert.NoError(t, err) + + streamCRD, err := cluster.KubeClient.FabricEventStreams(namespace).Get(context.TODO(), fesName, metav1.GetOptions{}) + assert.NoError(t, err) + + result := cluster.generateFabricEventStream(appId) + if !reflect.DeepEqual(result, streamCRD) { + t.Errorf("Malformed FabricEventStream, expected %#v, got %#v", streamCRD, result) + } +} diff --git a/pkg/cluster/sync.go b/pkg/cluster/sync.go index bbf023764..a897ff318 100644 --- a/pkg/cluster/sync.go +++ b/pkg/cluster/sync.go @@ -81,7 +81,7 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return err } - c.logger.Debugf("syncing statefulsets") + c.logger.Debug("syncing statefulsets") if err = c.syncStatefulSet(); err != nil { if !k8sutil.ResourceAlreadyExists(err) { err = fmt.Errorf("could not sync statefulsets: %v", err) @@ -107,17 +107,17 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { // create database objects unless we are running without pods or disabled that feature explicitly if !(c.databaseAccessDisabled() || c.getNumberOfInstances(&newSpec.Spec) <= 0 || c.Spec.StandbyCluster != nil) { - c.logger.Debugf("syncing roles") + c.logger.Debug("syncing roles") if err = c.syncRoles(); err != nil { err = fmt.Errorf("could not sync roles: %v", err) return err } - c.logger.Debugf("syncing databases") + c.logger.Debug("syncing databases") if err = c.syncDatabases(); err != nil { err = fmt.Errorf("could not sync databases: %v", err) return err } - c.logger.Debugf("syncing prepared databases with schemas") + c.logger.Debug("syncing prepared databases with schemas") if err = c.syncPreparedDatabases(); err != nil { err = fmt.Errorf("could not sync prepared database: %v", err) return err @@ -129,6 +129,14 @@ func (c *Cluster) Sync(newSpec *acidv1.Postgresql) error { return fmt.Errorf("could not sync connection pooler: %v", err) } + if len(c.Spec.Streams) > 0 { + c.logger.Debug("syncing streams") + if err = c.syncStreams(); err != nil { + err = fmt.Errorf("could not sync streams: %v", err) + return err + } + } + // Major version upgrade must only run after success of all earlier operations, must remain last item in sync if err := c.majorVersionUpgrade(); err != nil { c.logger.Errorf("major version upgrade failed: %v", err) @@ -401,7 +409,7 @@ func (c *Cluster) syncStatefulSet() error { } // get Postgres config, compare with manifest and update via Patroni PATCH endpoint if it differs - // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used. + // Patroni's config endpoint is just a "proxy" to DCS. It is enough to patch it only once and it doesn't matter which pod is used for i, pod := range pods { patroniConfig, pgParameters, err := c.getPatroniConfig(&pod) if err != nil { @@ -414,11 +422,13 @@ func (c *Cluster) syncStatefulSet() error { // empty config probably means cluster is not fully initialized yet, e.g. restoring from backup // do not attempt a restart if !reflect.DeepEqual(patroniConfig, acidv1.Patroni{}) || len(pgParameters) > 0 { - restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, pgParameters) + // compare config returned from Patroni with what is specified in the manifest + restartMasterFirst, err = c.checkAndSetGlobalPostgreSQLConfiguration(&pod, patroniConfig, c.Spec.Patroni, pgParameters, c.Spec.Parameters) if err != nil { c.logger.Warningf("could not set PostgreSQL configuration options for pod %s: %v", pods[i].Name, err) continue } + // it could take up to LoopWait to apply the config time.Sleep(time.Duration(restartWait)*time.Second + time.Second*2) break @@ -526,40 +536,39 @@ func (c *Cluster) AnnotationsToPropagate(annotations map[string]string) map[stri // checkAndSetGlobalPostgreSQLConfiguration checks whether cluster-wide API parameters // (like max_connections) have changed and if necessary sets it via the Patroni API -func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniConfig acidv1.Patroni, effectivePgParameters map[string]string) (bool, error) { +func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectivePatroniConfig, desiredPatroniConfig acidv1.Patroni, effectivePgParameters, desiredPgParameters map[string]string) (bool, error) { configToSet := make(map[string]interface{}) parametersToSet := make(map[string]string) restartMaster := make([]bool, 0) requiresMasterRestart := false - // compare options from config with c.Spec.Patroni from manifest - desiredPatroniConfig := c.Spec.Patroni - if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != patroniConfig.LoopWait { + // compare effective and desired Patroni config options + if desiredPatroniConfig.LoopWait > 0 && desiredPatroniConfig.LoopWait != effectivePatroniConfig.LoopWait { configToSet["loop_wait"] = desiredPatroniConfig.LoopWait } - if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != patroniConfig.MaximumLagOnFailover { + if desiredPatroniConfig.MaximumLagOnFailover > 0 && desiredPatroniConfig.MaximumLagOnFailover != effectivePatroniConfig.MaximumLagOnFailover { configToSet["maximum_lag_on_failover"] = desiredPatroniConfig.MaximumLagOnFailover } - if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, patroniConfig.PgHba) { + if desiredPatroniConfig.PgHba != nil && !reflect.DeepEqual(desiredPatroniConfig.PgHba, effectivePatroniConfig.PgHba) { configToSet["pg_hba"] = desiredPatroniConfig.PgHba } - if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != patroniConfig.RetryTimeout { + if desiredPatroniConfig.RetryTimeout > 0 && desiredPatroniConfig.RetryTimeout != effectivePatroniConfig.RetryTimeout { configToSet["retry_timeout"] = desiredPatroniConfig.RetryTimeout } - if desiredPatroniConfig.SynchronousMode != patroniConfig.SynchronousMode { + if desiredPatroniConfig.SynchronousMode != effectivePatroniConfig.SynchronousMode { configToSet["synchronous_mode"] = desiredPatroniConfig.SynchronousMode } - if desiredPatroniConfig.SynchronousModeStrict != patroniConfig.SynchronousModeStrict { + if desiredPatroniConfig.SynchronousModeStrict != effectivePatroniConfig.SynchronousModeStrict { configToSet["synchronous_mode_strict"] = desiredPatroniConfig.SynchronousModeStrict } - if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != patroniConfig.TTL { + if desiredPatroniConfig.TTL > 0 && desiredPatroniConfig.TTL != effectivePatroniConfig.TTL { configToSet["ttl"] = desiredPatroniConfig.TTL } // check if specified slots exist in config and if they differ slotsToSet := make(map[string]map[string]string) for slotName, desiredSlot := range desiredPatroniConfig.Slots { - if effectiveSlot, exists := patroniConfig.Slots[slotName]; exists { + if effectiveSlot, exists := effectivePatroniConfig.Slots[slotName]; exists { if reflect.DeepEqual(desiredSlot, effectiveSlot) { continue } @@ -570,8 +579,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC configToSet["slots"] = slotsToSet } - // compare parameters under postgresql section with c.Spec.Postgresql.Parameters from manifest - desiredPgParameters := c.Spec.Parameters + // compare effective and desired parameters under postgresql section in Patroni config for desiredOption, desiredValue := range desiredPgParameters { effectiveValue := effectivePgParameters[desiredOption] if isBootstrapOnlyParameter(desiredOption) && (effectiveValue != desiredValue) { @@ -614,7 +622,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, patroniC c.logger.Debugf("patching Postgres config via Patroni API on pod %s with following options: %s", podName, configToSetJson) if err = c.patroni.SetConfig(pod, configToSet); err != nil { - return requiresMasterRestart, fmt.Errorf("could not patch postgres parameters with a pod %s: %v", podName, err) + return requiresMasterRestart, fmt.Errorf("could not patch postgres parameters within pod %s: %v", podName, err) } return requiresMasterRestart, nil diff --git a/pkg/cluster/sync_test.go b/pkg/cluster/sync_test.go index 226555a66..ea73fb97c 100644 --- a/pkg/cluster/sync_test.go +++ b/pkg/cluster/sync_test.go @@ -256,7 +256,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) { } for _, tt := range tests { - requireMasterRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, tt.pgParams) + requireMasterRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters) assert.NoError(t, err) if requireMasterRestart != tt.restartMaster { t.Errorf("%s - %s: unexpect master restart strategy, got %v, expected %v", testName, tt.subtest, requireMasterRestart, tt.restartMaster) diff --git a/pkg/generated/clientset/versioned/clientset.go b/pkg/generated/clientset/versioned/clientset.go index e0e97cf09..50f0ac841 100644 --- a/pkg/generated/clientset/versioned/clientset.go +++ b/pkg/generated/clientset/versioned/clientset.go @@ -28,6 +28,7 @@ import ( "fmt" acidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1" + zalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1" discovery "k8s.io/client-go/discovery" rest "k8s.io/client-go/rest" flowcontrol "k8s.io/client-go/util/flowcontrol" @@ -36,13 +37,15 @@ import ( type Interface interface { Discovery() discovery.DiscoveryInterface AcidV1() acidv1.AcidV1Interface + ZalandoV1() zalandov1.ZalandoV1Interface } // Clientset contains the clients for groups. Each group has exactly one // version included in a Clientset. type Clientset struct { *discovery.DiscoveryClient - acidV1 *acidv1.AcidV1Client + acidV1 *acidv1.AcidV1Client + zalandoV1 *zalandov1.ZalandoV1Client } // AcidV1 retrieves the AcidV1Client @@ -50,6 +53,11 @@ func (c *Clientset) AcidV1() acidv1.AcidV1Interface { return c.acidV1 } +// ZalandoV1 retrieves the ZalandoV1Client +func (c *Clientset) ZalandoV1() zalandov1.ZalandoV1Interface { + return c.zalandoV1 +} + // Discovery retrieves the DiscoveryClient func (c *Clientset) Discovery() discovery.DiscoveryInterface { if c == nil { @@ -75,6 +83,10 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { if err != nil { return nil, err } + cs.zalandoV1, err = zalandov1.NewForConfig(&configShallowCopy) + if err != nil { + return nil, err + } cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy) if err != nil { @@ -88,6 +100,7 @@ func NewForConfig(c *rest.Config) (*Clientset, error) { func NewForConfigOrDie(c *rest.Config) *Clientset { var cs Clientset cs.acidV1 = acidv1.NewForConfigOrDie(c) + cs.zalandoV1 = zalandov1.NewForConfigOrDie(c) cs.DiscoveryClient = discovery.NewDiscoveryClientForConfigOrDie(c) return &cs @@ -97,6 +110,7 @@ func NewForConfigOrDie(c *rest.Config) *Clientset { func New(c rest.Interface) *Clientset { var cs Clientset cs.acidV1 = acidv1.New(c) + cs.zalandoV1 = zalandov1.New(c) cs.DiscoveryClient = discovery.NewDiscoveryClient(c) return &cs diff --git a/pkg/generated/clientset/versioned/fake/clientset_generated.go b/pkg/generated/clientset/versioned/fake/clientset_generated.go index 25daca07c..4c94d23a6 100644 --- a/pkg/generated/clientset/versioned/fake/clientset_generated.go +++ b/pkg/generated/clientset/versioned/fake/clientset_generated.go @@ -28,6 +28,8 @@ import ( clientset "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" acidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1" fakeacidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1/fake" + zalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1" + fakezalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/discovery" @@ -89,3 +91,8 @@ var ( func (c *Clientset) AcidV1() acidv1.AcidV1Interface { return &fakeacidv1.FakeAcidV1{Fake: &c.Fake} } + +// ZalandoV1 retrieves the ZalandoV1Client +func (c *Clientset) ZalandoV1() zalandov1.ZalandoV1Interface { + return &fakezalandov1.FakeZalandoV1{Fake: &c.Fake} +} diff --git a/pkg/generated/clientset/versioned/fake/register.go b/pkg/generated/clientset/versioned/fake/register.go index d63cab2ff..313eeacc2 100644 --- a/pkg/generated/clientset/versioned/fake/register.go +++ b/pkg/generated/clientset/versioned/fake/register.go @@ -26,6 +26,7 @@ package fake import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -38,6 +39,7 @@ var codecs = serializer.NewCodecFactory(scheme) var localSchemeBuilder = runtime.SchemeBuilder{ acidv1.AddToScheme, + zalandov1.AddToScheme, } // AddToScheme adds all types of this clientset into the given scheme. This allows composition diff --git a/pkg/generated/clientset/versioned/scheme/register.go b/pkg/generated/clientset/versioned/scheme/register.go index beb803602..823909bcb 100644 --- a/pkg/generated/clientset/versioned/scheme/register.go +++ b/pkg/generated/clientset/versioned/scheme/register.go @@ -26,6 +26,7 @@ package scheme import ( acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + zalandov1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -38,6 +39,7 @@ var Codecs = serializer.NewCodecFactory(Scheme) var ParameterCodec = runtime.NewParameterCodec(Scheme) var localSchemeBuilder = runtime.SchemeBuilder{ acidv1.AddToScheme, + zalandov1.AddToScheme, } // AddToScheme adds all types of this clientset into the given scheme. This allows composition diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/doc.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/doc.go new file mode 100644 index 000000000..ba729b8ca --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// This package has the automatically generated typed clients. +package v1 diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fabriceventstream.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fabriceventstream.go new file mode 100644 index 000000000..26c3431f2 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fabriceventstream.go @@ -0,0 +1,184 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1 + +import ( + "context" + "time" + + v1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + scheme "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// FabricEventStreamsGetter has a method to return a FabricEventStreamInterface. +// A group's client should implement this interface. +type FabricEventStreamsGetter interface { + FabricEventStreams(namespace string) FabricEventStreamInterface +} + +// FabricEventStreamInterface has methods to work with FabricEventStream resources. +type FabricEventStreamInterface interface { + Create(ctx context.Context, fabricEventStream *v1.FabricEventStream, opts metav1.CreateOptions) (*v1.FabricEventStream, error) + Update(ctx context.Context, fabricEventStream *v1.FabricEventStream, opts metav1.UpdateOptions) (*v1.FabricEventStream, error) + Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error + Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.FabricEventStream, error) + List(ctx context.Context, opts metav1.ListOptions) (*v1.FabricEventStreamList, error) + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.FabricEventStream, err error) + FabricEventStreamExpansion +} + +// fabricEventStreams implements FabricEventStreamInterface +type fabricEventStreams struct { + client rest.Interface + ns string +} + +// newFabricEventStreams returns a FabricEventStreams +func newFabricEventStreams(c *ZalandoV1Client, namespace string) *fabricEventStreams { + return &fabricEventStreams{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the fabricEventStream, and returns the corresponding fabricEventStream object, and an error if there is any. +func (c *fabricEventStreams) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.FabricEventStream, err error) { + result = &v1.FabricEventStream{} + err = c.client.Get(). + Namespace(c.ns). + Resource("fabriceventstreams"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of FabricEventStreams that match those selectors. +func (c *fabricEventStreams) List(ctx context.Context, opts metav1.ListOptions) (result *v1.FabricEventStreamList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1.FabricEventStreamList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("fabriceventstreams"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested fabricEventStreams. +func (c *fabricEventStreams) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("fabriceventstreams"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a fabricEventStream and creates it. Returns the server's representation of the fabricEventStream, and an error, if there is any. +func (c *fabricEventStreams) Create(ctx context.Context, fabricEventStream *v1.FabricEventStream, opts metav1.CreateOptions) (result *v1.FabricEventStream, err error) { + result = &v1.FabricEventStream{} + err = c.client.Post(). + Namespace(c.ns). + Resource("fabriceventstreams"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(fabricEventStream). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a fabricEventStream and updates it. Returns the server's representation of the fabricEventStream, and an error, if there is any. +func (c *fabricEventStreams) Update(ctx context.Context, fabricEventStream *v1.FabricEventStream, opts metav1.UpdateOptions) (result *v1.FabricEventStream, err error) { + result = &v1.FabricEventStream{} + err = c.client.Put(). + Namespace(c.ns). + Resource("fabriceventstreams"). + Name(fabricEventStream.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(fabricEventStream). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the fabricEventStream and deletes it. Returns an error if one occurs. +func (c *fabricEventStreams) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("fabriceventstreams"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *fabricEventStreams) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("fabriceventstreams"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched fabricEventStream. +func (c *fabricEventStreams) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.FabricEventStream, err error) { + result = &v1.FabricEventStream{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("fabriceventstreams"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/doc.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/doc.go new file mode 100644 index 000000000..380725443 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +// Package fake has the automatically generated clients. +package fake diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_fabriceventstream.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_fabriceventstream.go new file mode 100644 index 000000000..8d76e06de --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_fabriceventstream.go @@ -0,0 +1,136 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + zalandoorgv1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeFabricEventStreams implements FabricEventStreamInterface +type FakeFabricEventStreams struct { + Fake *FakeZalandoV1 + ns string +} + +var fabriceventstreamsResource = schema.GroupVersionResource{Group: "zalando.org", Version: "v1", Resource: "fabriceventstreams"} + +var fabriceventstreamsKind = schema.GroupVersionKind{Group: "zalando.org", Version: "v1", Kind: "FabricEventStream"} + +// Get takes name of the fabricEventStream, and returns the corresponding fabricEventStream object, and an error if there is any. +func (c *FakeFabricEventStreams) Get(ctx context.Context, name string, options v1.GetOptions) (result *zalandoorgv1.FabricEventStream, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(fabriceventstreamsResource, c.ns, name), &zalandoorgv1.FabricEventStream{}) + + if obj == nil { + return nil, err + } + return obj.(*zalandoorgv1.FabricEventStream), err +} + +// List takes label and field selectors, and returns the list of FabricEventStreams that match those selectors. +func (c *FakeFabricEventStreams) List(ctx context.Context, opts v1.ListOptions) (result *zalandoorgv1.FabricEventStreamList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(fabriceventstreamsResource, fabriceventstreamsKind, c.ns, opts), &zalandoorgv1.FabricEventStreamList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &zalandoorgv1.FabricEventStreamList{ListMeta: obj.(*zalandoorgv1.FabricEventStreamList).ListMeta} + for _, item := range obj.(*zalandoorgv1.FabricEventStreamList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested fabricEventStreams. +func (c *FakeFabricEventStreams) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(fabriceventstreamsResource, c.ns, opts)) + +} + +// Create takes the representation of a fabricEventStream and creates it. Returns the server's representation of the fabricEventStream, and an error, if there is any. +func (c *FakeFabricEventStreams) Create(ctx context.Context, fabricEventStream *zalandoorgv1.FabricEventStream, opts v1.CreateOptions) (result *zalandoorgv1.FabricEventStream, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(fabriceventstreamsResource, c.ns, fabricEventStream), &zalandoorgv1.FabricEventStream{}) + + if obj == nil { + return nil, err + } + return obj.(*zalandoorgv1.FabricEventStream), err +} + +// Update takes the representation of a fabricEventStream and updates it. Returns the server's representation of the fabricEventStream, and an error, if there is any. +func (c *FakeFabricEventStreams) Update(ctx context.Context, fabricEventStream *zalandoorgv1.FabricEventStream, opts v1.UpdateOptions) (result *zalandoorgv1.FabricEventStream, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(fabriceventstreamsResource, c.ns, fabricEventStream), &zalandoorgv1.FabricEventStream{}) + + if obj == nil { + return nil, err + } + return obj.(*zalandoorgv1.FabricEventStream), err +} + +// Delete takes name of the fabricEventStream and deletes it. Returns an error if one occurs. +func (c *FakeFabricEventStreams) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(fabriceventstreamsResource, c.ns, name), &zalandoorgv1.FabricEventStream{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeFabricEventStreams) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(fabriceventstreamsResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &zalandoorgv1.FabricEventStreamList{}) + return err +} + +// Patch applies the patch and returns the patched fabricEventStream. +func (c *FakeFabricEventStreams) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *zalandoorgv1.FabricEventStream, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(fabriceventstreamsResource, c.ns, name, pt, data, subresources...), &zalandoorgv1.FabricEventStream{}) + + if obj == nil { + return nil, err + } + return obj.(*zalandoorgv1.FabricEventStream), err +} diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_zalando.org_client.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_zalando.org_client.go new file mode 100644 index 000000000..17786c9a7 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/fake/fake_zalando.org_client.go @@ -0,0 +1,46 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + v1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1" + rest "k8s.io/client-go/rest" + testing "k8s.io/client-go/testing" +) + +type FakeZalandoV1 struct { + *testing.Fake +} + +func (c *FakeZalandoV1) FabricEventStreams(namespace string) v1.FabricEventStreamInterface { + return &FakeFabricEventStreams{c, namespace} +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *FakeZalandoV1) RESTClient() rest.Interface { + var ret *rest.RESTClient + return ret +} diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/generated_expansion.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/generated_expansion.go new file mode 100644 index 000000000..6a776af84 --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/generated_expansion.go @@ -0,0 +1,27 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1 + +type FabricEventStreamExpansion interface{} diff --git a/pkg/generated/clientset/versioned/typed/zalando.org/v1/zalando.org_client.go b/pkg/generated/clientset/versioned/typed/zalando.org/v1/zalando.org_client.go new file mode 100644 index 000000000..937fd6b7e --- /dev/null +++ b/pkg/generated/clientset/versioned/typed/zalando.org/v1/zalando.org_client.go @@ -0,0 +1,95 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1 + +import ( + v1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/scheme" + rest "k8s.io/client-go/rest" +) + +type ZalandoV1Interface interface { + RESTClient() rest.Interface + FabricEventStreamsGetter +} + +// ZalandoV1Client is used to interact with features provided by the zalando.org group. +type ZalandoV1Client struct { + restClient rest.Interface +} + +func (c *ZalandoV1Client) FabricEventStreams(namespace string) FabricEventStreamInterface { + return newFabricEventStreams(c, namespace) +} + +// NewForConfig creates a new ZalandoV1Client for the given config. +func NewForConfig(c *rest.Config) (*ZalandoV1Client, error) { + config := *c + if err := setConfigDefaults(&config); err != nil { + return nil, err + } + client, err := rest.RESTClientFor(&config) + if err != nil { + return nil, err + } + return &ZalandoV1Client{client}, nil +} + +// NewForConfigOrDie creates a new ZalandoV1Client for the given config and +// panics if there is an error in the config. +func NewForConfigOrDie(c *rest.Config) *ZalandoV1Client { + client, err := NewForConfig(c) + if err != nil { + panic(err) + } + return client +} + +// New creates a new ZalandoV1Client for the given RESTClient. +func New(c rest.Interface) *ZalandoV1Client { + return &ZalandoV1Client{c} +} + +func setConfigDefaults(config *rest.Config) error { + gv := v1.SchemeGroupVersion + config.GroupVersion = &gv + config.APIPath = "/apis" + config.NegotiatedSerializer = scheme.Codecs.WithoutConversion() + + if config.UserAgent == "" { + config.UserAgent = rest.DefaultKubernetesUserAgent() + } + + return nil +} + +// RESTClient returns a RESTClient that is used to communicate +// with API server by this client implementation. +func (c *ZalandoV1Client) RESTClient() rest.Interface { + if c == nil { + return nil + } + return c.restClient +} diff --git a/pkg/generated/informers/externalversions/factory.go b/pkg/generated/informers/externalversions/factory.go index 20a7f415f..0842aa5ae 100644 --- a/pkg/generated/informers/externalversions/factory.go +++ b/pkg/generated/informers/externalversions/factory.go @@ -32,6 +32,7 @@ import ( versioned "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" acidzalando "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/acid.zalan.do" internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces" + zalandoorg "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/zalando.org" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" schema "k8s.io/apimachinery/pkg/runtime/schema" @@ -179,8 +180,13 @@ type SharedInformerFactory interface { WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Acid() acidzalando.Interface + Zalando() zalandoorg.Interface } func (f *sharedInformerFactory) Acid() acidzalando.Interface { return acidzalando.New(f, f.namespace, f.tweakListOptions) } + +func (f *sharedInformerFactory) Zalando() zalandoorg.Interface { + return zalandoorg.New(f, f.namespace, f.tweakListOptions) +} diff --git a/pkg/generated/informers/externalversions/generic.go b/pkg/generated/informers/externalversions/generic.go index 652ba0f77..afa4c6ac2 100644 --- a/pkg/generated/informers/externalversions/generic.go +++ b/pkg/generated/informers/externalversions/generic.go @@ -28,6 +28,7 @@ import ( "fmt" v1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" + zalandoorgv1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" schema "k8s.io/apimachinery/pkg/runtime/schema" cache "k8s.io/client-go/tools/cache" ) @@ -64,6 +65,10 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource case v1.SchemeGroupVersion.WithResource("postgresqls"): return &genericInformer{resource: resource.GroupResource(), informer: f.Acid().V1().Postgresqls().Informer()}, nil + // Group=zalando.org, Version=v1 + case zalandoorgv1.SchemeGroupVersion.WithResource("fabriceventstreams"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Zalando().V1().FabricEventStreams().Informer()}, nil + } return nil, fmt.Errorf("no informer found for %v", resource) diff --git a/pkg/generated/informers/externalversions/zalando.org/interface.go b/pkg/generated/informers/externalversions/zalando.org/interface.go new file mode 100644 index 000000000..afece8ac3 --- /dev/null +++ b/pkg/generated/informers/externalversions/zalando.org/interface.go @@ -0,0 +1,52 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package zalando + +import ( + internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces" + v1 "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/zalando.org/v1" +) + +// Interface provides access to each of this group's versions. +type Interface interface { + // V1 provides access to shared informers for resources in V1. + V1() v1.Interface +} + +type group struct { + factory internalinterfaces.SharedInformerFactory + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// New returns a new Interface. +func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { + return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} +} + +// V1 returns a new v1.Interface. +func (g *group) V1() v1.Interface { + return v1.New(g.factory, g.namespace, g.tweakListOptions) +} diff --git a/pkg/generated/informers/externalversions/zalando.org/v1/fabriceventstream.go b/pkg/generated/informers/externalversions/zalando.org/v1/fabriceventstream.go new file mode 100644 index 000000000..13733e1d1 --- /dev/null +++ b/pkg/generated/informers/externalversions/zalando.org/v1/fabriceventstream.go @@ -0,0 +1,96 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1 + +import ( + "context" + time "time" + + zalandoorgv1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + versioned "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" + internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces" + v1 "github.com/zalando/postgres-operator/pkg/generated/listers/zalando.org/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// FabricEventStreamInformer provides access to a shared informer and lister for +// FabricEventStreams. +type FabricEventStreamInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1.FabricEventStreamLister +} + +type fabricEventStreamInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewFabricEventStreamInformer constructs a new informer for FabricEventStream type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFabricEventStreamInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredFabricEventStreamInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredFabricEventStreamInformer constructs a new informer for FabricEventStream type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredFabricEventStreamInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ZalandoV1().FabricEventStreams(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ZalandoV1().FabricEventStreams(namespace).Watch(context.TODO(), options) + }, + }, + &zalandoorgv1.FabricEventStream{}, + resyncPeriod, + indexers, + ) +} + +func (f *fabricEventStreamInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredFabricEventStreamInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *fabricEventStreamInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&zalandoorgv1.FabricEventStream{}, f.defaultInformer) +} + +func (f *fabricEventStreamInformer) Lister() v1.FabricEventStreamLister { + return v1.NewFabricEventStreamLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/informers/externalversions/zalando.org/v1/interface.go b/pkg/generated/informers/externalversions/zalando.org/v1/interface.go new file mode 100644 index 000000000..329f15503 --- /dev/null +++ b/pkg/generated/informers/externalversions/zalando.org/v1/interface.go @@ -0,0 +1,51 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1 + +import ( + internalinterfaces "github.com/zalando/postgres-operator/pkg/generated/informers/externalversions/internalinterfaces" +) + +// Interface provides access to all the informers in this group version. +type Interface interface { + // FabricEventStreams returns a FabricEventStreamInformer. + FabricEventStreams() FabricEventStreamInformer +} + +type version struct { + factory internalinterfaces.SharedInformerFactory + namespace string + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// New returns a new Interface. +func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface { + return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} +} + +// FabricEventStreams returns a FabricEventStreamInformer. +func (v *version) FabricEventStreams() FabricEventStreamInformer { + return &fabricEventStreamInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} diff --git a/pkg/generated/listers/zalando.org/v1/expansion_generated.go b/pkg/generated/listers/zalando.org/v1/expansion_generated.go new file mode 100644 index 000000000..5a90e7828 --- /dev/null +++ b/pkg/generated/listers/zalando.org/v1/expansion_generated.go @@ -0,0 +1,33 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1 + +// FabricEventStreamListerExpansion allows custom methods to be added to +// FabricEventStreamLister. +type FabricEventStreamListerExpansion interface{} + +// FabricEventStreamNamespaceListerExpansion allows custom methods to be added to +// FabricEventStreamNamespaceLister. +type FabricEventStreamNamespaceListerExpansion interface{} diff --git a/pkg/generated/listers/zalando.org/v1/fabriceventstream.go b/pkg/generated/listers/zalando.org/v1/fabriceventstream.go new file mode 100644 index 000000000..b4c895c74 --- /dev/null +++ b/pkg/generated/listers/zalando.org/v1/fabriceventstream.go @@ -0,0 +1,105 @@ +/* +Copyright 2022 Compose, Zalando SE + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1 + +import ( + v1 "github.com/zalando/postgres-operator/pkg/apis/zalando.org/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// FabricEventStreamLister helps list FabricEventStreams. +// All objects returned here must be treated as read-only. +type FabricEventStreamLister interface { + // List lists all FabricEventStreams in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1.FabricEventStream, err error) + // FabricEventStreams returns an object that can list and get FabricEventStreams. + FabricEventStreams(namespace string) FabricEventStreamNamespaceLister + FabricEventStreamListerExpansion +} + +// fabricEventStreamLister implements the FabricEventStreamLister interface. +type fabricEventStreamLister struct { + indexer cache.Indexer +} + +// NewFabricEventStreamLister returns a new FabricEventStreamLister. +func NewFabricEventStreamLister(indexer cache.Indexer) FabricEventStreamLister { + return &fabricEventStreamLister{indexer: indexer} +} + +// List lists all FabricEventStreams in the indexer. +func (s *fabricEventStreamLister) List(selector labels.Selector) (ret []*v1.FabricEventStream, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1.FabricEventStream)) + }) + return ret, err +} + +// FabricEventStreams returns an object that can list and get FabricEventStreams. +func (s *fabricEventStreamLister) FabricEventStreams(namespace string) FabricEventStreamNamespaceLister { + return fabricEventStreamNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// FabricEventStreamNamespaceLister helps list and get FabricEventStreams. +// All objects returned here must be treated as read-only. +type FabricEventStreamNamespaceLister interface { + // List lists all FabricEventStreams in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1.FabricEventStream, err error) + // Get retrieves the FabricEventStream from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1.FabricEventStream, error) + FabricEventStreamNamespaceListerExpansion +} + +// fabricEventStreamNamespaceLister implements the FabricEventStreamNamespaceLister +// interface. +type fabricEventStreamNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all FabricEventStreams in the indexer for a given namespace. +func (s fabricEventStreamNamespaceLister) List(selector labels.Selector) (ret []*v1.FabricEventStream, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1.FabricEventStream)) + }) + return ret, err +} + +// Get retrieves the FabricEventStream from the indexer for a given namespace and name. +func (s fabricEventStreamNamespaceLister) Get(name string) (*v1.FabricEventStream, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1.Resource("fabriceventstream"), name) + } + return obj.(*v1.FabricEventStream), nil +} diff --git a/pkg/util/constants/streams.go b/pkg/util/constants/streams.go new file mode 100644 index 000000000..bd70b719b --- /dev/null +++ b/pkg/util/constants/streams.go @@ -0,0 +1,14 @@ +package constants + +// PostgreSQL specific constants +const ( + EventStreamCRDApiVersion = "zalando.org/v1" + EventStreamCRDKind = "FabricEventStream" + EventStreamCRDName = "fabriceventstreams.zalando.org" + EventStreamSourcePGType = "PostgresLogicalReplication" + EventStreamSourceSlotPrefix = "fes" + EventStreamSourcePluginType = "pgoutput" + EventStreamSourceAuthType = "DatabaseAuthenticationSecret" + EventStreamFlowPgGenericType = "PostgresWalToGenericNakadiEvent" + EventStreamSinkNakadiType = "Nakadi" +) diff --git a/pkg/util/k8sutil/k8sutil.go b/pkg/util/k8sutil/k8sutil.go index dd6ec1e8b..0897777ee 100644 --- a/pkg/util/k8sutil/k8sutil.go +++ b/pkg/util/k8sutil/k8sutil.go @@ -12,8 +12,9 @@ import ( clientbatchv1beta1 "k8s.io/client-go/kubernetes/typed/batch/v1beta1" apiacidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1" - acidv1client "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" + zalandoclient "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned" acidv1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/acid.zalan.do/v1" + zalandov1 "github.com/zalando/postgres-operator/pkg/generated/clientset/versioned/typed/zalando.org/v1" "github.com/zalando/postgres-operator/pkg/spec" apiappsv1 "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" @@ -58,9 +59,11 @@ type KubernetesClient struct { acidv1.OperatorConfigurationsGetter acidv1.PostgresTeamsGetter acidv1.PostgresqlsGetter + zalandov1.FabricEventStreamsGetter - RESTClient rest.Interface - AcidV1ClientSet *acidv1client.Clientset + RESTClient rest.Interface + AcidV1ClientSet *zalandoclient.Clientset + Zalandov1ClientSet *zalandoclient.Clientset } type mockSecret struct { @@ -158,14 +161,19 @@ func NewFromConfig(cfg *rest.Config) (KubernetesClient, error) { kubeClient.CustomResourceDefinitionsGetter = apiextClient.ApiextensionsV1() - kubeClient.AcidV1ClientSet = acidv1client.NewForConfigOrDie(cfg) + kubeClient.AcidV1ClientSet = zalandoclient.NewForConfigOrDie(cfg) if err != nil { return kubeClient, fmt.Errorf("could not create acid.zalan.do clientset: %v", err) } + kubeClient.Zalandov1ClientSet = zalandoclient.NewForConfigOrDie(cfg) + if err != nil { + return kubeClient, fmt.Errorf("could not create zalando.org clientset: %v", err) + } kubeClient.OperatorConfigurationsGetter = kubeClient.AcidV1ClientSet.AcidV1() kubeClient.PostgresTeamsGetter = kubeClient.AcidV1ClientSet.AcidV1() kubeClient.PostgresqlsGetter = kubeClient.AcidV1ClientSet.AcidV1() + kubeClient.FabricEventStreamsGetter = kubeClient.Zalandov1ClientSet.ZalandoV1() return kubeClient, nil } diff --git a/pkg/util/patroni/patroni_test.go b/pkg/util/patroni/patroni_test.go index 614f77828..60f289c6f 100644 --- a/pkg/util/patroni/patroni_test.go +++ b/pkg/util/patroni/patroni_test.go @@ -185,7 +185,7 @@ func TestGetConfig(t *testing.T) { Slots: map[string]map[string]string{ "cdc": { "database": "foo", - "plugin": "wal2json", + "plugin": "pgoutput", "type": "logical", }, }, @@ -218,7 +218,7 @@ func TestGetConfig(t *testing.T) { "wal_log_hints": "on", } - configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 100, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "hot_standby", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` + configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 100, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "hot_standby", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "pgoutput", "type": "logical"}}, "ttl": 30}` r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) response := http.Response{ @@ -253,7 +253,7 @@ func TestSetPostgresParameters(t *testing.T) { "wal_level": "logical", } - configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 50, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "logical", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "wal2json", "type": "logical"}}, "ttl": 30}` + configJson := `{"loop_wait": 10, "maximum_lag_on_failover": 33554432, "postgresql": {"parameters": {"archive_mode": "on", "archive_timeout": "1800s", "autovacuum_analyze_scale_factor": 0.02, "autovacuum_max_workers": 5, "autovacuum_vacuum_scale_factor": 0.05, "checkpoint_completion_target": 0.9, "hot_standby": "on", "log_autovacuum_min_duration": 0, "log_checkpoints": "on", "log_connections": "on", "log_disconnections": "on", "log_line_prefix": "%t [%p]: [%l-1] %c %x %d %u %a %h ", "log_lock_waits": "on", "log_min_duration_statement": 500, "log_statement": "ddl", "log_temp_files": 0, "max_connections": 50, "max_replication_slots": 10, "max_wal_senders": 10, "tcp_keepalives_idle": 900, "tcp_keepalives_interval": 100, "track_functions": "all", "wal_level": "logical", "wal_log_hints": "on"}, "use_pg_rewind": true, "use_slots": true}, "retry_timeout": 10, "slots": {"cdc": {"database": "foo", "plugin": "pgoutput", "type": "logical"}}, "ttl": 30}` r := ioutil.NopCloser(bytes.NewReader([]byte(configJson))) response := http.Response{