diff --git a/ydb/core/grpc_services/base/flow_control.h b/ydb/core/grpc_services/base/flow_control.h new file mode 100644 index 000000000000..f3c2ca72dd4d --- /dev/null +++ b/ydb/core/grpc_services/base/flow_control.h @@ -0,0 +1,47 @@ +#pragma once +#include +#include +#include + +namespace NKikimr::NGRpcService { + +class TRpcFlowControlState { +public: + TRpcFlowControlState(ui64 inflightLimitBytes) + : InflightLimitBytes_(inflightLimitBytes) {} + + void PushResponse(ui64 responseSizeBytes) { + ResponseSizeQueue_.push(responseSizeBytes); + TotalResponsesSize_ += responseSizeBytes; + } + + void PopResponse() { + Y_ENSURE(!ResponseSizeQueue_.empty()); + TotalResponsesSize_ -= ResponseSizeQueue_.front(); + ResponseSizeQueue_.pop(); + } + + size_t QueueSize() const { + return ResponseSizeQueue_.size(); + } + + i64 FreeSpaceBytes() const { // Negative value temporarily stops data evaluation in DQ graph + return static_cast(InflightLimitBytes_) - static_cast(TotalResponsesSize_); + } + + ui64 InflightBytes() const { + return TotalResponsesSize_; + } + + ui64 InflightLimitBytes() const { + return InflightLimitBytes_; + } + +private: + const ui64 InflightLimitBytes_; + + TQueue ResponseSizeQueue_; + ui64 TotalResponsesSize_ = 0; +}; + +} // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/query/rpc_execute_query.cpp b/ydb/core/grpc_services/query/rpc_execute_query.cpp index 4e7b56104c6b..94831d79df2b 100644 --- a/ydb/core/grpc_services/query/rpc_execute_query.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_query.cpp @@ -1,9 +1,9 @@ #include "service_query.h" - #include #include #include #include +#include #include #include #include @@ -24,51 +24,10 @@ using TEvExecuteQueryRequest = TGrpcRequestNoOperationCall LastSeqNo; - ui64 AckedFreeSpaceBytes = 0; + i64 AckedFreeSpaceBytes = 0; TActorId ActorId; }; -class TRpcFlowControlState { -public: - TRpcFlowControlState(ui64 inflightLimitBytes) - : InflightLimitBytes_(inflightLimitBytes) {} - - void PushResponse(ui64 responseSizeBytes) { - ResponseSizeQueue_.push(responseSizeBytes); - TotalResponsesSize_ += responseSizeBytes; - } - - void PopResponse() { - Y_ENSURE(!ResponseSizeQueue_.empty()); - TotalResponsesSize_ -= ResponseSizeQueue_.front(); - ResponseSizeQueue_.pop(); - } - - size_t QueueSize() const { - return ResponseSizeQueue_.size(); - } - - ui64 FreeSpaceBytes() const { - return TotalResponsesSize_ < InflightLimitBytes_ - ? InflightLimitBytes_ - TotalResponsesSize_ - : 0; - } - - ui64 InflightBytes() const { - return TotalResponsesSize_; - } - - ui64 InflightLimitBytes() const { - return InflightLimitBytes_; - } - -private: - const ui64 InflightLimitBytes_; - - TQueue ResponseSizeQueue_; - ui64 TotalResponsesSize_ = 0; -}; - bool FillTxSettings(const Ydb::Query::TransactionSettings& from, Ydb::Table::TransactionSettings& to, NYql::TIssues& issues) { @@ -328,13 +287,13 @@ class TExecuteQueryRPC : public TActorBootstrapped { FlowControl_.PopResponse(); } - ui64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); + const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); for (auto& pair : StreamChannels_) { const auto& channelId = pair.first; auto& channel = pair.second; - if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes == 0) { + if (freeSpaceBytes > 0 && channel.LastSeqNo && channel.AckedFreeSpaceBytes <= 0) { LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << "Resume execution, " << ", channel: " << channelId << ", seqNo: " << channel.LastSeqNo @@ -363,7 +322,7 @@ class TExecuteQueryRPC : public TActorBootstrapped { Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); FlowControl_.PushResponse(out.size()); - auto freeSpaceBytes = FlowControl_.FreeSpaceBytes(); + const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); Request_->SendSerializedResult(std::move(out), Ydb::StatusIds::SUCCESS); diff --git a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp index cca7306786c9..77d34a06b6ec 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp @@ -1,5 +1,6 @@ #include "service_table.h" #include +#include #include "rpc_kqp_base.h" #include "service_table.h" @@ -154,7 +155,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrappedBecome(&TStreamExecuteScanQueryRPC::StateWork); @@ -249,32 +250,30 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrappedSelfId() << " NextReply" << ", left: " << ev->Get()->LeftInQueue - << ", queue: " << GRpcResponsesSizeQueue_.size() - << ", used memory: " << GRpcResponsesSize_ - << ", buffer size: " << RpcBufferSize_); + << ", queue: " << FlowControl_.QueueSize() + << ", inflight bytes: " << FlowControl_.InflightBytes() + << ", limit bytes: " << FlowControl_.InflightLimitBytes()); - while (GRpcResponsesSizeQueue_.size() > ev->Get()->LeftInQueue) { - GRpcResponsesSize_ -= GRpcResponsesSizeQueue_.front(); - GRpcResponsesSizeQueue_.pop(); + while (FlowControl_.QueueSize() > ev->Get()->LeftInQueue) { + FlowControl_.PopResponse(); } - Y_DEBUG_ABORT_UNLESS(GRpcResponsesSizeQueue_.empty() == (GRpcResponsesSize_ == 0)); - LastDataStreamTimestamp_ = TAppData::TimeProvider->Now(); - if (WaitOnSeqNo_ && RpcBufferSize_ > GRpcResponsesSize_) { - ui64 freeSpace = RpcBufferSize_ - GRpcResponsesSize_; + LastDataStreamTimestamp_ = TAppData::TimeProvider->Now(); + const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); + if (freeSpaceBytes > 0 && LastSeqNo_ && AckedFreeSpaceBytes_ <= 0) { LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack" - << ", seqNo: " << *WaitOnSeqNo_ - << ", freeSpace: " << freeSpace + << ", seqNo: " << *LastSeqNo_ + << ", freeSpace: " << freeSpaceBytes << ", to: " << ExecuterActorId_); auto resp = MakeHolder(); - resp->Record.SetSeqNo(*WaitOnSeqNo_); - resp->Record.SetFreeSpace(freeSpace); + resp->Record.SetSeqNo(*LastSeqNo_); + resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ExecuterActorId_, resp.Release()); - WaitOnSeqNo_.Clear(); + AckedFreeSpaceBytes_ = freeSpaceBytes; } } @@ -348,28 +347,22 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrappedGet()->Record.GetSeqNo(); + AckedFreeSpaceBytes_ = freeSpaceBytes; Request_->SendSerializedResult(std::move(out), StatusIds::SUCCESS); - ui64 freeSpace = GRpcResponsesSize_ < RpcBufferSize_ - ? RpcBufferSize_ - GRpcResponsesSize_ - : 0; - - if (freeSpace == 0) { - WaitOnSeqNo_ = ev->Get()->Record.GetSeqNo(); - } - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack" << ", seqNo: " << ev->Get()->Record.GetSeqNo() - << ", freeSpace: " << freeSpace + << ", freeSpace: " << freeSpaceBytes << ", to: " << ev->Sender - << ", queue: " << GRpcResponsesSizeQueue_.size()); + << ", queue: " << FlowControl_.QueueSize()); auto resp = MakeHolder(); resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); - resp->Record.SetFreeSpace(freeSpace); + resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); } @@ -410,9 +403,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrappedNow(); TDuration timeout; LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, "Got timeout event, InactiveClientTimeout: " << InactiveClientTimeout_ - << " GRpcResponsesSizeQueue: " << GRpcResponsesSizeQueue_.size()); + << " GRpcResponsesSizeQueue: " << FlowControl_.QueueSize()); - if (InactiveClientTimeout_ && GRpcResponsesSizeQueue_.size() > 0) { + if (InactiveClientTimeout_ && FlowControl_.QueueSize() > 0) { TDuration processTime = now - LastDataStreamTimestamp_; if (processTime >= InactiveClientTimeout_) { auto message = TStringBuilder() << this->SelfId() << " Client cannot process data in " << processTime @@ -476,13 +469,12 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped Request_; - const ui64 RpcBufferSize_; + TRpcFlowControlState FlowControl_; + TMaybe LastSeqNo_; + i64 AckedFreeSpaceBytes_ = 0; TDuration InactiveClientTimeout_; - TQueue GRpcResponsesSizeQueue_; - ui64 GRpcResponsesSize_ = 0; TInstant LastDataStreamTimestamp_; - TMaybe WaitOnSeqNo_; TSchedulerCookieHolder TimeoutTimerCookieHolder_; diff --git a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp index b6d37f243dcf..9c861b7b1baf 100644 --- a/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp +++ b/ydb/core/grpc_services/rpc_stream_execute_yql_script.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include #include @@ -84,7 +85,7 @@ class TStreamExecuteYqlScriptRPC TStreamExecuteYqlScriptRPC(IRequestNoOpCtx* request, ui64 rpcBufferSize) : TBase(request) - , RpcBufferSize_(rpcBufferSize) + , FlowControl_(rpcBufferSize) , CancelationFlag(std::make_shared(false)) { // StreamExecuteYqlScript allows write in to table. @@ -230,8 +231,7 @@ class TStreamExecuteYqlScriptRPC TString out; Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); - GRpcResponsesSizeQueue_.push(out.size()); - GRpcResponsesSize_ += out.size(); + FlowControl_.PushResponse(out.size()); RequestPtr()->SendSerializedResult(std::move(out), StatusIds::SUCCESS); } @@ -268,28 +268,22 @@ class TStreamExecuteYqlScriptRPC TString out; Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out); - GRpcResponsesSizeQueue_.push(out.size()); - GRpcResponsesSize_ += out.size(); + FlowControl_.PushResponse(out.size()); + const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); + LastSeqNo_ = ev->Get()->Record.GetSeqNo(); + AckedFreeSpaceBytes_ = freeSpaceBytes; RequestPtr()->SendSerializedResult(std::move(out), StatusIds::SUCCESS); - ui64 freeSpace = GRpcResponsesSize_ < RpcBufferSize_ - ? RpcBufferSize_ - GRpcResponsesSize_ - : 0; - - if (freeSpace == 0) { - WaitOnSeqNo_ = ev->Get()->Record.GetSeqNo(); - } - LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack" << ", seqNo: " << ev->Get()->Record.GetSeqNo() - << ", freeSpace: " << freeSpace + << ", freeSpace: " << freeSpaceBytes << ", to: " << ev->Sender - << ", queue: " << GRpcResponsesSizeQueue_.size()); + << ", queue: " << FlowControl_.QueueSize()); auto resp = MakeHolder(); resp->Record.SetSeqNo(ev->Get()->Record.GetSeqNo()); - resp->Record.SetFreeSpace(freeSpace); + resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(ev->Sender, resp.Release()); } @@ -297,9 +291,9 @@ class TStreamExecuteYqlScriptRPC void Handle(TRpcServices::TEvGrpcNextReply::TPtr& ev, const TActorContext& ctx) { LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " NextReply" << ", left: " << ev->Get()->LeftInQueue - << ", queue: " << GRpcResponsesSizeQueue_.size() - << ", used memory: " << GRpcResponsesSize_ - << ", buffer size: " << RpcBufferSize_); + << ", queue: " << FlowControl_.QueueSize() + << ", inflight bytes: " << FlowControl_.InflightBytes() + << ", limit bytes: " << FlowControl_.InflightLimitBytes()); LastDataStreamTimestamp_ = TAppData::TimeProvider->Now(); if (DataQueryStreamContext) { @@ -317,27 +311,24 @@ class TStreamExecuteYqlScriptRPC } else { //ScanQuery in progress - while (GRpcResponsesSizeQueue_.size() > ev->Get()->LeftInQueue) { - GRpcResponsesSize_ -= GRpcResponsesSizeQueue_.front(); - GRpcResponsesSizeQueue_.pop(); + while (FlowControl_.QueueSize() > ev->Get()->LeftInQueue) { + FlowControl_.PopResponse(); } - Y_DEBUG_ABORT_UNLESS(GRpcResponsesSizeQueue_.empty() == (GRpcResponsesSize_ == 0)); - - if (WaitOnSeqNo_ && RpcBufferSize_ > GRpcResponsesSize_) { - ui64 freeSpace = RpcBufferSize_ - GRpcResponsesSize_; + const i64 freeSpaceBytes = FlowControl_.FreeSpaceBytes(); + if (freeSpaceBytes > 0 && LastSeqNo_ && AckedFreeSpaceBytes_ <= 0) { LOG_DEBUG_S(ctx, NKikimrServices::RPC_REQUEST, this->SelfId() << " Send stream data ack" - << ", seqNo: " << *WaitOnSeqNo_ - << ", freeSpace: " << freeSpace + << ", seqNo: " << *LastSeqNo_ + << ", freeSpace: " << freeSpaceBytes << ", to: " << GatewayRequestHandlerActorId_); auto resp = MakeHolder(); - resp->Record.SetSeqNo(*WaitOnSeqNo_); - resp->Record.SetFreeSpace(freeSpace); + resp->Record.SetSeqNo(*LastSeqNo_); + resp->Record.SetFreeSpace(freeSpaceBytes); ctx.Send(GatewayRequestHandlerActorId_, resp.Release()); - WaitOnSeqNo_.Clear(); + AckedFreeSpaceBytes_ = freeSpaceBytes; } } } @@ -397,7 +388,7 @@ class TStreamExecuteYqlScriptRPC TInstant now = TAppData::TimeProvider->Now(); TDuration timeout; - if (InactiveClientTimeout_ && GRpcResponsesSizeQueue_.size() > 0) { + if (InactiveClientTimeout_ && FlowControl_.QueueSize() > 0) { TDuration processTime = now - LastDataStreamTimestamp_; if (processTime >= InactiveClientTimeout_) { auto message = TStringBuilder() << this->SelfId() << " Client cannot process data in " << processTime @@ -476,13 +467,12 @@ class TStreamExecuteYqlScriptRPC } private: - const ui64 RpcBufferSize_; + TRpcFlowControlState FlowControl_; + TMaybe LastSeqNo_; + i64 AckedFreeSpaceBytes_ = 0; TDuration InactiveClientTimeout_; - TQueue GRpcResponsesSizeQueue_; - ui64 GRpcResponsesSize_ = 0; TInstant LastDataStreamTimestamp_; - TMaybe WaitOnSeqNo_; TSchedulerCookieHolder ClientTimeoutTimerCookieHolder_;