Skip to content

Commit 852f11e

Browse files
Merge e434c12 into c4e0dc4
2 parents c4e0dc4 + e434c12 commit 852f11e

File tree

7 files changed

+190
-11
lines changed

7 files changed

+190
-11
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,19 @@ struct TEvPQ {
189189
EvPartitionScaleStatusChanged,
190190
EvPartitionScaleRequestDone,
191191
EvBalanceConsumer,
192+
EvDeletePartition,
193+
EvDeletePartitionDone,
194+
EvTransactionCompleted,
192195
EvEnd
193196
};
194197

195198
struct TEvHandleWriteResponse : TEventLocal<TEvHandleWriteResponse, EvHandleWriteResponse> {
196-
TEvHandleWriteResponse()
197-
{}
199+
explicit TEvHandleWriteResponse(ui64 cookie) :
200+
Cookie(cookie)
201+
{
202+
}
203+
204+
ui64 Cookie = 0;
198205
};
199206

200207
struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> {
@@ -1132,6 +1139,35 @@ struct TEvPQ {
11321139

11331140
TString ConsumerName;
11341141
};
1142+
1143+
struct TEvDeletePartition : TEventLocal<TEvDeletePartition, EvDeletePartition> {
1144+
explicit TEvDeletePartition(ui64 cookie) :
1145+
Cookie(cookie)
1146+
{
1147+
}
1148+
1149+
ui64 Cookie;
1150+
};
1151+
1152+
struct TEvDeletePartitionDone : TEventLocal<TEvDeletePartitionDone, EvDeletePartitionDone> {
1153+
TEvDeletePartitionDone(const NPQ::TPartitionId& partitionId, ui64 cookie) :
1154+
PartitionId(partitionId),
1155+
Cookie(cookie)
1156+
{
1157+
}
1158+
1159+
NPQ::TPartitionId PartitionId;
1160+
ui64 Cookie;
1161+
};
1162+
1163+
struct TEvTransactionCompleted : TEventLocal<TEvTransactionCompleted, EvTransactionCompleted> {
1164+
explicit TEvTransactionCompleted(ui64 writeId) :
1165+
WriteId(writeId)
1166+
{
1167+
}
1168+
1169+
ui64 WriteId = 0;
1170+
};
11351171
};
11361172

11371173
} //NKikimr

ydb/core/persqueue/partition.cpp

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1574,13 +1574,11 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
15741574

15751575
const auto writeDuration = ctx.Now() - WriteStartTime;
15761576
const auto minWriteLatency = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs());
1577+
15771578
if (writeDuration > minWriteLatency) {
1578-
KVWriteInProgress = false;
1579-
OnProcessTxsAndUserActsWriteComplete(ctx);
1580-
HandleWriteResponse(ctx);
1581-
ProcessTxsAndUserActs(ctx);
1579+
OnHandleWriteResponse(response.GetCookie(), ctx);
15821580
} else {
1583-
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse());
1581+
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse(response.GetCookie()));
15841582
}
15851583
}
15861584

@@ -1658,6 +1656,18 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16581656

16591657
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
16601658

1659+
if (DeletePartitionCookie) {
1660+
ScheduleNegativeReplies();
1661+
ScheduleDeletePartitionDone();
1662+
1663+
request->Record.SetCookie(DELETE_PARTITION_COOKIE);
1664+
AddCmdDeleteRangeForAllKeys(*request);
1665+
1666+
ctx.Send(Tablet, request.Release());
1667+
1668+
return;
1669+
}
1670+
16611671
HaveWriteMsg = false;
16621672

16631673
if (UserActionAndTransactionEvents.empty()) {
@@ -2629,6 +2639,12 @@ void TPartition::SchedulePartitionConfigChanged()
26292639
MakeHolder<TEvPQ::TEvPartitionConfigChanged>(Partition).Release());
26302640
}
26312641

2642+
void TPartition::ScheduleDeletePartitionDone()
2643+
{
2644+
Replies.emplace_back(Tablet,
2645+
MakeHolder<TEvPQ::TEvDeletePartitionDone>(Partition, DeletePartitionCookie).Release());
2646+
}
2647+
26322648
void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
26332649
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated)
26342650
{
@@ -3013,6 +3029,67 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
30133029
Send(ev->Sender, response.Release());
30143030
}
30153031

