Skip to content

Commit f691ff7

Browse files
authored
24-3 Fix memory consumption restriction in stream RPC calls (#10758) (#10866)
1 parent 8d0968e commit f691ff7

File tree

4 files changed

+105
-117
lines changed

4 files changed

+105
-117
lines changed
+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
#pragma once
2+
#include <util/generic/queue.h>
3+
#include <util/generic/yexception.h>
4+
#include <util/system/types.h>
5+
6+
namespace NKikimr::NGRpcService {
7+
8+
class TRpcFlowControlState {
9+
public:
10+
TRpcFlowControlState(ui64 inflightLimitBytes)
11+
: InflightLimitBytes_(inflightLimitBytes) {}
12+
13+
void PushResponse(ui64 responseSizeBytes) {
14+
ResponseSizeQueue_.push(responseSizeBytes);
15+
TotalResponsesSize_ += responseSizeBytes;
16+
}
17+
18+
void PopResponse() {
19+
Y_ENSURE(!ResponseSizeQueue_.empty());
20+
TotalResponsesSize_ -= ResponseSizeQueue_.front();
21+
ResponseSizeQueue_.pop();
22+
}
23+
24+
size_t QueueSize() const {
25+
return ResponseSizeQueue_.size();
26+
}
27+
28+
i64 FreeSpaceBytes() const { // Negative value temporarily stops data evaluation in DQ graph
29+
return static_cast<i64>(InflightLimitBytes_) - static_cast<i64>(TotalResponsesSize_);
30+
}
31+
32+
ui64 InflightBytes() const {
33+
return TotalResponsesSize_;
34+
}
35+
36+
ui64 InflightLimitBytes() const {
37+
return InflightLimitBytes_;
38+
}
39+
40+
private:
41+
const ui64 InflightLimitBytes_;
42+
43+
TQueue<ui64> ResponseSizeQueue_;
44+
ui64 TotalResponsesSize_ = 0;
45+
};
46+
47+
} // namespace NKikimr::NGRpcService

ydb/core/grpc_services/query/rpc_execute_query.cpp

+5-46
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
#include "service_query.h"
2-
32
#include <ydb/core/actorlib_impl/long_timer.h>
43
#include <ydb/core/base/appdata.h>
54
#include <ydb/core/grpc_services/audit_dml_operations.h>
65
#include <ydb/core/grpc_services/base/base.h>
6+
#include <ydb/core/grpc_services/base/flow_control.h>
77
#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
88
#include <ydb/core/grpc_services/rpc_kqp_base.h>
99
#include <ydb/core/kqp/executer_actor/kqp_executer.h>
@@ -23,51 +23,10 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall<Ydb::Query::ExecuteQu
2323

2424
struct TProducerState {
2525
TMaybe<ui64> LastSeqNo;
26-
ui64 AckedFreeSpaceBytes = 0;
26+
i64 AckedFreeSpaceBytes = 0;
2727
TActorId ActorId;
2828
};
2929

30-
class TRpcFlowControlState {
31-
public:
32-
TRpcFlowControlState(ui64 inflightLimitBytes)
33-
: InflightLimitBytes_(inflightLimitBytes) {}
34-
35-
void PushResponse(ui64 responseSizeBytes) {
36-
ResponseSizeQueue_.push(responseSizeBytes);
37-
TotalResponsesSize_ += responseSizeBytes;
38-
}
39-
40-
void PopResponse() {
41-
Y_ENSURE(!ResponseSizeQueue_.empty());
42-
TotalResponsesSize_ -= ResponseSizeQueue_.front();
43-
ResponseSizeQueue_.pop();
44-
}
45-
46-
size_t QueueSize() const {
47-
return ResponseSizeQueue_.size();
48-
}
49-
50-
ui64 FreeSpaceBytes() const {
51-
return TotalResponsesSize_ < InflightLimitBytes_
52-
? InflightLimitBytes_ - TotalResponsesSize_
53-
: 0;
54-
}
55-
56-
ui64 InflightBytes() const {
57-
return TotalResponsesSize_;
58-
}
59-
60-
ui64 InflightLimitBytes() const {
61-
return InflightLimitBytes_;
62-
}
63-
64-
private:
65-
const ui64 InflightLimitBytes_;
66-
67-
TQueue<ui64> ResponseSizeQueue_;
68-
ui64 TotalResponsesSize_ = 0;
69-
};
70-
7130
bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to,
7231
NYql::TIssues& issues)
7332
{
@@ -326,13 +285,13 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
326285
FlowControl_.PopResponse();
327286
}
328287

