From a1c2278f90582c7c9b5518e16df2b7228e44bc34 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Tue, 17 May 2022 13:28:16 -0500 Subject: [PATCH 1/6] add webhook spec to CRD --- .../templates/node.k8s.aws_terminators.yaml | 36 +++++++++++++++++++ .../values.yaml | 5 +++ 2 files changed, 41 insertions(+) diff --git a/src/charts/aws-node-termination-handler-2/templates/node.k8s.aws_terminators.yaml b/src/charts/aws-node-termination-handler-2/templates/node.k8s.aws_terminators.yaml index aa57064f..be1f33ec 100644 --- a/src/charts/aws-node-termination-handler-2/templates/node.k8s.aws_terminators.yaml +++ b/src/charts/aws-node-termination-handler-2/templates/node.k8s.aws_terminators.yaml @@ -136,6 +136,42 @@ spec: - Cordon - NoAction default: CordonAndDrain + webhook: + description: Send notification of handled events. + type: object + properties: + url: + description: URL to send notifications. + type: string + proxyURL: + description: Proxy URL to use to send notifications. + type: string + headers: + description: HTTP headers to include when sending notifications. + type: array + items: + type: object + properties: + name: + description: Header name. + type: string + value: + description: Header value. + type: string + required: + - name + - value + {{- with .Values.terminator.defaults.webhook.headers }} + default: + {{- toYaml . | nindent 22 }} + {{- end }} + template: + description: Used to generate the request payload. + type: string + {{- with .Values.terminator.defaults.webhook.template }} + default: | + {{ . }} + {{- end }} status: description: TerminatorStatus defines the observed state of Terminator type: object diff --git a/src/charts/aws-node-termination-handler-2/values.yaml b/src/charts/aws-node-termination-handler-2/values.yaml index 1eac2a8f..9923fd61 100644 --- a/src/charts/aws-node-termination-handler-2/values.yaml +++ b/src/charts/aws-node-termination-handler-2/values.yaml @@ -18,6 +18,11 @@ terminator: ignoreAllDaemonSets: true deleteEmptyDirData: true timeoutSeconds: 120 + webhook: + headers: + - name: Content-Type + value: application/json + template: '{"text":"[NTH][Instance Interruption] EventID: {{ .EventID }} - Kind: {{ .Kind }} - Instance: {{ .InstanceID }} - Node: {{ .NodeName }} - Start Time: {{ .StartTime }}"}' aws: # AWS region to use in API calls. From 853839c082a498883224f003bf2b3b95fa812cc6 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Tue, 17 May 2022 13:29:10 -0500 Subject: [PATCH 2/6] add webhook spec to Terminator --- src/api/v1alpha1/terminator_logging.go | 19 ++++++++++++++++ src/api/v1alpha1/terminator_types.go | 15 +++++++++++++ src/api/v1alpha1/terminator_validation.go | 27 +++++++++++++++++++++++ 3 files changed, 61 insertions(+) diff --git a/src/api/v1alpha1/terminator_logging.go b/src/api/v1alpha1/terminator_logging.go index 95797d21..a44c162e 100644 --- a/src/api/v1alpha1/terminator_logging.go +++ b/src/api/v1alpha1/terminator_logging.go @@ -57,3 +57,22 @@ func (e EventsSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddString("stateChange", e.StateChange) return nil } + +func (w WebhookSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("url", w.URL) + enc.AddString("proxyURL", w.ProxyURL) + + enc.AddArray("headers", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, header := range w.Headers { + enc.AppendObject(zapcore.ObjectMarshalerFunc(func(enc zapcore.ObjectEncoder) error { + enc.AddString("name", header.Name) + enc.AddString("value", header.Value) + return nil + })) + } + return nil + })) + + enc.AddString("template", w.Template) + return nil +} diff --git a/src/api/v1alpha1/terminator_types.go b/src/api/v1alpha1/terminator_types.go index 70edb4f6..5f5d27fc 100644 --- a/src/api/v1alpha1/terminator_types.go +++ b/src/api/v1alpha1/terminator_types.go @@ -34,6 +34,7 @@ type TerminatorSpec struct { SQS SQSSpec `json:"sqs,omitempty"` Drain DrainSpec `json:"drain,omitempty"` Events EventsSpec `json:"events,omitempty"` + Webhook WebhookSpec `json:"webhook,omitempty"` } // SQSSpec defines inputs to SQS "receive messages" requests. @@ -73,6 +74,20 @@ type EventsSpec struct { StateChange Action `json:"stateChange,omitempty"` } +// HeaderSpec defines the HTTP headers to include with webhook notifications. +type HeaderSpec struct { + Name string `json:"name,omitempty"` + Value string `json:"value,omitempty"` +} + +// WebhookSpec defines the configuration of webhook notifications to send when events are handled. +type WebhookSpec struct { + URL string `json:"url,omitempty"` + ProxyURL string `json:"proxyURL,omitempty"` + Headers []HeaderSpec `json:"headers,omitempty"` + Template string `json:"template,omitempty"` +} + // TerminatorStatus defines the observed state of Terminator type TerminatorStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster diff --git a/src/api/v1alpha1/terminator_validation.go b/src/api/v1alpha1/terminator_validation.go index b4e0642a..f52a4830 100644 --- a/src/api/v1alpha1/terminator_validation.go +++ b/src/api/v1alpha1/terminator_validation.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/url" + "text/template" "k8s.io/apimachinery/pkg/util/sets" @@ -44,6 +45,7 @@ func (t *TerminatorSpec) validate() (errs *apis.FieldError) { t.validateMatchLabels().ViaField("matchLabels"), t.SQS.validate().ViaField("sqs"), t.Events.validate().ViaField("events"), + t.Webhook.validate().ViaField("webhook"), ) } @@ -78,3 +80,28 @@ func (e *EventsSpec) validate() (errs *apis.FieldError) { } return errs } + +func (w *WebhookSpec) validate() (errs *apis.FieldError) { + if _, err := url.Parse(w.URL); err != nil { + errs = errs.Also(apis.ErrInvalidValue(w.URL, "url", err.Error())) + } + + if _, err := url.Parse(w.ProxyURL); err != nil && w.ProxyURL != "" { + errs = errs.Also(apis.ErrInvalidValue(w.ProxyURL, "proxyURL", "must be a valid URL")) + } + + for i, h := range w.Headers { + if h.Name != "" { + continue + } + errs = errs.Also(apis.ErrInvalidValue(h.Name, "name", "must not be empty").ViaFieldIndex("headers", i)) + } + + if w.Template == "" { + errs = errs.Also(apis.ErrInvalidValue(w.Template, "template", "must not be empty")) + } else if _, err := template.New("Validate").Parse(w.Template); err != nil { + errs = errs.Also(apis.ErrInvalidValue(w.Template, "template", err.Error())) + } + + return errs +} From 59fcef09a49f07c29563e58594ae514224e0d03e Mon Sep 17 00:00:00 2001 From: Jerad C Date: Tue, 17 May 2022 13:30:25 -0500 Subject: [PATCH 3/6] add webhook client --- src/pkg/webhook/builder.go | 79 +++++++++++++++++++++++++++++++++ src/pkg/webhook/client.go | 76 +++++++++++++++++++++++++++++++ src/pkg/webhook/notification.go | 27 +++++++++++ src/pkg/webhook/request.go | 50 +++++++++++++++++++++ 4 files changed, 232 insertions(+) create mode 100644 src/pkg/webhook/builder.go create mode 100644 src/pkg/webhook/client.go create mode 100644 src/pkg/webhook/notification.go create mode 100644 src/pkg/webhook/request.go diff --git a/src/pkg/webhook/builder.go b/src/pkg/webhook/builder.go new file mode 100644 index 00000000..e99eee52 --- /dev/null +++ b/src/pkg/webhook/builder.go @@ -0,0 +1,79 @@ +/* +Copyright 2022 Amazon.com, Inc. or its affiliates. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import ( + "fmt" + "net/http" + "net/url" + urlpkg "net/url" + templatepkg "text/template" + "time" +) + +type ( + HttpSendFunc = func(*http.Request) (*http.Response, error) + ProxyFunc = func(*http.Request) (*url.URL, error) + + ClientBuilder func(ProxyFunc) HttpSendFunc +) + +func NewHttpClientDo(proxy ProxyFunc) HttpSendFunc { + c := &http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + IdleConnTimeout: 1 * time.Second, + Proxy: proxy, + }, + } + return c.Do +} + +func (b ClientBuilder) NewClient(url, proxyURL, template string, headers http.Header) (Client, error) { + c := Client{} + + if url == "" { + return c, nil + } + + proxy := noopProxy + if proxyURL != "" { + proxyURL, err := urlpkg.Parse(proxyURL) + if err != nil { + return c, fmt.Errorf("failed to parse proxy URL: %w", err) + } + + proxy = func(_ *http.Request) (*urlpkg.URL, error) { + return proxyURL, nil + } + } + + tmpl, err := templatepkg.New("webhook").Parse(template) + if err != nil { + return c, fmt.Errorf("failed to parse template: %w", err) + } + + c.url = url + c.headers = headers + c.sendFunc = b(proxy) + c.template = tmpl + return c, nil +} + +func noopProxy(_ *http.Request) (*url.URL, error) { + return nil, nil +} diff --git a/src/pkg/webhook/client.go b/src/pkg/webhook/client.go new file mode 100644 index 00000000..b78fcc40 --- /dev/null +++ b/src/pkg/webhook/client.go @@ -0,0 +1,76 @@ +/* +Copyright 2022 Amazon.com, Inc. or its affiliates. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import ( + "bytes" + "context" + "fmt" + "net/http" + templatepkg "text/template" + + "github.com/aws/aws-node-termination-handler/pkg/logging" +) + +type Client struct { + url string + headers http.Header + sendFunc HttpSendFunc + template *templatepkg.Template +} + +func (c Client) NewRequest() Request { + return Request{sendFunc: c.send} +} + +func (c Client) send(ctx context.Context, n Notification) error { + ctx = logging.WithLogger(ctx, logging.FromContext(ctx).Named("webhook")) + + if c.url == "" { + return nil + } + + var buf bytes.Buffer + if err := c.template.Execute(&buf, n); err != nil { + msg := "failed to populate template" + logging.FromContext(ctx).With("error", err).Error(msg) + return fmt.Errorf("%s: %w", msg, err) + } + + req, err := http.NewRequest(http.MethodPost, c.url, &buf) + if err != nil { + msg := "failed to create request" + logging.FromContext(ctx).With("error", err).Error(msg) + return fmt.Errorf("%s: %w", msg, err) + } + + req.Header = c.headers + + resp, err := c.sendFunc(req) + if err != nil { + msg := "request failed" + logging.FromContext(ctx).With("error", err).Error(msg) + return fmt.Errorf("%s: %w", msg, err) + } + if resp != nil { + logging.FromContext(ctx). + With("status", resp.StatusCode). + Info("response status") + } + + return nil +} diff --git a/src/pkg/webhook/notification.go b/src/pkg/webhook/notification.go new file mode 100644 index 00000000..ef31bed7 --- /dev/null +++ b/src/pkg/webhook/notification.go @@ -0,0 +1,27 @@ +/* +Copyright 2022 Amazon.com, Inc. or its affiliates. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import "time" + +type Notification struct { + EventID string + InstanceID string + Kind string + NodeName string + StartTime time.Time +} diff --git a/src/pkg/webhook/request.go b/src/pkg/webhook/request.go new file mode 100644 index 00000000..e8d41661 --- /dev/null +++ b/src/pkg/webhook/request.go @@ -0,0 +1,50 @@ +/* +Copyright 2022 Amazon.com, Inc. or its affiliates. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package webhook + +import ( + "context" + "time" +) + +type ( + sendFuncType func(context.Context, Notification) error + + Event interface { + EventID() string + Kind() string + StartTime() time.Time + } + + Request struct { + sendFunc sendFuncType + + Event Event + InstanceID string + NodeName string + } +) + +func (r Request) Send(ctx context.Context) error { + return r.sendFunc(ctx, Notification{ + EventID: r.Event.EventID(), + InstanceID: r.InstanceID, + Kind: r.Event.Kind(), + NodeName: r.NodeName, + StartTime: r.Event.StartTime(), + }) +} From 2aa725861d834929a7de71d45910d3d5bfbc61a6 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Tue, 17 May 2022 13:31:29 -0500 Subject: [PATCH 4/6] update event handlers --- src/pkg/event/asgterminate/v1/handler.go | 13 +++++++++++-- src/pkg/event/asgterminate/v2/handler.go | 13 +++++++++++-- src/pkg/event/noop.go | 13 +++++++++++-- src/pkg/event/rebalancerecommendation/v0/handler.go | 13 +++++++++++-- src/pkg/event/scheduledchange/v1/handler.go | 13 +++++++++++-- src/pkg/event/spotinterruption/v1/handler.go | 13 +++++++++++-- src/pkg/event/statechange/v1/handler.go | 13 +++++++++++-- 7 files changed, 77 insertions(+), 14 deletions(-) diff --git a/src/pkg/event/asgterminate/v1/handler.go b/src/pkg/event/asgterminate/v1/handler.go index f386b736..421610f2 100644 --- a/src/pkg/event/asgterminate/v1/handler.go +++ b/src/pkg/event/asgterminate/v1/handler.go @@ -18,6 +18,7 @@ package v1 import ( "context" + "time" "github.com/aws/aws-node-termination-handler/pkg/event/asgterminate/lifecycleaction" "github.com/aws/aws-node-termination-handler/pkg/terminator" @@ -31,8 +32,8 @@ type EC2InstanceTerminateLifecycleAction struct { AWSEvent } -func (EC2InstanceTerminateLifecycleAction) Kind() terminator.EventKind { - return terminator.EventKinds.AutoScalingTermination +func (e EC2InstanceTerminateLifecycleAction) EventID() string { + return e.AWSEvent.ID } func (e EC2InstanceTerminateLifecycleAction) EC2InstanceIDs() []string { @@ -48,7 +49,15 @@ func (e EC2InstanceTerminateLifecycleAction) Done(ctx context.Context) (bool, er }) } +func (EC2InstanceTerminateLifecycleAction) Kind() terminator.EventKind { + return terminator.EventKinds.AutoScalingTermination +} + func (e EC2InstanceTerminateLifecycleAction) MarshalLogObject(enc zapcore.ObjectEncoder) error { zap.Inline(e.AWSEvent).AddTo(enc) return nil } + +func (e EC2InstanceTerminateLifecycleAction) StartTime() time.Time { + return e.AWSEvent.Time +} diff --git a/src/pkg/event/asgterminate/v2/handler.go b/src/pkg/event/asgterminate/v2/handler.go index 01550074..fe2edb4d 100644 --- a/src/pkg/event/asgterminate/v2/handler.go +++ b/src/pkg/event/asgterminate/v2/handler.go @@ -18,6 +18,7 @@ package v2 import ( "context" + "time" "github.com/aws/aws-node-termination-handler/pkg/event/asgterminate/lifecycleaction" "github.com/aws/aws-node-termination-handler/pkg/terminator" @@ -31,8 +32,8 @@ type EC2InstanceTerminateLifecycleAction struct { AWSEvent } -func (EC2InstanceTerminateLifecycleAction) Kind() terminator.EventKind { - return terminator.EventKinds.AutoScalingTermination +func (e EC2InstanceTerminateLifecycleAction) EventID() string { + return e.AWSEvent.ID } func (e EC2InstanceTerminateLifecycleAction) EC2InstanceIDs() []string { @@ -48,7 +49,15 @@ func (e EC2InstanceTerminateLifecycleAction) Done(ctx context.Context) (bool, er }) } +func (EC2InstanceTerminateLifecycleAction) Kind() terminator.EventKind { + return terminator.EventKinds.AutoScalingTermination +} + func (e EC2InstanceTerminateLifecycleAction) MarshalLogObject(enc zapcore.ObjectEncoder) error { zap.Inline(e.AWSEvent).AddTo(enc) return nil } + +func (e EC2InstanceTerminateLifecycleAction) StartTime() time.Time { + return e.AWSEvent.Time +} diff --git a/src/pkg/event/noop.go b/src/pkg/event/noop.go index 2f70ab78..abab3cbc 100644 --- a/src/pkg/event/noop.go +++ b/src/pkg/event/noop.go @@ -18,6 +18,7 @@ package event import ( "context" + "time" "github.com/aws/aws-node-termination-handler/pkg/terminator" @@ -27,8 +28,8 @@ import ( type noop AWSMetadata -func (noop) Kind() terminator.EventKind { - return terminator.EventKinds.Noop +func (noop) EventID() string { + return "" } func (noop) EC2InstanceIDs() []string { @@ -39,7 +40,15 @@ func (noop) Done(_ context.Context) (bool, error) { return true, nil } +func (noop) Kind() terminator.EventKind { + return terminator.EventKinds.Noop +} + func (n noop) MarshalLogObject(enc zapcore.ObjectEncoder) error { zap.Inline(AWSMetadata(n)).AddTo(enc) return nil } + +func (noop) StartTime() time.Time { + return time.Now() +} diff --git a/src/pkg/event/rebalancerecommendation/v0/handler.go b/src/pkg/event/rebalancerecommendation/v0/handler.go index 7b8dbf37..19240883 100644 --- a/src/pkg/event/rebalancerecommendation/v0/handler.go +++ b/src/pkg/event/rebalancerecommendation/v0/handler.go @@ -18,6 +18,7 @@ package v0 import ( "context" + "time" "github.com/aws/aws-node-termination-handler/pkg/terminator" @@ -27,8 +28,8 @@ import ( type EC2InstanceRebalanceRecommendation AWSEvent -func (EC2InstanceRebalanceRecommendation) Kind() terminator.EventKind { - return terminator.EventKinds.RebalanceRecommendation +func (e EC2InstanceRebalanceRecommendation) EventID() string { + return e.ID } func (e EC2InstanceRebalanceRecommendation) EC2InstanceIDs() []string { @@ -39,7 +40,15 @@ func (EC2InstanceRebalanceRecommendation) Done(_ context.Context) (bool, error) return false, nil } +func (EC2InstanceRebalanceRecommendation) Kind() terminator.EventKind { + return terminator.EventKinds.RebalanceRecommendation +} + func (e EC2InstanceRebalanceRecommendation) MarshalLogObject(enc zapcore.ObjectEncoder) error { zap.Inline(AWSEvent(e)).AddTo(enc) return nil } + +func (e EC2InstanceRebalanceRecommendation) StartTime() time.Time { + return e.Time +} diff --git a/src/pkg/event/scheduledchange/v1/handler.go b/src/pkg/event/scheduledchange/v1/handler.go index 9e93ffbd..cd83e75b 100644 --- a/src/pkg/event/scheduledchange/v1/handler.go +++ b/src/pkg/event/scheduledchange/v1/handler.go @@ -18,6 +18,7 @@ package v1 import ( "context" + "time" "github.com/aws/aws-node-termination-handler/pkg/terminator" @@ -27,8 +28,8 @@ import ( type AWSHealthEvent AWSEvent -func (AWSHealthEvent) Kind() terminator.EventKind { - return terminator.EventKinds.ScheduledChange +func (e AWSHealthEvent) EventID() string { + return e.ID } func (e AWSHealthEvent) EC2InstanceIDs() []string { @@ -43,7 +44,15 @@ func (AWSHealthEvent) Done(_ context.Context) (bool, error) { return false, nil } +func (AWSHealthEvent) Kind() terminator.EventKind { + return terminator.EventKinds.ScheduledChange +} + func (e AWSHealthEvent) MarshalLogObject(enc zapcore.ObjectEncoder) error { zap.Inline(AWSEvent(e)).AddTo(enc) return nil } + +func (e AWSHealthEvent) StartTime() time.Time { + return e.Time +} diff --git a/src/pkg/event/spotinterruption/v1/handler.go b/src/pkg/event/spotinterruption/v1/handler.go index d8da27bf..90a0d288 100644 --- a/src/pkg/event/spotinterruption/v1/handler.go +++ b/src/pkg/event/spotinterruption/v1/handler.go @@ -18,6 +18,7 @@ package v1 import ( "context" + "time" "github.com/aws/aws-node-termination-handler/pkg/terminator" @@ -27,8 +28,8 @@ import ( type EC2SpotInstanceInterruptionWarning AWSEvent -func (EC2SpotInstanceInterruptionWarning) Kind() terminator.EventKind { - return terminator.EventKinds.SpotInterruption +func (e EC2SpotInstanceInterruptionWarning) EventID() string { + return e.ID } func (e EC2SpotInstanceInterruptionWarning) EC2InstanceIDs() []string { @@ -39,7 +40,15 @@ func (EC2SpotInstanceInterruptionWarning) Done(_ context.Context) (bool, error) return false, nil } +func (EC2SpotInstanceInterruptionWarning) Kind() terminator.EventKind { + return terminator.EventKinds.SpotInterruption +} + func (e EC2SpotInstanceInterruptionWarning) MarshalLogObject(enc zapcore.ObjectEncoder) error { zap.Inline(AWSEvent(e)).AddTo(enc) return nil } + +func (e EC2SpotInstanceInterruptionWarning) StartTime() time.Time { + return e.Time +} diff --git a/src/pkg/event/statechange/v1/handler.go b/src/pkg/event/statechange/v1/handler.go index 02781679..767e02cc 100644 --- a/src/pkg/event/statechange/v1/handler.go +++ b/src/pkg/event/statechange/v1/handler.go @@ -18,6 +18,7 @@ package v1 import ( "context" + "time" "github.com/aws/aws-node-termination-handler/pkg/terminator" @@ -27,8 +28,8 @@ import ( type EC2InstanceStateChangeNotification AWSEvent -func (EC2InstanceStateChangeNotification) Kind() terminator.EventKind { - return terminator.EventKinds.StateChange +func (e EC2InstanceStateChangeNotification) EventID() string { + return e.ID } func (e EC2InstanceStateChangeNotification) EC2InstanceIDs() []string { @@ -39,7 +40,15 @@ func (EC2InstanceStateChangeNotification) Done(_ context.Context) (bool, error) return false, nil } +func (EC2InstanceStateChangeNotification) Kind() terminator.EventKind { + return terminator.EventKinds.StateChange +} + func (e EC2InstanceStateChangeNotification) MarshalLogObject(enc zapcore.ObjectEncoder) error { zap.Inline(AWSEvent(e)).AddTo(enc) return nil } + +func (e EC2InstanceStateChangeNotification) StartTime() time.Time { + return e.Time +} From f0eda14ee4ef326a6419ebb887e05cb66aa6af38 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Tue, 17 May 2022 13:32:09 -0500 Subject: [PATCH 5/6] update reconciler --- src/cmd/controller/main.go | 4 ++ src/pkg/terminator/adapter/webhookclient.go | 41 +++++++++++++++++++++ src/pkg/terminator/eventkind.go | 6 +-- src/pkg/terminator/reconciler.go | 38 ++++++++++++++++--- 4 files changed, 78 insertions(+), 11 deletions(-) create mode 100644 src/pkg/terminator/adapter/webhookclient.go diff --git a/src/cmd/controller/main.go b/src/cmd/controller/main.go index 4366cac1..f56cd7b4 100644 --- a/src/cmd/controller/main.go +++ b/src/cmd/controller/main.go @@ -57,6 +57,7 @@ import ( "github.com/aws/aws-node-termination-handler/pkg/sqsmessage" "github.com/aws/aws-node-termination-handler/pkg/terminator" terminatoradapter "github.com/aws/aws-node-termination-handler/pkg/terminator/adapter" + "github.com/aws/aws-node-termination-handler/pkg/webhook" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/ec2metadata" @@ -172,6 +173,9 @@ func main() { Drainer: kubectlcordondrainer.DefaultDrainer, }, }, + WebhookClientBuilder: terminatoradapter.WebhookClientBuilder( + webhook.ClientBuilder(webhook.NewHttpClientDo).NewClient, + ), } if err = rec.BuildController( ctrl.NewControllerManagedBy(mgr). diff --git a/src/pkg/terminator/adapter/webhookclient.go b/src/pkg/terminator/adapter/webhookclient.go new file mode 100644 index 00000000..955a7ff5 --- /dev/null +++ b/src/pkg/terminator/adapter/webhookclient.go @@ -0,0 +1,41 @@ +/* +Copyright 2022 Amazon.com, Inc. or its affiliates. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package adapter + +import ( + "net/http" + + "github.com/aws/aws-node-termination-handler/api/v1alpha1" + "github.com/aws/aws-node-termination-handler/pkg/terminator" + "github.com/aws/aws-node-termination-handler/pkg/webhook" +) + +type WebhookClientBuilder func(url, proxyURL, template string, headers http.Header) (webhook.Client, error) + +func (b WebhookClientBuilder) NewWebhookClient(terminator *v1alpha1.Terminator) (terminator.WebhookClient, error) { + headers := http.Header{} + for _, h := range terminator.Spec.Webhook.Headers { + headers.Add(h.Name, h.Value) + } + + return b( + terminator.Spec.Webhook.URL, + terminator.Spec.Webhook.ProxyURL, + terminator.Spec.Webhook.Template, + headers, + ) +} diff --git a/src/pkg/terminator/eventkind.go b/src/pkg/terminator/eventkind.go index c4546b32..26a9cd47 100644 --- a/src/pkg/terminator/eventkind.go +++ b/src/pkg/terminator/eventkind.go @@ -16,7 +16,7 @@ limitations under the License. package terminator -type EventKind string +type EventKind = string var EventKinds = struct { AutoScalingTermination, @@ -33,7 +33,3 @@ var EventKinds = struct { StateChange: EventKind("stateChange"), Noop: EventKind("noop"), } - -func (e EventKind) String() string { - return string(e) -} diff --git a/src/pkg/terminator/reconciler.go b/src/pkg/terminator/reconciler.go index cb35aebc..74f7fcf3 100644 --- a/src/pkg/terminator/reconciler.go +++ b/src/pkg/terminator/reconciler.go @@ -23,6 +23,7 @@ import ( "github.com/aws/aws-node-termination-handler/api/v1alpha1" "github.com/aws/aws-node-termination-handler/pkg/logging" + "github.com/aws/aws-node-termination-handler/pkg/webhook" "github.com/aws/aws-sdk-go/service/sqs" @@ -47,6 +48,7 @@ type ( } Event interface { + webhook.Event zapcore.ObjectMarshaler Done(context.Context) (tryAgain bool, err error) @@ -83,13 +85,22 @@ type ( Parse(context.Context, *sqs.Message) Event } + WebhookClient interface { + NewRequest() webhook.Request + } + + WebhookClientBuilder interface { + NewWebhookClient(*v1alpha1.Terminator) (WebhookClient, error) + } + Reconciler struct { + CordonDrainerBuilder + Getter NodeGetterBuilder NodeNameGetter SQSClientBuilder SQSMessageParser - CordonDrainerBuilder - Getter + WebhookClientBuilder Name string RequeueInterval time.Duration @@ -109,6 +120,11 @@ func (r Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (recon nodeGetter := r.NewNodeGetter(terminator) + webhookClient, err := r.NewWebhookClient(terminator) + if err != nil { + logging.FromContext(ctx).With("error", err).Warn("failed to initialize webhook client") + } + cordondrainer, err := r.NewCordonDrainer(terminator) if err != nil { return reconcile.Result{}, err @@ -125,7 +141,7 @@ func (r Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (recon } for _, msg := range sqsMessages { - e := r.handleMessage(ctx, msg, terminator, nodeGetter, cordondrainer, sqsClient) + e := r.handleMessage(ctx, msg, terminator, nodeGetter, cordondrainer, sqsClient, webhookClient.NewRequest()) err = multierr.Append(err, e) } @@ -135,19 +151,22 @@ func (r Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (recon return reconcile.Result{RequeueAfter: r.RequeueInterval}, nil } -func (r Reconciler) handleMessage(ctx context.Context, msg *sqs.Message, terminator *v1alpha1.Terminator, nodeGetter NodeGetter, cordondrainer CordonDrainer, sqsClient SQSClient) (err error) { +func (r Reconciler) handleMessage(ctx context.Context, msg *sqs.Message, terminator *v1alpha1.Terminator, nodeGetter NodeGetter, cordondrainer CordonDrainer, sqsClient SQSClient, webhookRequest webhook.Request) (err error) { ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("sqsMessage", logging.NewMessageMarshaler(msg))) evt := r.Parse(ctx, msg) ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("event", evt)) + webhookRequest.Event = evt + evtAction := actionForEvent(evt, terminator) ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("action", evtAction)) allInstancesHandled := true if evtAction != v1alpha1.Actions.NoAction { for _, ec2InstanceID := range evt.EC2InstanceIDs() { - instanceHandled, e := r.handleInstance(ctx, ec2InstanceID, evtAction, nodeGetter, cordondrainer) + webhookRequest.InstanceID = ec2InstanceID + instanceHandled, e := r.handleInstance(ctx, ec2InstanceID, evtAction, nodeGetter, cordondrainer, webhookRequest) err = multierr.Append(err, e) allInstancesHandled = allInstancesHandled && instanceHandled } @@ -165,7 +184,7 @@ func (r Reconciler) handleMessage(ctx context.Context, msg *sqs.Message, termina return multierr.Append(err, sqsClient.DeleteSQSMessage(ctx, msg)) } -func (r Reconciler) handleInstance(ctx context.Context, ec2InstanceID string, evtAction v1alpha1.Action, nodeGetter NodeGetter, cordondrainer CordonDrainer) (bool, error) { +func (r Reconciler) handleInstance(ctx context.Context, ec2InstanceID string, evtAction v1alpha1.Action, nodeGetter NodeGetter, cordondrainer CordonDrainer, webhookRequest webhook.Request) (bool, error) { ctx = logging.WithLogger(ctx, logging.FromContext(ctx).With("ec2InstanceID", ec2InstanceID)) nodeName, err := r.GetNodeName(ctx, ec2InstanceID) @@ -185,6 +204,13 @@ func (r Reconciler) handleInstance(ctx context.Context, ec2InstanceID string, ev return false, nil } + webhookRequest.NodeName = nodeName + if err = webhookRequest.Send(ctx); err != nil { + logging.FromContext(ctx). + With("error", err). + Error("webhook notification failed") + } + if err = cordondrainer.Cordon(ctx, node); err != nil { return true, err } From eac92610127fcc25016b0f9471058182904148d8 Mon Sep 17 00:00:00 2001 From: Jerad C Date: Tue, 17 May 2022 13:32:27 -0500 Subject: [PATCH 6/6] add tests --- src/test/reconciliation_test.go | 217 ++++++++++++++++++++++++++++++++ 1 file changed, 217 insertions(+) diff --git a/src/test/reconciliation_test.go b/src/test/reconciliation_test.go index 339e0064..97406d87 100644 --- a/src/test/reconciliation_test.go +++ b/src/test/reconciliation_test.go @@ -21,6 +21,8 @@ import ( "errors" "fmt" "io" + "io/ioutil" + "net/http" "reflect" "time" @@ -56,6 +58,7 @@ import ( "github.com/aws/aws-node-termination-handler/pkg/sqsmessage" "github.com/aws/aws-node-termination-handler/pkg/terminator" terminatoradapter "github.com/aws/aws-node-termination-handler/pkg/terminator/adapter" + "github.com/aws/aws-node-termination-handler/pkg/webhook" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -124,6 +127,8 @@ var _ = Describe("Reconciliation", func() { resizeCluster func(nodeCount uint) // Create an ASG lifecycle action state entry for an EC2 instance ID. createPendingASGLifecycleAction func(EC2InstanceID) + // Requests sent to the configured webhook. + webhookRequests []*http.Request // Name of default terminator. terminatorNamespaceName types.NamespacedName @@ -144,6 +149,7 @@ var _ = Describe("Reconciliation", func() { deleteSQSMessageFunc DeleteSQSMessageFunc cordonFunc kubectlcordondrain.CordonFunc drainFunc kubectlcordondrain.DrainFunc + webhookSendFunc webhook.HttpSendFunc ) When("the SQS queue is empty", func() { @@ -1427,6 +1433,196 @@ var _ = Describe("Reconciliation", func() { }) }) + When("the terminator has webhook configuration", func() { + const webhookURL = "http://webhook.example.aws" + webhookHeaders := []v1alpha1.HeaderSpec{{Name: "Content-Type", Value: "application/json"}} + webhookTemplate := fmt.Sprintf( + `EventID={{ .EventID }}, Kind={{ .Kind }}, InstanceID={{ .InstanceID }}, NodeName={{ .NodeName }}, StartTime={{ (.StartTime.Format "%s") }}`, + time.RFC3339, + ) + + When("the reconciliation takes no action", func() { + BeforeEach(func() { + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Webhook.URL = webhookURL + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("does not send any webhook requests", func() { + Expect(webhookRequests).To(BeEmpty()) + }) + }) + + When("the reconciliation acts on a node", func() { + const msgID = "id-123" + msgTime := time.Now().Format(time.RFC3339) + + BeforeEach(func() { + resizeCluster(3) + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "id": "%s", + "time": "%s", + "source": "aws.ec2", + "detail-type": "EC2 Spot Instance Interruption Warning", + "version": "1", + "detail": { + "instance-id": "%s" + } + }`, msgID, msgTime, instanceIDs[1])), + }) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Webhook.URL = webhookURL + terminator.Spec.Webhook.Headers = webhookHeaders + terminator.Spec.Webhook.Template = webhookTemplate + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("sends a webhook notification", func() { + Expect(webhookRequests).To(HaveLen(1)) + Expect(webhookRequests[0].Method).To(Equal(http.MethodPost)) + Expect(webhookRequests[0].URL.String()).To(Equal(webhookURL)) + Expect(webhookRequests[0].Header).To(And( + HaveLen(1), + HaveKeyWithValue("Content-Type", And( + HaveLen(1), + ContainElement("application/json"), + )))) + + Expect(ReadAll(webhookRequests[0].Body)).To(Equal(fmt.Sprintf( + "EventID=%s, Kind=spotInterruption, InstanceID=%s, NodeName=%s, StartTime=%s", + msgID, instanceIDs[1], nodeNames[1], msgTime, + ))) + }) + }) + + When("the reconciliation acts on multiple nodes", func() { + msgIDs := []string{"msg-1", "msg-2", "msg-2"} + msgBaseTime := time.Now() + msgTimes := []string{ + msgBaseTime.Add(-1 * time.Minute).Format(time.RFC3339), + msgBaseTime.Format(time.RFC3339), + msgBaseTime.Format(time.RFC3339), + } + kinds := []string{"spotInterruption", "scheduledChange", "scheduledChange"} + + BeforeEach(func() { + resizeCluster(5) + + sqsQueues[queueURL] = append(sqsQueues[queueURL], + &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "id": "%s", + "time": "%s", + "source": "aws.ec2", + "detail-type": "EC2 Spot Instance Interruption Warning", + "version": "1", + "detail": { + "instance-id": "%s" + } + }`, msgIDs[0], msgTimes[0], instanceIDs[1])), + }, + &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "id": "%s", + "time": "%s", + "source": "aws.health", + "detail-type": "AWS Health Event", + "version": "1", + "detail": { + "service": "EC2", + "eventTypeCategory": "scheduledChange", + "affectedEntities": [ + {"entityValue": "%s"}, + {"entityValue": "%s"} + ] + } + }`, msgIDs[1], msgTimes[1], instanceIDs[2], instanceIDs[3])), + }, + ) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Webhook.URL = webhookURL + terminator.Spec.Webhook.Headers = webhookHeaders + terminator.Spec.Webhook.Template = webhookTemplate + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + + It("sends a webhook notification", func() { + Expect(webhookRequests).To(HaveLen(3)) + + for i := 0; i < 3; i++ { + Expect(webhookRequests[i].Method).To(Equal(http.MethodPost), "request #%d", i) + Expect(webhookRequests[i].URL.String()).To(Equal(webhookURL), "request #%d", i) + Expect(webhookRequests[i].Header).To(And( + HaveLen(1), + HaveKeyWithValue("Content-Type", And( + HaveLen(1), + ContainElement("application/json"), + ))), + "request #%d", i) + + Expect(ReadAll(webhookRequests[i].Body)).To(Equal(fmt.Sprintf( + "EventID=%s, Kind=%s, InstanceID=%s, NodeName=%s, StartTime=%s", + msgIDs[i], kinds[i], instanceIDs[i+1], nodeNames[i+1], msgTimes[i], + )), + "request #%d", i, + ) + } + }) + }) + + When("there is an error sending the request", func() { + const msgID = "id-123" + msgTime := time.Now().Format(time.RFC3339) + + BeforeEach(func() { + resizeCluster(3) + + sqsQueues[queueURL] = append(sqsQueues[queueURL], &sqs.Message{ + ReceiptHandle: aws.String("msg-1"), + Body: aws.String(fmt.Sprintf(`{ + "id": "%s", + "time": "%s", + "source": "aws.ec2", + "detail-type": "EC2 Spot Instance Interruption Warning", + "version": "1", + "detail": { + "instance-id": "%s" + } + }`, msgID, msgTime, instanceIDs[1])), + }) + + terminator := terminators[terminatorNamespaceName] + terminator.Spec.Webhook.URL = webhookURL + terminator.Spec.Webhook.Headers = webhookHeaders + terminator.Spec.Webhook.Template = webhookTemplate + + webhookSendFunc = func(_ *http.Request) (*http.Response, error) { + return nil, errors.New("test error") + } + }) + + It("returns success and requeues the request with the reconciler's configured interval", func() { + Expect(result, err).To(HaveField("RequeueAfter", Equal(reconciler.RequeueInterval))) + }) + }) + }) + When("there is an error deleting an SQS message", func() { BeforeEach(func() { resizeCluster(3) @@ -2340,6 +2536,12 @@ var _ = Describe("Reconciliation", func() { asgLifecycleActions[instanceID] = StatePending } + webhookRequests = []*http.Request{} + webhookSendFunc = func(req *http.Request) (*http.Response, error) { + webhookRequests = append(webhookRequests, req) + return &http.Response{StatusCode: 200}, nil + } + // 2. Setup stub clients. describeEC2InstancesFunc = func(ctx aws.Context, input *ec2.DescribeInstancesInput, _ ...awsrequest.Option) (*ec2.DescribeInstancesOutput, error) { @@ -2493,6 +2695,10 @@ var _ = Describe("Reconciliation", func() { Drainer: drainer, } + newHttpClientDoFunc := func(_ webhook.ProxyFunc) webhook.HttpSendFunc { + return webhookSendFunc + } + reconciler = terminator.Reconciler{ Name: "terminator", RequeueInterval: time.Duration(10) * time.Second, @@ -2508,6 +2714,9 @@ var _ = Describe("Reconciliation", func() { CordonDrainerBuilder: terminatoradapter.CordonDrainerBuilder{ Builder: cordonDrainerBuilder, }, + WebhookClientBuilder: terminatoradapter.WebhookClientBuilder( + webhook.ClientBuilder(newHttpClientDoFunc).NewClient, + ), } }) @@ -2516,3 +2725,11 @@ var _ = Describe("Reconciliation", func() { result, err = reconciler.Reconcile(ctx, request) }) }) + +func ReadAll(r io.Reader) (string, error) { + bs, err := ioutil.ReadAll(r) + if err != nil { + return "", err + } + return string(bs), nil +}