Skip to content

Commit b70900d

Browse files
Completion of the transaction (#4407)
1 parent ce95b5e commit b70900d

File tree

12 files changed

+480
-27
lines changed

12 files changed

+480
-27
lines changed

.github/config/muted_ya.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ ydb/public/sdk/cpp/client/ydb_topic/ut BasicUsage.WriteRead
4949
ydb/public/sdk/cpp/client/ydb_topic/ut TSettingsValidation.TestDifferentDedupParams
5050
ydb/public/sdk/cpp/client/ydb_topic/ut [0/10]*
5151
ydb/public/sdk/cpp/client/ydb_topic/ut [6/10]*
52-
ydb/public/sdk/cpp/client/ydb_topic/ut TxUsage::WriteToTopic_Demo_*
5352
ydb/services/datastreams/ut DataStreams.TestGetRecordsStreamWithSingleShard
5453
ydb/services/datastreams/ut DataStreams.TestPutRecordsWithRead
5554
ydb/services/datastreams/ut DataStreams.TestReservedConsumersMetering

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2301,7 +2301,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23012301
!topicTxs.empty());
23022302

23032303
if (!locksMap.empty() || VolatileTx ||
2304-
Request.TopicOperations.HasReadOperations())
2304+
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
23052305
{
23062306
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);
23072307

@@ -2349,6 +2349,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23492349
}
23502350

23512351
if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) {
2352+
sendingShardsSet.insert(tabletIds.begin(), tabletIds.end());
23522353
receivingShardsSet.insert(tabletIds.begin(), tabletIds.end());
23532354
}
23542355

@@ -2395,6 +2396,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23952396
}
23962397
}
23972398

2399+
23982400
// Encode sending/receiving shards in tx bodies
23992401
if (needCommit) {
24002402
NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end());

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,7 @@ TSet<ui64> TTopicOperations::GetReceivingTabletIds() const
356356
{
357357
TSet<ui64> ids;
358358
for (auto& [_, operations] : Operations_) {
359-
if (operations.HasWriteOperations()) {
360-
ids.insert(operations.GetTabletId());
361-
}
359+
ids.insert(operations.GetTabletId());
362360
}
363361
return ids;
364362
}

ydb/core/persqueue/events/internal.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,12 +189,19 @@ struct TEvPQ {
189189
EvPartitionScaleStatusChanged,
190190
EvPartitionScaleRequestDone,
191191
EvBalanceConsumer,
192+
EvDeletePartition,
193+
EvDeletePartitionDone,
194+
EvTransactionCompleted,
192195
EvEnd
193196
};
194197

195198
struct TEvHandleWriteResponse : TEventLocal<TEvHandleWriteResponse, EvHandleWriteResponse> {
196-
TEvHandleWriteResponse()
197-
{}
199+
explicit TEvHandleWriteResponse(ui64 cookie) :
200+
Cookie(cookie)
201+
{
202+
}
203+
204+
ui64 Cookie = 0;
198205
};
199206

200207
struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> {
@@ -1132,6 +1139,27 @@ struct TEvPQ {
11321139

11331140
TString ConsumerName;
11341141
};
1142+
1143+
struct TEvDeletePartition : TEventLocal<TEvDeletePartition, EvDeletePartition> {
1144+
};
1145+
1146+
struct TEvDeletePartitionDone : TEventLocal<TEvDeletePartitionDone, EvDeletePartitionDone> {
1147+
explicit TEvDeletePartitionDone(const NPQ::TPartitionId& partitionId) :
1148+
PartitionId(partitionId)
1149+
{
1150+
}
1151+
1152+
NPQ::TPartitionId PartitionId;
1153+
};
1154+
1155+
struct TEvTransactionCompleted : TEventLocal<TEvTransactionCompleted, EvTransactionCompleted> {
1156+
explicit TEvTransactionCompleted(TMaybe<ui64> writeId) :
1157+
WriteId(writeId)
1158+
{
1159+
}
1160+
1161+
TMaybe<ui64> WriteId;
1162+
};
11351163
};
11361164

11371165
} //NKikimr

ydb/core/persqueue/partition.cpp

Lines changed: 112 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,6 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
391391
write->SetKey(ikey.Data(), ikey.Size());
392392
write->SetValue(out.c_str(), out.size());
393393
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
394-
395394
}
396395

397396
bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
@@ -492,7 +491,8 @@ void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContex
492491
}
493492
}
494493

495-
void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
494+
void TPartition::DestroyActor(const TActorContext& ctx)
495+
{
496496
// Reply to all outstanding requests in order to destroy corresponding actors
497497

498498
TStringBuilder ss;
@@ -523,12 +523,19 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
523523
UsersInfoStorage->Clear(ctx);
524524
}
525525