329-
ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
288+
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
330289

331290
for (auto& pair : StreamChannels_) {
332291
const auto& channelId = pair.first;
333292
auto& channel = pair.second;
334293

335-
if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes == 0) {
294+
if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes <= 0) {
336295
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, "
337296
<< ", channel: " << channelId
338297
<< ", seqNo: " << channel.LastSeqNo
@@ -361,7 +320,7 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
361320
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
362321

363322
FlowControl_.PushResponse(out.size());
364-
auto freeSpaceBytes = FlowControl_.FreeSpaceBytes();
323+
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
365324

366325
Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS);
367326

ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp

+27-35
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "service_table.h"
22
#include <ydb/core/grpc_services/base/base.h>
3+
#include <ydb/core/grpc_services/base/flow_control.h>
34

45
#include "rpc_common/rpc_common.h"
56
#include "rpc_kqp_base.h"
@@ -155,7 +156,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
155156

156157
TStreamExecuteScanQueryRPC(TEvStreamExecuteScanQueryRequest* request, ui64 rpcBufferSize)
157158
: Request_(request)
158-
, RpcBufferSize_(rpcBufferSize) {}
159+
, FlowControl_(rpcBufferSize) {}
159160

160161
void Bootstrap(const TActorContext &ctx) {
161162
this->Become(&TStreamExecuteScanQueryRPC::StateWork);
@@ -250,32 +251,30 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
250251
void Handle(TRpcServices::TEvGrpcNextReply::TPtr& ev, const TActorContext& ctx) {
251252
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " NextReply"
252253
<< ", left: " << ev->Get()->LeftInQueue
253-
<< ", queue: " << GRpcResponsesSizeQueue_.size()
254-
<< ", used memory: " << GRpcResponsesSize_
255-
<< ", buffer size: " << RpcBufferSize_);
254+
<< ", queue: " << FlowControl_.QueueSize()
255+
<< ", inflight bytes: " << FlowControl_.InflightBytes()
256+
<< ", limit bytes: " << FlowControl_.InflightLimitBytes());
256257

257-
while (GRpcResponsesSizeQueue_.size() > ev->Get()->LeftInQueue) {
258-
GRpcResponsesSize_ -= GRpcResponsesSizeQueue_.front();
259-
GRpcResponsesSizeQueue_.pop();
258+
while (FlowControl_.QueueSize() > ev->Get()->LeftInQueue) {
259+
FlowControl_.PopResponse();
260260
}
261-
Y_DEBUG_ABORT_UNLESS(GRpcResponsesSizeQueue_.empty() == (GRpcResponsesSize_ == 0));
262-
LastDataStreamTimestamp_ = TAppData::TimeProvider->Now();
263261

264-
if (WaitOnSeqNo_ && RpcBufferSize_ > GRpcResponsesSize_) {
265-
ui64 freeSpace = RpcBufferSize_ - GRpcResponsesSize_;
262+
LastDataStreamTimestamp_ = TAppData::TimeProvider->Now();
266263

264+
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
265+
if (freeSpaceBytes > 0 && LastSeqNo_ && AckedFreeSpaceBytes_ <= 0) {
267266
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
268-
<< ", seqNo: " << *WaitOnSeqNo_
269-
<< ", freeSpace: " << freeSpace
267+
<< ", seqNo: " << *LastSeqNo_
268+
<< ", freeSpace: " << freeSpaceBytes
270269
<< ", to: " << ExecuterActorId_);
271270

272271
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
273-
resp->Record.SetSeqNo(*WaitOnSeqNo_);
274-
resp->Record.SetFreeSpace(freeSpace);
272+
resp->Record.SetSeqNo(*LastSeqNo_);
273+
resp->Record.SetFreeSpace(freeSpaceBytes);
275274

276275
ctx.Send(ExecuterActorId_, resp.Release());
277276

278-
WaitOnSeqNo_.Clear();
277+
AckedFreeSpaceBytes_ = freeSpaceBytes;
279278
}
280279
}
281280

