@@ -112,7 +112,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
112
112
{
113
113
Header : & configPb.HeaderValue {
114
114
Key : runserver .DefaultDestinationEndpointHintKey ,
115
- RawValue : []byte ("address-1 :8000" ),
115
+ RawValue : []byte ("192.168.1.2 :8000" ),
116
116
},
117
117
},
118
118
{
@@ -122,7 +122,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
122
122
},
123
123
},
124
124
},
125
- wantMetadata : makeMetadata ("address-1 :8000" ),
125
+ wantMetadata : makeMetadata ("192.168.1.2 :8000" ),
126
126
wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" my-model-12345\" ,\" prompt\" :\" test1\" ,\" temperature\" :0}" ),
127
127
wantMetrics : `
128
128
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
@@ -165,7 +165,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
165
165
{
166
166
Header : & configPb.HeaderValue {
167
167
Key : runserver .DefaultDestinationEndpointHintKey ,
168
- RawValue : []byte ("address-1 :8000" ),
168
+ RawValue : []byte ("192.168.1.2 :8000" ),
169
169
},
170
170
},
171
171
{
@@ -175,7 +175,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
175
175
},
176
176
},
177
177
},
178
- wantMetadata : makeMetadata ("address-1 :8000" ),
178
+ wantMetadata : makeMetadata ("192.168.1.2 :8000" ),
179
179
wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" sql-lora-1fdg2\" ,\" prompt\" :\" test2\" ,\" temperature\" :0}" ),
180
180
wantMetrics : `
181
181
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
@@ -219,7 +219,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
219
219
{
220
220
Header : & configPb.HeaderValue {
221
221
Key : runserver .DefaultDestinationEndpointHintKey ,
222
- RawValue : []byte ("address-2 :8000" ),
222
+ RawValue : []byte ("192.168.1.3 :8000" ),
223
223
},
224
224
},
225
225
{
@@ -229,7 +229,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
229
229
},
230
230
},
231
231
},
232
- wantMetadata : makeMetadata ("address-2 :8000" ),
232
+ wantMetadata : makeMetadata ("192.168.1.3 :8000" ),
233
233
wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" sql-lora-1fdg2\" ,\" prompt\" :\" test3\" ,\" temperature\" :0}" ),
234
234
wantMetrics : `
235
235
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
@@ -316,7 +316,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
316
316
{
317
317
Header : & configPb.HeaderValue {
318
318
Key : runserver .DefaultDestinationEndpointHintKey ,
319
- RawValue : []byte ("address-0 :8000" ),
319
+ RawValue : []byte ("192.168.1.1 :8000" ),
320
320
},
321
321
},
322
322
{
@@ -326,7 +326,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
326
326
},
327
327
},
328
328
},
329
- wantMetadata : makeMetadata ("address-0 :8000" ),
329
+ wantMetadata : makeMetadata ("192.168.1.1 :8000" ),
330
330
wantBody : []byte ("{\" max_tokens\" :100,\" model\" :\" sql-lora-1fdg3\" ,\" prompt\" :\" test5\" ,\" temperature\" :0}" ),
331
331
wantMetrics : `
332
332
# HELP inference_model_request_total [ALPHA] Counter of inference model requests broken out for each model and target model.
@@ -343,7 +343,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
343
343
344
344
for _ , test := range tests {
345
345
t .Run (test .name , func (t * testing.T ) {
346
- client , cleanup := setUpHermeticServer (test .pods )
346
+ client , cleanup := setUpHermeticServer (t , test .pods )
347
347
t .Cleanup (cleanup )
348
348
want := & extProcPb.ProcessingResponse {
349
349
Response : & extProcPb.ProcessingResponse_RequestBody {
@@ -389,31 +389,52 @@ func TestKubeInferenceModelRequest(t *testing.T) {
389
389
}
390
390
}
391
391
392
- func setUpHermeticServer (podMetrics []* datastore.PodMetrics ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
392
+ func setUpHermeticServer (t * testing. T , podMetrics []* datastore.PodMetrics ) (client extProcPb.ExternalProcessor_ProcessClient , cleanup func ()) {
393
393
pms := make (map [types.NamespacedName ]* datastore.PodMetrics )
394
394
for _ , pm := range podMetrics {
395
395
pms [pm .NamespacedName ] = pm
396
396
}
397
397
pmc := & backend.FakePodMetricsClient {Res : pms }
398
398
399
399
serverCtx , stopServer := context .WithCancel (context .Background ())
400
- go func () {
401
- serverRunner .Datastore .PodDeleteAll ()
402
- for _ , pm := range podMetrics {
403
- pod := utiltesting .MakePod (pm .NamespacedName .Name ).
404
- Namespace (pm .NamespacedName .Namespace ).
405
- ReadyCondition ().
406
- IP (pm .Address ).
407
- ObjRef ()
408
- serverRunner .Datastore .PodUpdateOrAddIfNotExist (pod )
409
- serverRunner .Datastore .PodUpdateMetricsIfExist (pm .NamespacedName , & pm .Metrics )
400
+
401
+ // TODO: this should be consistent with the inference pool
402
+ podLabels := map [string ]string {
403
+ "app" : "vllm-llama2-7b-pool" ,
404
+ }
405
+
406
+ for _ , pm := range podMetrics {
407
+ pod := utiltesting .MakePod (pm .NamespacedName .Name ).
408
+ Namespace (pm .NamespacedName .Namespace ).
409
+ ReadyCondition ().
410
+ Labels (podLabels ).
411
+ IP (pm .Address ).
412
+ Complete ().
413
+ ObjRef ()
414
+
415
+ copy := pod .DeepCopy ()
416
+ if err := k8sClient .Create (context .Background (), copy ); err != nil {
417
+ logutil .Fatal (logger , err , "Failed to create pod" , "pod" , pm .NamespacedName )
410
418
}
411
- serverRunner .Provider = backend .NewProvider (pmc , serverRunner .Datastore )
419
+
420
+ // since no pod controllers deployed in fake environment, we manually update pod status
421
+ copy .Status = pod .Status
422
+ if err := k8sClient .Status ().Update (context .Background (), copy ); err != nil {
423
+ logutil .Fatal (logger , err , "Failed to update pod status" , "pod" , pm .NamespacedName )
424
+ }
425
+ }
426
+ serverRunner .Provider = backend .NewProvider (pmc , serverRunner .Datastore )
427
+ go func () {
412
428
if err := serverRunner .AsRunnable (logger .WithName ("ext-proc" )).Start (serverCtx ); err != nil {
413
429
logutil .Fatal (logger , err , "Failed to start ext-proc server" )
414
430
}
415
431
}()
416
432
433
+ // check if all pods are synced to datastore
434
+ assert .EventuallyWithT (t , func (t * assert.CollectT ) {
435
+ assert .Len (t , serverRunner .Datastore .PodGetAll (), len (podMetrics ), "Datastore not synced" )
436
+ }, 10 * time .Second , time .Second )
437
+
417
438
address := fmt .Sprintf ("localhost:%v" , port )
418
439
// Create a grpc connection
419
440
conn , err := grpc .NewClient (address , grpc .WithTransportCredentials (insecure .NewCredentials ()))
@@ -430,6 +451,16 @@ func setUpHermeticServer(podMetrics []*datastore.PodMetrics) (client extProcPb.E
430
451
cancel ()
431
452
conn .Close ()
432
453
stopServer ()
454
+
455
+ // clear created pods
456
+ for _ , pm := range podMetrics {
457
+ pod := utiltesting .MakePod (pm .NamespacedName .Name ).
458
+ Namespace (pm .NamespacedName .Namespace ).Complete ().ObjRef ()
459
+
460
+ if err := k8sClient .Delete (context .Background (), pod ); err != nil {
461
+ logutil .Fatal (logger , err , "Failed to delete pod" , "pod" , pm .NamespacedName )
462
+ }
463
+ }
433
464
// wait a little until the goroutines actually exit
434
465
time .Sleep (5 * time .Second )
435
466
}
0 commit comments