Skip to content

Commit c998e50

Browse files
authored
Added controller and datastore package (#363)
* Added controller and datastore package * Fix lint
1 parent 21d0c13 commit c998e50

25 files changed

+369
-377
lines changed

Diff for: pkg/ext-proc/backend/fake.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ import (
66
"k8s.io/apimachinery/pkg/types"
77
"sigs.k8s.io/controller-runtime/pkg/log"
88
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
9+
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
910
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1011
)
1112

1213
type FakePodMetricsClient struct {
1314
Err map[types.NamespacedName]error
14-
Res map[types.NamespacedName]*PodMetrics
15+
Res map[types.NamespacedName]*datastore.PodMetrics
1516
}
1617

17-
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *PodMetrics) (*PodMetrics, error) {
18+
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error) {
1819
if err, ok := f.Err[existing.NamespacedName]; ok {
1920
return nil, err
2021
}

Diff for: pkg/ext-proc/backend/provider.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/go-logr/logr"
1010
"go.uber.org/multierr"
1111
"sigs.k8s.io/controller-runtime/pkg/log"
12+
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
1213
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1314
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1415
)
@@ -17,7 +18,7 @@ const (
1718
fetchMetricsTimeout = 5 * time.Second
1819
)
1920

20-
func NewProvider(pmc PodMetricsClient, datastore Datastore) *Provider {
21+
func NewProvider(pmc PodMetricsClient, datastore datastore.Datastore) *Provider {
2122
p := &Provider{
2223
pmc: pmc,
2324
datastore: datastore,
@@ -28,11 +29,11 @@ func NewProvider(pmc PodMetricsClient, datastore Datastore) *Provider {
2829
// Provider provides backend pods and information such as metrics.
2930
type Provider struct {
3031
pmc PodMetricsClient
31-
datastore Datastore
32+
datastore datastore.Datastore
3233
}
3334

3435
type PodMetricsClient interface {
35-
FetchMetrics(ctx context.Context, existing *PodMetrics) (*PodMetrics, error)
36+
FetchMetrics(ctx context.Context, existing *datastore.PodMetrics) (*datastore.PodMetrics, error)
3637
}
3738

3839
func (p *Provider) Init(ctx context.Context, refreshMetricsInterval, refreshPrometheusMetricsInterval time.Duration) error {
@@ -100,7 +101,7 @@ func (p *Provider) refreshMetricsOnce(logger logr.Logger) error {
100101
errCh := make(chan error)
101102
processOnePod := func(key, value any) bool {
102103
loggerTrace.Info("Pod and metric being processed", "pod", key, "metric", value)
103-
existing := value.(*PodMetrics)
104+
existing := value.(*datastore.PodMetrics)
104105
wg.Add(1)
105106
go func() {
106107
defer wg.Done()

Diff for: pkg/ext-proc/backend/provider_test.go

+24-28
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,17 @@ import (
1111
"github.com/google/go-cmp/cmp/cmpopts"
1212
"github.com/stretchr/testify/assert"
1313
"k8s.io/apimachinery/pkg/types"
14+
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
1415
)
1516

1617
var (
17-
pod1 = &PodMetrics{
18-
Pod: Pod{
18+
pod1 = &datastore.PodMetrics{
19+
Pod: datastore.Pod{
1920
NamespacedName: types.NamespacedName{
2021
Name: "pod1",
2122
},
2223
},
23-
Metrics: Metrics{
24+
Metrics: datastore.Metrics{
2425
WaitingQueueSize: 0,
2526
KVCacheUsagePercent: 0.2,
2627
MaxActiveModels: 2,
@@ -30,13 +31,13 @@ var (
3031
},
3132
},
3233
}
33-
pod2 = &PodMetrics{
34-
Pod: Pod{
34+
pod2 = &datastore.PodMetrics{
35+
Pod: datastore.Pod{
3536
NamespacedName: types.NamespacedName{
3637
Name: "pod2",
3738
},
3839
},
39-
Metrics: Metrics{
40+
Metrics: datastore.Metrics{
4041
WaitingQueueSize: 1,
4142
KVCacheUsagePercent: 0.2,
4243
MaxActiveModels: 2,
@@ -52,37 +53,33 @@ func TestProvider(t *testing.T) {
5253
tests := []struct {
5354
name string
5455
pmc PodMetricsClient
55-
datastore Datastore
56-
want []*PodMetrics
56+
datastore datastore.Datastore
57+
want []*datastore.PodMetrics
5758
}{
5859
{
5960
name: "Probing metrics success",
6061
pmc: &FakePodMetricsClient{
61-
Res: map[types.NamespacedName]*PodMetrics{
62+
Res: map[types.NamespacedName]*datastore.PodMetrics{
6263
pod1.NamespacedName: pod1,
6364
pod2.NamespacedName: pod2,
6465
},
6566
},
66-
datastore: &datastore{
67-
pods: populateMap(pod1, pod2),
68-
},
69-
want: []*PodMetrics{
67+
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
68+
want: []*datastore.PodMetrics{
7069
pod1,
7170
pod2,
7271
},
7372
},
7473
{
7574
name: "Only pods in the datastore are probed",
7675
pmc: &FakePodMetricsClient{
77-
Res: map[types.NamespacedName]*PodMetrics{
76+
Res: map[types.NamespacedName]*datastore.PodMetrics{
7877
pod1.NamespacedName: pod1,
7978
pod2.NamespacedName: pod2,
8079
},
8180
},
82-
datastore: &datastore{
83-
pods: populateMap(pod1),
84-
},
85-
want: []*PodMetrics{
81+
datastore: datastore.NewFakeDatastore(populateMap(pod1), nil, nil),
82+
want: []*datastore.PodMetrics{
8683
pod1,
8784
},
8885
},
@@ -92,19 +89,18 @@ func TestProvider(t *testing.T) {
9289
Err: map[types.NamespacedName]error{
9390
pod2.NamespacedName: errors.New("injected error"),
9491
},
95-
Res: map[types.NamespacedName]*PodMetrics{
92+
Res: map[types.NamespacedName]*datastore.PodMetrics{
9693
pod1.NamespacedName: pod1,
9794
},
9895
},
99-
datastore: &datastore{
100-
pods: populateMap(pod1, pod2),
101-
},
102-
want: []*PodMetrics{
96+
datastore: datastore.NewFakeDatastore(populateMap(pod1, pod2), nil, nil),
97+
98+
want: []*datastore.PodMetrics{
10399
pod1,
104100
// Failed to fetch pod2 metrics so it remains the default values.
105101
{
106-
Pod: Pod{NamespacedName: pod2.NamespacedName},
107-
Metrics: Metrics{
102+
Pod: datastore.Pod{NamespacedName: pod2.NamespacedName},
103+
Metrics: datastore.Metrics{
108104
WaitingQueueSize: 0,
109105
KVCacheUsagePercent: 0,
110106
MaxActiveModels: 0,
@@ -122,7 +118,7 @@ func TestProvider(t *testing.T) {
122118
_ = p.Init(ctx, time.Millisecond, time.Millisecond)
123119
assert.EventuallyWithT(t, func(t *assert.CollectT) {
124120
metrics := test.datastore.PodGetAll()
125-
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *PodMetrics) bool {
121+
diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(func(a, b *datastore.PodMetrics) bool {
126122
return a.String() < b.String()
127123
}))
128124
assert.Equal(t, "", diff, "Unexpected diff (+got/-want)")
@@ -131,10 +127,10 @@ func TestProvider(t *testing.T) {
131127
}
132128
}
133129

134-
func populateMap(pods ...*PodMetrics) *sync.Map {
130+
func populateMap(pods ...*datastore.PodMetrics) *sync.Map {
135131
newMap := &sync.Map{}
136132
for _, pod := range pods {
137-
newMap.Store(pod.NamespacedName, &PodMetrics{Pod: Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
133+
newMap.Store(pod.NamespacedName, &datastore.PodMetrics{Pod: datastore.Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
138134
}
139135
return newMap
140136
}

Diff for: pkg/ext-proc/backend/vllm/metrics.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/prometheus/common/expfmt"
1515
"go.uber.org/multierr"
1616
"sigs.k8s.io/controller-runtime/pkg/log"
17-
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
17+
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
1818
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1919
)
2020

@@ -38,8 +38,8 @@ type PodMetricsClientImpl struct{}
3838
// FetchMetrics fetches metrics from a given pod.
3939
func (p *PodMetricsClientImpl) FetchMetrics(
4040
ctx context.Context,
41-
existing *backend.PodMetrics,
42-
) (*backend.PodMetrics, error) {
41+
existing *datastore.PodMetrics,
42+
) (*datastore.PodMetrics, error) {
4343
logger := log.FromContext(ctx)
4444
loggerDefault := logger.V(logutil.DEFAULT)
4545

@@ -79,8 +79,8 @@ func (p *PodMetricsClientImpl) FetchMetrics(
7979
func promToPodMetrics(
8080
logger logr.Logger,
8181
metricFamilies map[string]*dto.MetricFamily,
82-
existing *backend.PodMetrics,
83-
) (*backend.PodMetrics, error) {
82+
existing *datastore.PodMetrics,
83+
) (*datastore.PodMetrics, error) {
8484
var errs error
8585
updated := existing.Clone()
8686
runningQueueSize, err := getLatestMetric(logger, metricFamilies, RunningQueueSizeMetricName)

Diff for: pkg/ext-proc/backend/vllm/metrics_test.go

+7-7
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77
dto "github.com/prometheus/client_model/go"
88
"github.com/stretchr/testify/assert"
99
"google.golang.org/protobuf/proto"
10-
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
10+
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
1111
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1212
)
1313

@@ -17,9 +17,9 @@ func TestPromToPodMetrics(t *testing.T) {
1717
testCases := []struct {
1818
name string
1919
metricFamilies map[string]*dto.MetricFamily
20-
expectedMetrics *backend.Metrics
20+
expectedMetrics *datastore.Metrics
2121
expectedErr error
22-
initialPodMetrics *backend.PodMetrics
22+
initialPodMetrics *datastore.PodMetrics
2323
}{
2424
{
2525
name: "all metrics available",
@@ -107,7 +107,7 @@ func TestPromToPodMetrics(t *testing.T) {
107107
},
108108
},
109109
},
110-
expectedMetrics: &backend.Metrics{
110+
expectedMetrics: &datastore.Metrics{
111111
RunningQueueSize: 15,
112112
WaitingQueueSize: 25,
113113
KVCacheUsagePercent: 0.9,
@@ -117,7 +117,7 @@ func TestPromToPodMetrics(t *testing.T) {
117117
},
118118
MaxActiveModels: 2,
119119
},
120-
initialPodMetrics: &backend.PodMetrics{},
120+
initialPodMetrics: &datastore.PodMetrics{},
121121
expectedErr: nil,
122122
},
123123
{
@@ -206,7 +206,7 @@ func TestPromToPodMetrics(t *testing.T) {
206206
},
207207
},
208208
},
209-
expectedMetrics: &backend.Metrics{
209+
expectedMetrics: &datastore.Metrics{
210210
RunningQueueSize: 15,
211211
WaitingQueueSize: 25,
212212
KVCacheUsagePercent: 0.9,
@@ -216,7 +216,7 @@ func TestPromToPodMetrics(t *testing.T) {
216216
},
217217
MaxActiveModels: 0,
218218
},
219-
initialPodMetrics: &backend.PodMetrics{},
219+
initialPodMetrics: &datastore.PodMetrics{},
220220
expectedErr: errors.New("strconv.Atoi: parsing '2a': invalid syntax"),
221221
},
222222
}

Diff for: pkg/ext-proc/backend/inferencemodel_reconciler.go renamed to pkg/ext-proc/controller/inferencemodel_reconciler.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package backend
1+
package controller
22

33
import (
44
"context"
@@ -12,14 +12,15 @@ import (
1212
"sigs.k8s.io/controller-runtime/pkg/client"
1313
"sigs.k8s.io/controller-runtime/pkg/log"
1414
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
15+
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/datastore"
1516
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1617
)
1718

1819
type InferenceModelReconciler struct {
1920
client.Client
2021
Scheme *runtime.Scheme
2122
Record record.EventRecorder
22-
Datastore Datastore
23+
Datastore datastore.Datastore
2324
PoolNamespacedName types.NamespacedName
2425
}
2526

0 commit comments

Comments
 (0)