Skip to content

Commit 8bc2d2c

Browse files
Fix cloudId for YMQ (#9088) and tx changes (#8845) (#9175)
Co-authored-by: Alek5andr-Kotov <[email protected]>
1 parent d50240f commit 8bc2d2c

File tree

8 files changed

+144
-25
lines changed

8 files changed

+144
-25
lines changed

ydb/core/http_proxy/http_req.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -277,7 +277,7 @@ namespace NKikimr::NHttpProxy {
277277
"' iam token size: " << HttpContext.IamToken.size());
278278
TMap<TString, TString> peerMetadata {
279279
{NYmq::V1::FOLDER_ID, FolderId},
280-
{NYmq::V1::CLOUD_ID, HttpContext.UserName ? HttpContext.UserName : CloudId},
280+
{NYmq::V1::CLOUD_ID, CloudId ? CloudId : HttpContext.UserName },
281281
{NYmq::V1::USER_SID, UserSid},
282282
{NYmq::V1::REQUEST_ID, HttpContext.RequestId},
283283
{NYmq::V1::SECURITY_TOKEN, HttpContext.SecurityToken},
@@ -408,7 +408,7 @@ namespace NKikimr::NHttpProxy {
408408
ctx,
409409
NKikimrServices::HTTP_PROXY,
410410
"Not retrying GRPC response."
411-
<< " Code: " << get<1>(errorAndCode)
411+
<< " Code: " << get<1>(errorAndCode)
412412
<< ", Error: " << get<0>(errorAndCode);
413413
);
414414

@@ -434,7 +434,7 @@ namespace NKikimr::NHttpProxy {
434434
ctx,
435435
NKikimrServices::HTTP_PROXY,
436436
TStringBuilder() << "Got cloud auth response."
437-
<< " FolderId: " << ev->Get()->FolderId
437+
<< " FolderId: " << ev->Get()->FolderId
438438
<< " CloudId: " << ev->Get()->CloudId
439439
<< " UserSid: " << ev->Get()->Sid;
440440
);
@@ -553,7 +553,7 @@ namespace NKikimr::NHttpProxy {
553553
.Counters = nullptr,
554554
.AWSSignature = std::move(HttpContext.GetSignature()),
555555
.IAMToken = HttpContext.IamToken,
556-
.FolderID = ""
556+
.FolderID = ""
557557
};
558558

559559
auto authRequestProxy = MakeHolder<NSQS::THttpProxyAuthRequestProxy>(

ydb/core/persqueue/partition.cpp

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ TString TPartition::LogPrefix() const {
8282
} else {
8383
state = "Unknown";
8484
}
85-
return TStringBuilder() << "[PQ: " << TabletID << ", Partition:" << Partition << ", State:" << state << "] ";
85+
return TStringBuilder() << "[PQ: " << TabletID << ", Partition: " << Partition << ", State: " << state << "] ";
8686
}
8787

8888
bool TPartition::IsActive() const {
@@ -2134,6 +2134,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
21342134

21352135
void TPartition::CommitWriteOperations(TTransaction& t)
21362136
{
2137+
PQ_LOG_D("TPartition::CommitWriteOperations TxId: " << t.GetTxId());
2138+
21372139
Y_ABORT_UNLESS(PersistRequest);
21382140
Y_ABORT_UNLESS(!PartitionedBlob.IsInited());
21392141

@@ -2151,6 +2153,10 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21512153
HaveWriteMsg = true;
21522154
}
21532155

2156+
PQ_LOG_D("t.WriteInfo->BodyKeys.size=" << t.WriteInfo->BodyKeys.size() <<
2157+
", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
2158+
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);
2159+
21542160
if (!t.WriteInfo->BodyKeys.empty()) {
21552161
PartitionedBlob = TPartitionedBlob(Partition,
21562162
NewHead.Offset,
@@ -2165,6 +2171,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21652171
MaxBlobSize);
21662172

21672173
for (auto& k : t.WriteInfo->BodyKeys) {
2174+
PQ_LOG_D("add key " << k.Key.ToString());
21682175
auto write = PartitionedBlob.Add(k.Key, k.Size);
21692176
if (write && !write->Value.empty()) {
21702177
AddCmdWrite(write, PersistRequest.Get(), ctx);
@@ -2173,18 +2180,17 @@ void TPartition::CommitWriteOperations(TTransaction& t)
21732180
}
21742181
}
21752182

2176-
}
21772183

2178-
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
2179-
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
2180-
RenameFormedBlobs(formedBlobs,
2181-
*Parameters,
2182-
curWrites,
2183-
PersistRequest.Get(),
2184-
ctx);
2185-
}
2184+
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
2185+
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
2186+
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
2187+
RenameFormedBlobs(formedBlobs,
2188+
*Parameters,
2189+
curWrites,
2190+
PersistRequest.Get(),
2191+
ctx);
2192+
}
21862193

2187-
if (!t.WriteInfo->BodyKeys.empty()) {
21882194
const auto& last = t.WriteInfo->BodyKeys.back();
21892195

21902196
NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());

ydb/core/persqueue/partition_id.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#include <util/system/types.h>
88
#include <util/digest/multi.h>
99
#include <util/str_stl.h>
10+
#include <util/string/builder.h>
1011

1112
#include <functional>
1213

@@ -51,6 +52,13 @@ class TPartitionId {
5152
}
5253
}
5354