526-
Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
527-
Send(WriteQuotaTrackerActor, new TEvents::TEvPoisonPill());
526+
if (!IsSupportive()) {
527+
Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
528+
Send(WriteQuotaTrackerActor, new TEvents::TEvPoisonPill());
529+
}
528530

529531
Die(ctx);
530532
}
531533

534+
void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
535+
{
536+
DestroyActor(ctx);
537+
}
538+
532539
bool CheckDiskStatus(const TStorageStatusFlags status) {
533540
return !status.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop);
534541
}
@@ -1020,6 +1027,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
10201027
--ImmediateTxCount;
10211028

10221029
ProcessImmediateTx(tx, predicate, ctx);
1030+
ScheduleTransactionCompleted(tx);
10231031
TxInProgress = false;
10241032
ContinueProcessTxsAndUserActs(ctx);
10251033

@@ -1056,6 +1064,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
10561064
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
10571065
NKikimrPQ::TError::BAD_REQUEST,
10581066
ev->Get()->Message);
1067+
ScheduleTransactionCompleted(t->Record);
10591068

10601069
UserActionAndTransactionEvents.pop_front();
10611070
--ImmediateTxCount;
@@ -1574,13 +1583,11 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
15741583

15751584
const auto writeDuration = ctx.Now() - WriteStartTime;
15761585
const auto minWriteLatency = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs());
1586+
15771587
if (writeDuration > minWriteLatency) {
1578-
KVWriteInProgress = false;
1579-
OnProcessTxsAndUserActsWriteComplete(ctx);
1580-
HandleWriteResponse(ctx);
1581-
ProcessTxsAndUserActs(ctx);
1588+
OnHandleWriteResponse(ctx);
15821589
} else {
1583-
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse());
1590+
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse(response.GetCookie()));
15841591
}
15851592
}
15861593

@@ -1658,6 +1665,20 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16581665

16591666
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
16601667

1668+
if (DeletePartitionState == DELETION_INITED) {
1669+
ScheduleNegativeReplies();
1670+
ScheduleDeletePartitionDone();
1671+
1672+
AddCmdDeleteRangeForAllKeys(*request);
1673+
1674+
ctx.Send(Tablet, request.Release());
1675+
1676+
DeletePartitionState = DELETION_IN_PROCESS;
1677+
KVWriteInProgress = true;
1678+
1679+
return;
1680+
}
1681+
16611682
HaveWriteMsg = false;
16621683

16631684
if (UserActionAndTransactionEvents.empty()) {
@@ -2629,6 +2650,12 @@ void TPartition::SchedulePartitionConfigChanged()
26292650
MakeHolder<TEvPQ::TEvPartitionConfigChanged>(Partition).Release());
26302651
}
26312652

2653+
void TPartition::ScheduleDeletePartitionDone()
2654+
{
2655+
Replies.emplace_back(Tablet,
2656+
MakeHolder<TEvPQ::TEvDeletePartitionDone>(Partition).Release());
2657+
}
2658+
26322659
void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
26332660
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated)
26342661
{
@@ -3013,6 +3040,82 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
30133040
Send(ev->Sender, response.Release());
30143041
}
30153042

3043+
void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
3044+
{
3045+
PendingEvents.emplace_back(ev->ReleaseBase().Release());
3046+
}
3047+
3048+
void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
3049+
{
3050+
Y_ABORT_UNLESS(IsSupportive());
3051+
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
3052+
3053+
DeletePartitionState = DELETION_INITED;
3054+
3055+
ProcessTxsAndUserActs(ctx);
3056+
}
3057+
3058+
void TPartition::ScheduleNegativeReplies()
3059+
{
3060+
for (auto& event : UserActionAndTransactionEvents) {
3061+
auto visitor = [this](auto& event) {
3062+
using T = std::decay_t<decltype(event)>;
3063+
if constexpr (TIsSimpleSharedPtr<T>::value) {
3064+
return this->ScheduleNegativeReply(*event);
3065+
} else {
3066+
return this->ScheduleNegativeReply(event);
3067+
}
3068+
};
3069+
3070+
std::visit(visitor, event);
3071+
}
3072+
3073+
UserActionAndTransactionEvents.clear();
3074+
}
3075+
3076+
void TPartition::AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request)
3077+
{
3078+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeInfo, Partition);
3079+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeData, Partition);
3080+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTmpData, Partition);
3081+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeMeta, Partition);
3082+
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTxMeta, Partition);
3083+
}
3084+
3085+
void TPartition::ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo&)
3086+
{
3087+
Y_ABORT("The supportive partition does not accept read operations");
3088+
}
3089+
3090+
void TPartition::ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction&)
3091+
{
3092+
Y_ABORT("The supportive partition does not accept immediate transactions");
3093+
}
3094+
3095+
void TPartition::ScheduleNegativeReply(const TTransaction&)
3096+
{
3097+
Y_ABORT("The supportive partition does not accept distribute transactions");
3098+
}
3099+
3100+
void TPartition::ScheduleNegativeReply(const TMessage& msg)
3101+
{
3102+
ScheduleReplyError(msg.GetCookie(), NPersQueue::NErrorCode::ERROR, "The transaction is completed");
3103+
}
3104+
3105+
void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransaction& tx)
3106+
{
3107+
Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
3108+
Y_ABORT_UNLESS(tx.HasData());
3109+
3110+
TMaybe<ui64> writeId;
3111+
if (tx.GetData().HasWriteId()) {
3112+
writeId = tx.GetData().GetWriteId();
3113+
}
3114+
3115+
Replies.emplace_back(Tablet,
3116+
MakeHolder<TEvPQ::TEvTransactionCompleted>(writeId).Release());
3117+
}
3118+
30163119
const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
30173120
{
30183121
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);

