Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 99bcfdc

Browse files
committedMar 5, 2025
Extracting immediate response error handling logic to a func and implementing on both servers
1 parent a331092 commit 99bcfdc

File tree

3 files changed

+81
-55
lines changed

3 files changed

+81
-55
lines changed
 

‎config/manifests/ext_proc.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ spec:
7575
imagePullPolicy: Always
7676
args:
7777
- -poolName
78-
- "my-pool"
78+
- "vllm-llama2-7b-pool"
7979
- -v
8080
- "4"
8181
- -grpcPort

‎pkg/epp/handlers/server.go

+57-47
Original file line numberDiff line numberDiff line change
@@ -132,53 +132,9 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
132132

133133
if err != nil {
134134
logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
135-
switch errutil.CanonicalCode(err) {
136-
// This code can be returned by scheduler when there is no capacity for sheddable
137-
// requests.
138-
case errutil.InferencePoolResourceExhausted:
139-
resp = &extProcPb.ProcessingResponse{
140-
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
141-
ImmediateResponse: &extProcPb.ImmediateResponse{
142-
Status: &envoyTypePb.HttpStatus{
143-
Code: envoyTypePb.StatusCode_TooManyRequests,
144-
},
145-
},
146-
},
147-
}
148-
// This code can be returned by when EPP processes the request and run into server-side errors.
149-
case errutil.Internal:
150-
resp = &extProcPb.ProcessingResponse{
151-
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
152-
ImmediateResponse: &extProcPb.ImmediateResponse{
153-
Status: &envoyTypePb.HttpStatus{
154-
Code: envoyTypePb.StatusCode_InternalServerError,
155-
},
156-
},
157-
},
158-
}
159-
// This code can be returned when users provide invalid json request.
160-
case errutil.BadRequest:
161-
resp = &extProcPb.ProcessingResponse{
162-
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
163-
ImmediateResponse: &extProcPb.ImmediateResponse{
164-
Status: &envoyTypePb.HttpStatus{
165-
Code: envoyTypePb.StatusCode_BadRequest,
166-
},
167-
},
168-
},
169-
}
170-
case errutil.BadConfiguration:
171-
resp = &extProcPb.ProcessingResponse{
172-
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
173-
ImmediateResponse: &extProcPb.ImmediateResponse{
174-
Status: &envoyTypePb.HttpStatus{
175-
Code: envoyTypePb.StatusCode_NotFound,
176-
},
177-
},
178-
},
179-
}
180-
default:
181-
return status.Errorf(status.Code(err), "failed to handle request: %v", err)
135+
resp, err = BuildErrResponse(err)
136+
if err != nil {
137+
return err
182138
}
183139
}
184140

@@ -190,6 +146,60 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
190146
}
191147
}
192148

