Skip to content

Commit 29ba972

Browse files
one cycle for processing transactions, writing operations and promoting offsets (ydb-platform#3546)
1 parent 1abde1c commit 29ba972

9 files changed

+567
-621
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 190 additions & 132 deletions
Large diffs are not rendered by default.

ydb/core/persqueue/partition.h

Lines changed: 42 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -114,12 +114,14 @@ class TPartition : public TActorBootstrapped<TPartition> {
114114

115115
void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
116116
void AnswerCurrentWrites(const TActorContext& ctx);
117-
void CancelAllWritesOnIdle(const TActorContext& ctx);
118-
void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TPartitionSourceManager::TModificationBatch& sourceIdBatch, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST);
117+
void AnswerCurrentReplies(const TActorContext& ctx);
118+
void CancelOneWriteOnWrite(const TActorContext& ctx,
119+
const TString& errorStr,
120+
const TWriteMsg& p,
121+
NPersQueue::NErrorCode::EErrorCode errorCode);
119122
void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request);
120123
void CreateMirrorerActor();
121124
void DoRead(TEvPQ::TEvRead::TPtr&& ev, TDuration waitQuotaTime, const TActorContext& ctx);
122-
void FailBadClient(const TActorContext& ctx);
123125
void FillReadFromTimestamps(const TActorContext& ctx);
124126
void FilterDeadlinedWrites(const TActorContext& ctx);
125127
void FilterDeadlinedWrites(const TActorContext& ctx, TMessageQueue& requests);
@@ -205,33 +207,24 @@ class TPartition : public TActorBootstrapped<TPartition> {
205207
void UpdateAvailableSize(const TActorContext& ctx);
206208

207209
void AddMetaKey(TEvKeyValue::TEvRequest* request);
208-
void BecomeIdle();
209-
void BecomeWrite();
210210
void CheckHeadConsistency() const;
211211
void HandlePendingRequests(const TActorContext& ctx);
212212
void HandleQuotaWaitingRequests(const TActorContext& ctx);
213-
void HandleRequests(const TActorContext& ctx);
214213
void RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie);
215214
bool RequestBlobQuota();
216215
void RequestBlobQuota(size_t quotaSize);
217216
void ConsumeBlobQuota();
218-
void WritePendingBlob(THolder<TEvKeyValue::TEvRequest> request);
219217
void UpdateAfterWriteCounters(bool writeComplete);
220218
void UpdateUserInfoEndOffset(const TInstant& now);
221219
void UpdateWriteBufferIsFullState(const TInstant& now);
222-
void CancelReserveRequests(const TActorContext& ctx);
223-
void CancelRequests(const TActorContext& ctx, TMessageQueue& requests);
224220

225221
TInstant GetWriteTimeEstimate(ui64 offset) const;
226-
bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
227-
TPartitionSourceManager::TModificationBatch& sourceIdBatch);
228222
bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
229223

230224
// Removes blobs that are no longer required. Blobs are no longer required if the storage time of all messages
231225
// stored in this blob has expired and they have been read by all important consumers.
232226
bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorContext& ctx);
233227
bool IsQuotingEnabled() const;
234-
bool ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx);
235228
bool WaitingForPreviousBlobQuota() const;
236229
bool WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 withSize = 0) const;
237230
size_t GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request);
@@ -265,7 +258,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
265258
void ProcessDistrTx(const TActorContext& ctx);
266259

267260
void AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> event);
268-
void RemoveImmediateTx();
269261
void ProcessImmediateTxs(const TActorContext& ctx);
270262
void ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
271263
const TActorContext& ctx);
@@ -299,8 +291,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
299291
ui64 offset, ui32 gen, ui32 step, const TString& session,
300292
ui64 readOffsetRewindSum,
301293
ui64 readRuleGeneration);
302-
void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request,
303-
ui64 step, ui64 txId);
294+
void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request);
304295
void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
305296
void AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request);
306297
void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
@@ -330,7 +321,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
330321

