@@ -17,6 +17,7 @@ import (
17
17
18
18
configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
19
19
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
20
+ envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
20
21
"github.com/google/go-cmp/cmp"
21
22
"google.golang.org/grpc"
22
23
"google.golang.org/grpc/credentials/insecure"
@@ -47,32 +48,73 @@ var (
47
48
scheme = runtime .NewScheme ()
48
49
)
49
50
50
- func SKIPTestHandleRequestBody (t * testing.T ) {
51
+ func TestKubeInferenceModelRequest (t * testing.T ) {
51
52
tests := []struct {
52
- name string
53
- req * extProcPb.ProcessingRequest
54
- pods []* backend.PodMetrics
55
- models map [string ]* v1alpha1.InferenceModel
56
- wantHeaders []* configPb.HeaderValueOption
57
- wantBody []byte
58
- wantErr bool
53
+ name string
54
+ req * extProcPb.ProcessingRequest
55
+ pods []* backend.PodMetrics
56
+ wantHeaders []* configPb.HeaderValueOption
57
+ wantMetadata * structpb.Struct
58
+ wantBody []byte
59
+ wantErr bool
60
+ immediateResponse * extProcPb.ImmediateResponse
59
61
}{
60
62
{
61
- name : "success " ,
63
+ name : "select lower queue and kv cache, no active lora " ,
62
64
req : extprocutils .GenerateRequest ("my-model" ),
63
- models : map [string ]* v1alpha1.InferenceModel {
64
- "my-model" : {
65
- Spec : v1alpha1.InferenceModelSpec {
66
- ModelName : "my-model" ,
67
- TargetModels : []v1alpha1.TargetModel {
68
- {
69
- Name : "my-model-v1" ,
70
- Weight : pointer (100 ),
71
- },
65
+ // pod-1 will be picked because it has relatively low queue size and low KV cache.
66
+ pods : []* backend.PodMetrics {
67
+ {
68
+ Pod : extprocutils .FakePod (0 ),
69
+ Metrics : backend.Metrics {
70
+ WaitingQueueSize : 3 ,
71
+ KVCacheUsagePercent : 0.2 ,
72
+ },
73
+ },
74
+ {
75
+ Pod : extprocutils .FakePod (1 ),
76
+ Metrics : backend.Metrics {
77
+ WaitingQueueSize : 0 ,
78
+ KVCacheUsagePercent : 0.1 ,
79
+ },
80
+ },
81
+ {
82
+ Pod : extprocutils .FakePod (2 ),
83
+ Metrics : backend.Metrics {
84
+ WaitingQueueSize : 10 ,
85
+ KVCacheUsagePercent : 0.2 ,
86
+ },
87
+ },
88
+ },
89
+ wantHeaders : []* configPb.HeaderValueOption {
90
+ {
91
+ Header : & configPb.HeaderValue {
92
+ Key : runserver .DefaultTargetEndpointKey ,
93
+ RawValue : []byte ("address-1" ),
94
+ },
95
+ },
96
+ {
97
+ Header : & configPb.HeaderValue {
98
+ Key : "Content-Length" ,
99
+ RawValue : []byte ("76" ),
100
+ },
101
+ },
102
+ },
103
+ wantMetadata : & structpb.Struct {
104
+ Fields : map [string ]* structpb.Value {
105
+ runserver .DefaultTargetEndpointKey : {
106
+ Kind : & structpb.Value_StringValue {
107
+ StringValue : "address-1" ,
72
108
},
73
109
},
74
110
},
75
111
},
112
+ wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" my-model-12345\" ,\" prompt\" :\" hello\" ,\" temperature\" :0}" ),
113
+ wantErr : false ,
114
+ },
115
+ {
116
+ name : "select active lora, low queue" ,
117
+ req : extprocutils .GenerateRequest ("sql-lora" ),
76
118
// pod-1 will be picked because it has relatively low queue size, with the requested
77
119
// model being active, and has low KV cache.
78
120
pods : []* backend.PodMetrics {
@@ -93,8 +135,8 @@ func SKIPTestHandleRequestBody(t *testing.T) {
93
135
WaitingQueueSize : 0 ,
94
136
KVCacheUsagePercent : 0.1 ,
95
137
ActiveModels : map [string ]int {
96
- "foo" : 1 ,
97
- "my-model-v1 " : 1 ,
138
+ "foo" : 1 ,
139
+ "sql-lora-1fdg2 " : 1 ,
98
140
},
99
141
},
100
142
},
@@ -119,67 +161,67 @@ func SKIPTestHandleRequestBody(t *testing.T) {
119
161
{
120
162
Header : & configPb.HeaderValue {
121
163
Key : "Content-Length" ,
122
- RawValue : []byte ("73 " ),
164
+ RawValue : []byte ("76 " ),
123
165
},
124
166
},
125
167
},
126
- wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" my-model-v1\" ,\" prompt\" :\" hello\" ,\" temperature\" :0}" ),
127
- },
128
- }
129
-
130
- for _ , test := range tests {
131
- t .Run (test .name , func (t * testing.T ) {
132
- client , cleanup := setUpServer (t , test .pods , test .models )
133
- t .Cleanup (cleanup )
134
- want := & extProcPb.ProcessingResponse {
135
- Response : & extProcPb.ProcessingResponse_RequestBody {
136
- RequestBody : & extProcPb.BodyResponse {
137
- Response : & extProcPb.CommonResponse {
138
- HeaderMutation : & extProcPb.HeaderMutation {
139
- SetHeaders : test .wantHeaders ,
140
- },
141
- BodyMutation : & extProcPb.BodyMutation {
142
- Mutation : & extProcPb.BodyMutation_Body {
143
- Body : test .wantBody ,
144
- },
145
- },
168
+ wantMetadata : & structpb.Struct {
169
+ Fields : map [string ]* structpb.Value {
170
+ runserver .DefaultTargetEndpointKey : {
171
+ Kind : & structpb.Value_StringValue {
172
+ StringValue : "address-1" ,
146
173
},
147
174
},
148
175
},
149
- }
150
- res , err := sendRequest (t , client , test .req )
151
-
152
- if (err != nil ) != test .wantErr {
153
- t .Fatalf ("Unexpected error, got %v, want %v" , err , test .wantErr )
154
- }
155
-
156
- if diff := cmp .Diff (want , res , protocmp .Transform ()); diff != "" {
157
- t .Errorf ("Unexpected response, (-want +got): %v" , diff )
158
- }
159
- })
160
- }
161
-
162
- }
163
-
164
- func TestKubeInferenceModelRequest (t * testing.T ) {
165
- tests := []struct {
166
- name string
167
- req * extProcPb.ProcessingRequest
168
- wantHeaders []* configPb.HeaderValueOption
169
- wantMetadata * structpb.Struct
170
- wantBody []byte
171
- wantErr bool
172
- }{
176
+ },
177
+ wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" sql-lora-1fdg2\" ,\" prompt\" :\" hello\" ,\" temperature\" :0}" ),
178
+ wantErr : false ,
179
+ },
173
180
{
174
- name : "success " ,
181
+ name : "select no lora despite active model, avoid excessive queue size " ,
175
182
req : extprocutils .GenerateRequest ("sql-lora" ),
176
- // pod-1 will be picked because it has relatively low queue size, with the requested
177
- // model being active, and has low KV cache.
183
+ // pod-2 will be picked despite it NOT having the requested model being active
184
+ // as it's above the affinity for queue size. Also is critical, so we should
185
+ // still honor request despite all queues > 5
186
+ pods : []* backend.PodMetrics {
187
+ {
188
+ Pod : extprocutils .FakePod (0 ),
189
+ Metrics : backend.Metrics {
190
+ WaitingQueueSize : 10 ,
191
+ KVCacheUsagePercent : 0.2 ,
192
+ ActiveModels : map [string ]int {
193
+ "foo" : 1 ,
194
+ "bar" : 1 ,
195
+ },
196
+ },
197
+ },
198
+ {
199
+ Pod : extprocutils .FakePod (1 ),
200
+ Metrics : backend.Metrics {
201
+ WaitingQueueSize : 50 ,
202
+ KVCacheUsagePercent : 0.1 ,
203
+ ActiveModels : map [string ]int {
204
+ "foo" : 1 ,
205
+ "sql-lora-1fdg2" : 1 ,
206
+ },
207
+ },
208
+ },
209
+ {
210
+ Pod : extprocutils .FakePod (2 ),
211
+ Metrics : backend.Metrics {
212
+ WaitingQueueSize : 6 ,
213
+ KVCacheUsagePercent : 0.2 ,
214
+ ActiveModels : map [string ]int {
215
+ "foo" : 1 ,
216
+ },
217
+ },
218
+ },
219
+ },
178
220
wantHeaders : []* configPb.HeaderValueOption {
179
221
{
180
222
Header : & configPb.HeaderValue {
181
223
Key : runserver .DefaultTargetEndpointKey ,
182
- RawValue : []byte ("address-1 " ),
224
+ RawValue : []byte ("address-2 " ),
183
225
},
184
226
},
185
227
{
@@ -193,48 +235,130 @@ func TestKubeInferenceModelRequest(t *testing.T) {
193
235
Fields : map [string ]* structpb.Value {
194
236
runserver .DefaultTargetEndpointKey : {
195
237
Kind : & structpb.Value_StringValue {
196
- StringValue : "address-1 " ,
238
+ StringValue : "address-2 " ,
197
239
},
198
240
},
199
241
},
200
242
},
201
243
wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" sql-lora-1fdg2\" ,\" prompt\" :\" hello\" ,\" temperature\" :0}" ),
202
244
wantErr : false ,
203
245
},
204
- }
205
-
206
- pods := []* backend.PodMetrics {
207
246
{
208
- Pod : extprocutils .FakePod (0 ),
209
- Metrics : backend.Metrics {
210
- WaitingQueueSize : 0 ,
211
- KVCacheUsagePercent : 0.2 ,
212
- ActiveModels : map [string ]int {
213
- "foo" : 1 ,
214
- "bar" : 1 ,
247
+ name : "noncritical and all models past threshold, shed request" ,
248
+ req : extprocutils .GenerateRequest ("sql-lora-sheddable" ),
249
+ // no pods will be picked as all models are either above kv threshold,
250
+ // queue threshold, or both.
251
+ pods : []* backend.PodMetrics {
252
+ {
253
+ Pod : extprocutils .FakePod (0 ),
254
+ Metrics : backend.Metrics {
255
+ WaitingQueueSize : 6 ,
256
+ KVCacheUsagePercent : 0.2 ,
257
+ ActiveModels : map [string ]int {
258
+ "foo" : 1 ,
259
+ "bar" : 1 ,
260
+ "sql-lora-1fdg3" : 1 ,
261
+ },
262
+ },
263
+ },
264
+ {
265
+ Pod : extprocutils .FakePod (1 ),
266
+ Metrics : backend.Metrics {
267
+ WaitingQueueSize : 0 ,
268
+ KVCacheUsagePercent : 0.85 ,
269
+ ActiveModels : map [string ]int {
270
+ "foo" : 1 ,
271
+ "sql-lora-1fdg3" : 1 ,
272
+ },
273
+ },
274
+ },
275
+ {
276
+ Pod : extprocutils .FakePod (2 ),
277
+ Metrics : backend.Metrics {
278
+ WaitingQueueSize : 10 ,
279
+ KVCacheUsagePercent : 0.9 ,
280
+ ActiveModels : map [string ]int {
281
+ "foo" : 1 ,
282
+ "sql-lora-1fdg3" : 1 ,
283
+ },
284
+ },
215
285
},
216
286
},
217
- },
218
- {
219
- Pod : extprocutils .FakePod (1 ),
220
- Metrics : backend.Metrics {
221
- WaitingQueueSize : 0 ,
222
- KVCacheUsagePercent : 0.1 ,
223
- ActiveModels : map [string ]int {
224
- "foo" : 1 ,
225
- "sql-lora-1fdg2" : 1 ,
287
+ wantHeaders : []* configPb.HeaderValueOption {},
288
+ wantMetadata : & structpb.Struct {},
289
+ wantBody : []byte ("" ),
290
+ wantErr : false ,
291
+ immediateResponse : & extProcPb.ImmediateResponse {
292
+ Status : & envoyTypePb.HttpStatus {
293
+ Code : envoyTypePb .StatusCode_TooManyRequests ,
226
294
},
227
295
},
228
296
},
229
297
{
230
- Pod : extprocutils .FakePod (2 ),
231
- Metrics : backend.Metrics {
232
- WaitingQueueSize : 10 ,
233
- KVCacheUsagePercent : 0.2 ,
234
- ActiveModels : map [string ]int {
235
- "foo" : 1 ,
298
+ name : "noncritical, but one server has capacity, do not shed" ,
299
+ req : extprocutils .GenerateRequest ("sql-lora-sheddable" ),
300
+ // pod 0 will be picked as all other models are above threshold
301
+ pods : []* backend.PodMetrics {
302
+ {
303
+ Pod : extprocutils .FakePod (0 ),
304
+ Metrics : backend.Metrics {
305
+ WaitingQueueSize : 4 ,
306
+ KVCacheUsagePercent : 0.2 ,
307
+ ActiveModels : map [string ]int {
308
+ "foo" : 1 ,
309
+ "bar" : 1 ,
310
+ "sql-lora-1fdg3" : 1 ,
311
+ },
312
+ },
313
+ },
314
+ {
315
+ Pod : extprocutils .FakePod (1 ),
316
+ Metrics : backend.Metrics {
317
+ WaitingQueueSize : 0 ,
318
+ KVCacheUsagePercent : 0.85 ,
319
+ ActiveModels : map [string ]int {
320
+ "foo" : 1 ,
321
+ "sql-lora-1fdg3" : 1 ,
322
+ },
323
+ },
324
+ },
325
+ {
326
+ Pod : extprocutils .FakePod (2 ),
327
+ Metrics : backend.Metrics {
328
+ WaitingQueueSize : 10 ,
329
+ KVCacheUsagePercent : 0.9 ,
330
+ ActiveModels : map [string ]int {
331
+ "foo" : 1 ,
332
+ "sql-lora-1fdg3" : 1 ,
333
+ },
334
+ },
335
+ },
336
+ },
337
+ wantHeaders : []* configPb.HeaderValueOption {
338
+ {
339
+ Header : & configPb.HeaderValue {
340
+ Key : runserver .DefaultTargetEndpointKey ,
341
+ RawValue : []byte ("address-0" ),
342
+ },
343
+ },
344
+ {
345
+ Header : & configPb.HeaderValue {
346
+ Key : "Content-Length" ,
347
+ RawValue : []byte ("76" ),
348
+ },
236
349
},
237
350
},
351
+ wantMetadata : & structpb.Struct {
352
+ Fields : map [string ]* structpb.Value {
353
+ runserver .DefaultTargetEndpointKey : {
354
+ Kind : & structpb.Value_StringValue {
355
+ StringValue : "address-0" ,
356
+ },
357
+ },
358
+ },
359
+ },
360
+ wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" sql-lora-1fdg3\" ,\" prompt\" :\" hello\" ,\" temperature\" :0}" ),
361
+ wantErr : false ,
238
362
},
239
363
}
240
364
@@ -243,7 +367,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
243
367
244
368
for _ , test := range tests {
245
369
t .Run (test .name , func (t * testing.T ) {
246
- client , cleanup := setUpHermeticServer (t , pods )
370
+ client , cleanup := setUpHermeticServer (test . pods )
247
371
t .Cleanup (cleanup )
248
372
want := & extProcPb.ProcessingResponse {
249
373
Response : & extProcPb.ProcessingResponse_RequestBody {
@@ -264,78 +388,24 @@ func TestKubeInferenceModelRequest(t *testing.T) {
264
388
}
265
389
res , err := sendRequest (t , client , test .req )
266
390
267
- if err != nil {
268
- if ! test .wantErr {
269
- t .Errorf ("Unexpected error, got: %v, want error: %v" , err , test .wantErr )
391
+ if err != nil && ! test .wantErr {
392
+ t .Errorf ("Unexpected error, got: %v, want error: %v" , err , test .wantErr )
393
+ }
394
+ if test .immediateResponse != nil {
395
+ want = & extProcPb.ProcessingResponse {
396
+ Response : & extProcPb.ProcessingResponse_ImmediateResponse {
397
+ ImmediateResponse : test .immediateResponse ,
398
+ },
270
399
}
271
- } else if diff := cmp .Diff (want , res , protocmp .Transform ()); diff != "" {
400
+ }
401
+ if diff := cmp .Diff (want , res , protocmp .Transform ()); diff != "" {
272
402
t .Errorf ("Unexpected response, (-want +got): %v" , diff )
273
403
}
274
404
})
275
405
}
276
406
}
277
407
278
- func setUpServer (t * testing.T , pods []* backend.PodMetrics , models map [string ]* v1alpha1.InferenceModel ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
279
- t .Logf ("Setting up ExtProc server" )
280
- server := extprocutils .StartExtProc (port , time .Second , time .Second , pods , models )
281
-
282
- address := fmt .Sprintf ("localhost:%v" , port )
283
- // Create a grpc connection
284
- conn , err := grpc .NewClient (address , grpc .WithTransportCredentials (insecure .NewCredentials ()))
285
- if err != nil {
286
- log .Fatalf ("Failed to connect to %v: %v" , address , err )
287
- }
288
-
289
- ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Second )
290
- client , err = extProcPb .NewExternalProcessorClient (conn ).Process (ctx )
291
- if err != nil {
292
- log .Fatalf ("Failed to create client: %v" , err )
293
- }
294
- return client , func () {
295
- cancel ()
296
- conn .Close ()
297
- server .GracefulStop ()
298
- }
299
- }
300
-
301
- func setUpHermeticServer (t * testing.T , pods []* backend.PodMetrics ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
302
- t .Logf ("Setting up hermetic ExtProc server" )
303
- klog .InitFlags (nil )
304
- flag .Parse ()
305
- // Configure klog verbosity levels to print ext proc logs.
306
- _ = flag .Lookup ("v" ).Value .Set ("3" )
307
-
308
- // Unmarshal CRDs from file into structs
309
- manifestsPath := filepath .Join (".." , "testdata" , "inferencepool-with-model-hermetic.yaml" )
310
- docs , err := readDocuments (manifestsPath )
311
- if err != nil {
312
- log .Fatalf ("Can't read object manifests at path %v, %v" , manifestsPath , err )
313
- }
314
-
315
- for _ , doc := range docs {
316
- inferenceModel := & v1alpha1.InferenceModel {}
317
- if err = yaml .Unmarshal (doc , inferenceModel ); err != nil {
318
- log .Fatalf ("Can't unmarshal object: %v" , doc )
319
- }
320
- if inferenceModel .Kind == "InferenceModel" {
321
- t .Logf ("Creating inference model: %+v" , inferenceModel )
322
- if err := k8sClient .Create (context .Background (), inferenceModel ); err != nil {
323
- log .Fatalf ("unable to create inferenceModel %v: %v" , inferenceModel .Name , err )
324
- }
325
- }
326
- }
327
- for _ , doc := range docs {
328
- inferencePool := & v1alpha1.InferencePool {}
329
- if err = yaml .Unmarshal (doc , inferencePool ); err != nil {
330
- log .Fatalf ("Can't unmarshal object: %v" , doc )
331
- }
332
- if inferencePool .Kind == "InferencePool" {
333
- t .Logf ("Creating inference pool: %+v" , inferencePool )
334
- if err := k8sClient .Create (context .Background (), inferencePool ); err != nil {
335
- log .Fatalf ("unable to create inferencePool %v: %v" , inferencePool .Name , err )
336
- }
337
- }
338
- }
408
+ func setUpHermeticServer (pods []* backend.PodMetrics ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
339
409
340
410
ps := make (backend.PodSet )
341
411
pms := make (map [backend.Pod ]* backend.PodMetrics )
@@ -346,9 +416,6 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr
346
416
pmc := & backend.FakePodMetricsClient {Res : pms }
347
417
348
418
server := serverRunner .Start (backend .NewK8sDataStore (backend .WithPods (pods )), pmc )
349
- if err != nil {
350
- log .Fatalf ("Ext-proc failed with the err: %v" , err )
351
- }
352
419
353
420
// Wait the reconciler to populate the datastore.
354
421
time .Sleep (10 * time .Second )
@@ -408,6 +475,44 @@ func BeforeSuit() {
408
475
go func () {
409
476
serverRunner .StartManager ()
410
477
}()
478
+
479
+ klog .Info ("Setting up hermetic ExtProc server" )
480
+ klog .InitFlags (nil )
481
+ flag .Parse ()
482
+ // Configure klog verbosity levels to print ext proc logs.
483
+ _ = flag .Lookup ("v" ).Value .Set ("3" )
484
+
485
+ // Unmarshal CRDs from file into structs
486
+ manifestsPath := filepath .Join (".." , "testdata" , "inferencepool-with-model-hermetic.yaml" )
487
+ docs , err := readDocuments (manifestsPath )
488
+ if err != nil {
489
+ log .Fatalf ("Can't read object manifests at path %v, %v" , manifestsPath , err )
490
+ }
491
+
492
+ for _ , doc := range docs {
493
+ inferenceModel := & v1alpha1.InferenceModel {}
494
+ if err = yaml .Unmarshal (doc , inferenceModel ); err != nil {
495
+ log .Fatalf ("Can't unmarshal object: %v" , doc )
496
+ }
497
+ if inferenceModel .Kind == "InferenceModel" {
498
+ klog .Infof ("Creating inference model: %+v" , inferenceModel )
499
+ if err := k8sClient .Create (context .Background (), inferenceModel ); err != nil {
500
+ log .Fatalf ("unable to create inferenceModel %v: %v" , inferenceModel .Name , err )
501
+ }
502
+ }
503
+ }
504
+ for _ , doc := range docs {
505
+ inferencePool := & v1alpha1.InferencePool {}
506
+ if err = yaml .Unmarshal (doc , inferencePool ); err != nil {
507
+ log .Fatalf ("Can't unmarshal object: %v" , doc )
508
+ }
509
+ if inferencePool .Kind == "InferencePool" {
510
+ klog .Infof ("Creating inference pool: %+v" , inferencePool )
511
+ if err := k8sClient .Create (context .Background (), inferencePool ); err != nil {
512
+ log .Fatalf ("unable to create inferencePool %v: %v" , inferencePool .Name , err )
513
+ }
514
+ }
515
+ }
411
516
}
412
517
413
518
func sendRequest (t * testing.T , client extProcPb.ExternalProcessor_ProcessClient , req * extProcPb.ProcessingRequest ) (* extProcPb.ProcessingResponse , error ) {
@@ -448,6 +553,3 @@ func readDocuments(fp string) ([][]byte, error) {
448
553
}
449
554
return docs , nil
450
555
}
451
- func pointer (v int32 ) * int32 {
452
- return & v
453
- }
0 commit comments