Skip to content

Commit f0d7ba3

Browse files
[+] тест для удаления служебной партиции
1 parent 247c33a commit f0d7ba3

File tree

11 files changed

+154
-79
lines changed

11 files changed

+154
-79
lines changed

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2304,7 +2304,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23042304
!topicTxs.empty());
23052305

23062306
if (!locksMap.empty() || VolatileTx ||
2307-
Request.TopicOperations.HasReadOperations())
2307+
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
23082308
{
23092309
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);
23102310

@@ -2352,6 +2352,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23522352
}
23532353

23542354
if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) {
2355+
sendingShardsSet.insert(tabletIds.begin(), tabletIds.end());
23552356
receivingShardsSet.insert(tabletIds.begin(), tabletIds.end());
23562357
}
23572358

@@ -2398,6 +2399,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
23982399
}
23992400
}
24002401

2402+
24012403
// Encode sending/receiving shards in tx bodies
24022404
if (needCommit) {
24032405
NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end());

ydb/core/kqp/topics/kqp_topics.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,7 @@ TSet<ui64> TTopicOperations::GetReceivingTabletIds() const
356356
{
357357
TSet<ui64> ids;
358358
for (auto& [_, operations] : Operations_) {
359-
if (operations.HasWriteOperations()) {
360-
ids.insert(operations.GetTabletId());
361-
}
359+
ids.insert(operations.GetTabletId());
362360
}
363361
return ids;
364362
}

ydb/core/persqueue/key.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
#include "key.h"
2-
#include <ydb/library/dbgtrace/debug_trace.h>
32

43
namespace NKikimr::NPQ {
54

65
std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, const TPartitionId& partition)
76
{
8-
DBGTRACE("MakeKeyPrefixRange");
9-
DBGTRACE_LOG("type=" << (char)type << ", partition=" << partition);
107
TKeyPrefix from(type, partition);
118
TKeyPrefix to(type, TPartitionId(partition.OriginalPartitionId, partition.WriteId, partition.InternalPartitionId + 1));
129

ydb/core/persqueue/partition.cpp

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
#include <util/folder/path.h>
2020
#include <util/string/escape.h>
2121
#include <util/system/byteorder.h>
22-
#include <ydb/library/dbgtrace/debug_trace.h>
2322

2423
namespace {
2524

@@ -365,7 +364,6 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
365364
}
366365

367366
void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
368-
DBGTRACE("TPartition::AddMetaKey");
369367
//Set Start Offset
370368
auto write = request->Record.AddCmdWrite();
371369
TKeyPrefix ikey(TKeyPrefix::TypeMeta, Partition);
@@ -393,7 +391,6 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
393391
write->SetKey(ikey.Data(), ikey.Size());
394392
write->SetValue(out.c_str(), out.size());
395393
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);
396-
397394
}
398395

399396
bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
@@ -980,7 +977,6 @@ void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext
980977

