Skip to content

Commit d9b7f37

Browse files
Resharper transactions are hanging (#7676)
1 parent c66a55e commit d9b7f37

File tree

7 files changed

+149
-70
lines changed

7 files changed

+149
-70
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 91 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,6 +1340,7 @@ void TPersQueue::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&
13401340
EndWriteTabletState(resp, ctx);
13411341
break;
13421342
case WRITE_TX_COOKIE:
1343+
PQ_LOG_D("Handle TEvKeyValue::TEvResponse (WRITE_TX_COOKIE)");
13431344
EndWriteTxs(resp, ctx);
13441345
break;
13451346
default:
@@ -3459,7 +3460,7 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorCo
34593460
tx->OnReadSetAck(event);
34603461
tx->UnbindMsgsFromPipe(event.GetTabletConsumer());
34613462

3462-
if (tx->State == NKikimrPQ::TTransaction::EXECUTED) {
3463+
if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) {
34633464
CheckTxState(ctx, *tx);
34643465

34653466
TryWriteTxs(ctx);
@@ -3803,20 +3804,34 @@ void TPersQueue::ProcessWriteTxs(const TActorContext& ctx,
38033804
void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx,
38043805
NKikimrClient::TKeyValueRequest& request)
38053806
{
3806-
Y_ABORT_UNLESS(!WriteTxsInProgress);
3807+
Y_ABORT_UNLESS(!WriteTxsInProgress,
3808+
"PQ %" PRIu64,
3809+
TabletID());
38073810

38083811
for (ui64 txId : DeleteTxs) {
3809-
auto tx = GetTransaction(ctx, txId);
3810-
Y_ABORT_UNLESS(tx);
3812+
PQ_LOG_D("delete key for TxId " << txId);
3813+
AddCmdDeleteTx(request, txId);
38113814

3812-
tx->AddCmdDelete(request);
3813-
3814-
ChangedTxs.insert(tx->TxId);
3815+
auto tx = GetTransaction(ctx, txId);
3816+
if (tx) {
3817+
ChangedTxs.insert(txId);
3818+
}
38153819
}
38163820

38173821
DeleteTxs.clear();
38183822
}
38193823

3824+
void TPersQueue::AddCmdDeleteTx(NKikimrClient::TKeyValueRequest& request,
3825+
ui64 txId)
3826+
{
3827+
TString key = GetTxKey(txId);
3828+
auto range = request.AddCmdDeleteRange()->MutableRange();
3829+
range->SetFrom(key);
3830+
range->SetIncludeFrom(true);
3831+
range->SetTo(key);
3832+
range->SetIncludeTo(true);
3833+
}
3834+
38203835
void TPersQueue::ProcessConfigTx(const TActorContext& ctx,
38213836
TEvKeyValue::TEvRequest* request)
38223837
{
@@ -3938,6 +3953,7 @@ void TPersQueue::SendEvReadSetToReceivers(const TActorContext& ctx,
39383953
void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx,
39393954
TDistributedTransaction& tx)
39403955
{
3956+
PQ_LOG_D("TPersQueue::SendEvReadSetAckToSenders");
39413957
for (auto& [target, event] : tx.ReadSetAcks) {
39423958
PQ_LOG_D("Send TEvTxProcessing::TEvReadSetAck " << event->ToString());
39433959
ctx.Send(target, event.release());
@@ -4307,42 +4323,87 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43074323
Y_ABORT_UNLESS(false);
43084324
}
43094325

4326+
WriteTx(tx, NKikimrPQ::TTransaction::EXECUTED);
4327+
43104328
tx.State = NKikimrPQ::TTransaction::EXECUTED;
43114329
PQ_LOG_D("TxId " << tx.TxId <<
43124330
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
4313-
} else {
4314-
break;
43154331
}
43164332

4317-
[[fallthrough]];
4333+
break;
43184334

43194335
case NKikimrPQ::TTransaction::EXECUTED:
4320-
PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive());
4321-
if (tx.HaveAllRecipientsReceive()) {
4322-
if (tx.WriteId.Defined()) {
4323-
BeginDeleteTx(tx);
4324-
} else {
4325-
DeleteTx(tx);
4326-
}
4336+
SendEvReadSetAckToSenders(ctx, tx);
4337+
4338+
tx.State = NKikimrPQ::TTransaction::WAIT_RS_ACKS;
4339+
PQ_LOG_D("TxId " << tx.TxId <<
4340+
", NewState " << NKikimrPQ::TTransaction_EState_Name(tx.State));
4341+
4342+
[[fallthrough]];
4343+
4344+
case NKikimrPQ::TTransaction::WAIT_RS_ACKS:
4345+
PQ_LOG_D("HaveAllRecipientsReceive " << tx.HaveAllRecipientsReceive() <<
4346+
", WriteIdIsDisabled " << WriteIdIsDisabled(tx.WriteId));
4347+
if (tx.HaveAllRecipientsReceive() && WriteIdIsDisabled(tx.WriteId)) {
4348+
DeleteTx(tx);
4349+
// implicitly switch to the state DELETING
43274350
}
43284351

43294352
break;
43304353

43314354
case NKikimrPQ::TTransaction::DELETING:
43324355
// The PQ tablet has persisted its state. Now she can delete the transaction and take the next one.
4333-
SendEvReadSetAckToSenders(ctx, tx);
43344356
if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
43354357
TxQueue.pop();
43364358
TryStartTransaction(ctx);
43374359
}
4360+
4361+
DeleteWriteId(tx.WriteId);
4362+
PQ_LOG_D("delete TxId " << tx.TxId);
43384363
Txs.erase(tx.TxId);
4364+
43394365
// If this was the last transaction, then you need to send responses to messages about changes
43404366
// in the status of the PQ tablet (if they came)
43414367
TryReturnTabletStateAll(ctx);
43424368
break;
43434369
}
43444370
}
43454371

4372+
bool TPersQueue::WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const
4373+
{
4374+
if (!writeId.Defined()) {
4375+
return true;
4376+
}
4377+
4378+
Y_ABORT_UNLESS(TxWrites.contains(*writeId),
4379+
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
4380+
TabletID(), writeId->NodeId, writeId->KeyId);
4381+
const TTxWriteInfo& writeInfo = TxWrites.at(*writeId);
4382+
4383+
bool disabled =
4384+
(writeInfo.LongTxSubscriptionStatus != NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) &&
4385+
writeInfo.Partitions.empty()
4386+
;
4387+
4388+
PQ_LOG_D("WriteId " << *writeId << " is " << (disabled ? "disabled" : "enabled"));
4389+
4390+
return disabled;
4391+
}
4392+
4393+
void TPersQueue::DeleteWriteId(const TMaybe<TWriteId>& writeId)
4394+
{
4395+
if (!writeId.Defined()) {
4396+
return;
4397+
}
4398+
4399+
Y_ABORT_UNLESS(TxWrites.contains(*writeId),
4400+
"PQ %" PRIu64 ", WriteId {%" PRIu64 ", %" PRIu64 "}",
4401+
TabletID(), writeId->NodeId, writeId->KeyId);
4402+
4403+
PQ_LOG_D("delete WriteId " << *writeId);
4404+
TxWrites.erase(*writeId);
4405+
}
4406+
43464407
void TPersQueue::WriteTx(TDistributedTransaction& tx, NKikimrPQ::TTransaction::EState state)
43474408
{
43484409
WriteTxs[tx.TxId] = state;
@@ -4375,7 +4436,9 @@ void TPersQueue::CheckChangedTxStates(const TActorContext& ctx)
43754436
{
43764437
for (ui64 txId : ChangedTxs) {
43774438
auto tx = GetTransaction(ctx, txId);
4378-
Y_ABORT_UNLESS(tx);
4439+
Y_ABORT_UNLESS(tx,
4440+
"PQ %" PRIu64 ", TxId %" PRIu64,
4441+
TabletID(), txId);
43794442

43804443
CheckTxState(ctx, *tx);
43814444
}
@@ -4698,7 +4761,9 @@ void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& e
46984761
PQ_LOG_D("delete write info for WriteId " << writeId << " and TxId " << txId);
46994762

47004763
auto* tx = GetTransaction(ctx, txId);
4701-
if (!tx || (tx->State == NKikimrPQ::TTransaction::EXECUTED)) {
4764+
if (!tx ||
4765+
(tx->State == NKikimrPQ::TTransaction::EXECUTED) ||
4766+
(tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS)) {
47024767
BeginDeletePartitions(writeInfo);
47034768
}
47044769
}
@@ -4754,11 +4819,11 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
47544819
UnsubscribeWriteId(writeId, ctx);
47554820
if (writeInfo.TxId.Defined()) {
47564821
if (auto tx = GetTransaction(ctx, *writeInfo.TxId); tx) {
4757-
DeleteTx(*tx);
4822+
if (tx->State == NKikimrPQ::TTransaction::WAIT_RS_ACKS) {
4823+
CheckTxState(ctx, *tx);
4824+
}
47584825
}
47594826
}
4760-
PQ_LOG_D("delete WriteId " << writeId);
4761-
TxWrites.erase(writeId);
47624827
}
47634828
TxWritesChanged = true;
47644829

@@ -4767,6 +4832,9 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
47674832

47684833
void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext&)
47694834
{
4835+
PQ_LOG_D("Handle TEvPQ::TEvTransactionCompleted" <<
4836+
" WriteId " << ev->Get()->WriteId);
4837+
47704838
auto* event = ev->Get();
47714839
if (!event->WriteId.Defined()) {
47724840
return;
@@ -4782,26 +4850,6 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
47824850
BeginDeletePartitions(writeInfo);
47834851
}
47844852

4785-
void TPersQueue::BeginDeleteTx(const TDistributedTransaction& tx)
4786-
{
4787-
Y_ABORT_UNLESS(tx.WriteId.Defined());
4788-
const TWriteId& writeId = *tx.WriteId;
4789-
PQ_LOG_D("begin delete write info for WriteId " << writeId);
4790-
if (!TxWrites.contains(writeId)) {
4791-
// the transaction has already been completed
4792-
PQ_LOG_D("unknown WriteId " << writeId);
4793-
return;
4794-
}
4795-
4796-
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
4797-
if (writeInfo.LongTxSubscriptionStatus == NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) {
4798-
PQ_LOG_D("wait for WriteId subscription status");
4799-
return;
4800-
}
4801-
4802-
BeginDeletePartitions(writeInfo);
4803-
}
4804-
48054853
void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
48064854
{
48074855
if (writeInfo.Deleting) {

ydb/core/persqueue/pq_impl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,6 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
499499
void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx);
500500
void Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext& ctx);
501501

502-
void BeginDeleteTx(const TDistributedTransaction& tx);
503502
void BeginDeletePartitions(TTxWriteInfo& writeInfo);
504503

505504
bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
@@ -524,6 +523,12 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
524523

525524
void SendTransactionsReadRequest(const TString& fromKey, bool includeFrom,
526525
const TActorContext& ctx);
526+
527+
void AddCmdDeleteTx(NKikimrClient::TKeyValueRequest& request,
528+
ui64 txId);
529+
530+
bool WriteIdIsDisabled(const TMaybe<TWriteId>& writeId) const;
531+
void DeleteWriteId(const TMaybe<TWriteId>& writeId);
527532
};
528533

529534

ydb/core/persqueue/transaction.cpp

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ bool TDistributedTransaction::HaveParticipantsDecision() const
323323

324324
bool TDistributedTransaction::HaveAllRecipientsReceive() const
325325
{
326+
PQ_LOG_D("PredicateAcks: " << PredicateAcksCount << "/" << PredicateRecipients.size());
326327
return PredicateRecipients.size() == PredicateAcksCount;
327328
}
328329

@@ -389,18 +390,6 @@ void TDistributedTransaction::AddCmdWriteConfigTx(NKikimrPQ::TTransaction& tx)
389390
*tx.MutableBootstrapConfig() = BootstrapConfig;
390391
}
391392

