Skip to content

Commit 247c33a

Browse files
[-] левая граница диапазона ключей
1 parent 547a643 commit 247c33a

File tree

7 files changed

+154
-0
lines changed

7 files changed

+154
-0
lines changed

ydb/core/persqueue/key.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
#include "key.h"
2+
#include <ydb/library/dbgtrace/debug_trace.h>
23

34
namespace NKikimr::NPQ {
45

56
std::pair<TKeyPrefix, TKeyPrefix> MakeKeyPrefixRange(TKeyPrefix::EType type, const TPartitionId& partition)
67
{
8+
DBGTRACE("MakeKeyPrefixRange");
9+
DBGTRACE_LOG("type=" << (char)type << ", partition=" << partition);
710
TKeyPrefix from(type, partition);
811
TKeyPrefix to(type, TPartitionId(partition.OriginalPartitionId, partition.WriteId, partition.InternalPartitionId + 1));
912

ydb/core/persqueue/partition.cpp

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <util/folder/path.h>
2020
#include <util/string/escape.h>
2121
#include <util/system/byteorder.h>
22+
#include <ydb/library/dbgtrace/debug_trace.h>
2223

2324
namespace {
2425

@@ -364,6 +365,7 @@ void TPartition::HandleWakeup(const TActorContext& ctx) {
364365
}
365366

366367
void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
368+
DBGTRACE("TPartition::AddMetaKey");
367369
//Set Start Offset
368370
auto write = request->Record.AddCmdWrite();
369371
TKeyPrefix ikey(TKeyPrefix::TypeMeta, Partition);
@@ -978,6 +980,7 @@ void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext
978980

979981
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
980982
{
983+
DBGTRACE("TPartition::Handle(TEvPQ::TEvTxCommit)");
981984
EndTransaction(*ev->Get(), ctx);
982985

983986
TxInProgress = false;
@@ -987,6 +990,7 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
987990

988991
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
989992
{
993+
DBGTRACE("TPartition::Handle(TEvPQ::TEvTxRollback)");
990994
EndTransaction(*ev->Get(), ctx);
991995

992996
TxInProgress = false;
@@ -996,6 +1000,7 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
9961000

9971001
void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx)
9981002
{
1003+
DBGTRACE("TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse)");
9991004
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
10001005
"TEvGetWriteInfoResponse Cookie: " << ev->Get()->Cookie);
10011006

@@ -1017,6 +1022,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
10171022
WriteInfoResponse = ev->Release();
10181023

10191024
if (auto* t = TryGetCurrentImmediateTransaction()) {
1025+
DBGTRACE_LOG("immediate tx");
10201026
if (!WriteInfoResponse->BodyKeys.empty()) {
10211027
predicate = false;
10221028
}
@@ -1053,6 +1059,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
10531059

10541060
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx)
10551061
{
1062+
DBGTRACE("TPartition::Handle(TEvPQ::TEvGetWriteInfoError)");
10561063
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE,
10571064
"TEvGetWriteInfoError Cookie: " << ev->Get()->Cookie);
10581065

@@ -1061,6 +1068,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
10611068
WriteInfoResponse = nullptr;
10621069

10631070
if (auto* t = TryGetCurrentImmediateTransaction()) {
1071+
DBGTRACE_LOG("immediate tx");
10641072
ScheduleReplyPropose(t->Record,
10651073
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
10661074
NKikimrPQ::TError::BAD_REQUEST,
@@ -1648,7 +1656,9 @@ size_t TPartition::GetUserActCount(const TString& consumer) const
16481656

16491657
void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
16501658
{
1659+
DBGTRACE("TPartition::ProcessTxsAndUserActs");
16511660
if (KVWriteInProgress || TxInProgress) {
1661+
DBGTRACE_LOG("skip");
16521662
return;
16531663
}
16541664

@@ -1661,12 +1671,16 @@ void TPartition::ProcessTxsAndUserActs(const TActorContext& ctx)
16611671

16621672
void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16631673
{
1674+
DBGTRACE("TPartition::ContinueProcessTxsAndUserActs");
1675+
DBGTRACE_LOG("Partition=" << Partition);
1676+
DBGTRACE_LOG("DeletePartitionState=" << (int)DeletePartitionState);
16641677
Y_ABORT_UNLESS(!KVWriteInProgress);
16651678
Y_ABORT_UNLESS(!TxInProgress);
16661679

16671680
THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);
16681681

16691682
if (DeletePartitionState == DELETION_INITED) {
1683+
DBGTRACE_LOG("deletion inited");
16701684
ScheduleNegativeReplies();
16711685
ScheduleDeletePartitionDone();
16721686

@@ -1682,6 +1696,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16821696
HaveWriteMsg = false;
16831697

16841698
if (UserActionAndTransactionEvents.empty()) {
1699+
DBGTRACE_LOG("empty queue");
16851700
const auto now = ctx.Now();
16861701

16871702
if (ManageWriteTimestampEstimate) {
@@ -1697,6 +1712,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
16971712
ProcessReserveRequests(ctx);
16981713

16991714
if (haveChanges || TxIdHasChanged || !AffectedUsers.empty() || ChangeConfig) {
1715+
DBGTRACE_LOG("begin write");
17001716
AddCmdWriteTxMeta(request->Record);
17011717
AddCmdWriteUserInfos(request->Record);
17021718
AddCmdWriteConfig(request->Record);
@@ -1711,6 +1727,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
17111727
KVWriteInProgress = true;
17121728
HaveWriteMsg = true;
17131729
} else {
1730+
DBGTRACE_LOG("reply");
17141731
AnswerCurrentWrites(ctx);
17151732
AnswerCurrentReplies(ctx);
17161733
}
@@ -1746,6 +1763,7 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)
17461763
}
17471764

17481765
if (HaveWriteMsg) {
1766+
DBGTRACE_LOG("have write operation");
17491767
if (!DiskIsFull) {
17501768
EndAppendHeadWithNewWrites(request.Get(), ctx);
17511769
EndProcessWrites(request.Get(), ctx);
@@ -3075,6 +3093,7 @@ void TPartition::ScheduleNegativeReplies()
30753093

30763094
void TPartition::AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request)
30773095
{
3096+
DBGTRACE("TPartition::AddCmdDeleteRangeForAllKeys");
30783097
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeInfo, Partition);
30793098
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeData, Partition);
30803099
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTmpData, Partition);

ydb/core/persqueue/partition_init.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "partition.h"
22
#include "partition_util.h"
33
#include <memory>
4+
#include <ydb/library/dbgtrace/debug_trace.h>
45

56
namespace NKikimr::NPQ {
67

@@ -1009,6 +1010,8 @@ bool DiskIsFull(TEvKeyValue::TEvResponse::TPtr& ev) {
10091010

10101011
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, const TPartitionId& partitionId)
10111012
{
1013+
DBGTRACE("AddCmdDeleteRange");
1014+
DBGTRACE_LOG("partitionId=" << partitionId);
10121015
auto keyPrefixes = MakeKeyPrefixRange(c, partitionId);
10131016
const TKeyPrefix& from = keyPrefixes.first;
10141017
const TKeyPrefix& to = keyPrefixes.second;
@@ -1017,7 +1020,12 @@ void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, co
10171020
auto range = del->MutableRange();
10181021

10191022
range->SetFrom(from.Data(), from.Size());
1023+
range->SetIncludeFrom(true);
10201024
range->SetTo(to.Data(), to.Size());
1025+
range->SetIncludeTo(false);
1026+
1027+
DBGTRACE_LOG("from=" << TString(from.Data(), from.Size()));
1028+
DBGTRACE_LOG("to=" << TString(to.Data(), to.Size()));
10211029
}
10221030

10231031
static void RequestRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition,

ydb/core/persqueue/partition_write.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <util/folder/path.h>
2323
#include <util/string/escape.h>
2424
#include <util/system/byteorder.h>
25+
#include <ydb/library/dbgtrace/debug_trace.h>
2526

2627
namespace NKikimr::NPQ {
2728

@@ -1444,6 +1445,7 @@ void TPartition::BeginHandleRequests(TEvKeyValue::TEvRequest* request, const TAc
14441445

14451446
void TPartition::EndHandleRequests(TEvKeyValue::TEvRequest* request, const TActorContext& ctx)
14461447
{
1448+
DBGTRACE("TPartition::EndHandleRequests");
14471449
HaveDrop = CleanUp(request, ctx);
14481450

14491451
ProcessReserveRequests(ctx);

ydb/core/persqueue/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ PEERDIR(
7474
ydb/library/protobuf_printer
7575
ydb/public/lib/base
7676
ydb/public/sdk/cpp/client/ydb_persqueue_public
77+
ydb/library/dbgtrace
7778
)
7879

7980
END()

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
44
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
55
#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/ut_utils/ut_utils.h>
6+
#include <ydb/core/keyvalue/keyvalue_events.h>
67

78
#include <library/cpp/logger/stream.h>
89

910
#include <library/cpp/testing/unittest/registar.h>
11+
#include <ydb/library/dbgtrace/debug_trace.h>
1012

1113
namespace NYdb::NTopic::NTests {
1214

@@ -77,6 +79,13 @@ class TFixture : public NUnitTest::TBaseFixture {
7779
void WaitForAcks(const TString& topicPath,
7880
const TString& messageGroupId);
7981

82+
ui64 GetTopicTabletId(const TActorId& actorId,
83+
const TString& topicPath,
84+
ui32 partition);
85+
void PrintTopicTabletId(const TString& topicPath);
86+
THashSet<TString> GetTabletKeys(const TActorId& actorId,
87+
ui64 tabletId);
88+
8089
protected:
8190
const TDriver& GetDriver() const;
8291

@@ -569,6 +578,102 @@ void TFixture::WaitForAcks(const TString& topicPath, const TString& messageGroup
569578
UNIT_ASSERT(context.AckCount == context.WriteCount);
570579
}
571580

581+
ui64 TFixture::GetTopicTabletId(const TActorId& actorId, const TString& topicPath, ui32 partition)
582+
{
583+
auto navigate = std::make_unique<NSchemeCache::TSchemeCacheNavigate>();
584+
navigate->DatabaseName = "/Root";
585+
586+
NSchemeCache::TSchemeCacheNavigate::TEntry entry;
587+
entry.Path = SplitPath(topicPath);
588+
entry.SyncVersion = true;
589+
entry.ShowPrivatePath = true;
590+
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpList;
591+
592+
navigate->ResultSet.push_back(std::move(entry));
593+
//navigate->UserToken = "root@builtin";
594+
navigate->Cookie = 12345;
595+
596+
auto& runtime = Setup->GetRuntime();
597+
598+
runtime.Send(MakeSchemeCacheID(), actorId,
599+
new TEvTxProxySchemeCache::TEvNavigateKeySet(navigate.release()),
600+
0,
601+
true);
602+
auto response = runtime.GrabEdgeEvent<TEvTxProxySchemeCache::TEvNavigateKeySetResult>();
603+
604+
UNIT_ASSERT_VALUES_EQUAL(response->Request->Cookie, 12345);
605+
UNIT_ASSERT_VALUES_EQUAL(response->Request->ErrorCount, 0);
606+
607+
auto& front = response->Request->ResultSet.front();
608+
UNIT_ASSERT(front.PQGroupInfo);
609+
UNIT_ASSERT_GT(front.PQGroupInfo->Description.PartitionsSize(), 0);
610+
UNIT_ASSERT_LT(partition, front.PQGroupInfo->Description.PartitionsSize());
611+
612+
for (size_t i = 0; i < front.PQGroupInfo->Description.PartitionsSize(); ++i) {
613+
auto& p = front.PQGroupInfo->Description.GetPartitions(partition);
614+
if (p.GetPartitionId() == partition) {
615+
return p.GetTabletId();
616+
}
617+
}
618+
619+
UNIT_FAIL("unknown partition");
620+
621+
return Max<ui64>();
622+
}
623+
624+
THashSet<TString> TFixture::GetTabletKeys(const TActorId& actorId, ui64 tabletId)
625+
{
626+
using TEvKeyValue = NKikimr::TEvKeyValue;
627+
628+
auto request = std::make_unique<TEvKeyValue::TEvRequest>();
629+
request->Record.SetCookie(12345);
630+
631+
auto cmd = request->Record.AddCmdReadRange();
632+
TString from(1, '\x00');
633+
TString to(1, '\xFF');
634+
auto range = cmd->MutableRange();
635+
range->SetFrom(from);
636+
range->SetIncludeFrom(true);
637+
range->SetTo(to);
638+
range->SetIncludeTo(true);
639+
640+
auto& runtime = Setup->GetRuntime();
641+
642+
runtime.SendToPipe(tabletId, actorId, request.release());
643+
auto response = runtime.GrabEdgeEvent<TEvKeyValue::TEvResponse>();
644+
645+
UNIT_ASSERT(response->Record.HasCookie());
646+
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetCookie(), 12345);
647+
UNIT_ASSERT_VALUES_EQUAL(response->Record.ReadRangeResultSize(), 1);
648+
649+
THashSet<TString> keys;
650+
651+
auto& result = response->Record.GetReadRangeResult(0);
652+
for (size_t i = 0; i < result.PairSize(); ++i) {
653+
auto& kv = result.GetPair(i);
654+
keys.insert(kv.GetKey());
655+
}
656+
657+
return keys;
658+
}
659+
660+
void TFixture::PrintTopicTabletId(const TString& topicPath)
661+
{
662+
DBGTRACE("TFixture::PrintTopicTabletId");
663+
auto& runtime = Setup->GetRuntime();
664+
TActorId edge = runtime.AllocateEdgeActor();
665+
666+
ui64 tabletId = GetTopicTabletId(edge, topicPath, 0);
667+
DBGTRACE_LOG("tabletId=" << tabletId);
668+
669+
auto keys = GetTabletKeys(edge, tabletId);
670+
DBGTRACE_LOG("keys.size=" << keys.size());
671+
672+
for (auto& key : keys) {
673+
DBGTRACE_LOG(key);
674+
}
675+
}
676+
572677
Y_UNIT_TEST_F(WriteToTopic_Demo_1, TFixture)
573678
{
574679
CreateTopic("topic_A");
@@ -966,6 +1071,21 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_10, TFixture)
9661071
}
9671072
}
9681073

1074+
Y_UNIT_TEST_F(WriteToTopic_Demo_11, TFixture)
1075+
{
1076+
DBGTRACE("WriteToTopic_Demo_11");
1077+
CreateTopic("topic_A");
1078+
PrintTopicTabletId("/Root/topic_A");
1079+
NTable::TSession tableSession = CreateTableSession();
1080+
NTable::TTransaction tx_1 = BeginTx(tableSession);
1081+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, "message #1", &tx_1);
1082+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
1083+
PrintTopicTabletId("/Root/topic_A");
1084+
CommitTx(tx_1, EStatus::SUCCESS);
1085+
Sleep(TDuration::Seconds(5));
1086+
PrintTopicTabletId("/Root/topic_A");
1087+
}
1088+
9691089
}
9701090

9711091
}

ydb/public/sdk/cpp/client/ydb_topic/ut/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ PEERDIR(
2929

3030
ydb/core/tx/schemeshard/ut_helpers
3131
ydb/core/persqueue/ut/common
32+
ydb/library/dbgtrace
3233
)
3334

3435
YQL_LAST_ABI_VERSION()

0 commit comments

Comments
 (0)