Skip to content

Commit dfe9074

Browse files
committed
Review comments
1 parent 3e9dcc5 commit dfe9074

File tree

7 files changed

+80
-56
lines changed

7 files changed

+80
-56
lines changed

src/ipc_message.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,15 @@ IPCMessage::Create(
5757
}
5858

5959
std::unique_ptr<IPCMessage>
60-
IPCMessage::Create(IPCMessageShm* ipc_message_shm,
61-
bi::managed_external_buffer::handle_t& message_handle)
60+
IPCMessage::Create(
61+
IPCMessageShm* ipc_message_shm,
62+
bi::managed_external_buffer::handle_t& message_handle)
6263
{
63-
return std::unique_ptr<IPCMessage>(new IPCMessage(ipc_message_shm, message_handle));
64+
return std::unique_ptr<IPCMessage>(
65+
new IPCMessage(ipc_message_shm, message_handle));
6466
}
6567

66-
AllocatedSharedMemory<IPCMessageShm>&
68+
AllocatedSharedMemory<IPCMessageShm>&
6769
IPCMessage::GetAllocatedSharedMemory()
6870
{
6971
return ipc_message_shm_;
@@ -146,7 +148,9 @@ IPCMessage::IPCMessage(
146148
ipc_message_handle_ = ipc_message_shm_.handle_;
147149
}
148150

149-
IPCMessage::IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle)
151+
IPCMessage::IPCMessage(
152+
IPCMessageShm* ipc_message_shm,
153+
bi::managed_external_buffer::handle_t& handle)
150154
{
151155
ipc_message_handle_ = handle;
152156
ipc_message_shm_ptr_ = ipc_message_shm;

src/ipc_message.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ class IPCMessage {
9898
const std::unique_ptr<SharedMemoryManager>& shm_pool,
9999
bool inline_response);
100100

101-
static std::unique_ptr<IPCMessage>
102-
Create(IPCMessageShm* ipc_message_shm,
101+
static std::unique_ptr<IPCMessage> Create(
102+
IPCMessageShm* ipc_message_shm,
103103
bi::managed_external_buffer::handle_t& message_handle);
104104
static std::unique_ptr<IPCMessage> LoadFromSharedMemory(
105105
std::unique_ptr<SharedMemoryManager>& shm_pool,
@@ -135,7 +135,9 @@ class IPCMessage {
135135
AllocatedSharedMemory<bi::interprocess_mutex>& response_mutex_shm,
136136
AllocatedSharedMemory<bi::interprocess_condition>& response_cond_shm);
137137

138-
IPCMessage(IPCMessageShm* ipc_message_shm, bi::managed_external_buffer::handle_t& handle);
138+
IPCMessage(
139+
IPCMessageShm* ipc_message_shm,
140+
bi::managed_external_buffer::handle_t& handle);
139141
};
140142

141143
}}}; // namespace triton::backend::python

src/pb_stub.cc

Lines changed: 46 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,6 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
654654
py::list py_request_list =
655655
LoadRequestsFromSharedMemory(request_batch_shm_ptr);
656656
std::unique_ptr<IPCMessage> execute_response;
657-
// IPCMessage::Create(shm_pool_, false /* Inline response */);
658657

659658
std::optional<AllocatedSharedMemory<char>> response_batch;
660659
bool has_exception = false;
@@ -675,8 +674,7 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
675674
{
676675
NVTX_RANGE(nvtx_, "PyExecute " + name_);
677676

678-
execute_return =
679-
model_instance_.attr("execute")(py_request_list);
677+
execute_return = model_instance_.attr("execute")(py_request_list);
680678

681679
bool is_coroutine = py::module::import("asyncio")
682680
.attr("iscoroutine")(execute_return)
@@ -688,10 +686,12 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
688686
} else {
689687
py::object coroutine_return =
690688
RunCoroutine(execute_return, false /* in_background */);
691-
ProcessReturnedResponses(py_request_list, coroutine_return, response_batch);
689+
ProcessReturnedResponses(
690+
py_request_list, coroutine_return, response_batch);
692691
}
693692
} else {
694-
ProcessReturnedResponses(py_request_list, execute_return, response_batch);
693+
ProcessReturnedResponses(
694+
py_request_list, execute_return, response_batch);
695695
}
696696
}
697697
}
@@ -712,11 +712,14 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
712712
error_string;
713713
LOG_ERROR << err_message.c_str();
714714
if (!response_batch) {
715-
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch) + sizeof(IPCMessageShm));
716-
}
717-
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
715+
response_batch = shm_pool_->Construct<char>(
716+
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
717+
}
718+
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
719+
response_batch.value().data_.get() + sizeof(IPCMessageShm));
718720

719-
response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
721+
response_batch_shm_ptr =
722+
reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get());
720723
response_batch_shm_ptr->has_error = true;
721724
error_string_shm = PbString::Create(shm_pool_, err_message);
722725
response_batch_shm_ptr->error = error_string_shm->ShmHandle();
@@ -732,14 +735,19 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
732735
}
733736