3032+
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
3033+
{
3034+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
3035+
}
3036+
3037+
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
3038+
{
3039+
Y_ABORT_UNLESS(IsSupportive());
3040+
Y_ABORT_UNLESS(DeletePartitionCookie == 0);
3041+
3042+
DeletePartitionCookie = ev->Get()->Cookie;
3043+
ProcessTxsAndUserActs(ctx);
3044+
}
3045+
3046+
void TPartition::ScheduleNegativeReplies()
3047+
{
3048+
for (auto& event : UserActionAndTransactionEvents) {
3049+
auto visitor = [this](auto& event) {
3050+
using T = std::decay_t<decltype(event)>;
3051+
if constexpr (TIsSimpleSharedPtr<T>::value) {
3052+
return this->ScheduleNegativeReply(*event);
3053+
} else {
3054+
return this->ScheduleNegativeReply(event);
3055+
}
3056+
};
3057+
3058+
std::visit(visitor, event);
3059+
}
3060+
3061+
UserActionAndTransactionEvents.clear();
3062+
}
3063+
3064+
void TPartition::AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request)
3065+
{
3066+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeInfo, Partition);
3067+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeData, Partition);
3068+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTmpData, Partition);
3069+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeMeta, Partition);
3070+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTxMeta, Partition);
3071+
}
3072+
3073+
void TPartition::ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo&)
3074+
{
3075+
Y_ABORT_UNLESS(false, "The supportive partition does not accept read operations");
3076+
}
3077+
3078+
void TPartition::ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction&)
3079+
{
3080+
Y_ABORT_UNLESS(false, "The supportive partition does not accept immediate transactions");
3081+
}
3082+
3083+
void TPartition::ScheduleNegativeReply(const TTransaction&)
3084+
{
3085+
Y_ABORT_UNLESS(false, "The supportive partition does not accept distribute transactions");
3086+
}
3087+
3088+
void TPartition::ScheduleNegativeReply(const TMessage& msg)
3089+
{
3090+
ScheduleReplyError(msg.GetCookie(), NPersQueue::NErrorCode::ERROR, "The transaction is completed");
3091+
}
3092+
30163093
const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
30173094
{
30183095
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);

ydb/core/persqueue/partition.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
9191
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
9292
static const ui32 SCALE_REQUEST_REPEAT_MIN_SECONDS = 60;
9393

94+
enum ECookie : ui64 {
95+
DELETE_PARTITION_COOKIE = 6,
96+
};
97+
9498
private:
9599
struct THasDataReq;
96100
struct THasDataDeadline;
@@ -374,6 +378,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
374378

375379
void CommitWriteOperations(const TActorContext& ctx);
376380

381+
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
382+
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
383+
377384
public:
378385
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
379386
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -457,6 +464,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
457464
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
458465
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
459466
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
467+
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
460468
default:
461469
if (!Initializer.Handle(ev)) {
462470
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -519,6 +527,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
519527
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
520528
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
521529
HFuncTraced(TEvPQ::TEvProcessChangeOwnerRequests, Handle);
530+
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
522531
default:
523532
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
524533
break;
@@ -795,6 +804,19 @@ class TPartition : public TActorBootstrapped<TPartition> {
795804
bool ClosedInternalPartition = false;
796805

797806
bool IsSupportive() const;
807+
808+
ui64 DeletePartitionCookie = 0;
809+
810+
void ScheduleDeletePartitionDone();
811+
void ScheduleNegativeReplies();
812+
void AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request);
813+
814+
void ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo& event);
815+
void ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction& event);
816+
void ScheduleNegativeReply(const TTransaction& tx);
817+
void ScheduleNegativeReply(const TMessage& msg);
818+
819+
void OnHandleWriteResponse(ui64 cookie, const TActorContext& ctx);
798820
};
799821

800822
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_util.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,5 +132,8 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels);
132132
NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i);
133133
bool IsQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig,
134134
bool isLocalDC);
135+
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request,
136+
TKeyPrefix::EType c,
137+
const TPartitionId& partitionId);
135138

136139
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_write.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ static const ui32 MAX_INLINE_SIZE = 1000;
3030