55+
TString ToString() const
56+
{
57+
TStringBuilder s;
58+
s << *this;
59+
return s;
60+
}
61+
5462
bool IsSupportivePartition() const
5563
{
5664
return WriteId.Defined();

ydb/core/persqueue/partition_write.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,16 +1064,16 @@ void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFor
10641064
}
10651065
if (!DataKeysBody.empty() && CompactedKeys.empty()) {
10661066
Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.NewKey.GetOffset(),
1067+
"PQ: %" PRIu64 ", Partition: %s, "
10671068
"LAST KEY %s, HeadOffset %lu, NEWKEY %s",
1069+
TabletID, Partition.ToString().c_str(),
10681070
DataKeysBody.back().Key.ToString().c_str(),
10691071
Head.Offset,
10701072
x.NewKey.ToString().c_str());
10711073
}
1072-
LOG_DEBUG_S(
1073-
ctx, NKikimrServices::PERSQUEUE,
1074-
"writing blob: topic '" << TopicName() << "' partition " << Partition
1075-
<< " " << x.OldKey.ToString() << " size " << x.Size << " WTime " << ctx.Now().MilliSeconds()
1076-
);
1074+
PQ_LOG_D("writing blob: topic '" << TopicName() << "' partition " << Partition <<
1075+
" old key " << x.OldKey.ToString() << " new key " << x.NewKey.ToString() <<
1076+
" size " << x.Size << " WTime " << ctx.Now().MilliSeconds());
10771077

10781078
CompactedKeys.emplace_back(x.NewKey, x.Size);
10791079
}

ydb/core/persqueue/pq_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4667,6 +4667,8 @@ void TPersQueue::TryStartTransaction(const TActorContext& ctx)
46674667
Y_ABORT_UNLESS(next);
46684668

46694669
CheckTxState(ctx, *next);
4670+
4671+
TryWriteTxs(ctx);
46704672
}
46714673

46724674
void TPersQueue::OnInitComplete(const TActorContext& ctx)

ydb/core/tx/datashard/datashard_ut_change_exchange.cpp

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1247,6 +1247,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
12471247

12481248
// get records
12491249
{
1250+
WaitForDataRecords(client, shardIt);
1251+
12501252
auto res = client.GetRecords(shardIt).ExtractValueSync();
12511253
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
12521254
UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size());
@@ -1268,6 +1270,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
12681270
}
12691271
}
12701272

1273+
static void WaitForDataRecords(TDataStreamsClient& client, const TString& shardIt) {
1274+
int n = 0;
1275+
for (; n < 100; ++n) {
1276+
auto res = client.GetRecords(shardIt).ExtractValueSync();
1277+
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
1278+
if (res.GetResult().records().size()) {
1279+
break;
1280+
}
1281+
Sleep(TDuration::MilliSeconds(100));
1282+
}
1283+
UNIT_ASSERT_VALUES_UNEQUAL(n, 100);
1284+
}
1285+
12711286
static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
12721287
TTestYdsEnv env(tableDesc, streamDesc);
12731288

ydb/core/tx/schemeshard/ut_base/ut_base.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6343,6 +6343,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
63436343
"PartitionPerTablet: 10 "
63446344
"PQTabletConfig: {PartitionConfig { LifetimeSeconds : 10}}"
63456345
);
6346+
env.TestWaitNotification(runtime, txId);
63466347

