Skip to content

Commit 1c102f7

Browse files
Merge ab19a34 into ec02547
2 parents ec02547 + ab19a34 commit 1c102f7

File tree

5 files changed

+44
-45
lines changed

5 files changed

+44
-45
lines changed

ydb/services/deprecated/persqueue_v0/grpc_pq_actor.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -726,7 +726,7 @@ class TReadSessionActor : public TActorBootstrapped<TReadSessionActor> {
726726
static ui32 NormalizeMaxReadSize(ui32 sourceValue);
727727
static ui32 NormalizeMaxReadPartitionsCount(ui32 sourceValue);
728728

729-
static bool RemoveEmptyMessages(NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are nonempty messages
729+
static bool HasMessages(const NPersQueue::TReadResponse::TBatchedData& data); // returns true if there are any messages
730730

731731
private:
732732
IReadSessionHandlerRef Handler;

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

+9-20
Original file line numberDiff line numberDiff line change
@@ -1525,7 +1525,7 @@ bool TReadSessionActor::ProcessAnswer(const TActorContext& ctx, TFormedReadRespo
15251525

15261526
Y_ABORT_UNLESS(formedResponse->RequestsInfly == 0);
15271527
i64 diff = formedResponse->Response.ByteSize();
1528-
const bool hasMessages = RemoveEmptyMessages(*formedResponse->Response.MutableBatchedData());
1528+
const bool hasMessages = HasMessages(formedResponse->Response.GetBatchedData());
15291529
if (hasMessages) {
15301530
LOG_DEBUG_S(ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " assign read id " << ReadIdToResponse << " to read request " << formedResponse->Guid);
15311531
formedResponse->Response.MutableBatchedData()->SetCookie(ReadIdToResponse);
@@ -1758,26 +1758,15 @@ void TReadSessionActor::HandleWakeup(const TActorContext& ctx) {
17581758
}
17591759
}
17601760

1761-
bool TReadSessionActor::RemoveEmptyMessages(TReadResponse::TBatchedData& data) {
1762-
bool hasNonEmptyMessages = false;
1763-
auto isMessageEmpty = [&](TReadResponse::TBatchedData::TMessageData& message) -> bool {
1764-
if (message.GetData().empty()) {
1765-
return true;
1766-
} else {
1767-
hasNonEmptyMessages = true;
1768-
return false;
1761+
bool TReadSessionActor::HasMessages(const TReadResponse::TBatchedData& data) {
1762+
for (const auto& partData : data.GetPartitionData()) {
1763+
for (const auto& batch : partData.GetBatch()) {
1764+
if (batch.MessageDataSize() > 0) {
1765+
return true;
1766+
}
17691767
}
1770-
};
1771-
auto batchRemover = [&](TReadResponse::TBatchedData::TBatch& batch) -> bool {
1772-
NProtoBuf::RemoveRepeatedFieldItemIf(batch.MutableMessageData(), isMessageEmpty);
1773-
return batch.MessageDataSize() == 0;
1774-
};
1775-
auto partitionDataRemover = [&](TReadResponse::TBatchedData::TPartitionData& partition) -> bool {
1776-
NProtoBuf::RemoveRepeatedFieldItemIf(partition.MutableBatch(), batchRemover);
1777-
return partition.BatchSize() == 0;
1778-
};
1779-
NProtoBuf::RemoveRepeatedFieldItemIf(data.MutablePartitionData(), partitionDataRemover);
1780-
return hasNonEmptyMessages;
1768+
}
1769+
return false;
17811770
}
17821771

17831772

ydb/services/persqueue_v1/actors/helpers.cpp

+30-20
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,39 @@
44

55
namespace NKikimr::NGRpcProxy::V1 {
66

7-
bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) {
8-
auto batchRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::Batch& batch) -> bool {
9-
return batch.message_data_size() == 0;
10-
};
11-
auto partitionDataRemover = [&](PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch::PartitionData& partition) -> bool {
12-
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
13-
return partition.batches_size() == 0;
14-
};
15-
NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover);
16-
return !data.partition_data().empty();
7+
/*
8+
for (const auto& partData : data.GetPartitionData()) {
9+
for (const auto& batch : partData.GetBatch()) {
10+
if (batch.MessageDataSize() > 0) {
11+
return true;
12+
}
13+
}
14+
}
15+
return false;
16+
17+
*/
18+
19+
bool HasEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data) {
20+
for (const auto& partData : data.partition_data()) {
21+
for (const auto& batch : partData.batches()) {
22+
if (batch.message_data_size() > 0) {
23+
return true;
24+
}
25+
}
26+
}
27+
return false;
1728
}
1829

1930
// TODO: remove after refactor
20-
bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data) {
21-
auto batchRemover = [&](Topic::StreamReadMessage::ReadResponse::Batch& batch) -> bool {
22-
return batch.message_data_size() == 0;
23-
};
24-
auto partitionDataRemover = [&](Topic::StreamReadMessage::ReadResponse::PartitionData& partition) -> bool {
25-
NProtoBuf::RemoveRepeatedFieldItemIf(partition.mutable_batches(), batchRemover);
26-
return partition.batches_size() == 0;
27-
};
28-
NProtoBuf::RemoveRepeatedFieldItemIf(data.mutable_partition_data(), partitionDataRemover);
29-
return !data.partition_data().empty();
31+
bool HasMessages(Topic::StreamReadMessage::ReadResponse& data) {
32+
for (const auto& partData : data.partition_data()) {
33+
for (const auto& batch : partData.batches()) {
34+
if (batch.message_data_size() > 0) {
35+
return true;
36+
}
37+
}
38+
}
39+
return false;
3040
}
3141

3242
}

ydb/services/persqueue_v1/actors/helpers.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ static constexpr ui64 READ_BLOCK_SIZE = 8_KB; // metering
1313

1414
using namespace Ydb;
1515

16-
bool RemoveEmptyMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data);
16+
bool HasMessages(PersQueue::V1::MigrationStreamingReadServerMessage::DataBatch& data);
1717

18-
bool RemoveEmptyMessages(Topic::StreamReadMessage::ReadResponse& data);
18+
bool HasMessages(Topic::StreamReadMessage::ReadResponse& data);
1919

2020
}

ydb/services/persqueue_v1/actors/read_session_actor.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -1913,9 +1913,9 @@ ui64 TReadSessionActor<UseMigrationProtocol>::PrepareResponse(typename TFormedRe
19131913
formedResponse->ByteSizeBeforeFiltering = formedResponse->Response.ByteSize();
19141914

19151915
if constexpr (UseMigrationProtocol) {
1916-
formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_data_batch());
1916+
formedResponse->HasMessages = HasMessages(*formedResponse->Response.mutable_data_batch());
19171917
} else {
1918-
formedResponse->HasMessages = RemoveEmptyMessages(*formedResponse->Response.mutable_read_response());
1918+
formedResponse->HasMessages = HasMessages(*formedResponse->Response.mutable_read_response());
19191919
}
19201920

19211921
return formedResponse->HasMessages ? formedResponse->Response.ByteSize() : 0;

0 commit comments

Comments
 (0)