Skip to content

Commit 3b75e88

Browse files
Merge 8b94486 into 4072389
2 parents 4072389 + 8b94486 commit 3b75e88

File tree

7 files changed

+222
-11
lines changed

7 files changed

+222
-11
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,19 @@ struct TEvPQ {
189189
EvPartitionScaleStatusChanged,
190190
EvPartitionScaleRequestDone,
191191
EvBalanceConsumer,
192+
EvDeletePartition,
193+
EvDeletePartitionDone,
194+
EvTransactionCompleted,
192195
EvEnd
193196
};
194197

195198
struct TEvHandleWriteResponse : TEventLocal<TEvHandleWriteResponse, EvHandleWriteResponse> {
196-
TEvHandleWriteResponse()
197-
{}
199+
explicit TEvHandleWriteResponse(ui64 cookie) :
200+
Cookie(cookie)
201+
{
202+
}
203+
204+
ui64 Cookie = 0;
198205
};
199206

200207
struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> {
@@ -1132,6 +1139,37 @@ struct TEvPQ {
11321139

11331140
TString ConsumerName;
11341141
};
1142+
1143+
struct TEvDeletePartition : TEventLocal<TEvDeletePartition, EvDeletePartition> {
1144+
explicit TEvDeletePartition(ui64 cookie) :
1145+
Cookie(cookie)
1146+
{
1147+
}
1148+
1149+
ui64 Cookie;
1150+
};
1151+
1152+
struct TEvDeletePartitionDone : TEventLocal<TEvDeletePartitionDone, EvDeletePartitionDone> {
1153+
TEvDeletePartitionDone(const NPQ::TPartitionId& partitionId, ui64 cookie) :
1154+
PartitionId(partitionId),
1155+
Cookie(cookie)
1156+
{
1157+
}
1158+
1159+
NPQ::TPartitionId PartitionId;
1160+
ui64 Cookie;
1161+
};
1162+
1163+
struct TEvTransactionCompleted : TEventLocal<TEvTransactionCompleted, EvTransactionCompleted> {
1164+
TEvTransactionCompleted(const NPQ::TPartitionId& partitionId, ui64 writeId) :
1165+
PartitionId(partitionId),
1166+
WriteId(writeId)
1167+
{
1168+
}
1169+
1170+
NPQ::TPartitionId PartitionId;
1171+
ui64 WriteId = 0;
1172+
};
11351173
};
11361174

11371175
} //NKikimr

ydb/core/persqueue/partition.cpp

Lines changed: 93 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1020,6 +1020,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
10201020
--ImmediateTxCount;
10211021

10221022
ProcessImmediateTx(tx, predicate, ctx);
1023+
ScheduleTransactionCompleted(tx);
10231024
TxInProgress = false;
10241025
ContinueProcessTxsAndUserActs(ctx);
10251026

@@ -1056,6 +1057,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
10561057
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
10571058
NKikimrPQ::TError::BAD_REQUEST,
10581059
ev->Get()->Message);
1060+
ScheduleTransactionCompleted(t->Record);
10591061

10601062
UserActionAndTransactionEvents.pop_front();
10611063
--ImmediateTxCount;
@@ -1574,13 +1576,11 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
15741576

15751577
const auto writeDuration = ctx.Now() - WriteStartTime;
15761578
const auto minWriteLatency = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs());
1579+
15771580
if (writeDuration > minWriteLatency) {
1578-
KVWriteInProgress = false;
1579-
OnProcessTxsAndUserActsWriteComplete(ctx);
1580-
HandleWriteResponse(ctx);
1581-
ProcessTxsAndUserActs(ctx);
1581+
OnHandleWriteResponse(response.GetCookie(), ctx);
15821582
} else {
1583-
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse());
1583+
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse(response.GetCookie()));
15841584
}
15851585
}
15861586

@@ -1658,6 +1658,18 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16581658

16591659
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
16601660

1661+
if (DeletePartitionCookie) {
1662+
ScheduleNegativeReplies();
1663+
ScheduleDeletePartitionDone();
1664+
1665+
request->Record.SetCookie(DELETE_PARTITION_COOKIE);
1666+
AddCmdDeleteRangeForAllKeys(*request);
1667+
1668+
ctx.Send(Tablet, request.Release());
1669+
1670+
return;
1671+
}
1672+
16611673
HaveWriteMsg = false;
16621674

16631675
if (UserActionAndTransactionEvents.empty()) {
@@ -2629,6 +2641,12 @@ void TPartition::SchedulePartitionConfigChanged()
26292641
MakeHolder<TEvPQ::TEvPartitionConfigChanged>(Partition).Release());
26302642
}
26312643

2644+
void TPartition::ScheduleDeletePartitionDone()
2645+
{
2646+
Replies.emplace_back(Tablet,
2647+
MakeHolder<TEvPQ::TEvDeletePartitionDone>(Partition, DeletePartitionCookie).Release());
2648+
}
2649+
26322650
void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
26332651
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated)
26342652
{
@@ -3013,6 +3031,76 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
30133031
Send(ev->Sender, response.Release());
30143032
}
30153033