63476348
TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/PQGroup_1", true),
63486349
{NLs::CheckPartCount("PQGroup_1", 100, 10, 10, 100),
@@ -6865,7 +6866,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
68656866
AsyncForceDropUnsafe(runtime, ++txId, pVer.PathId.LocalPathId);
68666867

68676868
TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted);
6868-
TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted);
6869+
TestModificationResults(runtime, txId-1, {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications});
68696870
TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted);
68706871

68716872
TActorId sender = runtime.AllocateEdgeActor();

ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ class TFixture : public NUnitTest::TBaseFixture {
146146

147147
void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params);
148148

149+
void WriteMessagesInTx(size_t big, size_t small);
150+
149151
const TDriver& GetDriver() const;
150152

151153
void CheckTabletKeys(const TString& topicName);
@@ -1595,21 +1597,22 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
15951597

15961598
for (size_t i = 0; i < params.OldHeadCount; ++i) {
15971599
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'));
1600+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
15981601
++oldHeadMsgCount;
15991602
}
16001603

16011604
for (size_t i = 0; i < params.BigBlobsCount; ++i) {
1602-
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx);
1605+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx);
1606+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
16031607
++bigBlobMsgCount;
16041608
}
16051609

16061610
for (size_t i = 0; i < params.NewHeadCount; ++i) {
16071611
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx);
1612+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
16081613
++newHeadMsgCount;
16091614
}
16101615

1611-
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
1612-
16131616
if (params.RestartMode == ERestartBeforeCommit) {
16141617
RestartPQTablet("topic_A", 0);
16151618
}
@@ -1638,7 +1641,7 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
16381641
start += oldHeadMsgCount;
16391642

16401643
for (size_t i = 0; i < bigBlobMsgCount; ++i) {
1641-
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000);
1644+
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'000'000);
16421645
}
16431646
start += bigBlobMsgCount;
16441647

@@ -1903,6 +1906,90 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
19031906
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
19041907
}
19051908

1909+
void TFixture::WriteMessagesInTx(size_t big, size_t small)
1910+
{
1911+
CreateTopic("topic_A", TEST_CONSUMER);
1912+
1913+
NTable::TSession tableSession = CreateTableSession();
1914+
NTable::TTransaction tx = BeginTx(tableSession);
1915+
1916+
for (size_t i = 0; i < big; ++i) {
1917+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0);
1918+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
1919+
}
1920+
1921+
for (size_t i = 0; i < small; ++i) {
1922+
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0);
1923+
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
1924+
}
1925+
1926+
CommitTx(tx, EStatus::SUCCESS);
1927+
}
1928+
1929+
Y_UNIT_TEST_F(WriteToTopic_Demo_29, TFixture)
1930+
{
1931+
WriteMessagesInTx(1, 0);
1932+
WriteMessagesInTx(1, 0);
1933+
}
1934+
1935+
Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture)
1936+
{
1937+
WriteMessagesInTx(1, 0);
1938+
WriteMessagesInTx(0, 1);
1939+
}
1940+
1941+
Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture)
1942+
{
1943+
WriteMessagesInTx(1, 0);
1944+
WriteMessagesInTx(1, 1);
1945+
}
1946+
1947+
Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture)
1948+
{
1949+
WriteMessagesInTx(0, 1);
1950+
WriteMessagesInTx(1, 0);
1951+
}
1952+
1953+
Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture)
1954+
{
1955+
WriteMessagesInTx(0, 1);
1956+
WriteMessagesInTx(0, 1);
1957+
}
1958+
1959+
Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture)
1960+
{
1961+
WriteMessagesInTx(0, 1);
1962+
WriteMessagesInTx(1, 1);
1963+
}
1964+
1965+
Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture)
1966+
{
1967+
WriteMessagesInTx(1, 1);
1968+
WriteMessagesInTx(1, 0);
1969+
}
1970+
1971+
Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture)
1972+
{
1973+
WriteMessagesInTx(1, 1);
1974+
WriteMessagesInTx(0, 1);
1975+
}
1976+
1977+
Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture)
1978+
{
1979+
WriteMessagesInTx(1, 1);
1980+
WriteMessagesInTx(1, 1);
1981+
}
1982+
1983+
1984+
Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
1985+
{
1986+
WriteMessagesInTx(2, 202);
1987+
WriteMessagesInTx(2, 200);
1988+
WriteMessagesInTx(0, 1);
1989+
WriteMessagesInTx(4, 0);
1990+
WriteMessagesInTx(0, 1);
1991+
}
1992+
19061993
}
19071994

19081995
}

0 commit comments

Comments
 (0)