331322
void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
332323
const TActorContext& ctx);
333-
void OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx);
324+
void OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx);
334325
void EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
335326
NPersQueue::TTopicConverterPtr topicConverter,
336327
const TActorContext& ctx);
@@ -469,6 +460,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
469460
switch (ev->GetTypeRewrite()) {
470461
CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
471462
HFuncTraced(TEvKeyValue::TEvResponse, Handle);
463+
HFuncTraced(TEvPQ::TEvHandleWriteResponse, Handle);
472464
HFuncTraced(TEvPQ::TEvBlobResponse, Handle);
473465
HFuncTraced(TEvPQ::TEvWrite, HandleOnIdle);
474466
HFuncTraced(TEvPQ::TEvRead, Handle);
@@ -516,67 +508,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
516508
};
517509
}
518510

519-
STFUNC(StateWrite)
520-
{
521-
NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
522-
523-
ALOG_TRACE(NKikimrServices::PERSQUEUE, EventStr("StateWrite", ev));
524-
525-
TRACE_EVENT(NKikimrServices::PERSQUEUE);
526-
switch (ev->GetTypeRewrite()) {
527-
CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
528-
HFuncTraced(TEvKeyValue::TEvResponse, Handle);
529-
HFuncTraced(TEvPQ::TEvHandleWriteResponse, Handle);
530-
HFuncTraced(TEvPQ::TEvBlobResponse, Handle);
531-
HFuncTraced(TEvPQ::TEvWrite, HandleOnWrite);
532-
HFuncTraced(TEvPQ::TEvRead, Handle);
533-
HFuncTraced(TEvPQ::TEvApproveReadQuota, Handle);
534-
HFuncTraced(TEvPQ::TEvReadTimeout, Handle);
535-
HFuncTraced(TEvents::TEvPoisonPill, Handle);
536-
HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring);
537-
HFuncTraced(TEvPQ::TEvGetMaxSeqNoRequest, Handle);
538-
HFuncTraced(TEvPQ::TEvGetClientOffset, Handle);
539-
HFuncTraced(TEvPQ::TEvUpdateWriteTimestamp, Handle);
540-
HFuncTraced(TEvPQ::TEvSetClientInfo, Handle);
541-
HFuncTraced(TEvPQ::TEvPartitionOffsets, Handle);
542-
HFuncTraced(TEvPQ::TEvPartitionStatus, Handle);
543-
HFuncTraced(TEvPersQueue::TEvReportPartitionError, Handle);
544-
HFuncTraced(TEvPQ::TEvChangeOwner, Handle);
545-
HFuncTraced(TEvPQ::TEvChangePartitionConfig, Handle);
546-
HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
547-
HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
548-
HFuncTraced(TEvPQ::TEvProxyResponse, Handle);
549-
HFuncTraced(TEvPQ::TEvError, Handle);
550-
HFuncTraced(TEvPQ::TEvReserveBytes, Handle);
551-
HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
552-
HFuncTraced(TEvPQ::TEvPipeDisconnected, Handle);
553-
HFuncTraced(TEvPQ::TEvUpdateAvailableSize, HandleOnWrite);
554-
HFuncTraced(TEvPQ::TEvQuotaDeadlineCheck, Handle);
555-
HFuncTraced(TEvPQ::TEvApproveWriteQuota, Handle);
556-
HFuncTraced(TEvPQ::TEvRegisterMessageGroup, HandleOnWrite);
557-
HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnWrite);
558-
HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnWrite);
559-
HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
560-
HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
561-
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
562-
HFuncTraced(TEvPQ::TEvProposePartitionConfig, Handle);
563-
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
564-
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
565-
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
566-
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
567-
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
568-
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
569-
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
570-
HFuncTraced(TEvPQ::TEvProcessChangeOwnerRequests, Handle);
571-
default:
572-
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateWrite", ev));
573-
break;
574-
};
575-
}
576-
577511
private:
578512
enum class EProcessResult {
579513
Continue,
514+
Reply,
580515
Abort,
581516
Break
582517
};
@@ -600,7 +535,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
600535

601536
static void RemoveMessages(TMessageQueue& src, TMessageQueue& dst);
602537
void RemovePendingRequests(TMessageQueue& requests);
603-
void RemoveQuotaWaitingRequests();
538+
void RemoveMessagesToQueue(TMessageQueue& requests);
604539