3034+
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
3035+
{
3036+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
3037+
}
3038+
3039+
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
3040+
{
3041+
Y_ABORT_UNLESS(IsSupportive());
3042+
Y_ABORT_UNLESS(DeletePartitionCookie == 0);
3043+
3044+
DeletePartitionCookie = ev->Get()->Cookie;
3045+
ProcessTxsAndUserActs(ctx);
3046+
}
3047+
3048+
void TPartition::ScheduleNegativeReplies()
3049+
{
3050+
for (auto& event : UserActionAndTransactionEvents) {
3051+
auto visitor = [this](auto& event) {
3052+
using T = std::decay_t<decltype(event)>;
3053+
if constexpr (TIsSimpleSharedPtr<T>::value) {
3054+
return this->ScheduleNegativeReply(*event);
3055+
} else {
3056+
return this->ScheduleNegativeReply(event);
3057+
}
3058+
};
3059+
3060+
std::visit(visitor, event);
3061+
}
3062+
3063+
UserActionAndTransactionEvents.clear();
3064+
}
3065+
3066+
void TPartition::AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request)
3067+
{
3068+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeInfo, Partition);
3069+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeData, Partition);
3070+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTmpData, Partition);
3071+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeMeta, Partition);
3072+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTxMeta, Partition);
3073+
}
3074+
3075+
void TPartition::ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo&)
3076+
{
3077+
Y_ABORT("The supportive partition does not accept read operations");
3078+
}
3079+
3080+
void TPartition::ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction&)
3081+
{
3082+
Y_ABORT("The supportive partition does not accept immediate transactions");
3083+
}
3084+
3085+
void TPartition::ScheduleNegativeReply(const TTransaction&)
3086+
{
3087+
Y_ABORT("The supportive partition does not accept distribute transactions");
3088+
}
3089+
3090+
void TPartition::ScheduleNegativeReply(const TMessage& msg)
3091+
{
3092+
ScheduleReplyError(msg.GetCookie(), NPersQueue::NErrorCode::ERROR, "The transaction is completed");
3093+
}
3094+
3095+
void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransaction& tx)
3096+
{
3097+
Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
3098+
Y_ABORT_UNLESS(tx.HasData());
3099+
3100+
Replies.emplace_back(Tablet,
3101+
MakeHolder<TEvPQ::TEvTransactionCompleted>(Partition, tx.GetData().GetWriteId()).Release());
3102+
}
3103+
30163104
const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
30173105
{
30183106
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);

ydb/core/persqueue/partition.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ class TPartition : public TActorBootstrapped<TPartition> {
9191
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
9292
static const ui32 SCALE_REQUEST_REPEAT_MIN_SECONDS = 60;
9393

94+
enum ECookie : ui64 {
95+
DELETE_PARTITION_COOKIE = 6,
96+
};
97+
9498
private:
9599
struct THasDataReq;
96100
struct THasDataDeadline;
@@ -374,6 +378,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
374378

375379
void CommitWriteOperations(const TActorContext& ctx);
376380

381+
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
382+
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
383+
377384
public:
378385
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
379386
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -457,6 +464,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
457464
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
458465
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
459466
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
467+
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
460468
default:
461469
if (!Initializer.Handle(ev)) {
462470
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -519,6 +527,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
519527
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
520528
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
521529
HFuncTraced(TEvPQ::TEvProcessChangeOwnerRequests, Handle);
530+
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
522531
default:
523532
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
524533
break;
@@ -795,6 +804,21 @@ class TPartition : public TActorBootstrapped<TPartition> {
795804
bool ClosedInternalPartition = false;
796805

797806
bool IsSupportive() const;
807+
808+
ui64 DeletePartitionCookie = 0;
809+
810+
void ScheduleDeletePartitionDone();
811+
void ScheduleNegativeReplies();
812+
void AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request);
813+
814+
void ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo& event);
815+
void ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction& event);
816+
void ScheduleNegativeReply(const TTransaction& tx);
817+
void ScheduleNegativeReply(const TMessage& msg);
818+
819+
void OnHandleWriteResponse(ui64 cookie, const TActorContext& ctx);
820+
821+
void ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransaction& tx);
798822
};
799823

800824
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_util.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,5 +132,8 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels);
132132
NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i);
133133
bool IsQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig,
134134
bool isLocalDC);
135+
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request,
136+
TKeyPrefix::EType c,
137+
const TPartitionId& partitionId);
135138

136139
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_write.cpp

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ static const ui32 MAX_INLINE_SIZE = 1000;
3030

3131
static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE;
3232

33-
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, const TPartitionId& partitionId);
34-
3533
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
3634
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);
3735

@@ -439,14 +437,23 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
439437
UpdateUserInfoEndOffset(ctx.Now());
440438
}
441439

