Skip to content

NTHv2 core functionality #612

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 14, 2022
24 changes: 3 additions & 21 deletions src/api/v1alpha1/terminator_logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,13 @@ import (
)

func (t *TerminatorSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddObject("sqs", t.Sqs)
enc.AddObject("sqs", t.SQS)
enc.AddObject("drain", t.Drain)
return nil
}

func (s SqsSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddArray("attributeNames", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, attrName := range s.AttributeNames {
enc.AppendString(attrName)
}
return nil
}))

enc.AddInt64("maxNumberOfMessages", s.MaxNumberOfMessages)

enc.AddArray("messageAttributeNames", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error {
for _, attrName := range s.MessageAttributeNames {
enc.AppendString(attrName)
}
return nil
}))

enc.AddString("queueUrl", s.QueueUrl)
enc.AddInt64("visibilityTimeoutSeconds", s.VisibilityTimeoutSeconds)
enc.AddInt64("waitTimeSeconds", s.WaitTimeSeconds)
func (s SQSSpec) MarshalLogObject(enc zapcore.ObjectEncoder) error {
enc.AddString("queueURL", s.QueueURL)
return nil
}

Expand Down
13 changes: 4 additions & 9 deletions src/api/v1alpha1/terminator_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,14 @@ type TerminatorSpec struct {
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file

Sqs SqsSpec `json:"sqs,omitempty"`
SQS SQSSpec `json:"sqs,omitempty"`
Drain DrainSpec `json:"drain,omitempty"`
}

