Skip to content

Commit 9f52024

Browse files
qyryqGazizonoki
authored andcommitted
Moved commit "ydb_topic: schedule SendImpl from OnCompressedImpl instead of calling it directly" from ydb repo
1 parent c025731 commit 9f52024

File tree

3 files changed

+15
-3
lines changed

3 files changed

+15
-3
lines changed

src/client/topic/impl/write_session_impl.cpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1187,7 +1187,17 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
11871187
(*Counters->BytesInflightCompressed) += block.Data.size();
11881188

11891189
PackedMessagesToSend.emplace(std::move(block));
1190-
SendImpl();
1190+
1191+
if (!SendImplScheduled.exchange(true)) {
1192+
CompressionExecutor->Post([cbContext = SelfContext]() {
1193+
if (auto self = cbContext->LockShared()) {
1194+
self->SendImplScheduled = false;
1195+
with_lock (self->Lock) {
1196+
self->SendImpl();
1197+
}
1198+
}
1199+
});
1200+
}
11911201
return memoryUsage;
11921202
}
11931203

src/client/topic/impl/write_session_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
455455
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
456456
bool Connected = false;
457457
bool Started = false;
458+
std::atomic<bool> SendImplScheduled = false;
458459
std::atomic<int> Aborting = 0;
459460
bool SessionEstablished = false;
460461
ui32 PartitionId = 0;

src/client/topic/ut/basic_usage_ut.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,14 +455,15 @@ Y_UNIT_TEST_SUITE(BasicUsage) {
455455

456456
UNIT_ASSERT(!futureWrite.HasValue());
457457
Cerr << ">>>TEST: future write has no value " << Endl;
458-
RunTasks(stepByStepExecutor, {0});
458+
RunTasks(stepByStepExecutor, {0}); // Run compression task.
459+
RunTasks(stepByStepExecutor, {1}); // Run send task.
459460
futureWrite.GetValueSync();
460461
UNIT_ASSERT(futureWrite.HasValue());
461462
Cerr << ">>>TEST: future write has value " << Endl;
462463

463464
UNIT_ASSERT(!futureRead.HasValue());
464465
Cerr << ">>>TEST: future read has no value " << Endl;
465-
RunTasks(stepByStepExecutor, {1});
466+
RunTasks(stepByStepExecutor, {2}); // Run decompression task.
466467
futureRead.GetValueSync();
467468
UNIT_ASSERT(futureRead.HasValue());
468469
Cerr << ">>>TEST: future read has value " << Endl;

0 commit comments

Comments
 (0)