Skip to content

Commit 2f66865

Browse files
sbarnesthorntonSam Barnes-ThorntonJorTurFerzroubalik
authored
Add beanstalkd scaler (#6081)
Signed-off-by: Sam Barnes-Thornton <[email protected]> Signed-off-by: sbarnesthornton <[email protected]> Signed-off-by: Zbynek Roubalik <[email protected]> Co-authored-by: Sam Barnes-Thornton <[email protected]> Co-authored-by: Jorge Turrado Ferrero <[email protected]> Co-authored-by: Zbynek Roubalik <[email protected]>
1 parent 5ea1eac commit 2f66865

File tree

18 files changed

+1389
-0
lines changed

18 files changed

+1389
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio
5959

6060
- **General**: Add the generateEmbeddedObjectMeta flag to generate meta properties of JobTargetRef in ScaledJob ([#5908](https://github.com/kedacore/keda/issues/5908))
6161
- **General**: Cache miss fallback in validating webhook for ScaledObjects with direct kubernetes client ([#5973](https://github.com/kedacore/keda/issues/5973))
62+
- **General**: Introduce new Beanstalkd scaler ([#5901](https://github.com/kedacore/keda/issues/5901))
6263
- **General**: Replace wildcards in RBAC objects with explicit resources and verbs ([#6129](https://github.com/kedacore/keda/pull/6129))
6364
- **Azure Pipelines Scalar**: Print warning to log when Azure DevOps API Rate Limits are (nearly) reached ([#6284](https://github.com/kedacore/keda/issues/6284))
6465
- **CloudEventSource**: Introduce ClusterCloudEventSource ([#3533](https://github.com/kedacore/keda/issues/3533))

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ require (
117117
sigs.k8s.io/controller-tools v0.15.0
118118
sigs.k8s.io/custom-metrics-apiserver v1.29.0
119119
sigs.k8s.io/kustomize/kustomize/v5 v5.4.3
120+
github.com/beanstalkd/go-beanstalk v0.2.0
120121
)
121122

122123
// Remove this when they merge the PR and cut a release https://github.com/open-policy-agent/cert-controller/pull/202

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -951,6 +951,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.30.3/go.mod h1:zwySh8fpFyXp9yOr/KVzx
951951
github.com/aws/smithy-go v1.13.0/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
952952
github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE=
953953
github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
954+
github.com/beanstalkd/go-beanstalk v0.2.0 h1:6UOJugnu47uNB2jJO/lxyDgeD1Yds7owYi1USELqexA=
955+
github.com/beanstalkd/go-beanstalk v0.2.0/go.mod h1:/G8YTyChOtpOArwLTQPY1CHB+i212+av35bkPXXj56Y=
954956
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
955957
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
956958
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=

pkg/scalers/beanstalkd_scaler.go

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
package scalers
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"net/url"
8+
"time"
9+
10+
beanstalk "github.com/beanstalkd/go-beanstalk"
11+
"github.com/go-logr/logr"
12+
"github.com/mitchellh/mapstructure"
13+
v2 "k8s.io/api/autoscaling/v2"
14+
"k8s.io/metrics/pkg/apis/external_metrics"
15+
16+
"github.com/kedacore/keda/v2/pkg/scalers/scalersconfig"
17+
"github.com/kedacore/keda/v2/pkg/util"
18+
)
19+
20+
const (
21+
beanstalkdJobsMetricName = "jobs"
22+
beanstalkdValueConfigName = "value"
23+
beanstalkdActivationValueTriggerConfigName = "activationValue"
24+
beanstalkdMetricType = "External"
25+
beanstalkdNetworkProtocol = "tcp"
26+
)
27+
28+
type BeanstalkdScaler struct {
29+
metricType v2.MetricTargetType
30+
metadata *BeanstalkdMetadata
31+
connection *beanstalk.Conn
32+
tube *beanstalk.Tube
33+
logger logr.Logger
34+
}
35+
36+
type BeanstalkdMetadata struct {
37+
Server string `keda:"name=server, order=triggerMetadata"`
38+
Tube string `keda:"name=tube, order=triggerMetadata"`
39+
Value float64 `keda:"name=value, order=triggerMetadata"`
40+
ActivationValue float64 `keda:"name=activationValue, order=triggerMetadata, optional"`
41+
IncludeDelayed bool `keda:"name=includeDelayed, order=triggerMetadata, optional"`
42+
Timeout uint `keda:"name=timeout, order=triggerMetadata, optional, default=30"`
43+
TriggerIndex int
44+
}
45+
46+
// TubeStats represents a set of tube statistics.
47+
type tubeStats struct {
48+
TotalJobs int64 `mapstructure:"total-jobs"`
49+
JobsReady int64 `mapstructure:"current-jobs-ready"`
50+
JobsReserved int64 `mapstructure:"current-jobs-reserved"`
51+
JobsUrgent int64 `mapstructure:"current-jobs-urgent"`
52+
JobsBuried int64 `mapstructure:"current-jobs-buried"`
53+
JobsDelayed int64 `mapstructure:"current-jobs-delayed"`
54+
}
55+
56+
func NewBeanstalkdScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
57+
s := &BeanstalkdScaler{}
58+
59+
metricType, err := GetMetricTargetType(config)
60+
if err != nil {
61+
return nil, fmt.Errorf("error getting scaler metric type: %w", err)
62+
}
63+
s.metricType = metricType
64+
65+
s.logger = InitializeLogger(config, "beanstalkd_scaler")
66+
67+
meta, err := parseBeanstalkdMetadata(config)
68+
if err != nil {
69+
return nil, fmt.Errorf("error parsing beanstalkd metadata: %w", err)
70+
}
71+
s.metadata = meta
72+
73+
timeout := time.Duration(s.metadata.Timeout) * time.Second
74+
75+
conn, err := beanstalk.DialTimeout(beanstalkdNetworkProtocol, s.metadata.Server, timeout)
76+
if err != nil {
77+
return nil, fmt.Errorf("error connecting to beanstalkd: %w", err)
78+
}
79+
80+
s.connection = conn
81+
82+
s.tube = beanstalk.NewTube(s.connection, meta.Tube)
83+
84+
return s, nil
85+
}
86+
87+
func parseBeanstalkdMetadata(config *scalersconfig.ScalerConfig) (*BeanstalkdMetadata, error) {
88+
meta := &BeanstalkdMetadata{}
89+
90+
meta.TriggerIndex = config.TriggerIndex
91+
if err := config.TypedConfig(meta); err != nil {
92+
return nil, fmt.Errorf("error parsing beanstalkd metadata: %w", err)
93+
}
94+
95+
return meta, nil
96+
}
97+
98+
func (s *BeanstalkdScaler) getTubeStats(ctx context.Context) (*tubeStats, error) {
99+
errCh := make(chan error)
100+
statsCh := make(chan *tubeStats)
101+
102+
go func() {
103+
rawStats, err := s.tube.Stats()
104+
if err != nil {
105+
errCh <- fmt.Errorf("error retrieving stats from beanstalkd: %w", err)
106+
}
107+
108+
var stats tubeStats
109+
err = mapstructure.WeakDecode(rawStats, &stats)
110+
if err != nil {
111+
errCh <- fmt.Errorf("error decoding stats from beanstalkd: %w", err)
112+
}
113+
114+
statsCh <- &stats
115+
}()
116+
117+
select {
118+
case err := <-errCh:
119+
if errors.Is(err, beanstalk.ErrNotFound) {
120+
s.logger.Info("tube not found, setting stats to 0")
121+
return &tubeStats{
122+
TotalJobs: 0,
123+
JobsReady: 0,
124+
JobsDelayed: 0,
125+
JobsReserved: 0,
126+
JobsUrgent: 0,
127+
JobsBuried: 0,
128+
}, nil
129+
}
130+
return nil, err
131+
case tubeStats := <-statsCh:
132+
return tubeStats, nil
133+
case <-ctx.Done():
134+
return nil, ctx.Err()
135+
}
136+
}
137+
138+
func (s *BeanstalkdScaler) GetMetricsAndActivity(ctx context.Context, metricName string) ([]external_metrics.ExternalMetricValue, bool, error) {
139+
stats, err := s.getTubeStats(ctx)
140+
if err != nil {
141+
return []external_metrics.ExternalMetricValue{}, false, fmt.Errorf("error interacting with beanstalkd: %w", err)
142+
}
143+
144+
totalJobs := stats.JobsReady + stats.JobsReserved
145+
146+
if s.metadata.IncludeDelayed {
147+
totalJobs += stats.JobsDelayed
148+
}
149+
150+
metric := GenerateMetricInMili(metricName, float64(totalJobs))
151+
isActive := float64(totalJobs) > s.metadata.ActivationValue
152+
153+
return []external_metrics.ExternalMetricValue{metric}, isActive, nil
154+
}
155+
156+
func (s *BeanstalkdScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec {
157+
externalMetric := &v2.ExternalMetricSource{
158+
Metric: v2.MetricIdentifier{
159+
Name: GenerateMetricNameWithIndex(s.metadata.TriggerIndex, util.NormalizeString(fmt.Sprintf("beanstalkd-%s", url.QueryEscape(s.metadata.Tube)))),
160+
},
161+
Target: GetMetricTargetMili(s.metricType, s.metadata.Value),
162+
}
163+
metricSpec := v2.MetricSpec{
164+
External: externalMetric, Type: beanstalkdMetricType,
165+
}
166+
167+
return []v2.MetricSpec{metricSpec}
168+
}
169+
170+
func (s *BeanstalkdScaler) Close(context.Context) error {
171+
if s.connection != nil {
172+
err := s.connection.Close()
173+
if err != nil {
174+
s.logger.Error(err, "Error closing beanstalkd connection")
175+
return err
176+
}
177+
}
178+
return nil
179+
}

0 commit comments

Comments
 (0)