Skip to content

Commit e2e92e4

Browse files
authored
Fix splitted commit message for EvWrite (#15062)
1 parent e0f640d commit e2e92e4

File tree

6 files changed

+11
-12
lines changed

6 files changed

+11
-12
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,6 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
457457

458458
ptr->InFlightMemoryLimitPerActorBytes = settings.GetInFlightMemoryLimitPerActorBytes();
459459
ptr->MemoryLimitPerMessageBytes = settings.GetMemoryLimitPerMessageBytes();
460-
ptr->MaxBatchesPerMessage = settings.GetMaxBatchesPerMessage();
461460

462461
ptr->StartRetryDelay = TDuration::MilliSeconds(settings.GetStartRetryDelayMs());
463462
ptr->MaxRetryDelay = TDuration::MilliSeconds(settings.GetMaxRetryDelayMs());

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,12 +239,12 @@ class TKqpTableWriteActor : public TActorBootstrapped<TKqpTableWriteActor> {
239239
, Counters(counters)
240240
, TableWriteActorSpan(TWilsonKqp::TableWriteActor, NWilson::TTraceId(traceId), "TKqpTableWriteActor")
241241
{
242+
AFL_ENSURE(MessageSettings.InFlightMemoryLimitPerActorBytes >= MessageSettings.MemoryLimitPerMessageBytes);
242243
LogPrefix = TStringBuilder() << "Table: `" << TablePath << "` (" << TableId << "), " << "SessionActorId: " << sessionActorId;
243244
ShardedWriteController = CreateShardedWriteController(
244245
TShardedWriteControllerSettings {
245246
.MemoryLimitTotal = MessageSettings.InFlightMemoryLimitPerActorBytes,
246247
.MemoryLimitPerMessage = MessageSettings.MemoryLimitPerMessageBytes,
247-
.MaxBatchesPerMessage = MessageSettings.MaxBatchesPerMessage,
248248
},
249249
TypeEnv,
250250
Alloc);

ydb/core/kqp/runtime/kqp_write_actor_settings.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ namespace NKqp {
1010
struct TWriteActorSettings : TAtomicRefCount<TWriteActorSettings> {
1111
i64 InFlightMemoryLimitPerActorBytes = 64_MB;
1212
i64 MemoryLimitPerMessageBytes = 64_MB;
13-
i64 MaxBatchesPerMessage = 1000;
1413
i64 MaxForwardedSize = 64_MB;
1514

1615
TDuration StartRetryDelay = TDuration::Seconds(1);

ydb/core/kqp/runtime/kqp_write_table.cpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -927,20 +927,21 @@ class TShardsInfo {
927927
return IsClosed() && IsEmpty();
928928
}
929929

930-
void MakeNextBatches(i64 maxDataSize, ui64 maxCount) {
930+
void MakeNextBatches(i64 maxDataSize, std::optional<ui64> maxCount) {
931931
YQL_ENSURE(BatchesInFlight == 0);
932932
YQL_ENSURE(!IsEmpty());
933-
YQL_ENSURE(maxCount != 0);
934933
i64 dataSize = 0;
935934
// For columnshard batch can be slightly larger than the limit.
936-
while (BatchesInFlight < maxCount
935+
while ((!maxCount || BatchesInFlight < *maxCount)
937936
&& BatchesInFlight < Batches.size()
938937
&& (dataSize + GetBatch(BatchesInFlight).GetMemory() <= maxDataSize || BatchesInFlight == 0)) {
939938
dataSize += GetBatch(BatchesInFlight).GetMemory();
940939
++BatchesInFlight;
941940
}
942941
YQL_ENSURE(BatchesInFlight != 0);
943-
YQL_ENSURE(BatchesInFlight == Batches.size() || BatchesInFlight >= maxCount || dataSize + GetBatch(BatchesInFlight).GetMemory() > maxDataSize);
942+
YQL_ENSURE(BatchesInFlight == Batches.size()
943+
|| (maxCount && BatchesInFlight >= *maxCount)
944+
|| dataSize + GetBatch(BatchesInFlight).GetMemory() > maxDataSize);
944945
}
945946

946947
TBatchWithMetadata& GetBatch(size_t index) {
@@ -1451,9 +1452,11 @@ class TShardedWriteController : public IShardedWriteController {
14511452
void BuildBatchesForShard(TShardsInfo::TShardInfo& shard) {
14521453
if (shard.GetBatchesInFlight() == 0) {
14531454
YQL_ENSURE(IsOlap != std::nullopt);
1454-
shard.MakeNextBatches(
1455-
Settings.MemoryLimitPerMessage,
1456-
(*IsOlap) ? 1 : Settings.MaxBatchesPerMessage);
1455+
if (*IsOlap) {
1456+
shard.MakeNextBatches(Settings.MemoryLimitPerMessage, 1);
1457+
} else {
1458+
shard.MakeNextBatches(Settings.MemoryLimitPerMessage, std::nullopt);
1459+
}
14571460
}
14581461
}
14591462

ydb/core/kqp/runtime/kqp_write_table.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ using IShardedWriteControllerPtr = TIntrusivePtr<IShardedWriteController>;
116116
struct TShardedWriteControllerSettings {
117117
i64 MemoryLimitTotal;
118118
i64 MemoryLimitPerMessage;
119-
i64 MaxBatchesPerMessage;
120119
bool Inconsistent;
121120
};
122121

ydb/core/protos/table_service_config.proto

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -330,7 +330,6 @@ message TTableServiceConfig {
330330
message TWriteActorSettings {
331331
optional uint64 InFlightMemoryLimitPerActorBytes = 1 [ default = 67108864 ];
332332
optional uint64 MemoryLimitPerMessageBytes = 2 [ default = 67108864 ];
333-
optional uint64 MaxBatchesPerMessage = 3 [ default = 1000 ];
334333

335334
optional uint64 StartRetryDelayMs = 4 [ default = 1000 ];
336335
optional uint64 MaxRetryDelayMs = 5 [ default = 10000 ];

0 commit comments

Comments
 (0)