Skip to content

Commit 87b4d78

Browse files
Merge 1b4b028 into 418f880
2 parents 418f880 + 1b4b028 commit 87b4d78

File tree

7 files changed

+239
-14
lines changed

7 files changed

+239
-14
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,19 @@ struct TEvPQ {
189189
EvPartitionScaleStatusChanged,
190190
EvPartitionScaleRequestDone,
191191
EvBalanceConsumer,
192+
EvDeletePartition,
193+
EvDeletePartitionDone,
194+
EvTransactionCompleted,
192195
EvEnd
193196
};
194197

195198
struct TEvHandleWriteResponse : TEventLocal<TEvHandleWriteResponse, EvHandleWriteResponse> {
196-
TEvHandleWriteResponse()
197-
{}
199+
explicit TEvHandleWriteResponse(ui64 cookie) :
200+
Cookie(cookie)
201+
{
202+
}
203+
204+
ui64 Cookie = 0;
198205
};
199206

200207
struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> {
@@ -1132,6 +1139,27 @@ struct TEvPQ {
11321139

11331140
TString ConsumerName;
11341141
};
1142+
1143+
struct TEvDeletePartition : TEventLocal<TEvDeletePartition, EvDeletePartition> {
1144+
};
1145+
1146+
struct TEvDeletePartitionDone : TEventLocal<TEvDeletePartitionDone, EvDeletePartitionDone> {
1147+
explicit TEvDeletePartitionDone(const NPQ::TPartitionId& partitionId) :
1148+
PartitionId(partitionId)
1149+
{
1150+
}
1151+
1152+
NPQ::TPartitionId PartitionId;
1153+
};
1154+
1155+
struct TEvTransactionCompleted : TEventLocal<TEvTransactionCompleted, EvTransactionCompleted> {
1156+
explicit TEvTransactionCompleted(TMaybe<ui64> writeId) :
1157+
WriteId(writeId)
1158+
{
1159+
}
1160+
1161+
TMaybe<ui64> WriteId;
1162+
};
11351163
};
11361164

11371165
} //NKikimr

ydb/core/persqueue/partition.cpp

Lines changed: 111 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,8 @@ void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContex
492492
}
493493
}
494494

495-
void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
495+
void TPartition::DestroyActor(const TActorContext& ctx)
496+
{
496497
// Reply to all outstanding requests in order to destroy corresponding actors
497498

498499
TStringBuilder ss;
@@ -523,12 +524,19 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
523524
UsersInfoStorage->Clear(ctx);
524525
}
525526

526-
Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
527-
Send(WriteQuotaTrackerActor, new TEvents::TEvPoisonPill());
527+
if (!IsSupportive()) {
528+
Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
529+
Send(WriteQuotaTrackerActor, new TEvents::TEvPoisonPill());
530+
}
528531

529532
Die(ctx);
530533
}
531534

535+
void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
536+
{
537+
DestroyActor(ctx);
538+
}
539+
532540
bool CheckDiskStatus(const TStorageStatusFlags status) {
533541
return !status.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop);
534542
}
@@ -1020,6 +1028,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
10201028
--ImmediateTxCount;
10211029

10221030
ProcessImmediateTx(tx, predicate, ctx);
1031+
ScheduleTransactionCompleted(tx);
10231032
TxInProgress = false;
10241033
ContinueProcessTxsAndUserActs(ctx);
10251034

@@ -1056,6 +1065,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
10561065
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
10571066
NKikimrPQ::TError::BAD_REQUEST,
10581067
ev->Get()->Message);
1068+
ScheduleTransactionCompleted(t->Record);
10591069

10601070
UserActionAndTransactionEvents.pop_front();
10611071
--ImmediateTxCount;
@@ -1574,13 +1584,11 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
15741584

15751585
const auto writeDuration = ctx.Now() - WriteStartTime;
15761586
const auto minWriteLatency = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs());
1587+
15771588
if (writeDuration > minWriteLatency) {
1578-
KVWriteInProgress = false;
1579-
OnProcessTxsAndUserActsWriteComplete(ctx);
1580-
HandleWriteResponse(ctx);
1581-
ProcessTxsAndUserActs(ctx);
1589+
OnHandleWriteResponse(ctx);
15821590
} else {
1583-
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse());
1591+
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse(response.GetCookie()));
15841592
}
15851593
}
15861594

@@ -1658,6 +1666,19 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16581666

16591667
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
16601668

1669+
if (DeletePartitionState == DELETION_INITED) {
1670+
ScheduleNegativeReplies();
1671+
ScheduleDeletePartitionDone();
1672+
1673+
AddCmdDeleteRangeForAllKeys(*request);
1674+
1675+
ctx.Send(Tablet, request.Release());
1676+
1677+
DeletePartitionState = DELETION_IN_PROCESS;
1678+
1679+
return;
1680+
}
1681+
16611682
HaveWriteMsg = false;
16621683

16631684
if (UserActionAndTransactionEvents.empty()) {
@@ -2629,6 +2650,12 @@ void TPartition::SchedulePartitionConfigChanged()
26292650
MakeHolder<TEvPQ::TEvPartitionConfigChanged>(Partition).Release());
26302651
}
26312652

2653+
void TPartition::ScheduleDeletePartitionDone()
2654+
{
2655+
Replies.emplace_back(Tablet,
2656+
MakeHolder<TEvPQ::TEvDeletePartitionDone>(Partition).Release());
2657+
}
2658+
26322659
void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
26332660
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated)
26342661
{
@@ -3013,6 +3040,82 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
30133040
Send(ev->Sender, response.Release());
30143041
}
30153042

