Skip to content

Commit 4a7936a

Browse files
provide codecs via singleton map (#3070)
1 parent a76df4a commit 4a7936a

File tree

6 files changed

+69
-14
lines changed

6 files changed

+69
-14
lines changed

ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/persqueue.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@
1111

1212
namespace NYdb::NPersQueue {
1313

14+
class TCommonCodecsProvider {
15+
public:
16+
TCommonCodecsProvider() {
17+
NYdb::NTopic::TCodecMap::GetTheCodecMap().Set((ui32)NYdb::NPersQueue::ECodec::GZIP, MakeHolder<NYdb::NTopic::TGzipCodec>());
18+
NYdb::NTopic::TCodecMap::GetTheCodecMap().Set((ui32)NYdb::NPersQueue::ECodec::ZSTD, MakeHolder<NYdb::NTopic::TZstdCodec>());
19+
}
20+
};
21+
TCommonCodecsProvider COMMON_CODECS_PROVIDER;
22+
1423
const TVector<ECodec>& GetDefaultCodecs() {
1524
static const TVector<ECodec> codecs = {};
1625
return codecs;

ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/read_session.ipp

+7-11
Original file line numberDiff line numberDiff line change
@@ -2569,23 +2569,19 @@ void TDataDecompressionInfo<UseMigrationProtocol>::TDecompressionTask::operator(
25692569
&& data.codec() != Ydb::PersQueue::V1::CODEC_RAW
25702570
&& data.codec() != Ydb::PersQueue::V1::CODEC_UNSPECIFIED
25712571
) {
2572-
if (auto session = Parent->CbContext->LockShared()) {
2573-
const NYdb::NTopic::ICodec* codecImpl = session->GetCodecImplOrThrow(static_cast<ECodec>(data.codec()));
2574-
TString decompressed = codecImpl->Decompress(data.data());
2575-
data.set_data(decompressed);
2576-
data.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
2577-
}
2572+
const NYdb::NTopic::ICodec* codecImpl = NYdb::NTopic::TCodecMap::GetTheCodecMap().GetOrThrow(static_cast<ui32>(data.codec()));
2573+
TString decompressed = codecImpl->Decompress(data.data());
2574+
data.set_data(decompressed);
2575+
data.set_codec(Ydb::PersQueue::V1::CODEC_RAW);
25782576
}
25792577
} else {
25802578
if (Parent->DoDecompress
25812579
&& static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_RAW
25822580
&& static_cast<Ydb::Topic::Codec>(batch.codec()) != Ydb::Topic::CODEC_UNSPECIFIED
25832581
) {
2584-
if (auto session = Parent->CbContext->LockShared()) {
2585-
const NYdb::NTopic::ICodec* codecImpl = session->GetCodecImplOrThrow(static_cast<NTopic::ECodec>(batch.codec()));
2586-
TString decompressed = codecImpl->Decompress(data.data());
2587-
data.set_data(decompressed);
2588-
}
2582+
const NYdb::NTopic::ICodec* codecImpl = NYdb::NTopic::TCodecMap::GetTheCodecMap().GetOrThrow(static_cast<ui32>(batch.codec()));
2583+
TString decompressed = codecImpl->Decompress(data.data());
2584+
data.set_data(decompressed);
25892585
}
25902586
}
25912587

ydb/public/sdk/cpp/client/ydb_persqueue_core/impl/write_session_impl.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -892,7 +892,8 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) {
892892

893893
TBuffer CompressBuffer(std::shared_ptr<TPersQueueClient::TImpl> client, TVector<TStringBuf>& data, ECodec codec, i32 level) {
894894
TBuffer result;
895-
THolder<IOutputStream> coder = client->GetCodecImplOrThrow(codec)->CreateCoder(result, level);
895+
Y_UNUSED(client);
896+
THolder<IOutputStream> coder = NYdb::NTopic::TCodecMap::GetTheCodecMap().GetOrThrow((ui32)codec)->CreateCoder(result, level);
896897
for (auto& buffer : data) {
897898
coder->Write(buffer.data(), buffer.size());
898899
}

ydb/public/sdk/cpp/client/ydb_topic/codecs/codecs.h

+40-1
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,9 @@
88
#include "util/stream/str.h"
99
#include <util/stream/zlib.h>
1010
#include <util/stream/output.h>
11+
#include <util/system/spinlock.h>
1112

12-
#define CODECS_ALREADY_DEFINED
13+
#include <unordered_map>
1314

1415
namespace NYdb::NTopic {
1516

@@ -78,4 +79,42 @@ class TUnsupportedCodec final : public ICodec {
7879
}
7980
};
8081

82+
class TCodecMap {
83+
public:
84+
static TCodecMap& GetTheCodecMap() {
85+
static TCodecMap instance;
86+
return instance;
87+
}
88+
89+
void Set(ui32 codecId, THolder<ICodec>&& codecImpl) {
90+
with_lock(Lock) {
91+
Codecs[codecId] = std::move(codecImpl);
92+
}
93+
}
94+
95+
const ICodec* GetOrThrow(ui32 codecId) const {
96+
with_lock(Lock) {
97+
if (!Codecs.contains(codecId)) {
98+
throw yexception() << "codec with id " << ui32(codecId) << " not provided";
99+
}
100+
return Codecs.at(codecId).Get();
101+
}
102+
}
103+
104+
105+
TCodecMap(const TCodecMap&) = delete;
106+
TCodecMap(TCodecMap&&) = delete;
107+
TCodecMap& operator=(const TCodecMap&) = delete;
108+
TCodecMap& operator=(TCodecMap&&) = delete;
109+
110+
private:
111+
TCodecMap() = default;
112+
113+
private:
114+
std::unordered_map<ui32, THolder<ICodec>> Codecs;
115+
TAdaptiveLock Lock;
116+
};
117+
118+
#define CODEC_MAP_ALREADY_PROVIDED
119+
81120
} // namespace NYdb::NTopic

ydb/public/sdk/cpp/client/ydb_topic/impl/topic.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@
1212

1313
namespace NYdb::NTopic {
1414

15+
class TCommonCodecsProvider {
16+
public:
17+
TCommonCodecsProvider() {
18+
NYdb::NTopic::TCodecMap::GetTheCodecMap().Set((ui32)NYdb::NPersQueue::ECodec::GZIP, MakeHolder<NYdb::NTopic::TGzipCodec>());
19+
NYdb::NTopic::TCodecMap::GetTheCodecMap().Set((ui32)NYdb::NPersQueue::ECodec::ZSTD, MakeHolder<NYdb::NTopic::TZstdCodec>());
20+
}
21+
};
22+
TCommonCodecsProvider COMMON_CODECS_PROVIDER;
23+
1524
TDescribeTopicResult::TDescribeTopicResult(TStatus&& status, Ydb::Topic::DescribeTopicResult&& result)
1625
: TStatus(std::move(status))
1726
, TopicDescription_(std::move(result))

ydb/public/sdk/cpp/client/ydb_topic/impl/write_session_impl.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1073,7 +1073,8 @@ TMemoryUsageChange TWriteSessionImpl::OnMemoryUsageChangedImpl(i64 diff) {
10731073

10741074
TBuffer CompressBuffer(std::shared_ptr<TTopicClient::TImpl> client, TVector<TStringBuf>& data, ECodec codec, i32 level) {
10751075
TBuffer result;
1076-
THolder<IOutputStream> coder = client->GetCodecImplOrThrow(codec)->CreateCoder(result, level);
1076+
Y_UNUSED(client);
1077+
THolder<IOutputStream> coder = NYdb::NTopic::TCodecMap::GetTheCodecMap().GetOrThrow((ui32)codec)->CreateCoder(result, level);
10771078
for (auto& buffer : data) {
10781079
coder->Write(buffer.data(), buffer.size());
10791080
}

0 commit comments

Comments
 (0)