Skip to content

Default values for feature flags EnablePQConfigTransactionsAtSchemeshard and EnableTopicServiceTx #8845

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
28 changes: 17 additions & 11 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ TString TPartition::LogPrefix() const {
} else {
state = "Unknown";
}
return TStringBuilder() << "[PQ: " << TabletID << ", Partition:" << Partition << ", State:" << state << "] ";
return TStringBuilder() << "[PQ: " << TabletID << ", Partition: " << Partition << ", State: " << state << "] ";
}

bool TPartition::IsActive() const {
Expand Down Expand Up @@ -2134,6 +2134,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)

void TPartition::CommitWriteOperations(TTransaction& t)
{
PQ_LOG_D("TPartition::CommitWriteOperations TxId: " << t.GetTxId());

Y_ABORT_UNLESS(PersistRequest);
Y_ABORT_UNLESS(!PartitionedBlob.IsInited());

Expand All @@ -2151,6 +2153,10 @@ void TPartition::CommitWriteOperations(TTransaction& t)
HaveWriteMsg = true;
}

PQ_LOG_D("t.WriteInfo->BodyKeys.size=" << t.WriteInfo->BodyKeys.size() <<
", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size());
PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead);

if (!t.WriteInfo->BodyKeys.empty()) {
PartitionedBlob = TPartitionedBlob(Partition,
NewHead.Offset,
Expand All @@ -2165,6 +2171,7 @@ void TPartition::CommitWriteOperations(TTransaction& t)
MaxBlobSize);

for (auto& k : t.WriteInfo->BodyKeys) {
PQ_LOG_D("add key " << k.Key.ToString());
auto write = PartitionedBlob.Add(k.Key, k.Size);
if (write && !write->Value.empty()) {
AddCmdWrite(write, PersistRequest.Get(), ctx);
Expand All @@ -2173,18 +2180,17 @@ void TPartition::CommitWriteOperations(TTransaction& t)
}
}

}

if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
RenameFormedBlobs(formedBlobs,
*Parameters,
curWrites,
PersistRequest.Get(),
ctx);
}
PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size());
if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) {
ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get());
RenameFormedBlobs(formedBlobs,
*Parameters,
curWrites,
PersistRequest.Get(),
ctx);
}

if (!t.WriteInfo->BodyKeys.empty()) {
const auto& last = t.WriteInfo->BodyKeys.back();

NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount());
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/persqueue/partition_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <util/system/types.h>
#include <util/digest/multi.h>
#include <util/str_stl.h>
#include <util/string/builder.h>

#include <functional>

Expand Down Expand Up @@ -51,6 +52,13 @@ class TPartitionId {
}
}

TString ToString() const
{
TStringBuilder s;
s << *this;
return s;
}

bool IsSupportivePartition() const
{
return WriteId.Defined();
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,16 +1064,16 @@ void TPartition::RenameFormedBlobs(const std::deque<TPartitionedBlob::TRenameFor
}
if (!DataKeysBody.empty() && CompactedKeys.empty()) {
Y_ABORT_UNLESS(DataKeysBody.back().Key.GetOffset() + DataKeysBody.back().Key.GetCount() <= x.NewKey.GetOffset(),
"PQ: %" PRIu64 ", Partition: %s, "
"LAST KEY %s, HeadOffset %lu, NEWKEY %s",
TabletID, Partition.ToString().c_str(),
DataKeysBody.back().Key.ToString().c_str(),
Head.Offset,
x.NewKey.ToString().c_str());
}
LOG_DEBUG_S(
ctx, NKikimrServices::PERSQUEUE,
"writing blob: topic '" << TopicName() << "' partition " << Partition
<< " " << x.OldKey.ToString() << " size " << x.Size << " WTime " << ctx.Now().MilliSeconds()
);
PQ_LOG_D("writing blob: topic '" << TopicName() << "' partition " << Partition <<
" old key " << x.OldKey.ToString() << " new key " << x.NewKey.ToString() <<
" size " << x.Size << " WTime " << ctx.Now().MilliSeconds());

CompactedKeys.emplace_back(x.NewKey, x.Size);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4667,6 +4667,8 @@ void TPersQueue::TryStartTransaction(const TActorContext& ctx)
Y_ABORT_UNLESS(next);

CheckTxState(ctx, *next);

TryWriteTxs(ctx);
}

void TPersQueue::OnInitComplete(const TActorContext& ctx)
Expand Down
15 changes: 15 additions & 0 deletions ydb/core/tx/datashard/datashard_ut_change_exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,8 @@ Y_UNIT_TEST_SUITE(Cdc) {

// get records
{
WaitForDataRecords(client, shardIt);

auto res = client.GetRecords(shardIt).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
UNIT_ASSERT_VALUES_EQUAL(res.GetResult().records().size(), records.size());
Expand All @@ -1267,6 +1269,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
}
}

