Skip to content

Commit 9d9b5d6

Browse files
authored
Replace codec to raw if not specified to 24 3 (#10382)
1 parent f6729a6 commit 9d9b5d6

File tree

4 files changed

+15
-1
lines changed

4 files changed

+15
-1
lines changed

ydb/core/persqueue/dread_cache_service/caching_service.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,10 @@ class TPQDirectReadCacheService : public TActorBootstrapped<TPQDirectReadCacheSe
475475
continue; //TODO - no such chunks must be on prod
476476
}
477477

478+
if (!proto.has_codec()) {
479+
proto.set_codec(NPersQueueCommon::RAW);
480+
}
481+
478482
TString sourceId;
479483
if (!r.GetSourceId().empty()) {
480484
sourceId = NPQ::NSourceIdEncoding::Decode(r.GetSourceId());

ydb/services/deprecated/persqueue_v0/grpc_pq_read_actor.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -2128,6 +2128,11 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
21282128
if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
21292129
continue; //TODO - no such chunks must be on prod
21302130
}
2131+
2132+
if (!proto.has_codec()) {
2133+
proto.set_codec(NPersQueueCommon::RAW);
2134+
}
2135+
21312136
TString sourceId = "";
21322137
if (!r.GetSourceId().empty()) {
21332138
if (!NPQ::NSourceIdEncoding::IsValidEncoded(r.GetSourceId())) {

ydb/services/persqueue_v1/actors/partition_actor.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,11 @@ bool FillBatchedData(
419419
hasOffset = true;
420420

421421
auto proto(GetDeserializedData(r.GetData()));
422+
423+
if (!proto.has_codec()) {
424+
proto.set_codec(NPersQueueCommon::RAW);
425+
}
426+
422427
if (proto.GetChunkType() != NKikimrPQClient::TDataChunk::REGULAR) {
423428
continue; //TODO - no such chunks must be on prod
424429
}

ydb/services/persqueue_v1/actors/read_session_actor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1773,6 +1773,7 @@ i64 TFormedReadResponse<TServerMessage>::ApplyResponse(TServerMessage&& resp) {
17731773
return ByteSize - prev;
17741774
}
17751775

1776+
17761777
template <typename TServerMessage>
17771778
i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEvDirectReadResponse::TPtr& ev) {
17781779

@@ -1789,7 +1790,6 @@ i64 TFormedReadResponse<TServerMessage>::ApplyDirectReadResponse(TEvPQProxy::TEv
17891790
return diff;
17901791
}
17911792

1792-
17931793
template <bool UseMigrationProtocol>
17941794
void TReadSessionActor<UseMigrationProtocol>::Handle(typename TEvReadResponse::TPtr& ev, const TActorContext& ctx) {
17951795
if (!ActualPartitionActors.contains(ev->Sender)) {

0 commit comments

Comments
 (0)