// SqsSpec defines inputs to SQS "receive messages" requests.
type SqsSpec struct {
// SQSSpec defines inputs to SQS "receive messages" requests.
type SQSSpec struct {
// https://pkg.go.dev/github.com/aws/[email protected]/service/sqs#ReceiveMessageInput
AttributeNames []string `json:"attributeNames,omitempty"`
MaxNumberOfMessages int64 `json:"maxNumberOfMessages,omitempty"`
MessageAttributeNames []string `json:"messageAttributeNames,omitempty"`
QueueUrl string `json:"queueUrl,omitempty"`
VisibilityTimeoutSeconds int64 `json:"visibilityTimeoutSeconds,omitempty"`
WaitTimeSeconds int64 `json:"waitTimeSeconds,omitempty"`
QueueURL string `json:"queueURL,omitempty"`
}

// DrainSpec defines inputs to the cordon and drain operations.
Expand Down
49 changes: 5 additions & 44 deletions src/api/v1alpha1/terminator_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package v1alpha1
import (
"context"
"net/url"
"strings"

"github.com/aws/aws-sdk-go/service/sqs"

Expand All @@ -30,7 +29,7 @@ import (

var (
// https://github.com/aws/aws-sdk-go/blob/v1.38.55/service/sqs/api.go#L3966-L3994
knownSqsAttributeNames = sets.NewString(sqs.MessageSystemAttributeName_Values()...)
knownSQSAttributeNames = sets.NewString(sqs.MessageSystemAttributeName_Values()...)
)

func (t *Terminator) Validate(_ context.Context) (errs *apis.FieldError) {
Expand All @@ -41,50 +40,12 @@ func (t *Terminator) Validate(_ context.Context) (errs *apis.FieldError) {
}

func (t *TerminatorSpec) validate() (errs *apis.FieldError) {
return t.Sqs.validate().ViaField("sqs")
return t.SQS.validate().ViaField("sqs")
}

func (s *SqsSpec) validate() (errs *apis.FieldError) {
for _, attrName := range s.AttributeNames {
if !knownSqsAttributeNames.Has(attrName) {
errs = errs.Also(apis.ErrInvalidValue(attrName, "attributeNames"))
}
func (s *SQSSpec) validate() (errs *apis.FieldError) {
if _, err := url.Parse(s.QueueURL); err != nil {
errs = errs.Also(apis.ErrInvalidValue(s.QueueURL, "queueURL", "must be a valid URL"))
}

// https://github.com/aws/aws-sdk-go/blob/v1.38.55/service/sqs/api.go#L3996-L3999
if s.MaxNumberOfMessages < 1 || 10 < s.MaxNumberOfMessages {
errs = errs.Also(apis.ErrInvalidValue(s.MaxNumberOfMessages, "maxNumberOfMessages", "must be in range 1-10"))
}

// https://github.com/aws/aws-sdk-go/blob/v1.38.55/service/sqs/api.go#L4001-L4021
//
// Simple checks are done below. More indepth checks are left to the SQS client/service.
for _, attrName := range s.MessageAttributeNames {
if len(attrName) > 256 {
errs = errs.Also(apis.ErrInvalidValue(attrName, "messageAttributeNames", "must be 256 characters or less"))
}

lcAttrName := strings.ToLower(attrName)
if strings.HasPrefix(lcAttrName, "aws") || strings.HasPrefix(lcAttrName, "amazon") {
errs = errs.Also(apis.ErrInvalidValue(attrName, "messageAttributeNames", `must not use reserved prefixes "AWS" or "Amazon"`))
}

if strings.HasPrefix(attrName, ".") || strings.HasSuffix(attrName, ".") {
errs = errs.Also(apis.ErrInvalidValue(attrName, "messageAttributeNames", "must not begin or end with a period (.)"))
}
}

if _, err := url.Parse(s.QueueUrl); err != nil {
errs = errs.Also(apis.ErrInvalidValue(s.QueueUrl, "queueUrl", "must be a valid URL"))
}

if s.VisibilityTimeoutSeconds < 0 {
errs = errs.Also(apis.ErrInvalidValue(s.VisibilityTimeoutSeconds, "visibilityTimeoutSeconds", "must be zero or greater"))
}

if s.WaitTimeSeconds < 0 {
errs = errs.Also(apis.ErrInvalidValue(s.WaitTimeSeconds, "waitTimeSeconds", "must be zero or greater"))
}

return errs
}
20 changes: 5 additions & 15 deletions src/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
{{- with .Values.aws.region }}
- name: AWS_REGION
value: {{ . | quote}}
{{- end }}
{{- with .Values.controller.env }}
{{- toYaml . | nindent 22 }}
{{- end }}
Expand Down Expand Up @@ -123,9 +127,13 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
{{- with .Values.webhook.env }}
{{- toYaml . | nindent 26 }}
{{- end }}
{{- with .Values.aws.region }}
- name: AWS_REGION
value: {{ . | quote}}
{{- end }}
{{- with .Values.webhook.env }}
{{- toYaml . | nindent 26 }}
{{- end }}
ports:
- name: https-webhook
containerPort: {{ .Values.webhook.port }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,117 +40,14 @@ spec:
description: AWS SQS queue configuration.
type: object
properties:
attributeNames:
description: |
A list of attributes that need to be returned along with each message. These
attributes include:

* All – Returns all values.

* ApproximateFirstReceiveTimestamp – Returns the time the message was
first received from the queue (epoch time (http://en.wikipedia.org/wiki/Unix_time)
in milliseconds).

* ApproximateReceiveCount – Returns the number of times a message has
been received across all queues but not deleted.

* AWSTraceHeader – Returns the AWS X-Ray trace header string.

* SenderId For an IAM user, returns the IAM user ID, for example ABCDEFGHI1JKLMNOPQ23R.
For an IAM role, returns the IAM role ID, for example ABCDE1F2GH3I4JK5LMNOP:i-a123b456.

* SentTimestamp – Returns the time the message was sent to the queue
(epoch time (http://en.wikipedia.org/wiki/Unix_time) in milliseconds).

* MessageDeduplicationId – Returns the value provided by the producer
that calls the SendMessage action.

* MessageGroupId – Returns the value provided by the producer that calls
the SendMessage action. Messages with the same MessageGroupId are returned
in sequence.

* SequenceNumber – Returns the value provided by Amazon SQS.
type: array
items:
type: string
{{- with .Values.terminator.defaults.sqs.attributeNames }}
default:
{{- toYaml . | nindent 22 }}
{{- end }}
maxNumberOfMessages:
description: |
The maximum number of messages to return. Amazon SQS never returns more messages
than this value (however, fewer messages might be returned). Valid values:
1 to 10.
type: integer
format: int64
{{- with .Values.terminator.defaults.sqs.maxNumberOfMessages }}
default: {{ . }}
{{- end }}
messageAttributeNames:
description: |
The name of the message attribute, where N is the index.

* The name can contain alphanumeric characters and the underscore (_),
hyphen (-), and period (.).

* The name is case-sensitive and must be unique among all attribute names
for the message.

* The name must not start with AWS-reserved prefixes such as AWS. or Amazon.
(or any casing variants).

* The name must not start or end with a period (.), and it should not
have periods in succession (..).

* The name can be up to 256 characters long.

When using ReceiveMessage, you can send a list of attribute names to receive,
or you can return all of the attributes by specifying All or .* in your request.
You can also use all message attributes starting with a prefix, for example
bar.*.
type: array
items:
type: string
{{- with .Values.terminator.defaults.sqs.messageAttributeNames }}
default:
{{- toYaml . | nindent 22 }}
{{- end }}
queueUrl:
queueURL:
description: |
The URL of the Amazon SQS queue from which messages are received.

* Queue URLs and names are case-sensitive.

* QueueUrl is a required field
* QueueURL is a required field
type: string
visibilityTimeoutSeconds:
description: |
The duration (in seconds) that the received messages are hidden from subsequent
retrieve requests after being retrieved by a ReceiveMessage request.
type: integer
format: int64
{{- with .Values.terminator.defaults.sqs.visibilityTimeoutSeconds }}
default: {{ . }}
{{- end }}
waitTimeSeconds:
description: |
The duration (in seconds) for which the call waits for a message to arrive
in the queue before returning. If a message is available, the call returns
sooner than WaitTimeSeconds. If no messages are available and the wait time
expires, the call returns successfully with an empty list of messages.

To avoid HTTP errors, ensure that the HTTP response timeout for ReceiveMessage
requests is longer than the WaitTimeSeconds parameter. For example, with
the Java SDK, you can set HTTP transport settings using the NettyNioAsyncHttpClient
(https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/nio/netty/NettyNioAsyncHttpClient.html)
for asynchronous clients, or the ApacheHttpClient (https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/http/apache/ApacheHttpClient.html)
for synchronous clients.
type: integer
format: int64
{{- with .Values.terminator.defaults.sqs.waitTimeSeconds }}
default: {{ . }}
{{- end }}
drain:
description: TBD
type: object
Expand Down
12 changes: 3 additions & 9 deletions src/charts/aws-node-termination-handler-2/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,16 @@ annotations: {}

terminator:
defaults:
sqs:
attributeNames:
- SentTimestamp
maxNumberOfMessages: 10
messageAttributeNames:
- All
visibilityTimeoutSeconds: 20
waitTimeSeconds: 20
drain:
force: true
gracePeriodSeconds: -1
ignoreAllDaemonSets: true
deleteEmptyDirData: true
timeoutSeconds: 120


aws:
# AWS region to use in API calls.
region: ""

# Global logging configuration.
# See https://github.com/uber-go/zap/blob/2314926ec34c23ee21f3dd4399438469668f8097/config.go#L58-L94
Expand Down
Loading