ydb/core/persqueue/partition.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,12 @@ class TPartition : public TActorBootstrapped<TPartition> {
9191
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
9292
static const ui32 SCALE_REQUEST_REPEAT_MIN_SECONDS = 60;
9393

94+
enum EDeletePartitionState {
95+
DELETION_NOT_INITED = 0,
96+
DELETION_INITED = 1,
97+
DELETION_IN_PROCESS = 2,
98+
};
99+
94100
private:
95101
struct THasDataReq;
96102
struct THasDataDeadline;
@@ -374,6 +380,9 @@ class TPartition : public TActorBootstrapped<TPartition> {
374380

375381
void CommitWriteOperations(const TActorContext& ctx);
376382

383+
void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
384+
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
385+
377386
public:
378387
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
379388
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
@@ -457,6 +466,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
457466
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
458467
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
459468
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
469+
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
460470
default:
461471
if (!Initializer.Handle(ev)) {
462472
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
@@ -519,6 +529,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
519529
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
520530
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
521531
HFuncTraced(TEvPQ::TEvProcessChangeOwnerRequests, Handle);
532+
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
522533
default:
523534
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
524535
break;
@@ -795,6 +806,23 @@ class TPartition : public TActorBootstrapped<TPartition> {
795806
bool ClosedInternalPartition = false;
796807

797808
bool IsSupportive() const;
809+
810+
EDeletePartitionState DeletePartitionState = DELETION_NOT_INITED;
811+
812+
void ScheduleDeletePartitionDone();
813+
void ScheduleNegativeReplies();
814+
void AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request);
815+
816+
void ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo& event);
817+
void ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction& event);
818+
void ScheduleNegativeReply(const TTransaction& tx);
819+
void ScheduleNegativeReply(const TMessage& msg);
820+
821+
void OnHandleWriteResponse(const TActorContext& ctx);
822+
823+
void ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransaction& tx);
824+
825+
void DestroyActor(const TActorContext& ctx);
798826
};
799827

800828
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_init.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1017,7 +1017,9 @@ void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, co
10171017
auto range = del->MutableRange();
10181018

10191019
range->SetFrom(from.Data(), from.Size());
1020+
range->SetIncludeFrom(true);
10201021
range->SetTo(to.Data(), to.Size());
1022+
range->SetIncludeTo(false);
10211023
}
10221024

10231025
static void RequestRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition,

ydb/core/persqueue/partition_util.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,5 +132,8 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels);
132132
NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i);
133133
bool IsQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig,
134134
bool isLocalDC);
135+
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request,
136+
TKeyPrefix::EType c,
137+
const TPartitionId& partitionId);
135138

136139
} // namespace NKikimr::NPQ

ydb/core/persqueue/partition_write.cpp

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ static const ui32 MAX_INLINE_SIZE = 1000;
3030

3131
static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE;
3232

33-
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, const TPartitionId& partitionId);
34-
3533
void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
3634
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);
3735

@@ -439,12 +437,22 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
439437
UpdateUserInfoEndOffset(ctx.Now());
440438
}
441439

442-
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) {
443-
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
440+
void TPartition::OnHandleWriteResponse(const TActorContext& ctx)
441+
{
444442
KVWriteInProgress = false;
445443
OnProcessTxsAndUserActsWriteComplete(ctx);
446444
HandleWriteResponse(ctx);
447445
ProcessTxsAndUserActs(ctx);
446+
447+
if (DeletePartitionState == DELETION_IN_PROCESS) {
448+
DestroyActor(ctx);
449+
}
450+
}
451+
452+
void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx)
453+
{
454+
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
455+
OnHandleWriteResponse(ctx);
448456
}
449457

450458
void TPartition::UpdateAfterWriteCounters(bool writeComplete) {

0 commit comments

Comments
 (0)