Skip to content

Commit 7b0e72f

Browse files
authored
Merge 8de431e into 56e8de9
2 parents 56e8de9 + 8de431e commit 7b0e72f

File tree

5 files changed

+20
-3
lines changed

5 files changed

+20
-3
lines changed

ydb/core/persqueue/dread_cache_service/caching_service.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,10 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
475475
continue; //TODO - no such chunks must be on prod
476476
}
477477

478+
if (!proto.has_codec()) {
479+
proto.set_codec(NPersQueueCommon::RAW);
480+
}
481+
478482
TString sourceId;
479483
if (!r.GetSourceId().empty()) {
480484
sourceId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId());

ydb/services/datastreams/datastreams_proxy.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1669,9 +1669,12 @@ namespace NKikimr::NDataStreams::V1 {
16691669
record->set_approximate_arrival_timestamp(r.GetCreateTimestampMS());
16701670
record->set_partition_key(r.GetPartitionKey());
16711671
record->set_sequence_number(std::to_string(r.GetOffset()).c_str());
1672-
if (proto.GetCodec() > 0) {
1673-
record->set_codec(proto.GetCodec() + 1);
1672+
1673+
if (!proto.has_codec()) {
1674+
proto.set_codec(NPersQueueCommon::RAW);
16741675
}
1676+
1677+
record->set_codec(proto.GetCodec() + 1);
16751678
}
16761679
if (!results.empty()) {
16771680
auto last = results.rbegin();

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2126,6 +2126,11 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
21262126
if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
21272127
continue; //TODO - no such chunks must be on prod
21282128
}
2129+
2130+
if (!proto.has_codec()) {
2131+
proto.set_codec(NPersQueueCommon::RAW);
2132+
}
2133+
21292134
TString sourceId = "";
21302135
if (!r.GetSourceId().empty()) {
21312136
if (!NPQ::NSourceIdEncoding::IsValidEncoded(r.GetSourceId())) {

ydb/services/persqueue_v1/actors/partition_actor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,11 @@ bool FillBatchedData(
419419
hasOffset = true;
420420

421421
auto proto(GetDeserializedData(r.GetData()));
422+
423+
if (!proto.has_codec()) {
424+
proto.set_codec(NPersQueueCommon::RAW);
425+
}
426+
422427
if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
423428
continue; //TODO - no such chunks must be on prod
424429
}

ydb/services/persqueue_v1/actors/read_session_actor.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1739,6 +1739,7 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
17391739
return ByteSize - prev;
17401740
}
17411741

1742+
17421743
template <typename TServerMessage>
17431744
i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) {
17441745

@@ -1755,7 +1756,6 @@ i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEv
17551756
return diff;
17561757
}
17571758

1758-
17591759
template <bool UseMigrationProtocol>
17601760
void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx) {
17611761
if (!ActualPartitionActors.contains(ev->Sender)) {

0 commit comments

Comments
 (0)