605540
private:
606541
ui64 TabletID;
@@ -639,8 +574,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
639574

640575
TMessageQueue PendingRequests;
641576
TMessageQueue QuotaWaitingRequests;
642-
TMessageQueue Requests;
643-
TMessageQueue Responses;
644577

645578
THead Head;
646579
THead NewHead;
@@ -672,30 +605,57 @@ class TPartition : public TActorBootstrapped<TPartition> {
672605

673606
template <class T> void EnsureUserActionAndTransactionEventsFrontIs() const;
674607

675-
bool ProcessUserActionOrTransaction(TEvPQ::TEvSetClientInfo& event, const TActorContext& ctx);
676-
bool ProcessUserActionOrTransaction(const TEvPersQueue::TEvProposeTransaction& event, const TActorContext& ctx);
677-
bool ProcessUserActionOrTransaction(TTransaction& tx, const TActorContext& ctx);
608+
EProcessResult ProcessUserActionOrTransaction(TEvPQ::TEvSetClientInfo& event,
609+
TEvKeyValue::TEvRequest* request,
610+
const TActorContext& ctx);
611+
EProcessResult ProcessUserActionOrTransaction(const TEvPersQueue::TEvProposeTransaction& event,
612+
TEvKeyValue::TEvRequest* request,
613+
const TActorContext& ctx);
614+
EProcessResult ProcessUserActionOrTransaction(TTransaction& tx,
615+
TEvKeyValue::TEvRequest* request,
616+
const TActorContext& ctx);
617+
EProcessResult ProcessUserActionOrTransaction(TMessage& msg,
618+
TEvKeyValue::TEvRequest* request,
619+
const TActorContext& ctx);
620+
621+
bool FirstEvent = true;
622+
bool HaveWriteMsg = false;
623+
bool HaveData = false;
624+
bool HaveCheckDisk = false;
625+
bool HaveDrop = false;
626+
bool HeadCleared = false;
627+
TMaybe<TPartitionSourceManager::TModificationBatch> SourceIdBatch;
628+
TMaybe<ProcessParameters> Parameters;
629+
630+
void BeginHandleRequests(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
631+
void EndHandleRequests(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
632+
void BeginProcessWrites(const TActorContext& ctx);
633+
void EndProcessWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
634+
void BeginAppendHeadWithNewWrites(const TActorContext& ctx);
635+
void EndAppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
678636

679637
//
680638
// user actions and transactions
681639
//
682640
using TUserActionAndTransactionEvent =
683641
std::variant<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>, // user actions
684642
TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>, // immediate transaction
685-
TTransaction>; // distributed transaction or update config
643+
TTransaction, // distributed transaction or update config
644+
TMessage>;
686645
std::deque<TUserActionAndTransactionEvent> UserActionAndTransactionEvents;
687646
size_t ImmediateTxCount = 0;
688647
THashMap<TString, size_t> UserActCount;
689648
THashMap<TString, TUserInfoBase> PendingUsersInfo;
690649
TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies;
691650
THashSet<TString> AffectedUsers;
692-
bool UsersInfoWriteInProgress = false;
651+
bool KVWriteInProgress = false;
693652
bool TxInProgress = false;
694653
TMaybe<ui64> PlanStep;
695654
TMaybe<ui64> TxId;
696655
bool TxIdHasChanged = false;
697656
TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
698657
bool SendChangeConfigReply = true;
658+
TMessageQueue Responses;
699659
//
700660
//
701661
//

ydb/core/persqueue/partition_monitoring.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ void TPartition::HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorCo
6868
str = "State is StateInit";
6969
} else if (CurrentStateFunc() == &TThis::StateIdle) {
7070
str = "State is StateIdle";
71-
} else if (CurrentStateFunc() == &TThis::StateWrite) {
72-
str = "State is StateWrite";
7371
} else {
7472
Y_ABORT("");
7573
}

ydb/core/persqueue/partition_util.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@
44

55
namespace NKikimr::NPQ {
66

7-
static const ui64 SET_OFFSET_COOKIE = 1;
8-
97
class TKeyLevel {
108
public:
119
friend IOutputStream& operator <<(IOutputStream& out, const TKeyLevel& value);

0 commit comments

Comments
 (0)