734737
if (!response_batch) {
735-
response_batch = shm_pool_->Construct<char>(sizeof(ResponseBatch) + sizeof(IPCMessageShm));
736-
ResponseBatch* response_batch_shm_ptr =reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
737-
response_batch_shm_ptr->batch_size = 0;
738-
}
739-
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
738+
response_batch = shm_pool_->Construct<char>(
739+
sizeof(ResponseBatch) + sizeof(IPCMessageShm));
740+
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
741+
response_batch.value().data_.get() + sizeof(IPCMessageShm));
742+
response_batch_shm_ptr->batch_size = 0;
743+
}
744+
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
745+
response_batch.value().data_.get() + sizeof(IPCMessageShm));
740746
response_batch_shm_ptr->has_error = false;
741747
response_batch_shm_ptr->is_error_set = false;
742-
execute_response = IPCMessage::Create(reinterpret_cast<IPCMessageShm*>(response_batch.value().data_.get()), response_batch.value().handle_);
748+
execute_response = IPCMessage::Create(
749+
reinterpret_cast<IPCMessageShm*>(response_batch.value().data_.get()),
750+
response_batch.value().handle_);
743751
execute_response->Args() = response_batch.value().handle_;
744752
execute_response->InlineResponse() = false;
745753
execute_response->Command() = PYTHONSTUB_ExecuteResponse;
@@ -761,7 +769,8 @@ Stub::ProcessResponse(InferResponse* response)
761769

762770
void
763771
Stub::ProcessReturnedResponses(
764-
py::list py_requests, py::object py_responses_obj, std::optional<AllocatedSharedMemory<char>>& response_batch)
772+
py::list py_requests, py::object py_responses_obj,
773+
std::optional<AllocatedSharedMemory<char>>& response_batch)
765774
{
766775
// Return if there is nothing to process.
767776
if (py::isinstance<py::none>(py_responses_obj)) {
@@ -812,29 +821,34 @@ Stub::ProcessReturnedResponses(
812821

813822
std::shared_ptr<InferResponse> response =
814823
py_responses[i].cast<std::shared_ptr<InferResponse>>();
815-
request->GetResponseSender()->UpdateStateAndCounters(response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
824+
request->GetResponseSender()->UpdateStateAndCounters(
825+
response, TRITONSERVER_RESPONSE_COMPLETE_FINAL);
816826
}
817827
}
818-
response_batch = std::move(shm_pool_->Construct<char>(sizeof(IPCMessageShm) +
828+
// Return all the created responses using response_batch. The reason
829+
// that both of the paths are available is that sending the responses
830+
// using response_batch is faster than using `response_sender`.
831+
response_batch = std::move(shm_pool_->Construct<char>(
832+
sizeof(IPCMessageShm) +
819833
requests_size * sizeof(bi::managed_external_buffer::handle_t) +
820834
sizeof(ResponseBatch)));
821-
ResponseBatch* response_batch_shm_ptr =
822-
reinterpret_cast<ResponseBatch*>(response_batch.value().data_.get() + sizeof(IPCMessageShm));
835+
ResponseBatch* response_batch_shm_ptr = reinterpret_cast<ResponseBatch*>(
836+
response_batch.value().data_.get() + sizeof(IPCMessageShm));
823837

824838
bi::managed_external_buffer::handle_t* responses_shm_handle =
825839
reinterpret_cast<bi::managed_external_buffer::handle_t*>(
826-
response_batch.value().data_.get() + sizeof(ResponseBatch) + sizeof(IPCMessageShm));
827-
828-
for (size_t i = 0; i < responses_size; i++) {
829-
// Check the return type of execute function.
830-
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
831-
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
832-
infer_response->PruneOutputTensors(
833-
infer_request->RequestedOutputNames());
834-
ProcessResponse(infer_response);
835-
responses_shm_handle[i] = infer_response->ShmHandle();
836-
}
837-
response_batch_shm_ptr->batch_size = requests_size;
840+
response_batch.value().data_.get() + sizeof(ResponseBatch) +
841+
sizeof(IPCMessageShm));
842+
843+
for (size_t i = 0; i < responses_size; i++) {
844+
// Check the return type of execute function.
845+
InferRequest* infer_request = py_requests[i].cast<InferRequest*>();
846+
InferResponse* infer_response = py_responses[i].cast<InferResponse*>();
847+
infer_response->PruneOutputTensors(infer_request->RequestedOutputNames());
848+
ProcessResponse(infer_response);
849+
responses_shm_handle[i] = infer_response->ShmHandle();
850+
}
851+
response_batch_shm_ptr->batch_size = requests_size;
838852
}
839853

840854
py::object

src/pb_stub.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,8 @@ class Stub {
254254
void ProcessRequests(RequestBatch* request_batch_shm_ptr);
255255

256256
void ProcessReturnedResponses(
257-
py::list py_requests, py::object py_responses_obj, std::optional<AllocatedSharedMemory<char>>& response_batch);
257+
py::list py_requests, py::object py_responses_obj,
258+
std::optional<AllocatedSharedMemory<char>>& response_batch);
258259

259260
void ProcessResponse(InferResponse* response);
260261

src/python_be.cc

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1023,7 +1023,7 @@ ModelInstanceState::SendMessageAndReceiveResponse(
10231023
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
10241024
TRITONBACKEND_Request** requests, const uint32_t request_count)
10251025
{
1026-
SendMessageToStub(message);
1026+
SendMessageToStub(message);
10271027

10281028
bi::managed_external_buffer::handle_t response_message;
10291029
auto error = Stub()->ReceiveMessageFromStub(response_message);
@@ -1224,7 +1224,8 @@ ModelInstanceState::ResponseSendDecoupled(
12241224
if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
12251225
std::unique_ptr<
12261226
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
1227-
lresponse_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(response_factory));
1227+
lresponse_factory(
1228+
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(response_factory));
12281229
}
12291230
}
12301231

@@ -1280,12 +1281,15 @@ ModelInstanceState::ProcessRequests(
12801281
Stub()->StubMessageQueue()->Push(ipc_message->ShmHandle());
12811282
bi::managed_external_buffer::handle_t response_message;
12821283
Stub()->ReceiveMessageFromStub(response_message);
1283-
response = IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message);
1284+
response =
1285+
IPCMessage::LoadFromSharedMemory(Stub()->ShmPool(), response_message);
12841286
}
1285-
char* ipc_message_shm = reinterpret_cast<char*>(response->GetAllocatedSharedMemory().data_.get());;
1287+
char* ipc_message_shm =
1288+
reinterpret_cast<char*>(response->GetAllocatedSharedMemory().data_.get());
1289+
;
12861290
ResponseBatch* response_batch_shm_ptr =
12871291
reinterpret_cast<ResponseBatch*>(ipc_message_shm + sizeof(IPCMessageShm));
1288-
1292+
12891293
uint64_t compute_end_ns = 0;
12901294
SET_TIMESTAMP(compute_end_ns);
12911295
reporter.SetComputeEndNs(compute_end_ns);
@@ -1304,10 +1308,10 @@ ModelInstanceState::ProcessRequests(
13041308
}
13051309