442-
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) {
443-
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
440+
void TPartition::OnHandleWriteResponse(ui64 cookie, const TActorContext& ctx)
441+
{
444442
KVWriteInProgress = false;
443+
if (cookie == DELETE_PARTITION_COOKIE) {
444+
DeletePartitionCookie = 0;
445+
}
445446
OnProcessTxsAndUserActsWriteComplete(ctx);
446447
HandleWriteResponse(ctx);
447448
ProcessTxsAndUserActs(ctx);
448449
}
449450

451+
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr& ev, const TActorContext& ctx)
452+
{
453+
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
454+
OnHandleWriteResponse(ev->Get()->Cookie, ctx);
455+
}
456+
450457
void TPartition::UpdateAfterWriteCounters(bool writeComplete) {
451458
if (IsSupportive() == writeComplete) {
452459
// If supportive - update counters only prior to write, otherwise - only after writes;

ydb/core/persqueue/pq_impl.cpp

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3366,6 +3366,13 @@ void TPersQueue::SubscribeWriteId(ui64 writeId,
33663366
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId, ctx.SelfID.NodeId()));
33673367
}
33683368

3369+
void TPersQueue::UnsubscribeWriteId(ui64 writeId,
3370+
const TActorContext& ctx)
3371+
{
3372+
ctx.Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()),
3373+
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId, ctx.SelfID.NodeId()));
3374+
}
3375+
33693376
void TPersQueue::CreateSupportivePartitionActors(const TActorContext& ctx)
33703377
{
33713378
for (auto& partitionId : PendingSupportivePartitions) {
@@ -4424,6 +4431,45 @@ void TPersQueue::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const T
44244431
}
44254432
}
44264433

4434+
void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx)
4435+
{
4436+
auto* event = ev->Get();
4437+
const ui64 writeId = event->Cookie;
4438+
Y_ABORT_UNLESS(TxWrites.contains(writeId));
4439+
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
4440+
Y_ABORT_UNLESS(writeInfo.Partitions.contains(event->PartitionId.OriginalPartitionId));
4441+
const TPartitionId partitionId = writeInfo.Partitions.at(event->PartitionId.OriginalPartitionId);
4442+
Y_ABORT_UNLESS(partitionId == event->PartitionId);
4443+
Y_ABORT_UNLESS(partitionId.IsSupportivePartition());
4444+
Y_ABORT_UNLESS(Partitions.contains(partitionId));
4445+
const TPartitionInfo& partition = Partitions.at(partitionId);
4446+
4447+
Send(partition.Actor, new TEvents::TEvPoisonPill());
4448+
Partitions.erase(partitionId);
4449+
4450+
writeInfo.Partitions.erase(partitionId.OriginalPartitionId);
4451+
if (writeInfo.Partitions.empty()) {
4452+
UnsubscribeWriteId(writeId, ctx);
4453+
TxWrites.erase(writeId);
4454+
}
4455+
TryWriteTxs(ctx);
4456+
}
4457+
4458+
void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext&)
4459+
{
4460+
auto* event = ev->Get();
4461+
ui64 writeId = event->WriteId;
4462+
Y_ABORT_UNLESS(TxWrites.contains(writeId));
4463+
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
4464+
Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1);
4465+
4466+
for (auto& [_, partitionId] : writeInfo.Partitions) {
4467+
Y_ABORT_UNLESS(Partitions.contains(partitionId));
4468+
const TPartitionInfo& partition = Partitions.at(partitionId);
4469+
Send(partition.Actor, new TEvPQ::TEvDeletePartition(writeId));
4470+
}
4471+
}
4472+
44274473
TString TPersQueue::LogPrefix() const {
44284474
return TStringBuilder() << SelfId() << " ";
44294475
}
@@ -4480,6 +4526,8 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
44804526
HFuncTraced(TEvPQ::TEvPartitionScaleStatusChanged, Handle);
44814527
HFuncTraced(NLongTxService::TEvLongTxService::TEvLockStatus, Handle);
44824528
HFuncTraced(TEvPQ::TEvReadingPartitionStatusRequest, Handle);
4529+
HFuncTraced(TEvPQ::TEvDeletePartitionDone, Handle);
4530+
HFuncTraced(TEvPQ::TEvTransactionCompleted, Handle);
44834531
default:
44844532
return false;
44854533
}

ydb/core/persqueue/pq_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,10 +483,13 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
483483
void CreateSupportivePartitionActor(const TPartitionId& shadowPartitionId, const TActorContext& ctx);
484484
NKikimrPQ::TPQTabletConfig MakeSupportivePartitionConfig() const;
485485
void SubscribeWriteId(ui64 writeId, const TActorContext& ctx);
486+
void UnsubscribeWriteId(ui64 writeId, const TActorContext& ctx);
486487

487488
bool AllOriginalPartitionsInited() const;
488489

489490
void Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx);
491+
void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx);
492+
void Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext& ctx);
490493
};
491494

492495

0 commit comments

Comments
 (0)