3131
static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE;
3232

33-
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, const TPartitionId& partitionId);
34-
3533
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
3634
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);
3735

@@ -439,14 +437,23 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
439437
UpdateUserInfoEndOffset(ctx.Now());
440438
}
441439

442-
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) {
443-
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
440+
void TPartition::OnHandleWriteResponse(ui64 cookie, const TActorContext& ctx)
441+
{
444442
KVWriteInProgress = false;
443+
if (cookie == DELETE_PARTITION_COOKIE) {
444+
DeletePartitionCookie = 0;
445+
}
445446
OnProcessTxsAndUserActsWriteComplete(ctx);
446447
HandleWriteResponse(ctx);
447448
ProcessTxsAndUserActs(ctx);
448449
}
449450

451+
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr& ev, const TActorContext& ctx)
452+
{
453+
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
454+
OnHandleWriteResponse(ev->Get()->Cookie, ctx);
455+
}
456+
450457
void TPartition::UpdateAfterWriteCounters(bool writeComplete) {
451458
if (IsSupportive() == writeComplete) {
452459
// If supportive - update counters only prior to write, otherwise - only after writes;

ydb/core/persqueue/pq_impl.cpp

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3366,6 +3366,13 @@ void TPersQueue::SubscribeWriteId(ui64 writeId,
33663366
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId, ctx.SelfID.NodeId()));
33673367
}
33683368

3369+
void TPersQueue::UnsubscribeWriteId(ui64 writeId,
3370+
const TActorContext& ctx)
3371+
{
3372+
ctx.Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()),
3373+
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId, ctx.SelfID.NodeId()));
3374+
}
3375+
33693376
void TPersQueue::CreateSupportivePartitionActors(const TActorContext& ctx)
33703377
{
33713378
for (auto& partitionId : PendingSupportivePartitions) {
@@ -4424,6 +4431,30 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const T
44244431
}
44254432
}
44264433

4434+
void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx)
4435+
{
4436+
auto* event = ev->Get();
4437+
const ui64 writeId = event->Cookie;
4438+
Y_ABORT_UNLESS(TxWrites.contains(writeId));
4439+
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
4440+
Y_ABORT_UNLESS(writeInfo.Partitions.contains(event->PartitionId.OriginalPartitionId));
4441+
const TPartitionId partitionId = writeInfo.Partitions.at(event->PartitionId.OriginalPartitionId);
4442+
Y_ABORT_UNLESS(partitionId == event->PartitionId);
4443+
Y_ABORT_UNLESS(partitionId.IsSupportivePartition());
4444+
Y_ABORT_UNLESS(Partitions.contains(partitionId));
4445+
const TPartitionInfo& partition = Partitions.at(partitionId);
4446+
4447+
Send(partition.Actor, new TEvents::TEvPoisonPill());
4448+
Partitions.erase(partitionId);
4449+
4450+
writeInfo.Partitions.erase(partitionId.OriginalPartitionId);
4451+
if (writeInfo.Partitions.empty()) {
4452+
UnsubscribeWriteId(writeId, ctx);
4453+
TxWrites.erase(writeId);
4454+
}
4455+
TryWriteTxs(ctx);
4456+
}
4457+
44274458
TString TPersQueue::LogPrefix() const {
44284459
return TStringBuilder() << SelfId() << " ";
44294460
}
@@ -4480,6 +4511,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
44804511
HFuncTraced(TEvPQ::TEvPartitionScaleStatusChanged, Handle);
44814512
HFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
44824513
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
4514+
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
44834515
default:
44844516
return false;
44854517
}

ydb/core/persqueue/pq_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,12 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
483483
void CreateSupportivePartitionActor(const TPartitionId& shadowPartitionId, const TActorContext& ctx);
484484
NKikimrPQ::TPQTabletConfig MakeSupportivePartitionConfig() const;
485485
void SubscribeWriteId(ui64 writeId, const TActorContext& ctx);
486+
void UnsubscribeWriteId(ui64 writeId, const TActorContext& ctx);
486487

487488
bool AllOriginalPartitionsInited() const;
488489

489490
void Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx);
491+
void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx);
490492
};
491493

492494

0 commit comments

Comments
 (0)