Skip to content

create CDC event stream CRD #1570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 55 commits into from
Feb 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
1bd1296
provide event stream API
FxKu Jul 9, 2021
11f3715
check manifest settings for logical decoding before creating streams
FxKu Jul 29, 2021
70df451
intermediate commit
FxKu Jul 30, 2021
2fadb74
Merge branch 'master' into fes-support
FxKu Aug 9, 2021
eb70505
operator updates Postgres config and creates replication user
FxKu Aug 10, 2021
14e989a
update log messages
FxKu Aug 10, 2021
4862b05
fix Postgres config sync
FxKu Aug 10, 2021
1f8cfc5
small fix for setting bools in Postgres config
FxKu Aug 11, 2021
54257b8
update docs and revert codegen
FxKu Aug 17, 2021
6120ef7
name FES like the Postgres cluster
FxKu Aug 20, 2021
2726492
add delete case and fix updating streams + update unit test
FxKu Aug 23, 2021
7a84c08
remove wal type and distinguish source between nakadi and sqs
FxKu Aug 25, 2021
d2addd3
re-add 3rd type, remove callHome flow
FxKu Aug 27, 2021
79bd69d
Merge branch 'master' into fes-support
FxKu Sep 16, 2021
e4c7e36
remove sqs and a few bugs
FxKu Sep 16, 2021
a17d630
remove fields from FES api and fix update
FxKu Sep 21, 2021
223ffa7
remove streamType field
FxKu Sep 22, 2021
c5ea298
check if fes CRD exists before syncing
FxKu Sep 22, 2021
9abd6f7
avoid CRD getter in unit test
FxKu Sep 22, 2021
c8d89f9
reflect code review
FxKu Oct 1, 2021
2f4b554
resolve conflict with latest config sync
FxKu Oct 18, 2021
db76b8c
minor fix for restart TTL
FxKu Oct 18, 2021
a5acfd3
reflect latest change in restart logic
FxKu Oct 26, 2021
fa63803
minor changes for comments
FxKu Oct 26, 2021
8d52e8d
Merge branch 'master' into fes-support
FxKu Nov 25, 2021
06956d1
existing slot must use the same plugin
FxKu Nov 25, 2021
bdf87da
Merge branch 'master' into fes-support
FxKu Nov 29, 2021
837969e
Update docs/reference/cluster_manifest.md
FxKu Dec 2, 2021
bde522b
make id and payload columns configurable
FxKu Dec 2, 2021
50a8bf5
fix typo in CRD schema
FxKu Dec 2, 2021
8f2f70b
set applicationId of FES CRD by taking label from manifest
FxKu Dec 3, 2021
3804bc5
resolve conflict
FxKu Dec 3, 2021
4173085
resolve conflict
FxKu Dec 6, 2021
90d6016
sync streams only when they are defined in manifest
FxKu Dec 6, 2021
96a2da1
use defined fes CRD in unit test
FxKu Dec 7, 2021
74ee530
introduce applicationId for separate stream CRDs
FxKu Dec 8, 2021
8690cc9
add FES to RBAC in chart
FxKu Dec 9, 2021
ff1ac1d
replace dash with underscroe for slot name
FxKu Dec 14, 2021
7079d0e
fix unit test, too
FxKu Dec 15, 2021
dfec6fd
fix codegen to support both apis
FxKu Dec 15, 2021
2846495
disable streams in chart
FxKu Dec 15, 2021
a686824
move FES to v1
FxKu Jan 24, 2022
e499567
resolve conflict
FxKu Jan 24, 2022
179fcb1
switch to pgoutput plugin and let operator create publications
FxKu Jan 25, 2022
eb51428
quotes for schema and table
FxKu Jan 25, 2022
31ec300
alter publications if needed
FxKu Jan 26, 2022
2630d8e
minor fixes
FxKu Jan 26, 2022
3f0ed26
fix sync of publications
FxKu Jan 26, 2022
6682407
cosmetic changes
FxKu Jan 28, 2022
47aa7b1
Merge branch 'master' into fes-support
FxKu Feb 21, 2022
9007475
reflect code review and additional refactoring
FxKu Feb 21, 2022
34cad46
fix warning formatting
FxKu Feb 21, 2022
795cd5d
sync publication with debug messages
FxKu Feb 21, 2022
d6ed3c2
fix manifest example for password rotation
FxKu Feb 21, 2022
22d3c72
resolve conflicts
FxKu Feb 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions charts/postgres-operator/crds/postgresqls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,39 @@ spec:
type: string
gs_wal_path:
type: string
streams:
type: array
nullable: true
items:
type: object
required:
- applicationId
- database
- tables
properties:
applicationId:
type: string
batchSize:
type: integer
database:
type: string
filter:
type: object
additionalProperties:
type: string
tables:
type: object
additionalProperties:
type: object
required:
- eventType
properties:
eventType:
type: string
idColumn:
type: string
payloadColumn:
type: string
teamId:
type: string
tls:
Expand Down
16 changes: 16 additions & 0 deletions charts/postgres-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ rules:
- get
- list
- watch
# all verbs allowed for event streams
{{- if .Values.enableStreams }}
- apiGroups:
- zalando.org
resources:
- fabriceventstreams
verbs:
- create
- delete
- deletecollection
- get
- list
- patch
- update
- watch
{{- end }}
# to create or get/update CRDs when starting up
- apiGroups:
- apiextensions.k8s.io
Expand Down
3 changes: 3 additions & 0 deletions charts/postgres-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ configConnectionPooler:
connection_pooler_default_cpu_limit: "1"
connection_pooler_default_memory_limit: 100Mi

