Skip to content

Commit 751da8a

Browse files
authored
Optimize CPU usage when read blob (always use count limit) (#12349)
1 parent 817dc99 commit 751da8a

File tree

5 files changed

+99
-63
lines changed

5 files changed

+99
-63
lines changed

ydb/core/persqueue/blob.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,7 @@ void TBatch::UnpackToType1(TVector<TClientBlob> *blobs) const {
466466
ui32 sourceIdCount = 0;
467467
TVector<TString> sourceIds;
468468

469-
static NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
469+
static const NScheme::TTypeCodecs ui32Codecs(NScheme::NTypeIds::Uint32), ui64Codecs(NScheme::NTypeIds::Uint64), stringCodecs(NScheme::NTypeIds::String);
470470
//read order
471471
{
472472
auto chunk = NScheme::IChunkDecoder::ReadChunk(GetChunk(data, dataEnd), &ui32Codecs);

ydb/core/persqueue/partition_read.cpp

+11-7
Original file line numberDiff line numberDiff line change
@@ -425,15 +425,16 @@ TReadAnswer TReadInfo::FormAnswer(
425425
size -= lastBlobSize;
426426
}
427427
lastBlobSize = 0;
428-
return (size >= Size || cnt >= Count);
428+
return cnt >= Count;
429429
}
430-
return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && (size >= Size || cnt >= Count);
430+
// For backward compatibility, we keep the behavior for older clients for non-FirstClassCitizen
431+
return !AppData()->PQConfig.GetTopicsAreFirstClassCitizen() && cnt >= Count;
431432
};
432433

433434
Y_ABORT_UNLESS(blobs.size() == Blobs.size());
434435
response->Check();
435436
bool needStop = false;
436-
for (ui32 pos = 0; pos < blobs.size() && !needStop; ++pos) {
437+
for (ui32 pos = 0; pos < blobs.size() && !needStop && size < Size; ++pos) {
437438
Y_ABORT_UNLESS(Blobs[pos].Offset == blobs[pos].Offset, "Mismatch %" PRIu64 " vs %" PRIu64, Blobs[pos].Offset, blobs[pos].Offset);
438439
Y_ABORT_UNLESS(Blobs[pos].Count == blobs[pos].Count, "Mismatch %" PRIu32 " vs %" PRIu32, Blobs[pos].Count, blobs[pos].Count);
439440

@@ -470,7 +471,7 @@ TReadAnswer TReadInfo::FormAnswer(
470471
Y_ABORT_UNLESS(offset < Offset || partNo <= PartNo);
471472
TKey key(TKeyPrefix::TypeData, TPartitionId(0), offset, partNo, count, internalPartsCount, false);
472473
ui64 firstHeaderOffset = GetFirstHeaderOffset(key, blobValue);
473-
for (TBlobIterator it(key, blobValue); it.IsValid(); it.Next()) {
474+
for (TBlobIterator it(key, blobValue); it.IsValid() && !needStop; it.Next()) {
474475
TBatch batch = it.GetBatch();
475476
auto& header = batch.Header;
476477
batch.Unpack();
@@ -489,8 +490,8 @@ TReadAnswer TReadInfo::FormAnswer(
489490
continue;
490491

491492

492-
PQ_LOG_D("FormAnswer processing batch offset "
493-
<< (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount() << " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
493+
PQ_LOG_D("FormAnswer processing batch offset " << (offset - header.GetCount()) << " totakecount " << count << " count " << header.GetCount()
494+
<< " size " << header.GetPayloadSize() << " from pos " << pos << " cbcount " << batch.Blobs.size());
494495

495496
for (size_t i = pos; i < batch.Blobs.size(); ++i) {
496497
TClientBlob &res = batch.Blobs[i];
@@ -514,7 +515,10 @@ TReadAnswer TReadInfo::FormAnswer(
514515
++PartNo;
515516
}
516517

517-
needStop = updateUsage(res);
518+
if (updateUsage(res)) {
519+
needStop = true;
520+
break;
521+
}
518522
}
519523
}
520524
}

ydb/core/persqueue/ut/pq_ut.cpp

+19-39
Original file line numberDiff line numberDiff line change
@@ -1174,13 +1174,9 @@ Y_UNIT_TEST(TestWritePQBigMessage) {
11741174
CmdWrite(0, "sourceid0", data, tc, false, {}, true);
11751175
PQGetPartInfo(0, 27, tc);
11761176

1177-
Cerr << ">>>>> 1" << Endl << Flush;
1178-
CmdRead(0, 0, Max<i32>(), Max<i32>(), 13, false, tc);
1179-
Cerr << ">>>>> 2" << Endl << Flush;
1177+
CmdRead(0, 0, Max<i32>(), Max<i32>(), 1, false, tc);
11801178
CmdRead(0, 1, Max<i32>(), Max<i32>(), 25, false, tc);
1181-
Cerr << ">>>>> 3" << Endl << Flush;
11821179
CmdRead(0, 24, Max<i32>(), Max<i32>(), 2, false, tc);
1183-
Cerr << ">>>>> 4" << Endl << Flush;
11841180
CmdRead(0, 26, Max<i32>(), Max<i32>(), 1, false, tc);
11851181

11861182
activeZone = false;
@@ -1606,27 +1602,19 @@ Y_UNIT_TEST(TestPQRead) {
16061602
CmdRead(0, 26, Max<i32>(), Max<i32>(), 0, true, tc);
16071603

16081604
CmdRead(0, 0, Max<i32>(), Max<i32>(), 25, false, tc);
1609-
CmdRead(0, 0, 10, 100_MB, 15, false, tc);
1610-
CmdRead(0, 9, 1, 100_MB, 6, false, tc);
1605+
CmdRead(0, 0, 10, 100_MB, 10, false, tc);
1606+
CmdRead(0, 9, 1, 100_MB, 1, false, tc);
16111607
CmdRead(0, 23, 3, 100_MB, 3, false, tc);
16121608

1613-
Cerr << ">>>>> CmdRead 1" << Endl << Flush;
16141609
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
1615-
Cerr << ">>>>> CmdRead 2" << Endl << Flush;
16161610
CmdRead(0, 3, 1000, 511_KB, 4, false, tc);
1617-
Cerr << ">>>>> CmdRead 3" << Endl << Flush;
16181611
CmdRead(0, 3, 1000, 1_KB, 4, false, tc); //at least one message will be readed always
1619-
Cerr << ">>>>> CmdRead 4" << Endl << Flush;
16201612
CmdRead(0, 25, 1000, 1_KB, 1, false, tc); //at least one message will be readed always, from head
16211613

16221614
activeZone = true;
1623-
Cerr << ">>>>> CmdRead 5" << Endl << Flush;
16241615
CmdRead(0, 9, 1000, 3_MB, 6, false, tc);
1625-
Cerr << ">>>>> CmdRead 6" << Endl << Flush;
16261616
CmdRead(0, 9, 1000, 3_MB - 10_KB, 6, false, tc);
1627-
Cerr << ">>>>> CmdRead 7" << Endl << Flush;
16281617
CmdRead(0, 25, 1000, 512_KB, 1, false, tc); //from head
1629-
Cerr << ">>>>> CmdRead 8" << Endl << Flush;
16301618
CmdRead(0, 24, 1000, 512_KB, 1, false, tc); //from head
16311619

16321620
CmdRead(0, 23, 1000, 98_MB, 3, false, tc);
@@ -1715,16 +1703,16 @@ Y_UNIT_TEST(TestPQReadAhead) {
17151703
PQGetPartInfo(0, 22, tc);
17161704
activeZone = true;
17171705

1718-
Cerr << ">>>>> 1" << Endl << Flush;
1719-
CmdRead(0, 0, 1, 100_MB, 12, false, tc);
1720-
Cerr << ">>>>> 2" << Endl << Flush;
1721-
CmdRead(0, 1, 1, 100_MB, 11, false, tc);
1722-
Cerr << ">>>>> 3" << Endl << Flush;
1723-
CmdRead(0, 2, 1, 100_MB, 10, false, tc);
1724-
Cerr << ">>>>> 4" << Endl << Flush;
1725-
CmdRead(0, 3, 1, 100_MB, 9, false, tc);
1726-
Cerr << ">>>>> 5" << Endl << Flush;
1727-
CmdRead(0, 4, 10, 100_MB, 16, false, tc);
1706+
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
1707+
CmdRead(0, 1, 1, 100_MB, 1, false, tc);
1708+
CmdRead(0, 2, 1, 100_MB, 1, false, tc);
1709+
CmdRead(0, 3, 1, 100_MB, 1, false, tc);
1710+
CmdRead(0, 4, 10, 100_MB, 10, false, tc);
1711+
1712+
CmdRead(0, 0, Max<i32>(), 100_KB, 12, false, tc);
1713+
CmdRead(0, 1, Max<i32>(), 100_KB, 11, false, tc);
1714+
CmdRead(0, 2, Max<i32>(), 100_KB, 10, false, tc);
1715+
CmdRead(0, 3, Max<i32>(), 100_KB, 9, false, tc);
17281716
});
17291717
}
17301718

@@ -2040,7 +2028,7 @@ Y_UNIT_TEST(TestPQCacheSizeManagement) {
20402028

20412029
TAutoPtr<IEventHandle> handle;
20422030
for (ui32 i = 0; i < 10; ++i) {
2043-
CmdRead(0, 0, 1, 100_MB, 7, false, tc);
2031+
CmdRead(0, 0, 1, 100_MB, 1, false, tc);
20442032
PQTabletRestart(tc);
20452033
}
20462034
});
@@ -2091,28 +2079,20 @@ Y_UNIT_TEST(TestMaxTimeLagRewind) {
20912079
}
20922080
const auto ts = tc.Runtime->GetCurrentTime();
20932081

2094-
Cerr << ">>>>> 1" << Endl << Flush;
2095-
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {0});
2096-
Cerr << ">>>>> 2" << Endl << Flush;
2097-
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
2098-
Cerr << ">>>>> 3" << Endl << Flush;
2099-
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
2100-
Cerr << ">>>>> 4" << Endl << Flush;
2082+
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {0});
2083+
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, TDuration::Minutes(3).MilliSeconds());
2084+
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, TDuration::Minutes(3).MilliSeconds());
21012085
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 1000);
21022086

2103-
Cerr << ">>>>> 5" << Endl << Flush;
2104-
CmdRead(0, 0, 1, Max<i32>(), 7, false, tc, {21}, 0,
2087+
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {21}, 0,
21052088
(ts - TDuration::Minutes(3)).MilliSeconds());
2106-
Cerr << ">>>>> 6" << Endl << Flush;
2107-
CmdRead(0, 22, 1, Max<i32>(), 6, false, tc, {22}, 0,
2089+
CmdRead(0, 22, 1, Max<i32>(), 1, false, tc, {22}, 0,
21082090
(ts - TDuration::Minutes(3)).MilliSeconds());
2109-
Cerr << ">>>>> 7" << Endl << Flush;
21102091
CmdRead(0, 4, 1, Max<i32>(), 1, false, tc, {34}, 0,
21112092
(ts - TDuration::Seconds(1)).MilliSeconds());
21122093

21132094
PQTabletPrepare({.readFromTimestampsMs=(ts - TDuration::Seconds(1)).MilliSeconds()},
21142095
{{"aaa", true}}, tc);
2115-
Cerr << ">>>>> 8" << Endl << Flush;
21162096
CmdRead(0, 0, 1, Max<i32>(), 1, false, tc, {34});
21172097

21182098
});