981978
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
982979
{
983-
DBGTRACE("TPartition::Handle(TEvPQ::TEvTxCommit)");
984980
EndTransaction(*ev->Get(), ctx);
985981

986982
TxInProgress = false;
@@ -990,7 +986,6 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
990986

991987
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
992988
{
993-
DBGTRACE("TPartition::Handle(TEvPQ::TEvTxRollback)");
994989
EndTransaction(*ev->Get(), ctx);
995990

996991
TxInProgress = false;
@@ -1000,7 +995,6 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
1000995

1001996
void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx)
1002997
{
1003-
DBGTRACE("TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse)");
1004998
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
1005999
"TEvGetWriteInfoResponse Cookie: " << ev->Get()->Cookie);
10061000

@@ -1022,7 +1016,6 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
10221016
WriteInfoResponse = ev->Release();
10231017

10241018
if (auto* t = TryGetCurrentImmediateTransaction()) {
1025-
DBGTRACE_LOG("immediate tx");
10261019
if (!WriteInfoResponse->BodyKeys.empty()) {
10271020
predicate = false;
10281021
}
@@ -1059,7 +1052,6 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
10591052

10601053
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx)
10611054
{
1062-
DBGTRACE("TPartition::Handle(TEvPQ::TEvGetWriteInfoError)");
10631055
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
10641056
"TEvGetWriteInfoError Cookie: " << ev->Get()->Cookie);
10651057

@@ -1068,7 +1060,6 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
10681060
WriteInfoResponse = nullptr;
10691061

10701062
if (auto* t = TryGetCurrentImmediateTransaction()) {
1071-
DBGTRACE_LOG("immediate tx");
10721063
ScheduleReplyPropose(t->Record,
10731064
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
10741065
NKikimrPQ::TError::BAD_REQUEST,
@@ -1656,9 +1647,7 @@ size_t TPartition::GetUserActCount(const TString& consumer) const
16561647

16571648
void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
16581649
{
1659-
DBGTRACE("TPartition::ProcessTxsAndUserActs");
16601650
if (KVWriteInProgress || TxInProgress) {
1661-
DBGTRACE_LOG("skip");
16621651
return;
16631652
}
16641653

@@ -1671,16 +1660,12 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
16711660

16721661
void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16731662
{
1674-
DBGTRACE("TPartition::ContinueProcessTxsAndUserActs");
1675-
DBGTRACE_LOG("Partition=" << Partition);
1676-
DBGTRACE_LOG("DeletePartitionState=" << (int)DeletePartitionState);
16771663
Y_ABORT_UNLESS(!KVWriteInProgress);
16781664
Y_ABORT_UNLESS(!TxInProgress);
16791665

16801666
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
16811667

16821668
if (DeletePartitionState == DELETION_INITED) {
1683-
DBGTRACE_LOG("deletion inited");
16841669
ScheduleNegativeReplies();
16851670
ScheduleDeletePartitionDone();
16861671

@@ -1689,14 +1674,14 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16891674
ctx.Send(Tablet, request.Release());
16901675

16911676
DeletePartitionState = DELETION_IN_PROCESS;
1677+
KVWriteInProgress = true;
16921678

16931679
return;
16941680
}
16951681

16961682
HaveWriteMsg = false;
16971683

16981684
if (UserActionAndTransactionEvents.empty()) {
1699-
DBGTRACE_LOG("empty queue");
17001685
const auto now = ctx.Now();
17011686

17021687
if (ManageWriteTimestampEstimate) {
@@ -1712,7 +1697,6 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
17121697
ProcessReserveRequests(ctx);
17131698

17141699
if (haveChanges || TxIdHasChanged || !AffectedUsers.empty() || ChangeConfig) {
1715-
DBGTRACE_LOG("begin write");
17161700
AddCmdWriteTxMeta(request->Record);
17171701
AddCmdWriteUserInfos(request->Record);
17181702
AddCmdWriteConfig(request->Record);
@@ -1727,7 +1711,6 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
17271711
KVWriteInProgress = true;
17281712
HaveWriteMsg = true;
17291713
} else {
1730-
DBGTRACE_LOG("reply");
17311714
AnswerCurrentWrites(ctx);
17321715
AnswerCurrentReplies(ctx);
17331716
}
@@ -1763,7 +1746,6 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
17631746
}
17641747

17651748
if (HaveWriteMsg) {
1766-
DBGTRACE_LOG("have write operation");
17671749
if (!DiskIsFull) {
17681750
EndAppendHeadWithNewWrites(request.Get(), ctx);
17691751
EndProcessWrites(request.Get(), ctx);
@@ -3093,7 +3075,6 @@ void TPartition::ScheduleNegativeReplies()
30933075

30943076
void TPartition::AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request)
30953077
{
3096-
DBGTRACE("TPartition::AddCmdDeleteRangeForAllKeys");
30973078
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeInfo, Partition);
30983079
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeData, Partition);
30993080
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTmpData, Partition);

ydb/core/persqueue/partition_init.cpp

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#include "partition.h"
22
#include "partition_util.h"
33
#include <memory>
4-
#include <ydb/library/dbgtrace/debug_trace.h>
54

65
namespace NKikimr::NPQ {
76

@@ -1010,8 +1009,6 @@ bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev) {
10101009

10111010
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, const TPartitionId& partitionId)
10121011
{
1013-
DBGTRACE("AddCmdDeleteRange");
1014-
DBGTRACE_LOG("partitionId=" << partitionId);
10151012
auto keyPrefixes = MakeKeyPrefixRange(c, partitionId);
10161013
const TKeyPrefix& from = keyPrefixes.first;
10171014
const TKeyPrefix& to = keyPrefixes.second;
@@ -1023,9 +1020,6 @@ void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, co
10231020
range->SetIncludeFrom(true);
10241021
range->SetTo(to.Data(), to.Size());
10251022
range->SetIncludeTo(false);
1026-
1027-
DBGTRACE_LOG("from=" << TString(from.Data(), from.Size()));
1028-
DBGTRACE_LOG("to=" << TString(to.Data(), to.Size()));
10291023
}
10301024

10311025
static void RequestRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition,

ydb/core/persqueue/partition_write.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
#include <util/folder/path.h>
2323
#include <util/string/escape.h>
2424
#include <util/system/byteorder.h>
25-
#include <ydb/library/dbgtrace/debug_trace.h>
2625

2726
namespace NKikimr::NPQ {
2827

@@ -1445,7 +1444,6 @@ void TPartition::BeginHandleRequests(TEvKeyValue::TEvRequest* request, const TAc
14451444

14461445
void TPartition::EndHandleRequests(TEvKeyValue::TEvRequest* request, const TActorContext& ctx)
14471446
{
1448-
DBGTRACE("TPartition::EndHandleRequests");
14491447
HaveDrop = CleanUp(request, ctx);
14501448

14511449
ProcessReserveRequests(ctx);

ydb/core/persqueue/pq_impl.cpp

Lines changed: 55 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3173,12 +3173,13 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
31733173
return;
31743174
}
31753175

3176+
31763177
if (txBody.GetImmediate()) {
31773178
TPartitionId originalPartitionId(txBody.GetOperations(0).GetPartitionId());
31783179
const TPartitionInfo& partition = Partitions.at(originalPartitionId);
31793180

31803181
if (txBody.HasWriteId()) {
3181-
Y_ABORT_UNLESS(Partitions.contains(*partitionId));
3182+
// the condition `Partition.contains(*partitioned)` is checked above
31823183
const TPartitionInfo& partition = Partitions.at(*partitionId);
31833184
ActorIdToProto(partition.Actor, event.MutableSupportivePartitionActor());
31843185
}
@@ -3843,7 +3844,6 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
38433844
result->Record.SetTxId(tx.TxId);
38443845
result->Record.SetStep(tx.Step);
38453846

3846-
38473847
ctx.Send(tx.SourceActor, std::move(result));
38483848
}
38493849

@@ -4088,7 +4088,11 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
40884088
case NKikimrPQ::TTransaction::EXECUTED:
40894089
PQ_LOG_T("TxId="<< tx.TxId << ", State=EXECUTED, tx.HaveAllRecipientsReceive()=" << tx.HaveAllRecipientsReceive());
40904090
if (tx.HaveAllRecipientsReceive()) {
4091-
DeleteTx(tx);
4091+
if (tx.WriteId.Defined()) {
4092+
BeginDeleteTx(tx);
4093+
} else {
4094+
DeleteTx(tx);
4095+
}
40924096
}
40934097

40944098
break;
@@ -4407,13 +4411,26 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
44074411
}
44084412
}
44094413

4410-
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& /*ctx*/)
4414+
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext&)
44114415
{
44124416
auto& record = ev->Get()->Record;
44134417
ui64 writeId = record.GetLockId();
4414-
if (TxWrites.contains(writeId)) {
4415-
TTxWriteInfo& txWrite = TxWrites.at(writeId);
4416-
txWrite.LongTxSubscriptionStatus = record.GetStatus();
4418+
4419+
if (!TxWrites.contains(writeId)) {
4420+
// the transaction has already been completed
4421+
return;
4422+
}
4423+
4424+
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
4425+
writeInfo.LongTxSubscriptionStatus = record.GetStatus();
4426+
4427+
if (writeInfo.LongTxSubscriptionStatus == NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) {
4428+
return;
4429+
}
4430+
4431+
if (!writeInfo.TxId.Defined()) {
4432+
// the message TEvProposeTransaction will not come anymore
4433+
BeginDeletePartitions(writeInfo);
44174434
}
44184435
}
44194436

@@ -4449,6 +4466,11 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
44494466
writeInfo.Partitions.erase(partitionId.OriginalPartitionId);
44504467
if (writeInfo.Partitions.empty()) {
44514468
UnsubscribeWriteId(writeId, ctx);
4469+
if (writeInfo.TxId.Defined()) {
4470+
if (auto tx = GetTransaction(ctx, *writeInfo.TxId); tx) {
4471+
DeleteTx(*tx);
4472+
}
4473+
}
44524474
TxWrites.erase(writeId);
44534475
}
44544476

@@ -4467,11 +4489,37 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
44674489
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
44684490
Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1);
44694491

4492+
BeginDeletePartitions(writeInfo);
4493+
}
4494+
4495+
void TPersQueue::BeginDeleteTx(const TDistributedTransaction& tx)
4496+
{
4497+
Y_ABORT_UNLESS(tx.WriteId.Defined());
4498+
const ui64 writeId = *tx.WriteId;
4499+
if (!TxWrites.contains(writeId)) {
4500+
// the transaction has already been completed
4501+
return;
4502+
}
4503+
4504+
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
4505+
if (writeInfo.LongTxSubscriptionStatus == NKikimrLongTxService::TEvLockStatus::STATUS_SUBSCRIBED) {
4506+
return;
4507+
}
4508+
4509+
BeginDeletePartitions(writeInfo);
4510+
}
4511+
4512+
void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
4513+
{
4514+
if (writeInfo.Deleting) {
4515+
return;
4516+
}
44704517
for (auto& [_, partitionId] : writeInfo.Partitions) {
44714518
Y_ABORT_UNLESS(Partitions.contains(partitionId));
44724519
const TPartitionInfo& partition = Partitions.at(partitionId);
44734520
Send(partition.Actor, new TEvPQ::TEvDeletePartition);
44744521
}
4522+
writeInfo.Deleting = true;
44754523
}
44764524

44774525
TString TPersQueue::LogPrefix() const {

ydb/core/persqueue/pq_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
201201
THashMap<ui32, TPartitionId> Partitions;
202202
TMaybe<ui64> TxId;
203203
NKikimrLongTxService::TEvLockStatus::EStatus LongTxSubscriptionStatus = NKikimrLongTxService::TEvLockStatus::STATUS_UNSPECIFIED;
204+
bool Deleting = false;
204205
};
205206

206207
THashMap<ui64, TTxWriteInfo> TxWrites;
@@ -490,6 +491,9 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
490491
void Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx);
491492
void Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorContext& ctx);
492493
void Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorContext& ctx);
494+
495+
void BeginDeleteTx(const TDistributedTransaction& tx);
496+
void BeginDeletePartitions(TTxWriteInfo& writeInfo);
493497
};
494498

495499

ydb/core/persqueue/ya.make

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ PEERDIR(
7474
ydb/library/protobuf_printer
7575
ydb/public/lib/base
7676
ydb/public/sdk/cpp/client/ydb_persqueue_public
77-
ydb/library/dbgtrace
7877
)
7978

8079
END()

0 commit comments

Comments
 (0)