static void WaitForDataRecords(TDataStreamsClient& client, const TString& shardIt) {
int n = 0;
for (; n < 100; ++n) {
auto res = client.GetRecords(shardIt).ExtractValueSync();
UNIT_ASSERT_C(res.IsSuccess(), res.GetIssues().ToString());
if (res.GetResult().records().size()) {
break;
}
Sleep(TDuration::MilliSeconds(100));
}
UNIT_ASSERT_VALUES_UNEQUAL(n, 100);
}

static void Write(const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
TTestYdsEnv env(tableDesc, streamDesc);

Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/ut_base/ut_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6343,6 +6343,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
"PartitionPerTablet: 10 "
"PQTabletConfig: {PartitionConfig { LifetimeSeconds : 10}}"
);
env.TestWaitNotification(runtime, txId);

TestDescribeResult(DescribePath(runtime, "/MyRoot/DirA/PQGroup_1", true),
{NLs::CheckPartCount("PQGroup_1", 100, 10, 10, 100),
Expand Down Expand Up @@ -6865,7 +6866,7 @@ Y_UNIT_TEST_SUITE(TSchemeShardTest) {
AsyncForceDropUnsafe(runtime, ++txId, pVer.PathId.LocalPathId);

TestModificationResult(runtime, txId-2, NKikimrScheme::StatusAccepted);
TestModificationResult(runtime, txId-1, NKikimrScheme::StatusAccepted);
TestModificationResults(runtime, txId-1, {NKikimrScheme::StatusAccepted, NKikimrScheme::StatusMultipleModifications});
TestModificationResult(runtime, txId, NKikimrScheme::StatusAccepted);

TActorId sender = runtime.AllocateEdgeActor();
Expand Down
95 changes: 91 additions & 4 deletions ydb/public/sdk/cpp/client/ydb_topic/ut/topic_to_table_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class TFixture : public NUnitTest::TBaseFixture {

void TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params);

void WriteMessagesInTx(size_t big, size_t small);

const TDriver& GetDriver() const;

void CheckTabletKeys(const TString& topicName);
Expand Down Expand Up @@ -1595,21 +1597,22 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)

for (size_t i = 0; i < params.OldHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'));
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++oldHeadMsgCount;
}

for (size_t i = 0; i < params.BigBlobsCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'900'000, 'x'), &tx);
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++bigBlobMsgCount;
}

for (size_t i = 0; i < params.NewHeadCount; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(100'000, 'x'), &tx);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
++newHeadMsgCount;
}

WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);

if (params.RestartMode == ERestartBeforeCommit) {
RestartPQTablet("topic_A", 0);
}
Expand Down Expand Up @@ -1638,7 +1641,7 @@ void TFixture::TestTxWithBigBlobs(const TTestTxWithBigBlobsParams& params)
start += oldHeadMsgCount;

for (size_t i = 0; i < bigBlobMsgCount; ++i) {
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'900'000);
UNIT_ASSERT_VALUES_EQUAL(messages[start + i].size(), 7'000'000);
}
start += bigBlobMsgCount;

Expand Down Expand Up @@ -1903,6 +1906,90 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_28, TFixture)
UNIT_ASSERT_VALUES_EQUAL(messages.size(), 2);
}

void TFixture::WriteMessagesInTx(size_t big, size_t small)
{
CreateTopic("topic_A", TEST_CONSUMER);

NTable::TSession tableSession = CreateTableSession();
NTable::TTransaction tx = BeginTx(tableSession);

for (size_t i = 0; i < big; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(7'000'000, 'x'), &tx, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
}

for (size_t i = 0; i < small; ++i) {
WriteToTopic("topic_A", TEST_MESSAGE_GROUP_ID, TString(16'384, 'x'), &tx, 0);
WaitForAcks("topic_A", TEST_MESSAGE_GROUP_ID);
}

CommitTx(tx, EStatus::SUCCESS);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_29, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_30, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_31, TFixture)
{
WriteMessagesInTx(1, 0);
WriteMessagesInTx(1, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_32, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_33, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_34, TFixture)
{
WriteMessagesInTx(0, 1);
WriteMessagesInTx(1, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_35, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(1, 0);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_36, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(0, 1);
}

Y_UNIT_TEST_F(WriteToTopic_Demo_37, TFixture)
{
WriteMessagesInTx(1, 1);
WriteMessagesInTx(1, 1);
}


Y_UNIT_TEST_F(WriteToTopic_Demo_38, TFixture)
{
WriteMessagesInTx(2, 202);
WriteMessagesInTx(2, 200);
WriteMessagesInTx(0, 1);
WriteMessagesInTx(4, 0);
WriteMessagesInTx(0, 1);
}

}

}
Loading