@@ -349,28 +348,22 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
349348
TString out;
350349
Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
351350

352-
GRpcResponsesSizeQueue_.push(out.size());
353-
GRpcResponsesSize_ += out.size();
351+
FlowControl_.PushResponse(out.size());
352+
const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes();
353+
LastSeqNo_ = ev->Get()->Record.GetSeqNo();
354+
AckedFreeSpaceBytes_ = freeSpaceBytes;
354355

355356
Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS);
356357

357-
ui64 freeSpace = GRpcResponsesSize_ < RpcBufferSize_
358-
? RpcBufferSize_ - GRpcResponsesSize_
359-
: 0;
360-
361-
if (freeSpace == 0) {
362-
WaitOnSeqNo_ = ev->Get()->Record.GetSeqNo();
363-
}
364-
365358
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack"
366359
<< ", seqNo: " << ev->Get()->Record.GetSeqNo()
367-
<< ", freeSpace: " << freeSpace
360+
<< ", freeSpace: " << freeSpaceBytes
368361
<< ", to: " << ev->Sender
369-
<< ", queue: " << GRpcResponsesSizeQueue_.size());
362+
<< ", queue: " << FlowControl_.QueueSize());
370363

371364
auto resp = MakeHolder<NKqp::TEvKqpExecuter::TEvStreamDataAck>();
372365
resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo());
373-
resp->Record.SetFreeSpace(freeSpace);
366+
resp->Record.SetFreeSpace(freeSpaceBytes);
374367

375368
ctx.Send(ev->Sender, resp.Release());
376369
}
@@ -411,9 +404,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
411404
TInstant now = TAppData::TimeProvider->Now();
412405
TDuration timeout;
413406
LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Got timeout event, InactiveClientTimeout: " << InactiveClientTimeout_
414-
<< " GRpcResponsesSizeQueue: " << GRpcResponsesSizeQueue_.size());
407+
<< " GRpcResponsesSizeQueue: " << FlowControl_.QueueSize());
415408

416-
if (InactiveClientTimeout_ && GRpcResponsesSizeQueue_.size() > 0) {
409+
if (InactiveClientTimeout_ && FlowControl_.QueueSize() > 0) {
417410
TDuration processTime = now - LastDataStreamTimestamp_;
418411
if (processTime >= InactiveClientTimeout_) {
419412
auto message = TStringBuilder() << this->SelfId() << " Client cannot process data in " << processTime
@@ -477,13 +470,12 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
477470

478471
private:
479472
std::shared_ptr<TEvStreamExecuteScanQueryRequest> Request_;
480-
const ui64 RpcBufferSize_;
473+
TRpcFlowControlState FlowControl_;
474+
TMaybe<ui64> LastSeqNo_;
475+
i64 AckedFreeSpaceBytes_ = 0;
481476

482477
TDuration InactiveClientTimeout_;
483-
TQueue<ui64> GRpcResponsesSizeQueue_;
484-
ui64 GRpcResponsesSize_ = 0;
485478
TInstant LastDataStreamTimestamp_;
486-
TMaybe<ui64> WaitOnSeqNo_;
487479

488480
TSchedulerCookieHolder TimeoutTimerCookieHolder_;
489481

0 commit comments

Comments
 (0)