392-
void TDistributedTransaction::AddCmdDelete(NKikimrClient::TKeyValueRequest& request)
393-
{
394-
TString key = GetKey();
395-
auto range = request.AddCmdDeleteRange()->MutableRange();
396-
range->SetFrom(key);
397-
range->SetIncludeFrom(true);
398-
range->SetTo(key);
399-
range->SetIncludeTo(true);
400-
401-
PQ_LOG_D("add CmdDeleteRange for key " << key);
402-
}
403-
404393
void TDistributedTransaction::SetDecision(NKikimrTx::TReadSetData::EDecision& var, NKikimrTx::TReadSetData::EDecision value)
405394
{
406395
if ((var == NKikimrTx::TReadSetData::DECISION_UNKNOWN) || (value == NKikimrTx::TReadSetData::DECISION_ABORT)) {

ydb/core/persqueue/transaction.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ struct TDistributedTransaction {
7575
bool HaveAllRecipientsReceive() const;
7676

7777
void AddCmdWrite(NKikimrClient::TKeyValueRequest& request, EState state);
78-
void AddCmdDelete(NKikimrClient::TKeyValueRequest& request);
7978

8079
static void SetDecision(NKikimrTx::TReadSetData::EDecision& var, NKikimrTx::TReadSetData::EDecision value);
8180

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -417,33 +417,36 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS
417417
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
418418
}
419419

420+
auto readSet = std::move(*tablet.ReadSet);
421+
tablet.ReadSet = Nothing();
422+
420423
if (matcher.Step.Defined()) {
421-
UNIT_ASSERT(tablet.ReadSet->HasStep());
422-
UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, tablet.ReadSet->GetStep());
424+
UNIT_ASSERT(readSet.HasStep());
425+
UNIT_ASSERT_VALUES_EQUAL(*matcher.Step, readSet.GetStep());
423426
}
424427
if (matcher.TxId.Defined()) {
425-
UNIT_ASSERT(tablet.ReadSet->HasTxId());
426-
UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, tablet.ReadSet->GetTxId());
428+
UNIT_ASSERT(readSet.HasTxId());
429+
UNIT_ASSERT_VALUES_EQUAL(*matcher.TxId, readSet.GetTxId());
427430
}
428431
if (matcher.Source.Defined()) {
429-
UNIT_ASSERT(tablet.ReadSet->HasTabletSource());
430-
UNIT_ASSERT_VALUES_EQUAL(*matcher.Source, tablet.ReadSet->GetTabletSource());
432+
UNIT_ASSERT(readSet.HasTabletSource());
433+
UNIT_ASSERT_VALUES_EQUAL(*matcher.Source, readSet.GetTabletSource());
431434
}
432435
if (matcher.Target.Defined()) {
433-
UNIT_ASSERT(tablet.ReadSet->HasTabletDest());
434-
UNIT_ASSERT_VALUES_EQUAL(*matcher.Target, tablet.ReadSet->GetTabletDest());
436+
UNIT_ASSERT(readSet.HasTabletDest());
437+
UNIT_ASSERT_VALUES_EQUAL(*matcher.Target, readSet.GetTabletDest());
435438
}
436439
if (matcher.Decision.Defined()) {
437-
UNIT_ASSERT(tablet.ReadSet->HasReadSet());
440+
UNIT_ASSERT(readSet.HasReadSet());
438441

439442
NKikimrTx::TReadSetData data;
440-
Y_ABORT_UNLESS(data.ParseFromString(tablet.ReadSet->GetReadSet()));
443+
Y_ABORT_UNLESS(data.ParseFromString(readSet.GetReadSet()));
441444

442445
UNIT_ASSERT_EQUAL(*matcher.Decision, data.GetDecision());
443446
}
444447
if (matcher.Producer.Defined()) {
445-
UNIT_ASSERT(tablet.ReadSet->HasTabletProducer());
446-
UNIT_ASSERT_VALUES_EQUAL(*matcher.Producer, tablet.ReadSet->GetTabletProducer());
448+
UNIT_ASSERT(readSet.HasTabletProducer());
449+
UNIT_ASSERT_VALUES_EQUAL(*matcher.Producer, readSet.GetTabletProducer());
447450
}
448451
}
449452

