@@ -1041,16 +1041,21 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,
1041
1041
for (size_t i = 0 ; i != info.TxWritesSize (); ++i) {
1042
1042
auto & txWrite = info.GetTxWrites (i);
1043
1043
const TWriteId writeId = GetWriteId (txWrite);
1044
- ui32 partitionId = txWrite.GetOriginalPartitionId ();
1045
- TPartitionId shadowPartitionId (partitionId, writeId, txWrite.GetInternalPartitionId ());
1046
1044
1047
- TxWrites[writeId].Partitions .emplace (partitionId, shadowPartitionId);
1045
+ TTxWriteInfo& writeInfo = TxWrites[writeId];
1046
+ if (txWrite.HasOriginalPartitionId ()) {
1047
+ ui32 partitionId = txWrite.GetOriginalPartitionId ();
1048
+ TPartitionId shadowPartitionId (partitionId, writeId, txWrite.GetInternalPartitionId ());
1048
1049
1049
- AddSupportivePartition (shadowPartitionId);
1050
- CreateSupportivePartitionActor (shadowPartitionId, ctx);
1051
- SubscribeWriteId (writeId, ctx);
1050
+ writeInfo.Partitions .emplace (partitionId, shadowPartitionId);
1051
+
1052
+ AddSupportivePartition (shadowPartitionId);
1053
+ CreateSupportivePartitionActor (shadowPartitionId, ctx);
1054
+
1055
+ NextSupportivePartitionId = Max (NextSupportivePartitionId, shadowPartitionId.InternalPartitionId + 1 );
1056
+ }
1052
1057
1053
- NextSupportivePartitionId = Max (NextSupportivePartitionId, shadowPartitionId. InternalPartitionId + 1 );
1058
+ SubscribeWriteId (writeId, ctx );
1054
1059
}
1055
1060
1056
1061
NewSupportivePartitions.clear ();
@@ -3281,7 +3286,7 @@ bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& ope
3281
3286
TPartitionId partitionId (operation.GetPartitionId (),
3282
3287
writeId,
3283
3288
operation.GetSupportivePartition ());
3284
- PQ_LOG_D (" partitionId= " << partitionId);
3289
+ PQ_LOG_D (" PartitionId " << partitionId << " for WriteId " << writeId );
3285
3290
return Partitions.contains (partitionId);
3286
3291
}
3287
3292
@@ -3292,7 +3297,6 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
3292
3297
}
3293
3298
3294
3299
const TWriteId writeId = GetWriteId (txBody);
3295
- PQ_LOG_D (" writeId=" << writeId);
3296
3300
3297
3301
for (auto & operation : txBody.GetOperations ()) {
3298
3302
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
@@ -3318,7 +3322,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
3318
3322
const NKikimrPQ::TDataTransaction& txBody = event.GetData ();
3319
3323
3320
3324
if (TabletState != NKikimrPQ::ENormal) {
3321
- PQ_LOG_D (" invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name (TabletState) << " )" );
3325
+ PQ_LOG_D (" TxId " << event. GetTxId () << " invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name (TabletState) << " )" );
3322
3326
SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
3323
3327
event.GetTxId (),
3324
3328
NKikimrPQ::TError::ERROR,
@@ -3332,7 +3336,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
3332
3336
//
3333
3337
3334
3338
if (txBody.OperationsSize () <= 0 ) {
3335
- PQ_LOG_D (" empty list of operations" );
3339
+ PQ_LOG_D (" TxId " << event. GetTxId () << " empty list of operations" );
3336
3340
SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
3337
3341
event.GetTxId (),
3338
3342
NKikimrPQ::TError::BAD_REQUEST,
@@ -3342,7 +3346,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
3342
3346
}
3343
3347
3344
3348
if (!CheckTxWriteOperations (txBody)) {
3345
- PQ_LOG_D (" invalid WriteId " << txBody.GetWriteId ());
3349
+ PQ_LOG_D (" TxId " << event. GetTxId () << " invalid WriteId " << txBody.GetWriteId ());
3346
3350
SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
3347
3351
event.GetTxId (),
3348
3352
NKikimrPQ::TError::BAD_REQUEST,
@@ -3351,9 +3355,36 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
3351
3355
return ;
3352
3356
}
3353
3357
3358
+ if (txBody.HasWriteId ()) {
3359
+ const TWriteId writeId = GetWriteId (txBody);
3360
+ if (!TxWrites.contains (writeId)) {
3361
+ PQ_LOG_D (" TxId " << event.GetTxId () << " unknown WriteId " << writeId);
3362
+ SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
3363
+ event.GetTxId (),
3364
+ NKikimrPQ::TError::BAD_REQUEST,
3365
+ " unknown WriteId" ,
3366
+ ctx);
3367
+ return ;
3368
+ }
3369
+
3370
+ TTxWriteInfo& writeInfo = TxWrites.at (writeId);
3371
+ if (writeInfo.Deleting ) {
3372
+ PQ_LOG_W (" TxId " << event.GetTxId () << " WriteId " << writeId << " will be deleted" );
3373
+ SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
3374
+ event.GetTxId (),
3375
+ NKikimrPQ::TError::BAD_REQUEST,
3376
+ " WriteId will be deleted" ,
3377
+ ctx);
3378
+ return ;
3379
+ }
3380
+
3381
+ writeInfo.TxId = event.GetTxId ();
3382
+ PQ_LOG_D (" TxId " << event.GetTxId () << " has WriteId " << writeId);
3383
+ }
3384
+
3354
3385
TMaybe<TPartitionId> partitionId = FindPartitionId (txBody);
3355
3386
if (!partitionId.Defined ()) {
3356
- PQ_LOG_D ( " unknown partition for WriteId " << txBody.GetWriteId ());
3387
+ PQ_LOG_W ( " TxId " << event. GetTxId () << " unknown partition for WriteId " << txBody.GetWriteId ());
3357
3388
SendProposeTransactionAbort (ActorIdFromProto (event.GetSourceActor ()),
3358
3389
event.GetTxId (),
3359
3390
NKikimrPQ::TError::INTERNAL,
@@ -3566,13 +3597,15 @@ bool TPersQueue::CanProcessTxWrites() const
3566
3597
void TPersQueue::SubscribeWriteId (const TWriteId& writeId,
3567
3598
const TActorContext& ctx)
3568
3599
{
3600
+ PQ_LOG_D (" send TEvSubscribeLock for WriteId " << writeId);
3569
3601
ctx.Send (NLongTxService::MakeLongTxServiceID (writeId.NodeId ),
3570
3602
new NLongTxService::TEvLongTxService::TEvSubscribeLock (writeId.KeyId , writeId.NodeId ));
3571
3603
}
3572
3604
3573
3605
void TPersQueue::UnsubscribeWriteId (const TWriteId& writeId,
3574
3606
const TActorContext& ctx)
3575
3607
{
3608
+ PQ_LOG_D (" send TEvUnsubscribeLock for WriteId " << writeId);
3576
3609
ctx.Send (NLongTxService::MakeLongTxServiceID (writeId.NodeId ),
3577
3610
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock (writeId.KeyId , writeId.NodeId ));
3578
3611
}
@@ -3874,11 +3907,16 @@ void TPersQueue::SavePlanStep(NKikimrPQ::TTabletTxInfo& info)
3874
3907
void TPersQueue::SaveTxWrites (NKikimrPQ::TTabletTxInfo& info)
3875
3908
{
3876
3909
for (auto & [writeId, write ] : TxWrites) {
3877
- for ( auto [partitionId, shadowPartitionId] : write .Partitions ) {
3910
+ if ( write .Partitions . empty () ) {
3878
3911
auto * txWrite = info.MutableTxWrites ()->Add ();
3879
3912
SetWriteId (*txWrite, writeId);
3880
- txWrite->SetOriginalPartitionId (partitionId);
3881
- txWrite->SetInternalPartitionId (shadowPartitionId.InternalPartitionId );
3913
+ } else {
3914
+ for (auto [partitionId, shadowPartitionId] : write .Partitions ) {
3915
+ auto * txWrite = info.MutableTxWrites ()->Add ();
3916
+ SetWriteId (*txWrite, writeId);
3917
+ txWrite->SetOriginalPartitionId (partitionId);
3918
+ txWrite->SetInternalPartitionId (shadowPartitionId.InternalPartitionId );
3919
+ }
3882
3920
}
3883
3921
}
3884
3922
@@ -4323,6 +4361,9 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
4323
4361
4324
4362
WriteTx (tx, NKikimrPQ::TTransaction::EXECUTED);
4325
4363
4364
+ PQ_LOG_D (" delete partitions for TxId " << tx.TxId );
4365
+ BeginDeletePartitions (tx);
4366
+
4326
4367
tx.State = NKikimrPQ::TTransaction::EXECUTED;
4327
4368
PQ_LOG_D (" TxId " << tx.TxId <<
4328
4369
" , NewState " << NKikimrPQ::TTransaction_EState_Name (tx.State ));
@@ -4341,8 +4382,8 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
4341
4382
4342
4383
case NKikimrPQ::TTransaction::WAIT_RS_ACKS:
4343
4384
PQ_LOG_D (" HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive () <<
4344
- " , WriteIdIsDisabled " << WriteIdIsDisabled (tx.WriteId ));
4345
- if (tx.HaveAllRecipientsReceive () && WriteIdIsDisabled (tx.WriteId )) {
4385
+ " , AllSupportivePartitionsHaveBeenDeleted " << AllSupportivePartitionsHaveBeenDeleted (tx.WriteId ));
4386
+ if (tx.HaveAllRecipientsReceive () && AllSupportivePartitionsHaveBeenDeleted (tx.WriteId )) {
4346
4387
DeleteTx (tx);
4347
4388
// implicitly switch to the state DELETING
4348
4389
}
@@ -4367,7 +4408,7 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
4367
4408
}
4368
4409
}
4369
4410
4370
- bool TPersQueue::WriteIdIsDisabled (const TMaybe<TWriteId>& writeId) const
4411
+ bool TPersQueue::AllSupportivePartitionsHaveBeenDeleted (const TMaybe<TWriteId>& writeId) const
4371
4412
{
4372
4413
if (!writeId.Defined ()) {
4373
4414
return true ;
@@ -4378,26 +4419,21 @@ bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
4378
4419
TabletID (), writeId->NodeId , writeId->KeyId );
4379
4420
const TTxWriteInfo& writeInfo = TxWrites.at (*writeId);
4380
4421
4381
- bool disabled =
4382
- (writeInfo.LongTxSubscriptionStatus != NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) &&
4422
+ PQ_LOG_D (" WriteId " << *writeId <<
4423
+ " Partitions.size=" << writeInfo.Partitions .size ());
4424
+ bool deleted =
4383
4425
writeInfo.Partitions .empty ()
4384
4426
;
4385
4427
4386
- PQ_LOG_D (" WriteId " << *writeId << " is " << (disabled ? " disabled" : " enabled" ));
4387
-
4388
- return disabled;
4428
+ return deleted;
4389
4429
}
4390
4430
4391
4431
void TPersQueue::DeleteWriteId (const TMaybe<TWriteId>& writeId)
4392
4432
{
4393
- if (!writeId.Defined ()) {
4433
+ if (!writeId.Defined () || !TxWrites. contains (*writeId) ) {
4394
4434
return ;
4395
4435
}
4396
4436
4397
- Y_ABORT_UNLESS (TxWrites.contains (*writeId),
4398
- " PQ %" PRIu64 " , WriteId {%" PRIu64 " , %" PRIu64 " }" ,
4399
- TabletID (), writeId->NodeId , writeId->KeyId );
4400
-
4401
4437
PQ_LOG_D (" delete WriteId " << *writeId);
4402
4438
TxWrites.erase (*writeId);
4403
4439
}
@@ -4727,7 +4763,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
4727
4763
}
4728
4764
}
4729
4765
4730
- void TPersQueue::Handle (NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx )
4766
+ void TPersQueue::Handle (NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev)
4731
4767
{
4732
4768
PQ_LOG_D (" Handle TEvLongTxService::TEvLockStatus " << ev->Get ()->Record .ShortDebugString ());
4733
4769
@@ -4748,22 +4784,14 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
4748
4784
return ;
4749
4785
}
4750
4786
4751
- if (!writeInfo.TxId .Defined ()) {
4752
- PQ_LOG_D (" delete write info for WriteId " << writeId);
4753
- // the message TEvProposeTransaction will not come anymore
4754
- BeginDeletePartitions (writeInfo);
4787
+ if (writeInfo.TxId .Defined ()) {
4788
+ // the message `TEvProposeTransaction` has already arrived
4789
+ PQ_LOG_D (" there is already a transaction TxId " << writeInfo.TxId << " for WriteId " << writeId);
4755
4790
return ;
4756
4791
}
4757
4792
4758
- ui64 txId = *writeInfo.TxId ;
4759
- PQ_LOG_D (" delete write info for WriteId " << writeId << " and TxId " << txId);
4760
-
4761
- auto * tx = GetTransaction (ctx, txId);
4762
- if (!tx ||
4763
- (tx->State == NKikimrPQ::TTransaction::EXECUTED) ||
4764
- (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS)) {
4765
- BeginDeletePartitions (writeInfo);
4766
- }
4793
+ PQ_LOG_D (" delete partitions for WriteId " << writeId);
4794
+ BeginDeletePartitions (writeInfo);
4767
4795
}
4768
4796
4769
4797
void TPersQueue::Handle (TEvPQ::TEvReadingPartitionStatusRequest::TPtr& ev, const TActorContext& ctx)
@@ -4863,6 +4891,16 @@ void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
4863
4891
writeInfo.Deleting = true ;
4864
4892
}
4865
4893
4894
+ void TPersQueue::BeginDeletePartitions (const TDistributedTransaction& tx)
4895
+ {
4896
+ if (!tx.WriteId .Defined () || !TxWrites.contains (*tx.WriteId )) {
4897
+ return ;
4898
+ }
4899
+
4900
+ TTxWriteInfo& writeInfo = TxWrites.at (*tx.WriteId );
4901
+ BeginDeletePartitions (writeInfo);
4902
+ }
4903
+
4866
4904
TString TPersQueue::LogPrefix () const {
4867
4905
return TStringBuilder () << " [PQ: " << TabletID () << " ] " ;
4868
4906
}
@@ -4917,7 +4955,7 @@ bool TPersQueue::HandleHook(STFUNC_SIG)
4917
4955
HFuncTraced (TEvMediatorTimecast::TEvRegisterTabletResult, Handle );
4918
4956
HFuncTraced (TEvPQ::TEvCheckPartitionStatusRequest, Handle );
4919
4957
HFuncTraced (TEvPQ::TEvPartitionScaleStatusChanged, Handle );
4920
- HFuncTraced (NLongTxService::TEvLongTxService::TEvLockStatus, Handle );
4958
+ hFuncTraced (NLongTxService::TEvLongTxService::TEvLockStatus, Handle );
4921
4959
HFuncTraced (TEvPQ::TEvReadingPartitionStatusRequest, Handle );
4922
4960
HFuncTraced (TEvPQ::TEvDeletePartitionDone, Handle );
4923
4961
HFuncTraced (TEvPQ::TEvTransactionCompleted, Handle );
0 commit comments