149+
func BuildErrResponse(err error) (*extProcPb.ProcessingResponse, error) {
150+
var resp *extProcPb.ProcessingResponse
151+
152+
switch errutil.CanonicalCode(err) {
153+
// This code can be returned by scheduler when there is no capacity for sheddable
154+
// requests.
155+
case errutil.InferencePoolResourceExhausted:
156+
resp = &extProcPb.ProcessingResponse{
157+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
158+
ImmediateResponse: &extProcPb.ImmediateResponse{
159+
Status: &envoyTypePb.HttpStatus{
160+
Code: envoyTypePb.StatusCode_TooManyRequests,
161+
},
162+
},
163+
},
164+
}
165+
// This code can be returned by when EPP processes the request and run into server-side errors.
166+
case errutil.Internal:
167+
resp = &extProcPb.ProcessingResponse{
168+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
169+
ImmediateResponse: &extProcPb.ImmediateResponse{
170+
Status: &envoyTypePb.HttpStatus{
171+
Code: envoyTypePb.StatusCode_InternalServerError,
172+
},
173+
},
174+
},
175+
}
176+
// This code can be returned when users provide invalid json request.
177+
case errutil.BadRequest:
178+
resp = &extProcPb.ProcessingResponse{
179+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
180+
ImmediateResponse: &extProcPb.ImmediateResponse{
181+
Status: &envoyTypePb.HttpStatus{
182+
Code: envoyTypePb.StatusCode_BadRequest,
183+
},
184+
},
185+
},
186+
}
187+
case errutil.BadConfiguration:
188+
resp = &extProcPb.ProcessingResponse{
189+
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
190+
ImmediateResponse: &extProcPb.ImmediateResponse{
191+
Status: &envoyTypePb.HttpStatus{
192+
Code: envoyTypePb.StatusCode_NotFound,
193+
},
194+
},
195+
},
196+
}
197+
default:
198+
return nil, status.Errorf(status.Code(err), "failed to handle request: %v", err)
199+
}
200+
return resp, nil
201+
}
202+
193203
// RequestContext stores context information during the life time of an HTTP request.
194204
type RequestContext struct {
195205
TargetPod string

‎pkg/epp/handlers/streamingserver.go

+23-7
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,20 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
184184
case *extProcPb.ProcessingRequest_ResponseTrailers:
185185
// This is currently unused.
186186
}
187+
188+
if err != nil {
189+
logger.V(logutil.DEFAULT).Error(err, "Failed to process request", "request", req)
190+
resp, err := BuildErrResponse(err)
191+
if err != nil {
192+
return err
193+
} else {
194+
if err := srv.Send(resp); err != nil {
195+
logger.V(logutil.DEFAULT).Error(err, "Send failed")
196+
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
197+
}
198+
return nil
199+
}
200+
}
187201
loggerVerbose.Info("checking", "request state", reqCtx.RequestState)
188202
if err := reqCtx.updateStateAndSendIfNeeded(srv, loggerVerbose); err != nil {
189203
return err
@@ -280,6 +294,7 @@ const (
280294
TrailerResponseResponsesComplete StreamRequestState = 7
281295
)
282296

297+
// HandleRequestBody always returns the requestContext even in the error case, as the request context is used in error handling.
283298
func (s *StreamingServer) HandleRequestBody(
284299
ctx context.Context,
285300
reqCtx *StreamingRequestContext,
@@ -294,7 +309,7 @@ func (s *StreamingServer) HandleRequestBody(
294309
// Resolve target models.
295310
model, ok := requestBodyMap["model"].(string)
296311
if !ok {
297-
return nil, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
312+
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
298313
}
299314
loggerVerbose.Info("Model requested", "model", model)
300315
modelName := model
@@ -304,12 +319,12 @@ func (s *StreamingServer) HandleRequestBody(
304319
// are able to be requested by using their distinct name.
305320
modelObj := s.datastore.ModelGet(model)
306321
if modelObj == nil {
307-
return nil, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error finding a model object in InferenceModel for input %v", model)}
322+
return reqCtx, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error finding a model object in InferenceModel for input %v", model)}
308323
}
309324
if len(modelObj.Spec.TargetModels) > 0 {
310325
modelName = datastore.RandomWeightedDraw(logger, modelObj, 0)
311326
if modelName == "" {
312-
return nil, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error getting target model name for model %v", modelObj.Name)}
327+
return reqCtx, errutil.Error{Code: errutil.BadConfiguration, Msg: fmt.Sprintf("error getting target model name for model %v", modelObj.Name)}
313328
}
314329
}
315330
llmReq := &scheduling.LLMRequest{
@@ -326,21 +341,21 @@ func (s *StreamingServer) HandleRequestBody(
326341
requestBodyBytes, err = json.Marshal(requestBodyMap)
327342
if err != nil {
328343
logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body")
329-
return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
344+
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
330345
}
331346
loggerVerbose.Info("Updated request body marshalled", "body", string(requestBodyBytes))
332347
}
333348

334349
targetPod, err := s.scheduler.Schedule(ctx, llmReq)
335350
if err != nil {
336-
return nil, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
351+
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
337352
}
338353

339354
// Insert target endpoint to instruct Envoy to route requests to the specified target pod.
340355
// Attach the port number
341356
pool, err := s.datastore.PoolGet()
342357
if err != nil {
343-
return nil, err
358+
return reqCtx, err
344359
}
345360
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
346361

@@ -432,6 +447,7 @@ func (s *StreamingServer) HandleRequestBody(
432447
return reqCtx, nil
433448
}
434449

450+
// HandleResponseBody always returns the requestContext even in the error case, as the request context is used in error handling.
435451
func (s *StreamingServer) HandleResponseBody(
436452
ctx context.Context,
437453
reqCtx *StreamingRequestContext,
@@ -443,7 +459,7 @@ func (s *StreamingServer) HandleResponseBody(
443459
responseBytes, err := json.Marshal(response)
444460
if err != nil {
445461
logger.V(logutil.DEFAULT).Error(err, "error marshalling responseBody")
446-
return nil, err
462+
return reqCtx, err
447463
}
448464
if response["usage"] != nil {
449465
usg := response["usage"].(map[string]interface{})

0 commit comments

Comments
 (0)
Please sign in to comment.