Skip to content

Commit b6219a1

Browse files
authored
Fix EvWrite Sink memory tracking (#3259)
1 parent 3966efa commit b6219a1

File tree

2 files changed

+23
-12
lines changed

2 files changed

+23
-12
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

+22-12
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424

2525
namespace {
2626
struct TWriteActorBackoffSettings {
27-
TDuration StartRetryDelay = TDuration::MilliSeconds(150);
28-
TDuration MaxRetryDelay = TDuration::Seconds(5);
27+
TDuration StartRetryDelay = TDuration::MilliSeconds(250);
28+
TDuration MaxRetryDelay = TDuration::Seconds(10);
2929
double UnsertaintyRatio = 0.5;
3030
double Multiplier = 2.0;
3131

@@ -218,16 +218,15 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
218218

219219
void CheckMemory() {
220220
const auto freeSpace = Writer.GetFreeSpace();
221-
if (NeedToResume && freeSpace > 0) {
222-
Writer.Callbacks->ResumeExecution();
223-
} else if (!NeedToResume && freeSpace <= 0) {
224-
NeedToResume = true;
221+
if (freeSpace > LastFreeMemory) {
222+
Writer.ResumeExecution();
225223
}
224+
LastFreeMemory = freeSpace;
226225
}
227226

228227
private:
229228
TKqpWriteActor& Writer;
230-
bool NeedToResume = false;
229+
i64 LastFreeMemory = std::numeric_limits<i64>::max();
231230
};
232231

233232
friend class TResumeNotificationManager;
@@ -291,9 +290,14 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
291290
}
292291

293292
i64 GetFreeSpace() const final {
294-
return Serializer
293+
const i64 result = Serializer
295294
? MemoryLimit - Serializer->GetMemory() - ShardsInfo.GetMemory()
296295
: std::numeric_limits<i64>::min(); // Can't use zero here because compute can use overcommit!
296+
297+
if (result <= 0) {
298+
CA_LOG_D("No free space left. FreeSpace=" << result << " bytes.");
299+
}
300+
return result;
297301
}
298302

299303
TMaybe<google::protobuf::Any> ExtraData() override {
@@ -308,12 +312,14 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
308312
return result;
309313
}
310314

311-
void SendData(NMiniKQL::TUnboxedValueBatch&& data, i64, const TMaybe<NYql::NDqProto::TCheckpoint>&, bool finished) final {
315+
void SendData(NMiniKQL::TUnboxedValueBatch&& data, i64 size, const TMaybe<NYql::NDqProto::TCheckpoint>&, bool finished) final {
312316
YQL_ENSURE(!data.IsWide(), "Wide stream is not supported yet");
313317
YQL_ENSURE(!Finished);
314318
Finished = finished;
315319
EgressStats.Resume();
316320

321+
CA_LOG_D("New data: size=" << size << ", finished=" << finished << ".");
322+
317323
YQL_ENSURE(Serializer);
318324
try {
319325
Serializer->AddData(std::move(data), Finished);
@@ -394,8 +400,6 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
394400
}
395401

396402
Prepare();
397-
398-
Callbacks->ResumeExecution();
399403
}
400404

401405
void Handle(NKikimr::NEvents::TDataEvents::TEvWriteResult::TPtr& ev) {
@@ -554,7 +558,8 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
554558

555559
CA_LOG_D("Send EvWrite to ShardID=" << shardId << ", TxId=" << std::get<ui64>(TxId)
556560
<< ", LockTxId=" << Settings.GetLockTxId() << ", LockNodeId=" << Settings.GetLockNodeId()
557-
<< ", Size=" << inFlightBatch.Data.size() << ", Cookie=" << inFlightBatch.Cookie);
561+
<< ", Size=" << inFlightBatch.Data.size() << ", Cookie=" << inFlightBatch.Cookie
562+
<< "; ShardBatchesLeft=" << shard.Size() << ", ShardClosed=" << shard.IsClosed());
558563
Send(
559564
PipeCacheId,
560565
new TEvPipeCache::TEvForward(evWrite.release(), shardId, true),
@@ -628,13 +633,18 @@ class TKqpWriteActor : public TActorBootstrapped<TKqpWriteActor>, public NYql::N
628633
*SchemeEntry,
629634
columnsMetadata,
630635
TypeEnv);
636+
ResumeExecution();
631637
} catch (...) {
632638
RuntimeError(
633639
CurrentExceptionMessage(),
634640
NYql::NDqProto::StatusIds::INTERNAL_ERROR);
635641
}
636642
}
637643

644+
void ResumeExecution() {
645+
CA_LOG_D("Resuming execution.");
646+
Callbacks->ResumeExecution();
647+
}
638648

639649
NActors::TActorId TxProxyId = MakeTxProxyID();
640650
NActors::TActorId PipeCacheId = NKikimr::MakePipePeNodeCacheID(false);

ydb/core/kqp/runtime/kqp_write_table.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer {
191191
Y_UNUSED(force);
192192
TBatches newBatches;
193193
std::swap(Batches, newBatches);
194+
Memory = 0;
194195
return std::move(newBatches);
195196
}
196197

0 commit comments

Comments
 (0)