ydb/services/datastreams/datastreams_ut.cpp

+60
Original file line numberDiff line numberDiff line change
@@ -2353,6 +2353,66 @@ Y_UNIT_TEST_SUITE(DataStreams) {
23532353

23542354
}
23552355

2356+
Y_UNIT_TEST(TestGetRecordsWithCount) {
2357+
TInsecureDatastreamsTestServer testServer;
2358+
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;
2359+
{
2360+
auto result = testServer.DataStreamsClient->CreateStream(streamName,
2361+
NYDS_V1::TCreateStreamSettings().ShardCount(1)).ExtractValueSync();
2362+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2363+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2364+
}
2365+
2366+
const ui32 recordsCount = 16;
2367+
std::vector<ui64> timestamps;
2368+
{
2369+
std::string data;
2370+
data.resize(1_MB); // big messages. compaction must will be completed.
2371+
std::iota(data.begin(), data.end(), 1);
2372+
std::random_device rd;
2373+
std::mt19937 generator{rd()};
2374+
2375+
for (ui32 i = 1; i <= recordsCount; ++i) {
2376+
std::shuffle(data.begin(), data.end(), generator);
2377+
{
2378+
TString id = Sprintf("%04u", i);
2379+
NYDS_V1::TDataRecord dataRecord{{data.begin(), data.end()}, id, ""};
2380+
//
2381+
// we make sure that the value of WriteTimestampMs is between neighboring timestamps
2382+
//
2383+
timestamps.push_back(TInstant::Now().MilliSeconds());
2384+
Sleep(TDuration::MilliSeconds(500));
2385+
auto result = testServer.DataStreamsClient->PutRecord(streamName, dataRecord).ExtractValueSync();
2386+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2387+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2388+
}
2389+
Sleep(TDuration::MilliSeconds(500));
2390+
}
2391+
}
2392+
2393+
for (ui32 i = 0; i < recordsCount; ++i) {
2394+
TString shardIterator;
2395+
2396+
{
2397+
auto result = testServer.DataStreamsClient->GetShardIterator(streamName, "shard-000000",
2398+
YDS_V1::ShardIteratorType::AT_TIMESTAMP,
2399+
NYDS_V1::TGetShardIteratorSettings().Timestamp(timestamps[i])).ExtractValueSync();
2400+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2401+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2402+
shardIterator = result.GetResult().shard_iterator();
2403+
}
2404+
2405+
{
2406+
auto result = testServer.DataStreamsClient->GetRecords(shardIterator,
2407+
NYDS_V1::TGetRecordsSettings().Limit(1)).ExtractValueSync();
2408+
UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
2409+
UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString());
2410+
UNIT_ASSERT_VALUES_EQUAL(result.GetResult().records().size(), 1);
2411+
UNIT_ASSERT_VALUES_EQUAL(std::stoi(result.GetResult().records().begin()->sequence_number()), i);
2412+
}
2413+
}
2414+
}
2415+
23562416
Y_UNIT_TEST(TestGetRecordsStreamWithMultipleShards) {
23572417
TInsecureDatastreamsTestServer testServer;
23582418
const TString streamName = TStringBuilder() << "stream_" << Y_UNIT_TEST_NAME;

ydb/services/persqueue_v1/persqueue_ut.cpp

+8-16
Original file line numberDiff line numberDiff line change
@@ -1161,14 +1161,12 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
11611161
ui32 totalMsg = 0;
11621162
ui64 nextReadId = 1;
11631163
Sleep(TDuration::Seconds(3));
1164-
Cerr << ">>>>> 1" << Endl << Flush;
11651164
setup.DoWrite(pqClient->GetDriver(), "acc/topic1", 1_MB, 50);
11661165

11671166
Cerr << "First read\n";
1168-
setup.DoRead(assignId, nextReadId, totalMsg, 40);
1167+
setup.DoRead(assignId, nextReadId, totalMsg, 43);
11691168
setup.DoRead(assignId, nextReadId, totalMsg, 42);
11701169

1171-
Cerr << ">>>>> 3" << Endl << Flush;
11721170
Topic::StreamReadMessage::FromClient req;
11731171
req.mutable_read_request()->set_bytes_size(40_MB);
11741172
if (!setup.ControlStream->Write(req)) {
@@ -1177,14 +1175,11 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
11771175
Cerr << "Second read\n";
11781176
setup.DoRead(assignId, nextReadId, totalMsg, 50);
11791177

1180-
Cerr << ">>>>> 5" << Endl << Flush;
11811178
Sleep(TDuration::Seconds(1));
1182-
Cerr << ">>>>> 6" << Endl << Flush;
11831179
auto cachedData = RequestCacheData(runtime, new TEvPQ::TEvGetFullDirectReadData());
11841180
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.size(), 1);
11851181
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.StagedReads.size(), 0);
11861182
UNIT_ASSERT_VALUES_EQUAL(cachedData->Data.begin()->second.Reads.size(), 0);
1187-
Cerr << ">>>>> 7" << Endl << Flush;
11881183
}
11891184