# Zalando's internal CDC stream feature
enableStreams: false

rbac:
# Specifies whether RBAC resources should be created
create: true
Expand Down
51 changes: 51 additions & 0 deletions docs/reference/cluster_manifest.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,3 +536,54 @@ Those parameters are grouped under the `tls` top-level key.
relative to the "/tls/", which is mount path of the tls secret.
If `caSecretName` is defined, the ca.crt path is relative to "/tlsca/",
otherwise to the same "/tls/".

## Change data capture streams

This sections enables change data capture (CDC) streams via Postgres'
[logical decoding](https://www.postgresql.org/docs/14/logicaldecoding.html)
feature and `pgoutput` plugin. While the Postgres operator takes responsibility
for providing the setup to publish change events, it relies on external tools
to consume them. At Zalando, we are using a workflow based on
[Debezium Connector](https://debezium.io/documentation/reference/stable/connectors/postgresql.html)
which can feed streams into Zalando’s distributed event broker [Nakadi](https://nakadi.io/)
among others.

The Postgres Operator creates custom resources for Zalando's internal CDC
operator which will be used to set up the consumer part. Each stream object
can have the following properties:

* **applicationId**
The application name to which the database and CDC belongs to. For each
set of streams with a distinct `applicationId` a separate stream CR as well
as a separate logical replication slot will be created. This means there can
be different streams in the same database and streams with the same
`applicationId` are bundled in one stream CR. The stream CR will be called
like the Postgres cluster plus "-<applicationId>" suffix. Required.

* **database**
Name of the database from where events will be published via Postgres'
logical decoding feature. The operator will take care of updating the
database configuration (setting `wal_level: logical`, creating logical
replication slots, using output plugin `pgoutput` and creating a dedicated
replication user). Required.

* **tables**
Defines a map of table names and their properties (`eventType`, `idColumn`
and `payloadColumn`). The CDC operator is following the [outbox pattern](https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/).
The application is responsible for putting events into a (JSON/B or VARCHAR)
payload column of the outbox table in the structure of the specified target
event type. The operator will create a [PUBLICATION](https://www.postgresql.org/docs/14/logical-replication-publication.html)
in Postgres for all tables specified for one `database` and `applicationId`.
The CDC operator will consume from it shortly after transactions are
committed to the outbox table. The `idColumn` will be used in telemetry for
the CDC operator. The names for `idColumn` and `payloadColumn` can be
configured. Defaults are `id` and `payload`. The target `eventType` has to
be defined. Required.

* **filter**
Streamed events can be filtered by a jsonpath expression for each table.
Optional.

* **batchSize**
Defines the size of batches in which events are consumed. Optional.
Defaults to 1.
2 changes: 1 addition & 1 deletion hack/update-codegen.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ trap "cleanup" EXIT SIGINT

bash "${CODEGEN_PKG}/generate-groups.sh" all \
"${OPERATOR_PACKAGE_ROOT}/pkg/generated" "${OPERATOR_PACKAGE_ROOT}/pkg/apis" \
"acid.zalan.do:v1" \
"acid.zalan.do:v1 zalando.org:v1" \
--go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt

cp -r "${OPERATOR_PACKAGE_ROOT}"/pkg/* "${TARGET_CODE_DIR}"
Expand Down
21 changes: 20 additions & 1 deletion manifests/complete-postgres-manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ kind: postgresql
metadata:
name: acid-test-cluster
# labels:
# application: test-app
# environment: demo
# annotations:
# "acid.zalan.do/controller": "second-operator"
Expand All @@ -17,7 +18,7 @@ spec:
- superuser
- createdb
foo_user: []
# flyway: []
# flyway: []
# usersWithSecretRotation:
# - foo_user
# usersWithInPlaceSecretRotation:
Expand Down Expand Up @@ -203,3 +204,21 @@ spec:
# operator: In
# values:
# - enabled

# Enables change data capture streams for defined database tables
# streams:
# - applicationId: test-app
# database: foo
# tables:
# data.state_pending_outbox:
# eventType: test-app.status-pending
# data.state_approved_outbox:
# eventType: test-app.status-approved
# data.orders_outbox:
# eventType: test-app.order-completed
# idColumn: o_id
# payloadColumn: o_payload
# # Optional. Filter ignores events before a certain txnId and lsn. Can be used to skip bad events
# filter:
# data.orders_outbox: "[?(@.source.txId > 500 && @.source.lsn > 123456)]"
# batchSize: 1000
14 changes: 14 additions & 0 deletions manifests/operator-service-account-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ rules:
- get
- list
- watch
# all verbs allowed for event streams (Zalando-internal feature)
# - apiGroups:
# - zalando.org
# resources:
# - fabriceventstreams
# verbs:
# - create
# - delete
# - deletecollection
# - get
# - list
# - patch
# - update
# - watch
# to create or get/update CRDs when starting up
- apiGroups:
- apiextensions.k8s.io
Expand Down
33 changes: 33 additions & 0 deletions manifests/postgresql.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,39 @@ spec:
type: string
gs_wal_path:
type: string
streams:
type: array
nullable: true
items:
type: object
required:
- applicationId
- database
- tables
properties:
applicationId:
type: string
batchSize:
type: integer
database:
type: string
filter:
type: object
additionalProperties:
type: string
tables:
type: object
additionalProperties:
type: object
required:
- eventType
properties:
eventType:
type: string
idColumn:
type: string
payloadColumn:
type: string
teamId:
type: string
tls:
Expand Down
48 changes: 48 additions & 0 deletions pkg/apis/acid.zalan.do/v1/crds.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,54 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
},
},
},
"streams": {
Type: "array",
Items: &apiextv1.JSONSchemaPropsOrArray{
Schema: &apiextv1.JSONSchemaProps{
Type: "object",
Required: []string{"applicationId", "database", "tables"},
Properties: map[string]apiextv1.JSONSchemaProps{
"applicationId": {
Type: "string",
},
"batchSize": {
Type: "integer",
},
"database": {
Type: "string",
},
"filter": {
Type: "object",
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
Schema: &apiextv1.JSONSchemaProps{
Type: "string",
},
},
},
"tables": {
Type: "object",
AdditionalProperties: &apiextv1.JSONSchemaPropsOrBool{
Schema: &apiextv1.JSONSchemaProps{
Type: "object",
Required: []string{"eventType"},
Properties: map[string]apiextv1.JSONSchemaProps{
"eventType": {
Type: "string",
},
"idColumn": {
Type: "string",
},
"payloadColumn": {
Type: "string",
},
},
},
},
},
},
},
},
},
"teamId": {
Type: "string",
},
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/acid.zalan.do/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type PostgresSpec struct {
ServiceAnnotations map[string]string `json:"serviceAnnotations,omitempty"`
TLS *TLSDescription `json:"tls,omitempty"`
AdditionalVolumes []AdditionalVolume `json:"additionalVolumes,omitempty"`
Streams []Stream `json:"streams,omitempty"`

// deprecated json tags
InitContainersOld []v1.Container `json:"init_containers,omitempty"`
Expand Down Expand Up @@ -231,3 +232,17 @@ type ConnectionPooler struct {

Resources `json:"resources,omitempty"`
}

type Stream struct {
ApplicationId string `json:"applicationId"`
Database string `json:"database"`
Tables map[string]StreamTable `json:"tables"`
Filter map[string]string `json:"filter,omitempty"`
BatchSize uint32 `json:"batchSize,omitempty"`
}

type StreamTable struct {
EventType string `json:"eventType"`
IdColumn string `json:"idColumn,omitempty" defaults:"id"`
PayloadColumn string `json:"payloadColumn,omitempty" defaults:"payload"`
}
53 changes: 53 additions & 0 deletions pkg/apis/acid.zalan.do/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions pkg/apis/zalando.org/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package zalando

const (
// GroupName is the group name for the operator CRDs
GroupName = "zalando.org"
)
Loading