Skip to content

Commit db527f7

Browse files
authored
Fix pq writer and few renames (#1757)
1 parent bb72994 commit db527f7

File tree

1 file changed

+23
-42
lines changed

1 file changed

+23
-42
lines changed

ydb/core/persqueue/writer/writer.cpp

+23-42
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
470470
return;
471471
}
472472

473-
const bool checkQuota = Opts.CheckRequestUnits() && IsQuotaRequired();
473+
const bool needToRequestQuota = Opts.CheckRequestUnits() && IsQuotaRequired();
474474

475475
size_t processed = 0;
476476
PendingQuotaAmount = 0;
@@ -490,23 +490,23 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
490490
cmd.SetSize(it->second.ByteSize());
491491
cmd.SetLastRequest(false);
492492

493-
if (checkQuota) {
493+
if (needToRequestQuota) {
494494
++processed;
495495
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize());
496496
PendingQuota.emplace_back(it->first);
497497
}
498498

499499
NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());
500500

501-
PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), checkQuota });
501+
PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), needToRequestQuota });
502502
Pending.erase(it);
503503

504-
if (checkQuota && processed == MAX_QUOTA_INFLIGHT) {
504+
if (needToRequestQuota && processed == MAX_QUOTA_INFLIGHT) {
505505
break;
506506
}
507507
}
508508

509-
if (checkQuota) {
509+
if (needToRequestQuota) {
510510
RequestDataQuota(PendingQuotaAmount, ctx);
511511
}
512512
}
@@ -527,18 +527,18 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
527527

528528
ReceivedReserve.emplace(it->first, std::move(it->second));
529529

530-
ProcessQuota();
530+
ProcessQuotaAndWrite();
531531
}
532532

533-
void ProcessQuota() {
533+
void ProcessQuotaAndWrite() {
534534
auto rit = ReceivedReserve.begin();
535535
auto qit = ReceivedQuota.begin();
536536

537537
while(rit != ReceivedReserve.end() && qit != ReceivedQuota.end()) {
538538
auto& request = rit->second;
539539
const auto cookie = rit->first;
540-
TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted);
541-
if (!request.QuotaChecked || request.QuotaAccepted) {
540+
TRACE("processing quota for request cookie=" << cookie << ", QuotaCheckEnabled=" << request.QuotaCheckEnabled << ", QuotaAccepted=" << request.QuotaAccepted);
541+
if (!request.QuotaCheckEnabled || request.QuotaAccepted) {
542542
// A situation when a quota was not requested or was received while waiting for a reserve
543543
Write(cookie, std::move(request.Request));
544544
ReceivedReserve.erase(rit++);
@@ -559,8 +559,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
559559
while(rit != ReceivedReserve.end()) {
560560
auto& request = rit->second;
561561
const auto cookie = rit->first;
562-
TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted);
563-
if (request.QuotaChecked && !request.QuotaAccepted) {
562+
TRACE("processing quota for request cookie=" << cookie << ", QuotaCheckEnabled=" << request.QuotaCheckEnabled << ", QuotaAccepted=" << request.QuotaAccepted);
563+
if (request.QuotaCheckEnabled && !request.QuotaAccepted) {
564564
break;
565565
}
566566

@@ -587,27 +587,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
587587
ReceivedQuota.clear();
588588
}
589589

590-
void Write(ui64 cookie) {
591-
if (PendingReserve.empty()) {
592-
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #02");
593-
Disconnected(EErrorCode::InternalError);
594-
return;
595-
}
596-
auto it = PendingReserve.begin();
597-
598-
auto cookieReserveValid = (it->first == cookie);
599-
auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie);
600-
if (!(cookieReserveValid && cookieWriteValid)) {
601-
ERROR("The cookie of Write is invalid. Cookie=" << cookie);
602-
Disconnected(EErrorCode::InternalError);
603-
return;
604-
}
605-
606-
Write(cookie, std::move(it->second.Request));
607-
608-
PendingReserve.erase(it);
609-
}
610-
611590
void Write(ui64 cookie, NKikimrClient::TPersQueueRequest&& req) {
612591
auto ev = MakeHolder<TEvPersQueue::TEvRequest>();
613592
ev->Record = std::move(req);
@@ -651,24 +630,26 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
651630
return WriteResult(EErrorCode::InternalError, error, std::move(record));
652631
}
653632

654-
WriteAccepted(cookie);
655-
656-
if (PendingReserve.empty()) {
657-
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #03");
633+
auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie);
634+
if (!cookieWriteValid) {
635+
ERROR("The cookie of Write is invalid. Cookie=" << cookie);
658636
Disconnected(EErrorCode::InternalError);
659637
return;
660638
}
639+
640+
WriteAccepted(cookie);
661641
auto it = PendingReserve.begin();
662642
auto& holder = it->second;
663643

664-
if ((holder.QuotaChecked && !holder.QuotaAccepted)|| !ReceivedReserve.empty()) {
644+
if ((holder.QuotaCheckEnabled && !holder.QuotaAccepted) || !ReceivedReserve.empty()) {
665645
// There may be two situations:
666646
// - a quota has been requested, and the quota has not been received yet
667647
// - the quota was not requested, for example, due to a change in the metering option, but the previous quota requests have not yet been processed
668648
EnqueueReservedAndProcess(cookie);
669649
} else {
670-
Write(cookie);
650+
Write(cookie, std::move(it->second.Request));
671651
}
652+
PendingReserve.erase(it);
672653
} else {
673654
if (PendingWrite.empty()) {
674655
return WriteResult(EErrorCode::InternalError, "Unexpected Write response", std::move(record));
@@ -740,7 +721,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
740721
ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end());
741722
PendingQuota.clear();
742723

743-
ProcessQuota();
724+
ProcessQuotaAndWrite();
744725

745726
break;
746727

@@ -829,12 +810,12 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
829810

830811
struct RequestHolder {
831812
NKikimrClient::TPersQueueRequest Request;
832-
bool QuotaChecked;
813+
bool QuotaCheckEnabled;
833814
bool QuotaAccepted;
834815

835-
RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaChecked)
816+
RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaCheckEnabled)
836817
: Request(std::move(request))
837-
, QuotaChecked(quotaChecked)
818+
, QuotaCheckEnabled(quotaCheckEnabled)
838819
, QuotaAccepted(false) {
839820
}
840821
};

0 commit comments

Comments
 (0)