11901185
Y_UNIT_TEST(DirectReadBadCases) {
@@ -2822,7 +2817,7 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
28222817
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", i}, value);
28232818

28242819
Cerr << ">>>>> 1" << Endl << Flush;
2825-
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
2820+
auto info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
28262821
Cerr << ">>>>> 2" << Endl << Flush;
28272822
auto info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
28282823

@@ -2834,9 +2829,9 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
28342829
server.AnnoyingClient->WriteToPQ({DEFAULT_TOPIC_NAME, 0, "source1", 32+i}, value);
28352830

28362831
Cerr << ">>>>> 3" << Endl << Flush;
2837-
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 23);
2832+
info0 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 16, "user"}, 16);
28382833
Cerr << ">>>>> 4" << Endl << Flush;
2839-
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 22);
2834+
info16 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 16, 16, "user"}, 16);
28402835

28412836
ui32 fromDisk = info0.BlobsFromDisk + info16.BlobsFromDisk;
28422837
ui32 fromCache = info0.BlobsFromCache + info16.BlobsFromCache;
@@ -2896,16 +2891,13 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
28962891
server.AnnoyingClient->WriteToPQ({secondTopic, 0, "source1", i}, mb);
28972892
}
28982893

2899-
Cerr << ">>>>> 1" << Endl << Flush;
2900-
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 7);
2901-
Cerr << ">>>>> 2" << Endl << Flush;
2902-
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 7);
2903-
Cerr << ">>>>> 3" << Endl << Flush;
2894+
auto info1 = server.AnnoyingClient->ReadFromPQ({DEFAULT_TOPIC_NAME, 0, 0, 1, "user1"}, 1);
2895+
auto info2 = server.AnnoyingClient->ReadFromPQ({secondTopic, 0, 0, 1, "user1"}, 1);
29042896

29052897
UNIT_ASSERT_VALUES_EQUAL(info1.BlobsFromCache, 1);
29062898
UNIT_ASSERT_VALUES_EQUAL(info2.BlobsFromCache, 1);
2907-
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 7);
2908-
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 7);
2899+
UNIT_ASSERT_VALUES_EQUAL(info1.Values.size(), 1);
2900+
UNIT_ASSERT_VALUES_EQUAL(info2.Values.size(), 1);
29092901
UNIT_ASSERT_VALUES_EQUAL(info1.Values[0].size(), valueSize);
29102902
UNIT_ASSERT_VALUES_EQUAL(info2.Values[0].size(), valueSize);
29112903
UNIT_ASSERT(info1.Values[0] == value1);

0 commit comments

Comments
 (0)