Skip to content

Commit 4bce249

Browse files
authored
Use a separate lock for Processor->Write calls (ydb-platform#7682)
1 parent 0b61b05 commit 4bce249

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -958,7 +958,8 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
958958
FirstTokenSent = true;
959959
}
960960
// Kickstart send after session reestablishment
961-
SendImpl();
961+
FormGrpcMessagesImpl();
962+
SendGrpcMessages();
962963
break;
963964
}
964965
case TServerMessage::kWriteResponse: {
@@ -1140,13 +1141,15 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
11401141

11411142
void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
11421143
TMemoryUsageChange memoryUsage;
1143-
if (!isSyncCompression) {
1144+
if (isSyncCompression) {
1145+
// The Lock is already held somewhere up the stack.
1146+
memoryUsage = OnCompressedImpl(std::move(block));
1147+
} else {
11441148
with_lock(Lock) {
11451149
memoryUsage = OnCompressedImpl(std::move(block));
11461150
}
1147-
} else {
1148-
memoryUsage = OnCompressedImpl(std::move(block));
11491151
}
1152+
SendGrpcMessages();
11501153
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
11511154
EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
11521155
}
@@ -1162,7 +1165,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
11621165
(*Counters->BytesInflightCompressed) += block.Data.size();
11631166

11641167
PackedMessagesToSend.emplace(std::move(block));
1165-
SendImpl();
1168+
FormGrpcMessagesImpl();
11661169
return memoryUsage;
11671170
}
11681171

@@ -1279,7 +1282,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
12791282
}
12801283
CurrentBatch.Reset();
12811284
if (skipCompression) {
1282-
SendImpl();
1285+
FormGrpcMessagesImpl();
12831286
}
12841287
return size;
12851288
}
@@ -1343,7 +1346,16 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
13431346
return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx);
13441347
}
13451348

1346-
void TWriteSessionImpl::SendImpl() {
1349+
void TWriteSessionImpl::SendGrpcMessages() {
1350+
with_lock(ProcessorLock) {
1351+
TClientMessage message;
1352+
while (GrpcMessagesToSend.Dequeue(&message)) {
1353+
Processor->Write(std::move(message));
1354+
}
1355+
}
1356+
}
1357+
1358+
void TWriteSessionImpl::FormGrpcMessagesImpl() {
13471359
Y_ABORT_UNLESS(Lock.IsLocked());
13481360

13491361
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
@@ -1413,7 +1425,7 @@ void TWriteSessionImpl::SendImpl() {
14131425
<< OriginalMessagesToSend.size() << " left), first sequence number is "
14141426
<< writeRequest->messages(0).seq_no()
14151427
);
1416-
Processor->Write(std::move(clientMessage));
1428+
GrpcMessagesToSend.Enqueue(std::move(clientMessage));
14171429
}
14181430
}
14191431

@@ -1475,6 +1487,7 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
14751487
with_lock(self->Lock) {
14761488
self->HandleWakeUpImpl();
14771489
}
1490+
self->SendGrpcMessages();
14781491
}
14791492
};
14801493
if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>
66

77
#include <util/generic/buffer.h>
8+
#include <util/thread/lfqueue.h>
89

910

1011
namespace NYdb::NTopic {
@@ -385,7 +386,8 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
385386
ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo);
386387
ui64 GetSeqNoImpl(ui64 id);
387388
ui64 GetIdImpl(ui64 seqNo);
388-
void SendImpl();
389+
void FormGrpcMessagesImpl();
390+
void SendGrpcMessages();
389391
void AbortImpl();
390392
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
391393
void CloseImpl(EStatus statusCode, const TString& message);
@@ -446,6 +448,9 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
446448
std::queue<TOriginalMessage> SentOriginalMessages;
447449
std::queue<TBlock> SentPackedMessage;
448450

451+
TLockFreeQueue<TClientMessage> GrpcMessagesToSend;
452+
TAdaptiveLock ProcessorLock;
453+
449454
const size_t MaxBlockSize = std::numeric_limits<size_t>::max();
450455
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
451456
bool Connected = false;

0 commit comments

Comments
 (0)