3043+
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
3044+
{
3045+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
3046+
}
3047+
3048+
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
3049+
{
3050+
Y_ABORT_UNLESS(IsSupportive());
3051+
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
3052+
3053+
DeletePartitionState = DELETION_INITED;
3054+
3055+
ProcessTxsAndUserActs(ctx);
3056+
}
3057+
3058+
void TPartition::ScheduleNegativeReplies()
3059+
{
3060+
for (auto& event : UserActionAndTransactionEvents) {
3061+
auto visitor = [this](auto& event) {
3062+
using T = std::decay_t<decltype(event)>;
3063+
if constexpr (TIsSimpleSharedPtr<T>::value) {
3064+
return this->ScheduleNegativeReply(*event);
3065+
} else {
3066+
return this->ScheduleNegativeReply(event);
3067+
}
3068+
};
3069+
3070+
std::visit(visitor, event);
3071+
}
3072+
3073+
UserActionAndTransactionEvents.clear();
3074+
}
3075+
3076+
void TPartition::AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request)
3077+
{
3078+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeInfo, Partition);
3079+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeData, Partition);
3080+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTmpData, Partition);
3081+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeMeta, Partition);
3082+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTxMeta, Partition);
3083+
}
3084+
3085+
void TPartition::ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo&)
3086+
{
3087+
Y_ABORT("The supportive partition does not accept read operations");
3088+
}
3089+
3090+
void TPartition::ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction&)
3091+
{
3092+
Y_ABORT("The supportive partition does not accept immediate transactions");
3093+
}
3094+
3095+
void TPartition::ScheduleNegativeReply(const TTransaction&)
3096+
{
3097+
Y_ABORT("The supportive partition does not accept distribute transactions");
3098+
}
3099+
3100+
void TPartition::ScheduleNegativeReply(const TMessage& msg)
3101+
{
3102+
ScheduleReplyError(msg.GetCookie(), NPersQueue::NErrorCode::ERROR, "The transaction is completed");
3103+
}
3104+
3105+
void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransaction& tx)
3106+
{
3107+
Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
3108+
Y_ABORT_UNLESS(tx.HasData());
3109+
3110+
TMaybe<ui64> writeId;
3111+
if (tx.GetData().HasWriteId()) {
3112+
writeId = tx.GetData().GetWriteId();
3113+
}
3114+
3115+
Replies.emplace_back(Tablet,
3116+
MakeHolder<TEvPQ::TEvTransactionCompleted>(writeId).Release());
3117+
}
3118+
30163119
const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
30173120
{
30183121
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);

ydb/core/persqueue/partition.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ class TPartition : public TActorBootstrapped<TPartition> {
9191
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
9292
static const ui32 SCALE_REQUEST_REPEAT_MIN_SECONDS = 60;
9393

94+
enum EDeletePartitionState {
95+
DELETION_NOT_INITED = 0,
96+
DELETION_INITED = 1,
97+
DELETION_IN_PROCESS = 2,
98+
};
99+
94100
private:
95101
struct THasDataReq;
96102
struct THasDataDeadline;
@@ -374,6 +380,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
374380

375381
void CommitWriteOperations(const TActorContext& ctx);
376382

383+
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
384+
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
385+
377386
public:
378387
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
379388
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -457,6 +466,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
457466
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
458467
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
459468
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
469+
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
460470
default:
461471
if (!Initializer.Handle(ev)) {
462472
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -519,6 +529,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
519529
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
520530
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
521531
HFuncTraced(TEvPQ::TEvProcessChangeOwnerRequests, Handle);
532+
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
522533
default:
523534
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
524535
break;
@@ -795,6 +806,23 @@ class TPartition : public TActorBootstrapped<TPartition> {
795806
bool ClosedInternalPartition = false;
796807

797808
bool IsSupportive() const;
809+
810+
EDeletePartitionState DeletePartitionState = DELETION_NOT_INITED;
811+
812+
void ScheduleDeletePartitionDone();
813+
void ScheduleNegativeReplies();
814+
void AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request);
815+
816+
void ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo& event);
817+
void ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction& event);
818+
void ScheduleNegativeReply(const TTransaction& tx);
819+
void ScheduleNegativeReply(const TMessage& msg);
820+
821+
void OnHandleWriteResponse(const TActorContext& ctx);
822+
823+
void ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransaction& tx);
824+
825+
void DestroyActor(const TActorContext& ctx);
798826
};
799827

800828
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_util.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,5 +132,8 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels);
132132
NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i);
133133
bool IsQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig,
134134
bool isLocalDC);
135+
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request,
136+
TKeyPrefix::EType c,
137+
const TPartitionId& partitionId);
135138

136139
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_write.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ static const ui32 MAX_INLINE_SIZE = 1000;
3030

3131
static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE;
3232

33-
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, const TPartitionId& partitionId);
34-
3533
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
3634
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);
3735

@@ -439,12 +437,22 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
439437
UpdateUserInfoEndOffset(ctx.Now());
440438
}
441439

442-
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) {
443-
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
440+
void TPartition::OnHandleWriteResponse(const TActorContext& ctx)
441+
{
444442
KVWriteInProgress = false;
445443
OnProcessTxsAndUserActsWriteComplete(ctx);
446444
HandleWriteResponse(ctx);
447445
ProcessTxsAndUserActs(ctx);
446+
447+
if (DeletePartitionState == DELETION_IN_PROCESS) {
448+
DestroyActor(ctx);
449+
}
450+
}
451+
452+
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx)
453+
{
454+
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
455+
OnHandleWriteResponse(ctx);
448456
}
449457

450458
void TPartition::UpdateAfterWriteCounters(bool writeComplete) {

0 commit comments

Comments
 (0)