ydb/core/protos/pqconfig.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1097,7 +1097,8 @@ message TTransaction {
10971097
CALCULATED = 6;
10981098
WAIT_RS = 7; // persist
10991099
EXECUTING = 8;
1100-
EXECUTED = 9;
1100+
EXECUTED = 9; // persist
1101+
WAIT_RS_ACKS = 11;
11011102
DELETING = 10;
11021103
};
11031104

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1846,6 +1846,40 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_26, TFixture)
18461846
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 3);
18471847
}
18481848

1849+
Y_UNIT_TEST_F(WriteToTopic_Demo_27, TFixture)
1850+
{
1851+
CreateTopic("topic_A", TEST_CONSUMER);
1852+
CreateTopic("topic_B", TEST_CONSUMER);
1853+
CreateTopic("topic_C", TEST_CONSUMER);
1854+
1855+
for (size_t i = 0; i < 2; ++i) {
1856+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", nullptr, 0);
1857+
WriteToTopic("topic_B", TEST_MESSAGE_GROUP_ID, "message #2", nullptr, 0);
1858+
1859+
NTable::TSession tableSession = CreateTableSession();
1860+
NTable::TTransaction tx = BeginTx(tableSession);
1861+
1862+
auto messages = ReadFromTopic("topic_A", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
1863+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
1864+
WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
1865+
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID);
1866+
1867+
messages = ReadFromTopic("topic_B", TEST_CONSUMER, TDuration::Seconds(2), &tx, 0);
1868+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 1);
1869+
WriteToTopic("topic_C", TEST_MESSAGE_GROUP_ID, messages[0], &tx, 0);
1870+
WaitForAcks("topic_C", TEST_MESSAGE_GROUP_ID);
1871+
1872+
CommitTx(tx, EStatus::SUCCESS);
1873+
1874+
messages = ReadFromTopic("topic_C", TEST_CONSUMER, TDuration::Seconds(2), nullptr, 0);
1875+
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
1876+
1877+
DumpPQTabletKeys("topic_A");
1878+
DumpPQTabletKeys("topic_B");
1879+
DumpPQTabletKeys("topic_C");
1880+
}
1881+
}
1882+
18491883
}
18501884

18511885
}

0 commit comments

Comments
 (0)