Skip to content

Commit 4eb7149

Browse files
Support multiple queues at the IBMMQ scaler (#6182)
Signed-off-by: rickbrouwer <[email protected]> Signed-off-by: Rick Brouwer <[email protected]> Signed-off-by: Jorge Turrado Ferrero <[email protected]> Co-authored-by: Jorge Turrado Ferrero <[email protected]>
1 parent 1e90416 commit 4eb7149

File tree

3 files changed

+98
-41
lines changed

3 files changed

+98
-41
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ Here is an overview of all new **experimental** features:
8585
- **GitHub Scaler**: Add support to not scale on default runner labels ([#6127](https://github.com/kedacore/keda/issues/6127))
8686
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
8787
- **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214))
88+
- **IBMMQ Scaler**: Support multiple queues at the IBMMQ scaler ([#6181](https://github.com/kedacore/keda/issues/6181))
8889
- **Kafka**: Allow disabling FAST negotation when using Kerberos ([#6188](https://github.com/kedacore/keda/issues/6188))
8990
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
9091
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))

pkg/scalers/ibmmq_scaler.go

Lines changed: 87 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,19 @@ type ibmmqScaler struct {
2626
}
2727

2828
type ibmmqMetadata struct {
29-
Host string `keda:"name=host, order=triggerMetadata"`
30-
QueueName string `keda:"name=queueName, order=triggerMetadata"`
31-
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
32-
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
33-
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
34-
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
35-
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
36-
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
37-
CA string `keda:"name=ca, order=authParams, optional"`
38-
Cert string `keda:"name=cert, order=authParams, optional"`
39-
Key string `keda:"name=key, order=authParams, optional"`
40-
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
29+
Host string `keda:"name=host, order=triggerMetadata"`
30+
QueueName []string `keda:"name=queueName;queueNames, order=triggerMetadata"`
31+
QueueDepth int64 `keda:"name=queueDepth, order=triggerMetadata, default=20"`
32+
ActivationQueueDepth int64 `keda:"name=activationQueueDepth, order=triggerMetadata, default=0"`
33+
Operation string `keda:"name=operation, order=triggerMetadata, enum=max;avg;sum, default=max"`
34+
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata"`
35+
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata"`
36+
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, default=false"`
37+
TLS bool `keda:"name=tls, order=triggerMetadata, default=false"` // , deprecated=use unsafeSsl instead
38+
CA string `keda:"name=ca, order=authParams, optional"`
39+
Cert string `keda:"name=cert, order=authParams, optional"`
40+
Key string `keda:"name=key, order=authParams, optional"`
41+
KeyPassword string `keda:"name=keyPassword, order=authParams, optional"`
4142

4243
triggerIndex int
4344
}
@@ -129,54 +130,101 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (ibmmqMetadata, erro
129130
}
130131

131132
func (s *ibmmqScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
132-
queue := s.metadata.QueueName
133+
depths := make([]int64, 0, len(s.metadata.QueueName))
133134
url := s.metadata.Host
134135

135-
var requestJSON = []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queue + `", "responseParameters" : ["CURDEPTH"]}`)
136-
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(requestJSON))
136+
req, err := http.NewRequestWithContext(ctx, "POST", url, nil)
137137
if err != nil {
138-
return 0, fmt.Errorf("failed to request queue depth: %w", err)
138+
return 0, fmt.Errorf("failed to create HTTP request: %w", err)
139139
}
140140
req.Header.Set("ibm-mq-rest-csrf-token", "value")
141141
req.Header.Set("Content-Type", "application/json")
142-
143142
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
144143

145-
resp, err := s.httpClient.Do(req)
146-
if err != nil {
147-
return 0, fmt.Errorf("failed to contact MQ via REST: %w", err)
144+
for _, queueName := range s.metadata.QueueName {
145+
requestJSON := []byte(`{"type": "runCommandJSON", "command": "display", "qualifier": "qlocal", "name": "` + queueName + `", "responseParameters" : ["CURDEPTH"]}`)
146+
req.Body = io.NopCloser(bytes.NewBuffer(requestJSON))
147+
148+
resp, err := s.httpClient.Do(req)
149+
if err != nil {
150+
return 0, fmt.Errorf("failed to contact MQ via REST for queue %s: %w", queueName, err)
151+
}
152+
defer resp.Body.Close()
153+
154+
if resp.StatusCode == http.StatusUnauthorized {
155+
return 0, fmt.Errorf("authentication failed: incorrect username or password")
156+
}
157+
158+
body, err := io.ReadAll(resp.Body)
159+
if err != nil {
160+
return 0, fmt.Errorf("failed to read body of request for queue %s: %w", queueName, err)
161+
}
162+
163+
var response CommandResponse
164+
err = json.Unmarshal(body, &response)
165+
if err != nil {
166+
return 0, fmt.Errorf("failed to parse JSON for queue %s: %w", queueName, err)
167+
}
168+
169+
if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
170+
return 0, fmt.Errorf("failed to parse response from REST call for queue %s", queueName)
171+
}
172+
173+
if response.CommandResponse[0].Parameters == nil {
174+
var reason string
175+
message := strings.Join(response.CommandResponse[0].Message, " ")
176+
if message != "" {
177+
reason = fmt.Sprintf(", reason: %s", message)
178+
}
179+
return 0, fmt.Errorf("failed to get the current queue depth parameter for queue %s%s", queueName, reason)
180+
}
181+
182+
depth := int64(response.CommandResponse[0].Parameters.Curdepth)
183+
depths = append(depths, depth)
148184
}
149-
defer resp.Body.Close()
150185

151-
body, err := io.ReadAll(resp.Body)
152-
if err != nil {
153-
return 0, fmt.Errorf("failed to read body of request: %w", err)
186+
switch s.metadata.Operation {
187+
case sumOperation:
188+
return sumDepths(depths), nil
189+
case avgOperation:
190+
return avgDepths(depths), nil
191+
case maxOperation:
192+
return maxDepth(depths), nil
193+
default:
194+
return 0, nil
154195
}
196+
}
155197

156-
var response CommandResponse
157-
err = json.Unmarshal(body, &response)
158-
if err != nil {
159-
return 0, fmt.Errorf("failed to parse JSON: %w", err)
198+
func sumDepths(depths []int64) int64 {
199+
var sum int64
200+
for _, depth := range depths {
201+
sum += depth
160202
}
203+
return sum
204+
}
161205

162-
if response.CommandResponse == nil || len(response.CommandResponse) == 0 {
163-
return 0, fmt.Errorf("failed to parse response from REST call")
206+
func avgDepths(depths []int64) int64 {
207+
if len(depths) == 0 {
208+
return 0
164209
}
210+
return sumDepths(depths) / int64(len(depths))
211+
}
165212

166-
if response.CommandResponse[0].Parameters == nil {
167-
var reason string
168-
message := strings.Join(response.CommandResponse[0].Message, " ")
169-
if message != "" {
170-
reason = fmt.Sprintf(", reason: %s", message)
213+
func maxDepth(depths []int64) int64 {
214+
if len(depths) == 0 {
215+
return 0
216+
}
217+
max := depths[0]
218+
for _, depth := range depths[1:] {
219+
if depth > max {
220+
max = depth
171221
}
172-
return 0, fmt.Errorf("failed to get the current queue depth parameter%s", reason)
173222
}
174-
175-
return int64(response.CommandResponse[0].Parameters.Curdepth), nil
223+
return max
176224
}
177225

178226
func (s *ibmmqScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
179-
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName))
227+
metricName := kedautil.NormalizeString(fmt.Sprintf("ibmmq-%s", s.metadata.QueueName[0]))
180228
externalMetric := &v2.ExternalMetricSource{
181229
Metric: v2.MetricIdentifier{
182230
Name: GenerateMetricNameWithIndex(s.metadata.triggerIndex, metricName),

pkg/scalers/ibmmq_scaler_test.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{
5151
{map[string]string{}, true, map[string]string{}},
5252
// Properly formed metadata
5353
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
54+
// Properly formed metadata with 2 queues
55+
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
56+
// Properly formed metadata with 2 queues with param queueNames
57+
{map[string]string{"host": testValidMQQueueURL, "queueNames": "testQueue1, testQueue2", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
58+
// Invalid operation
59+
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue1, testQueue2", "operation": "test", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
5460
// Invalid queueDepth using a string
5561
{map[string]string{"host": testValidMQQueueURL, "queueName": "testQueue", "queueDepth": "AA"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
5662
// Invalid activationQueueDepth using a string
@@ -89,7 +95,7 @@ func TestIBMMQParseMetadata(t *testing.T) {
8995
t.Error("Expected error but got success")
9096
fmt.Println(testData)
9197
}
92-
if metadata != (ibmmqMetadata{}) && metadata.Password != "" && metadata.Password != testData.authParams["password"] {
98+
if metadata.Password != "" && metadata.Password != testData.authParams["password"] {
9399
t.Error("Expected password from configuration but found something else: ", metadata.Password)
94100
fmt.Println(testData)
95101
}
@@ -216,7 +222,9 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) {
216222

217223
scaler := ibmmqScaler{
218224
metadata: ibmmqMetadata{
219-
Host: server.URL,
225+
Host: server.URL,
226+
QueueName: []string{"TEST.QUEUE"},
227+
Operation: "max",
220228
},
221229
httpClient: server.Client(),
222230
}

0 commit comments

Comments
 (0)