13061310
if (response_batch_shm_ptr->batch_size > 0) {
1307-
std::shared_ptr<std::vector<TRITONBACKEND_Response*>> responses(
1308-
new std::vector<TRITONBACKEND_Response*>());
1311+
std::shared_ptr<std::vector<TRITONBACKEND_Response*>> responses(
1312+
new std::vector<TRITONBACKEND_Response*>());
13091313
responses->reserve(request_count);
1310-
for (size_t i = 0; i < request_count; i++) {
1314+
for (size_t i = 0; i < request_count; i++) {
13111315
TRITONBACKEND_Response* response;
13121316
auto err = TRITONBACKEND_ResponseNew(&response, requests[i]);
13131317
if (err == nullptr) {
@@ -1324,7 +1328,6 @@ ModelInstanceState::ProcessRequests(
13241328

13251329
// If the output provided by the model is in GPU, we will pass the list of
13261330
// buffers provided by Triton to the stub process.
1327-
// bool has_gpu_output = false;
13281331
std::vector<bool> requires_deferred_callback;
13291332

13301333
bool has_gpu_output = false;
@@ -1429,6 +1432,8 @@ ModelInstanceState::ProcessRequests(
14291432
}
14301433
}
14311434

1435+
execute_finalize.Complete();
1436+
14321437
// If the output tensor is in GPU, there will be a second round trip
14331438
// required for filling the GPU buffers provided by the main process.
14341439
if (has_gpu_output) {

src/python_be.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -365,12 +365,11 @@ class ModelInstanceState : public BackendModelInstance {
365365
TRITONBACKEND_Request** requests, const uint32_t request_count);
366366

367367
void RespondErrorToAllRequests(
368-
const char* message,
369-
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
370-
TRITONBACKEND_Request** requests, const uint32_t request_count);
368+
const char* message,
369+
std::shared_ptr<std::vector<TRITONBACKEND_Response*>>& responses,
370+
TRITONBACKEND_Request** requests, const uint32_t request_count);
371371

372-
void SendMessageToStub(
373-
bi::managed_external_buffer::handle_t message);
372+
void SendMessageToStub(bi::managed_external_buffer::handle_t message);
374373

375374
// Model instance stub
376375
std::unique_ptr<StubLauncher>& Stub() { return model_instance_stub_; }

src/response_sender.cc

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,6 @@ ResponseSender::Send(
250250
"An error occurred while sending a response.");
251251
}
252252
}
253-
254253
}
255254

256255
bool

0 commit comments

Comments
 (0)