Skip to content

Commit 9f656ac

Browse files
authored
Revert separate lock for Processor->Write calls (#7837)
1 parent 752095e commit 9f656ac

File tree

2 files changed

+9
-27
lines changed

2 files changed

+9
-27
lines changed

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -958,8 +958,7 @@ TWriteSessionImpl::TProcessSrvMessageResult TWriteSessionImpl::ProcessServerMess
958958
FirstTokenSent = true;
959959
}
960960
// Kickstart send after session reestablishment
961-
FormGrpcMessagesImpl();
962-
SendGrpcMessages();
961+
SendImpl();
963962
break;
964963
}
965964
case TServerMessage::kWriteResponse: {
@@ -1147,15 +1146,13 @@ void TWriteSessionImpl::CompressImpl(TBlock&& block_) {
11471146

11481147
void TWriteSessionImpl::OnCompressed(TBlock&& block, bool isSyncCompression) {
11491148
TMemoryUsageChange memoryUsage;
1150-
if (isSyncCompression) {
1151-
// The Lock is already held somewhere up the stack.
1152-
memoryUsage = OnCompressedImpl(std::move(block));
1153-
} else {
1149+
if (!isSyncCompression) {
11541150
with_lock(Lock) {
11551151
memoryUsage = OnCompressedImpl(std::move(block));
11561152
}
1153+
} else {
1154+
memoryUsage = OnCompressedImpl(std::move(block));
11571155
}
1158-
SendGrpcMessages();
11591156
if (memoryUsage.NowOk && !memoryUsage.WasOk) {
11601157
EventsQueue->PushEvent(TWriteSessionEvent::TReadyToAcceptEvent{IssueContinuationToken()});
11611158
}
@@ -1171,7 +1168,7 @@ TMemoryUsageChange TWriteSessionImpl::OnCompressedImpl(TBlock&& block) {
11711168
(*Counters->BytesInflightCompressed) += block.Data.size();
11721169

11731170
PackedMessagesToSend.emplace(std::move(block));
1174-
FormGrpcMessagesImpl();
1171+
SendImpl();
11751172
return memoryUsage;
11761173
}
11771174

@@ -1288,7 +1285,7 @@ size_t TWriteSessionImpl::WriteBatchImpl() {
12881285
}
12891286
CurrentBatch.Reset();
12901287
if (skipCompression) {
1291-
FormGrpcMessagesImpl();
1288+
SendImpl();
12921289
}
12931290
return size;
12941291
}
@@ -1352,16 +1349,7 @@ bool TWriteSessionImpl::TxIsChanged(const Ydb::Topic::StreamWriteMessage_WriteRe
13521349
return GetTransactionId(*writeRequest) != GetTransactionId(OriginalMessagesToSend.front().Tx);
13531350
}
13541351

1355-
void TWriteSessionImpl::SendGrpcMessages() {
1356-
with_lock(ProcessorLock) {
1357-
TClientMessage message;
1358-
while (GrpcMessagesToSend.Dequeue(&message)) {
1359-
Processor->Write(std::move(message));
1360-
}
1361-
}
1362-
}
1363-
1364-
void TWriteSessionImpl::FormGrpcMessagesImpl() {
1352+
void TWriteSessionImpl::SendImpl() {
13651353
Y_ABORT_UNLESS(Lock.IsLocked());
13661354

13671355
// External cycle splits ready blocks into multiple gRPC messages. Current gRPC message size hard limit is 64MiB.
@@ -1431,7 +1419,7 @@ void TWriteSessionImpl::FormGrpcMessagesImpl() {
14311419
<< OriginalMessagesToSend.size() << " left), first sequence number is "
14321420
<< writeRequest->messages(0).seq_no()
14331421
);
1434-
GrpcMessagesToSend.Enqueue(std::move(clientMessage));
1422+
Processor->Write(std::move(clientMessage));
14351423
}
14361424
}
14371425

@@ -1493,7 +1481,6 @@ void TWriteSessionImpl::HandleWakeUpImpl() {
14931481
with_lock(self->Lock) {
14941482
self->HandleWakeUpImpl();
14951483
}
1496-
self->SendGrpcMessages();
14971484
}
14981485
};
14991486
if (TInstant::Now() - LastTokenUpdate > UPDATE_TOKEN_PERIOD) {

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.h

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
#include <ydb/public/sdk/cpp/client/ydb_topic/impl/topic_impl.h>
66

77
#include <util/generic/buffer.h>
8-
#include <util/thread/lfqueue.h>
98

109

1110
namespace NYdb::NTopic {
@@ -386,8 +385,7 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
386385
ui64 GetNextIdImpl(const TMaybe<ui64>& seqNo);
387386
ui64 GetSeqNoImpl(ui64 id);
388387
ui64 GetIdImpl(ui64 seqNo);
389-
void FormGrpcMessagesImpl();
390-
void SendGrpcMessages();
388+
void SendImpl();
391389
void AbortImpl();
392390
void CloseImpl(EStatus statusCode, NYql::TIssues&& issues);
393391
void CloseImpl(EStatus statusCode, const TString& message);
@@ -448,9 +446,6 @@ class TWriteSessionImpl : public TContinuationTokenIssuer,
448446
std::queue<TOriginalMessage> SentOriginalMessages;
449447
std::queue<TBlock> SentPackedMessage;
450448

451-
TLockFreeQueue<TClientMessage> GrpcMessagesToSend;
452-
TAdaptiveLock ProcessorLock;
453-
454449
const size_t MaxBlockSize = std::numeric_limits<size_t>::max();
455450
const size_t MaxBlockMessageCount = 1; //!< Max message count that can be packed into a single block. In block version 0 is equal to 1 for compatibility
456451
bool Connected = false;

0 commit comments

Comments
 (0)