Skip to content

Commit b543a93

Browse files
authored
configurable write session buffer limit (#14955)
1 parent f1dcfdb commit b543a93

File tree

2 files changed

+4
-3
lines changed

2 files changed

+4
-3
lines changed

ydb/core/protos/pqconfig.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ message TPQConfig {
207207

208208
optional uint64 BalancerWakeupIntervalSec = 54 [default = 30];
209209
optional uint64 BalancerStatsWakeupIntervalSec = 55 [default = 5];
210+
211+
optional uint64 MaxWriteSessionBytesInflight = 57 [default = 1000000];
210212
}
211213

212214
message TChannelProfile {

ydb/services/persqueue_v1/actors/write_session_actor.cpp

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,6 @@ namespace NGRpcProxy::V1 {
159159

160160
using namespace Ydb::PersQueue::V1;
161161

162-
static const ui32 MAX_BYTES_INFLIGHT = 1_MB;
163162
static const TDuration SOURCEID_UPDATE_PERIOD = TDuration::Hours(1);
164163

165164
// metering
@@ -942,7 +941,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(NPQ::TEvPartitionWriter::T
942941
if (BytesInflight) {
943942
BytesInflight.Dec(diff);
944943
}
945-
if (!NextRequestInited && BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended
944+
if (!NextRequestInited && BytesInflight_ < AppData(ctx)->PQConfig.GetMaxWriteSessionBytesInflight()) { //allow only one big request to be readed but not sended
946945
NextRequestInited = true;
947946
if (!Request->GetStreamCtx()->Read()) {
948947
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed");
@@ -1498,7 +1497,7 @@ void TWriteSessionActor<UseMigrationProtocol>::Handle(typename TEvWrite::TPtr& e
14981497
BytesInflightTotal.Inc(diff);
14991498
}
15001499

1501-
if (BytesInflight_ < MAX_BYTES_INFLIGHT) { //allow only one big request to be readed but not sended
1500+
if (BytesInflight_ < AppData(ctx)->PQConfig.GetMaxWriteSessionBytesInflight()) { //allow only one big request to be readed but not sended
15021501
Y_ABORT_UNLESS(NextRequestInited);
15031502
if (!Request->GetStreamCtx()->Read()) {
15041503
LOG_INFO_S(ctx, NKikimrServices::PQ_WRITE_PROXY, "session v1 cookie: " << Cookie << " sessionId: " << OwnerCookie << " grpc read failed");

0 commit comments

Comments
 (0)