diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index eac6a6670e00..1a2b90167313 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -4,7 +4,7 @@ #include "common/validation.h" #include "merging_sorted_input_stream.h" #include "permutations.h" -#include "serializer/batch_only.h" +#include "serializer/native.h" #include "serializer/abstract.h" #include "serializer/stream.h" #include "simple_arrays_cache.h" @@ -106,7 +106,7 @@ std::shared_ptr DeserializeSchema(const TString& str) { } TString SerializeBatch(const std::shared_ptr& batch, const arrow::ipc::IpcWriteOptions& options) { - return NSerialization::TBatchPayloadSerializer(options).Serialize(batch); + return NSerialization::TNativeSerializer(options).SerializePayload(batch); } TString SerializeBatchNoCompression(const std::shared_ptr& batch) { @@ -117,7 +117,7 @@ TString SerializeBatchNoCompression(const std::shared_ptr& b std::shared_ptr DeserializeBatch(const TString& blob, const std::shared_ptr& schema) { - auto result = NSerialization::TBatchPayloadDeserializer(schema).Deserialize(blob); + auto result = NSerialization::TNativeSerializer().Deserialize(blob, schema); if (result.ok()) { return *result; } else { diff --git a/ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt b/ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt deleted file mode 100644 index 0f1b4b72adce..000000000000 --- a/ydb/core/formats/arrow/compression/CMakeLists.darwin-arm64.txt +++ /dev/null @@ -1,23 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(formats-arrow-compression) -target_link_libraries(formats-arrow-compression PUBLIC - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - ydb-library-conclusion -) -target_sources(formats-arrow-compression PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp -) diff --git a/ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt b/ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt deleted file mode 100644 index 0f1b4b72adce..000000000000 --- a/ydb/core/formats/arrow/compression/CMakeLists.darwin-x86_64.txt +++ /dev/null @@ -1,23 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(formats-arrow-compression) -target_link_libraries(formats-arrow-compression PUBLIC - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - ydb-library-conclusion -) -target_sources(formats-arrow-compression PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp -) diff --git a/ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt b/ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt deleted file mode 100644 index 80620e94bb4b..000000000000 --- a/ydb/core/formats/arrow/compression/CMakeLists.linux-aarch64.txt +++ /dev/null @@ -1,24 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(formats-arrow-compression) -target_link_libraries(formats-arrow-compression PUBLIC - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - ydb-library-conclusion -) -target_sources(formats-arrow-compression PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp -) diff --git a/ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt b/ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt deleted file mode 100644 index 80620e94bb4b..000000000000 --- a/ydb/core/formats/arrow/compression/CMakeLists.linux-x86_64.txt +++ /dev/null @@ -1,24 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(formats-arrow-compression) -target_link_libraries(formats-arrow-compression PUBLIC - contrib-libs-linux-headers - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - ydb-library-conclusion -) -target_sources(formats-arrow-compression PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp -) diff --git a/ydb/core/formats/arrow/compression/CMakeLists.txt b/ydb/core/formats/arrow/compression/CMakeLists.txt deleted file mode 100644 index d863ebd18067..000000000000 --- a/ydb/core/formats/arrow/compression/CMakeLists.txt +++ /dev/null @@ -1,19 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - -if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-x86_64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) - include(CMakeLists.linux-aarch64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") - include(CMakeLists.darwin-x86_64.txt) -elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "arm64") - include(CMakeLists.darwin-arm64.txt) -elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) - include(CMakeLists.windows-x86_64.txt) -endif() diff --git a/ydb/core/formats/arrow/compression/CMakeLists.windows-x86_64.txt b/ydb/core/formats/arrow/compression/CMakeLists.windows-x86_64.txt deleted file mode 100644 index 0f1b4b72adce..000000000000 --- a/ydb/core/formats/arrow/compression/CMakeLists.windows-x86_64.txt +++ /dev/null @@ -1,23 +0,0 @@ - -# This file was generated by the build system used internally in the Yandex monorepo. -# Only simple modifications are allowed (adding source-files to targets, adding simple properties -# like target_include_directories). These modifications will be ported to original -# ya.make files by maintainers. Any complex modifications which can't be ported back to the -# original buildsystem will not be accepted. - - - -add_library(formats-arrow-compression) -target_link_libraries(formats-arrow-compression PUBLIC - contrib-libs-cxxsupp - yutil - libs-apache-arrow - ydb-core-protos - core-formats-arrow - ydb-library-conclusion -) -target_sources(formats-arrow-compression PRIVATE - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/diff.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/object.cpp - ${CMAKE_SOURCE_DIR}/ydb/core/formats/arrow/compression/parsing.cpp -) diff --git a/ydb/core/formats/arrow/compression/diff.cpp b/ydb/core/formats/arrow/compression/diff.cpp deleted file mode 100644 index 0659cba5e2ab..000000000000 --- a/ydb/core/formats/arrow/compression/diff.cpp +++ /dev/null @@ -1,77 +0,0 @@ -#include "diff.h" -#include "object.h" -#include "parsing.h" -#include - -namespace NKikimr::NArrow { - -NKikimrSchemeOp::TCompressionOptions TCompressionDiff::SerializeToProto() const { - NKikimrSchemeOp::TCompressionOptions result; - if (Level) { - result.SetCompressionLevel(*Level); - } - if (Codec) { - result.SetCompressionCodec(CompressionToProto(*Codec)); - } - return result; -} - -TConclusionStatus TCompressionDiff::DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features) { - { - auto fValue = features.Extract("COMPRESSION.TYPE"); - if (fValue) { - Codec = NArrow::CompressionFromString(*fValue); - if (!Codec) { - return TConclusionStatus::Fail("cannot parse COMPRESSION.TYPE as arrow::Compression"); - } - } - } - { - auto fValue = features.Extract("COMPRESSION.LEVEL"); - if (fValue) { - ui32 level; - if (!TryFromString(*fValue, level)) { - return TConclusionStatus::Fail("cannot parse COMPRESSION.LEVEL as ui32"); - } - Level = level; - } - } - return TConclusionStatus::Success(); -} - -bool TCompressionDiff::DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto) { - if (proto.HasCompressionLevel()) { - Level = proto.GetCompressionLevel(); - } - if (proto.HasCompressionCodec()) { - Codec = CompressionFromProto(proto.GetCompressionCodec()); - if (!Codec) { - return false; - } - } - return true; -} - -NKikimr::TConclusionStatus TCompressionDiff::Apply(std::optional& settings) const { - if (IsEmpty()) { - return TConclusionStatus::Success(); - } - TCompression merged; - if (!!settings) { - merged = *settings; - } - if (Codec) { - merged.Codec = *Codec; - } - if (Level) { - merged.Level = *Level; - } - auto validation = merged.Validate(); - if (!validation) { - return validation; - } - settings = merged; - return TConclusionStatus::Success(); -} - -} diff --git a/ydb/core/formats/arrow/compression/diff.h b/ydb/core/formats/arrow/compression/diff.h deleted file mode 100644 index 53cdd5bae788..000000000000 --- a/ydb/core/formats/arrow/compression/diff.h +++ /dev/null @@ -1,34 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -namespace NKikimr::NArrow { - -class TCompression; - -class TCompressionDiff { -private: - std::optional Codec; - std::optional Level; - bool IsEmpty() const { - return !Level && !Codec; - } -public: - NKikimrSchemeOp::TCompressionOptions SerializeToProto() const; - bool DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto); - TConclusionStatus DeserializeFromRequestFeatures(NYql::TFeaturesExtractor& features); - const std::optional& GetCodec() const { - return Codec; - } - const std::optional& GetLevel() const { - return Level; - } - TConclusionStatus Apply(std::optional& settings) const; -}; -} diff --git a/ydb/core/formats/arrow/compression/object.cpp b/ydb/core/formats/arrow/compression/object.cpp deleted file mode 100644 index 7f58d2618e90..000000000000 --- a/ydb/core/formats/arrow/compression/object.cpp +++ /dev/null @@ -1,70 +0,0 @@ -#include "object.h" -#include "parsing.h" -#include -#include - -namespace NKikimr::NArrow { - -TConclusionStatus NKikimr::NArrow::TCompression::Validate() const { - if (Level) { - auto codec = TStatusValidator::GetValid(arrow::util::Codec::Create(Codec)); - const int levelMin = codec->minimum_compression_level(); - const int levelMax = codec->maximum_compression_level(); - if (Level && (*Level < levelMin || levelMax < *Level)) { - return TConclusionStatus::Fail( - TStringBuilder() << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]" - ); - } - } - return TConclusionStatus::Success(); -} - -TConclusionStatus TCompression::DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& compression) { - if (compression.HasCompressionCodec()) { - auto codecOpt = NArrow::CompressionFromProto(compression.GetCompressionCodec()); - if (!codecOpt) { - return TConclusionStatus::Fail("cannot parse codec type from proto"); - } - Codec = *codecOpt; - } - if (compression.HasCompressionLevel()) { - Level = compression.GetCompressionLevel(); - } - return Validate(); -} - -NKikimrSchemeOp::TCompressionOptions TCompression::SerializeToProto() const { - NKikimrSchemeOp::TCompressionOptions result; - result.SetCompressionCodec(NArrow::CompressionToProto(Codec)); - if (Level) { - result.SetCompressionLevel(*Level); - } - return result; -} - -TString TCompression::DebugString() const { - TStringBuilder sb; - sb << arrow::util::Codec::GetCodecAsString(Codec) << ":" << Level.value_or(arrow::util::kUseDefaultCompressionLevel); - return sb; -} - -std::unique_ptr TCompression::BuildArrowCodec() const { - return NArrow::TStatusValidator::GetValid( - arrow::util::Codec::Create( - Codec, Level.value_or(arrow::util::kUseDefaultCompressionLevel))); -} - -NKikimr::TConclusion TCompression::BuildFromProto(const NKikimrSchemeOp::TCompressionOptions& compression) { - TCompression result; - auto resultStatus = result.DeserializeFromProto(compression); - if (!resultStatus) { - return resultStatus; - } - return result; -} - -std::unique_ptr TCompression::BuildDefaultCodec() { - return *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); -} - -} diff --git a/ydb/core/formats/arrow/compression/object.h b/ydb/core/formats/arrow/compression/object.h deleted file mode 100644 index 18c32b592212..000000000000 --- a/ydb/core/formats/arrow/compression/object.h +++ /dev/null @@ -1,39 +0,0 @@ -#pragma once - -#include -#include -#include "diff.h" - -namespace NKikimr::NArrow { - -class TCompression { -private: - arrow::Compression::type Codec = arrow::Compression::LZ4_FRAME; - std::optional Level; - TCompression() = default; - - TConclusionStatus Validate() const; - friend class TCompressionDiff; -public: - - static std::unique_ptr BuildDefaultCodec(); - - TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& compression); - - NKikimrSchemeOp::TCompressionOptions SerializeToProto() const; - - static TConclusion BuildFromProto(const NKikimrSchemeOp::TCompressionOptions& compression); - - explicit TCompression(const arrow::Compression::type codec, std::optional level = {}) - : Codec(codec) - , Level(level) - { - - } - - TString DebugString() const; - std::unique_ptr BuildArrowCodec() const; - -}; - -} diff --git a/ydb/core/formats/arrow/compression/ya.make b/ydb/core/formats/arrow/compression/ya.make deleted file mode 100644 index 420b028e3f29..000000000000 --- a/ydb/core/formats/arrow/compression/ya.make +++ /dev/null @@ -1,16 +0,0 @@ -LIBRARY() - -SRCS( - diff.cpp - object.cpp - parsing.cpp -) - -PEERDIR( - contrib/libs/apache/arrow - ydb/core/protos - ydb/core/formats/arrow - ydb/library/conclusion -) - -END() diff --git a/ydb/core/formats/arrow/serializer/abstract.cpp b/ydb/core/formats/arrow/serializer/abstract.cpp index 13ee3d731bb6..77b4b7fb5bca 100644 --- a/ydb/core/formats/arrow/serializer/abstract.cpp +++ b/ydb/core/formats/arrow/serializer/abstract.cpp @@ -1,11 +1,28 @@ #include "abstract.h" +#include "native.h" namespace NKikimr::NArrow::NSerialization { -arrow::Result> IDeserializer::Deserialize(const TString& data) const { - if (!data) { - return nullptr; +NKikimr::TConclusionStatus TSerializerContainer::DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto) { + NKikimrSchemeOp::TOlapColumn::TSerializer serializerProto; + serializerProto.SetClassName(NArrow::NSerialization::TNativeSerializer::GetClassNameStatic()); + *serializerProto.MutableArrowCompression() = proto; + AFL_VERIFY(Initialize(NArrow::NSerialization::TNativeSerializer::GetClassNameStatic())); + return GetObjectPtr()->DeserializeFromProto(serializerProto); +} + +NKikimr::TConclusionStatus TSerializerContainer::DeserializeFromRequest(NYql::TFeaturesExtractor& features) { + const std::optional className = features.Extract("SERIALIZER.CLASS_NAME"); + if (!className) { + return TConclusionStatus::Success(); + } + if (!TBase::Initialize(*className)) { + return TConclusionStatus::Fail("dont know anything about class_name=" + *className); } - return DoDeserialize(data); + return TBase::GetObjectPtr()->DeserializeFromRequest(features); +} + +std::shared_ptr TSerializerContainer::GetDefaultSerializer() { + return std::make_shared(); } } diff --git a/ydb/core/formats/arrow/serializer/abstract.h b/ydb/core/formats/arrow/serializer/abstract.h index f21ac1d58f74..6051f9fc90ce 100644 --- a/ydb/core/formats/arrow/serializer/abstract.h +++ b/ydb/core/formats/arrow/serializer/abstract.h @@ -1,38 +1,106 @@ #pragma once +#include +#include +#include +#include + #include #include #include +#include namespace NKikimr::NArrow::NSerialization { class ISerializer { protected: - virtual TString DoSerialize(const std::shared_ptr& batch) const = 0; + virtual TString DoSerializeFull(const std::shared_ptr& batch) const = 0; + virtual TString DoSerializePayload(const std::shared_ptr& batch) const = 0; + virtual arrow::Result> DoDeserialize(const TString& data) const = 0; + virtual arrow::Result> DoDeserialize(const TString& data, const std::shared_ptr& schema) const = 0; + virtual TString DoDebugString() const { + return ""; + } + + virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) = 0; + + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& proto) = 0; + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const = 0; public: using TPtr = std::shared_ptr; + using TFactory = NObjectFactory::TObjectFactory; + using TProto = NKikimrSchemeOp::TOlapColumn::TSerializer; virtual ~ISerializer() = default; - TString Serialize(const std::shared_ptr& batch) const { - return DoSerialize(batch); + TConclusionStatus DeserializeFromRequest(NYql::TFeaturesExtractor& features) { + return DoDeserializeFromRequest(features); + } + + TString DebugString() const { + return TStringBuilder() << "{class_name=" << GetClassName() << ";details={" << DoDebugString() << "}}"; + } + + TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& proto) { + return DoDeserializeFromProto(proto); + } + + void SerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const { + return DoSerializeToProto(proto); + } + + TString SerializeFull(const std::shared_ptr& batch) const { + return DoSerializeFull(batch); + } + + TString SerializePayload(const std::shared_ptr& batch) const { + return DoSerializePayload(batch); + } + + arrow::Result> Deserialize(const TString& data) const { + if (!data) { + return nullptr; + } + return DoDeserialize(data); + } + + arrow::Result> Deserialize(const TString& data, const std::shared_ptr& schema) const { + if (!data) { + return nullptr; + } + return DoDeserialize(data, schema); } virtual bool IsHardPacker() const = 0; + + virtual TString GetClassName() const = 0; }; -class IDeserializer { -protected: - virtual arrow::Result> DoDeserialize(const TString& data) const = 0; - virtual TString DoDebugString() const = 0; +class TSerializerContainer: public NBackgroundTasks::TInterfaceProtoContainer { +private: + using TBase = NBackgroundTasks::TInterfaceProtoContainer; public: - using TPtr = std::shared_ptr; - virtual ~IDeserializer() = default; + using TBase::TBase; + + TSerializerContainer(const std::shared_ptr& object) + : TBase(object) + { + + } TString DebugString() const { - return DoDebugString(); + if (GetObjectPtr()) { + return GetObjectPtr()->DebugString(); + } else { + return "NO_OBJECT"; + } } + using TBase::DeserializeFromProto; + + static std::shared_ptr GetDefaultSerializer(); + + TConclusionStatus DeserializeFromProto(const NKikimrSchemeOp::TCompressionOptions& proto); - arrow::Result> Deserialize(const TString& data) const; + TConclusionStatus DeserializeFromRequest(NYql::TFeaturesExtractor& features); }; } diff --git a/ydb/core/formats/arrow/serializer/batch_only.cpp b/ydb/core/formats/arrow/serializer/batch_only.cpp deleted file mode 100644 index c93f97b3f559..000000000000 --- a/ydb/core/formats/arrow/serializer/batch_only.cpp +++ /dev/null @@ -1,58 +0,0 @@ -#include "batch_only.h" -#include "stream.h" -#include -#include -#include -#include -#include -#include -#include -#include -namespace NKikimr::NArrow::NSerialization { - -arrow::Result> TBatchPayloadDeserializer::DoDeserialize(const TString& data) const { - arrow::ipc::DictionaryMemo dictMemo; - auto options = arrow::ipc::IpcReadOptions::Defaults(); - options.use_threads = false; - - std::shared_ptr buffer(std::make_shared(data)); - arrow::io::BufferReader reader(buffer); - AFL_TRACE(NKikimrServices::ARROW_HELPER)("event", "parsing")("size", data.size())("columns", Schema->num_fields()); - auto batchResult = arrow::ipc::ReadRecordBatch(Schema, &dictMemo, options, &reader); - if (!batchResult.ok()) { - return batchResult; - } - std::shared_ptr batch = *batchResult; - if (!batch) { - return arrow::Status(arrow::StatusCode::SerializationError, "empty batch"); - } - auto validation = batch->Validate(); - if (!validation.ok()) { - return arrow::Status(arrow::StatusCode::SerializationError, "batch is not valid: " + validation.ToString()); - } - return batch; -} - -TString TBatchPayloadSerializer::DoSerialize(const std::shared_ptr& batch) const { - arrow::ipc::IpcPayload payload; - // Build payload. Compression if set up performed here. - TStatusValidator::Validate(arrow::ipc::GetRecordBatchPayload(*batch, Options, &payload)); - - int32_t metadata_length = 0; - arrow::io::MockOutputStream mock; - // Process prepared payload through mock stream. Fast and efficient. - TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &mock, &metadata_length)); - - TString str; - str.resize(mock.GetExtentBytesWritten()); - - TFixedStringOutputStream out(&str); - // Write prepared payload into the resultant string. No extra allocation will be made. - TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length)); - Y_ABORT_UNLESS(out.GetPosition() == str.size()); - Y_DEBUG_ABORT_UNLESS(TBatchPayloadDeserializer(batch->schema()).Deserialize(str).ok()); - AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields()); - return str; -} - -} diff --git a/ydb/core/formats/arrow/serializer/batch_only.h b/ydb/core/formats/arrow/serializer/batch_only.h deleted file mode 100644 index b3dfda9e7ffb..000000000000 --- a/ydb/core/formats/arrow/serializer/batch_only.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once -#include "abstract.h" -#include -#include - -namespace NKikimr::NArrow::NSerialization { - -class TBatchPayloadSerializer: public ISerializer { -private: - const arrow::ipc::IpcWriteOptions Options; -protected: - virtual TString DoSerialize(const std::shared_ptr& batch) const override; -public: - virtual bool IsHardPacker() const override { - return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3; - } - TBatchPayloadSerializer(const arrow::ipc::IpcWriteOptions& options) - : Options(options) { - - } -}; - -class TBatchPayloadDeserializer: public IDeserializer { -private: - const std::shared_ptr Schema; -protected: - virtual arrow::Result> DoDeserialize(const TString& data) const override; - virtual TString DoDebugString() const override { - return "type=BATCH_PAYLOAD;"; - } -public: - TBatchPayloadDeserializer(const std::shared_ptr schema) - : Schema(schema) { - - } -}; - -} diff --git a/ydb/core/formats/arrow/serializer/full.cpp b/ydb/core/formats/arrow/serializer/full.cpp deleted file mode 100644 index 9f830c3662cb..000000000000 --- a/ydb/core/formats/arrow/serializer/full.cpp +++ /dev/null @@ -1,54 +0,0 @@ -#include "full.h" -#include "stream.h" -#include -#include -#include -#include -#include -#include -#include -namespace NKikimr::NArrow::NSerialization { - -arrow::Result> TFullDataDeserializer::DoDeserialize(const TString& data) const { - arrow::ipc::DictionaryMemo dictMemo; - auto options = arrow::ipc::IpcReadOptions::Defaults(); - options.use_threads = false; - - std::shared_ptr buffer(std::make_shared(data)); - arrow::io::BufferReader readerStream(buffer); - auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&readerStream)); - - std::shared_ptr batch; - auto readResult = reader->ReadNext(&batch); - if (!readResult.ok()) { - return readResult; - } - if (!batch) { - return arrow::Status(arrow::StatusCode::SerializationError, "null batch"); - } - auto validation = batch->Validate(); - if (!validation.ok()) { - return arrow::Status(arrow::StatusCode::SerializationError, "validation error: " + validation.ToString()); - } - return batch; -} - -TString TFullDataSerializer::DoSerialize(const std::shared_ptr& batch) const { - TString result; - { - arrow::io::MockOutputStream mock; - auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&mock, batch->schema(), Options)); - TStatusValidator::Validate(writer->WriteRecordBatch(*batch)); - result.reserve(mock.GetExtentBytesWritten()); - } - { - TStringOutputStream stream(&result); - auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&stream, batch->schema(), Options)); - TStatusValidator::Validate(writer->WriteRecordBatch(*batch)); - TStatusValidator::Validate(writer->Close()); - Y_ABORT_UNLESS(stream.GetPosition() == result.size()); - } - return result; -} - -} diff --git a/ydb/core/formats/arrow/serializer/full.h b/ydb/core/formats/arrow/serializer/full.h deleted file mode 100644 index 56761a3d752e..000000000000 --- a/ydb/core/formats/arrow/serializer/full.h +++ /dev/null @@ -1,38 +0,0 @@ -#pragma once - -#include "abstract.h" -#include -#include -#include - -namespace NKikimr::NArrow::NSerialization { - -class TFullDataSerializer: public ISerializer { -private: - const arrow::ipc::IpcWriteOptions Options; -protected: - virtual TString DoSerialize(const std::shared_ptr& batch) const override; -public: - virtual bool IsHardPacker() const override { - return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3; - } - - TFullDataSerializer(const arrow::ipc::IpcWriteOptions& options) - : Options(options) { - - } -}; - -class TFullDataDeserializer: public IDeserializer { -protected: - virtual arrow::Result> DoDeserialize(const TString& data) const override; - virtual TString DoDebugString() const override { - return "type=FULL_DATA;"; - } -public: - TFullDataDeserializer() { - - } -}; - -} diff --git a/ydb/core/formats/arrow/serializer/native.cpp b/ydb/core/formats/arrow/serializer/native.cpp new file mode 100644 index 000000000000..5a6819ee270a --- /dev/null +++ b/ydb/core/formats/arrow/serializer/native.cpp @@ -0,0 +1,185 @@ +#include "native.h" +#include "stream.h" +#include "parsing.h" +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace NKikimr::NArrow::NSerialization { + +arrow::Result> TNativeSerializer::DoDeserialize(const TString& data) const { + arrow::ipc::DictionaryMemo dictMemo; + auto options = arrow::ipc::IpcReadOptions::Defaults(); + options.use_threads = false; + + std::shared_ptr buffer(std::make_shared(data)); + arrow::io::BufferReader readerStream(buffer); + auto reader = TStatusValidator::GetValid(arrow::ipc::RecordBatchStreamReader::Open(&readerStream)); + + std::shared_ptr batch; + auto readResult = reader->ReadNext(&batch); + if (!readResult.ok()) { + return readResult; + } + if (!batch) { + return arrow::Status(arrow::StatusCode::SerializationError, "null batch"); + } + auto validation = batch->Validate(); + if (!validation.ok()) { + return arrow::Status(arrow::StatusCode::SerializationError, "validation error: " + validation.ToString()); + } + return batch; +} + +TString TNativeSerializer::DoSerializeFull(const std::shared_ptr& batch) const { + TString result; + { + arrow::io::MockOutputStream mock; + auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&mock, batch->schema(), Options)); + TStatusValidator::Validate(writer->WriteRecordBatch(*batch)); + result.reserve(mock.GetExtentBytesWritten()); + } + { + TStringOutputStream stream(&result); + auto writer = TStatusValidator::GetValid(arrow::ipc::MakeStreamWriter(&stream, batch->schema(), Options)); + TStatusValidator::Validate(writer->WriteRecordBatch(*batch)); + TStatusValidator::Validate(writer->Close()); + Y_ABORT_UNLESS(stream.GetPosition() == result.size()); + } + return result; +} + +arrow::Result> TNativeSerializer::DoDeserialize(const TString& data, const std::shared_ptr& schema) const { + arrow::ipc::DictionaryMemo dictMemo; + auto options = arrow::ipc::IpcReadOptions::Defaults(); + options.use_threads = false; + + std::shared_ptr buffer(std::make_shared(data)); + arrow::io::BufferReader reader(buffer); + AFL_TRACE(NKikimrServices::ARROW_HELPER)("event", "parsing")("size", data.size())("columns", schema->num_fields()); + auto batchResult = arrow::ipc::ReadRecordBatch(schema, &dictMemo, options, &reader); + if (!batchResult.ok()) { + return batchResult; + } + std::shared_ptr batch = *batchResult; + if (!batch) { + return arrow::Status(arrow::StatusCode::SerializationError, "empty batch"); + } + auto validation = batch->Validate(); + if (!validation.ok()) { + return arrow::Status(arrow::StatusCode::SerializationError, "batch is not valid: " + validation.ToString()); + } + return batch; +} + +TString TNativeSerializer::DoSerializePayload(const std::shared_ptr& batch) const { + arrow::ipc::IpcPayload payload; + // Build payload. Compression if set up performed here. + TStatusValidator::Validate(arrow::ipc::GetRecordBatchPayload(*batch, Options, &payload)); + + int32_t metadata_length = 0; + arrow::io::MockOutputStream mock; + // Process prepared payload through mock stream. Fast and efficient. + TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &mock, &metadata_length)); + + TString str; + str.resize(mock.GetExtentBytesWritten()); + + TFixedStringOutputStream out(&str); + // Write prepared payload into the resultant string. No extra allocation will be made. + TStatusValidator::Validate(arrow::ipc::WriteIpcPayload(payload, Options, &out, &metadata_length)); + Y_ABORT_UNLESS(out.GetPosition() == str.size()); + Y_DEBUG_ABORT_UNLESS(Deserialize(str, batch->schema()).ok()); + AFL_DEBUG(NKikimrServices::ARROW_HELPER)("event", "serialize")("size", str.size())("columns", batch->schema()->num_fields()); + return str; +} + +NKikimr::TConclusion> TNativeSerializer::BuildCodec(const arrow::Compression::type& cType, const std::optional level) const { + auto codec = NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(cType)); + if (!codec) { + return std::shared_ptr(); + } + const int levelDef = level.value_or(codec->default_compression_level()); + const int levelMin = codec->minimum_compression_level(); + const int levelMax = codec->maximum_compression_level(); + if (levelDef < levelMin || levelMax < levelDef) { + return TConclusionStatus::Fail( + TStringBuilder() << "incorrect level for codec. have to be: [" << levelMin << ":" << levelMax << "]" + ); + } + std::shared_ptr codecPtr = std::move(NArrow::TStatusValidator::GetValid(arrow::util::Codec::Create(cType, levelDef))); + return codecPtr; +} + +NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) { + std::optional codec; + std::optional level; + { + auto fValue = features.Extract("COMPRESSION.TYPE"); + if (!fValue) { + return TConclusionStatus::Fail("not defined COMPRESSION.TYPE as arrow::Compression"); + } + codec = NArrow::CompressionFromString(*fValue); + if (!codec) { + return TConclusionStatus::Fail("cannot parse COMPRESSION.TYPE as arrow::Compression"); + } + } + { + auto fValue = features.Extract("COMPRESSION.LEVEL"); + if (fValue) { + ui32 levelLocal; + if (!TryFromString(*fValue, levelLocal)) { + return TConclusionStatus::Fail("cannot parse COMPRESSION.LEVEL as ui32"); + } + level = levelLocal; + } + } + auto codecPtrStatus = BuildCodec(codec.value_or(Options.codec->compression_type()), level); + if (!codecPtrStatus) { + return codecPtrStatus.GetError(); + } + Options.codec = *codecPtrStatus; + return TConclusionStatus::Success(); +} + +NKikimr::TConclusionStatus TNativeSerializer::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& proto) { + if (!proto.HasArrowCompression()) { + return TConclusionStatus::Fail("no arrow serializer data in proto"); + } + auto compression = proto.GetArrowCompression(); + if (!compression.HasCodec()) { + Options = GetDefaultOptions(); + return TConclusionStatus::Success(); + } + std::optional codec = NArrow::CompressionFromProto(compression.GetCodec()); + if (!codec) { + return TConclusionStatus::Fail("cannot parse codec type from proto"); + } + std::optional level; + if (compression.HasLevel()) { + level = compression.GetLevel(); + } + + Options.use_threads = false; + auto result = BuildCodec(*codec, level); + if (!result) { + return result.GetError(); + } + Options.codec = *result; + return TConclusionStatus::Success(); +} + +void TNativeSerializer::DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const { + proto.MutableArrowCompression()->SetCodec(NArrow::CompressionToProto(Options.codec->compression_type())); + proto.MutableArrowCompression()->SetLevel(Options.codec->compression_level()); +} + +} diff --git a/ydb/core/formats/arrow/serializer/native.h b/ydb/core/formats/arrow/serializer/native.h new file mode 100644 index 000000000000..ece3e9e34fc9 --- /dev/null +++ b/ydb/core/formats/arrow/serializer/native.h @@ -0,0 +1,72 @@ +#pragma once + +#include "abstract.h" + +#include + +#include +#include + +#include +#include + +namespace NKikimr::NArrow::NSerialization { + +class TNativeSerializer: public ISerializer { +public: + static TString GetClassNameStatic() { + return "ARROW_SERIALIZER"; + } +private: + arrow::ipc::IpcWriteOptions Options; + + TConclusion> BuildCodec(const arrow::Compression::type& cType, const std::optional level) const; + static const inline TFactory::TRegistrator Registrator = TFactory::TRegistrator(GetClassNameStatic()); +protected: + virtual TString DoSerializeFull(const std::shared_ptr& batch) const override; + virtual TString DoSerializePayload(const std::shared_ptr& batch) const override; + virtual arrow::Result> DoDeserialize(const TString& data) const override; + virtual arrow::Result> DoDeserialize(const TString& data, const std::shared_ptr& schema) const override; + + virtual TConclusionStatus DoDeserializeFromRequest(NYql::TFeaturesExtractor& features) override; + + static arrow::ipc::IpcOptions BuildDefaultOptions() { + arrow::ipc::IpcWriteOptions options; + options.use_threads = false; + options.codec = *arrow::util::Codec::Create(arrow::Compression::LZ4_FRAME); + return options; + } + + virtual TConclusionStatus DoDeserializeFromProto(const NKikimrSchemeOp::TOlapColumn::TSerializer& proto) override; + + virtual void DoSerializeToProto(NKikimrSchemeOp::TOlapColumn::TSerializer& proto) const override; + +public: + virtual TString GetClassName() const override { + return GetClassNameStatic(); + } + + virtual bool IsHardPacker() const override { + return Options.codec && Options.codec->compression_type() == arrow::Compression::ZSTD && Options.codec->compression_level() > 3; + } + + static arrow::ipc::IpcOptions GetDefaultOptions() { + static arrow::ipc::IpcWriteOptions options = BuildDefaultOptions(); + return options; + } + + TNativeSerializer(const arrow::Compression::type compressionType) { + Options.use_threads = false; + auto r = arrow::util::Codec::Create(compressionType); + AFL_VERIFY(r.ok()); + Options.codec = std::move(*r); + } + + TNativeSerializer(const arrow::ipc::IpcWriteOptions& options = GetDefaultOptions()) + : Options(options) { + Options.use_threads = false; + + } +}; + +} diff --git a/ydb/core/formats/arrow/compression/parsing.cpp b/ydb/core/formats/arrow/serializer/parsing.cpp similarity index 100% rename from ydb/core/formats/arrow/compression/parsing.cpp rename to ydb/core/formats/arrow/serializer/parsing.cpp diff --git a/ydb/core/formats/arrow/compression/parsing.h b/ydb/core/formats/arrow/serializer/parsing.h similarity index 100% rename from ydb/core/formats/arrow/compression/parsing.h rename to ydb/core/formats/arrow/serializer/parsing.h diff --git a/ydb/core/formats/arrow/serializer/ya.make b/ydb/core/formats/arrow/serializer/ya.make index 558825873a2c..bf7e091ab4bf 100644 --- a/ydb/core/formats/arrow/serializer/ya.make +++ b/ydb/core/formats/arrow/serializer/ya.make @@ -3,15 +3,16 @@ LIBRARY() PEERDIR( contrib/libs/apache/arrow ydb/core/formats/arrow/common + ydb/services/metadata/abstract ydb/library/actors/core ydb/core/protos ) SRCS( abstract.cpp - full.cpp - batch_only.cpp + GLOBAL native.cpp stream.cpp + parsing.cpp ) END() diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp index 0006b339464a..b84fa44799c6 100644 --- a/ydb/core/formats/arrow/special_keys.cpp +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -1,7 +1,7 @@ #include "special_keys.h" #include "permutations.h" #include "reader/read_filter_merger.h" -#include +#include #include #include @@ -11,7 +11,7 @@ bool TSpecialKeys::DeserializeFromString(const TString& data) { if (!data) { return false; } - Data = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(data)); + Data = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()->Deserialize(data)); return !!Data; } @@ -25,7 +25,7 @@ NKikimr::NArrow::TReplaceKey TSpecialKeys::GetKeyByIndex(const ui32 position, co } TString TSpecialKeys::SerializeToString() const { - return NArrow::NSerialization::TFullDataSerializer(arrow::ipc::IpcWriteOptions::Defaults()).Serialize(Data); + return NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()->SerializeFull(Data); } TString TSpecialKeys::SerializeToStringDataOnlyNoCompression() const { diff --git a/ydb/core/formats/arrow/ut/ut_dictionary.cpp b/ydb/core/formats/arrow/ut/ut_dictionary.cpp index c723eb1957c6..c3df2c6a30f0 100644 --- a/ydb/core/formats/arrow/ut/ut_dictionary.cpp +++ b/ydb/core/formats/arrow/ut/ut_dictionary.cpp @@ -1,7 +1,6 @@ #include #include -#include -#include +#include #include #include #include @@ -13,13 +12,13 @@ Y_UNIT_TEST_SUITE(Dictionary) { ui64 Test(NConstruction::IArrayBuilder::TPtr column, const arrow::ipc::IpcWriteOptions& options, const ui32 bSize) { std::shared_ptr batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(bSize); - const TString data = NSerialization::TFullDataSerializer(options).Serialize(batch); - auto deserializedBatch = *NSerialization::TFullDataDeserializer().Deserialize(data); + const TString data = NSerialization::TNativeSerializer(options).SerializeFull(batch); + auto deserializedBatch = *NSerialization::TNativeSerializer().Deserialize(data); Y_ABORT_UNLESS(!!deserializedBatch); auto originalBatchTransformed = DictionaryToArray(batch); auto roundBatchTransformed = DictionaryToArray(deserializedBatch); - const TString roundUnpacked = NSerialization::TFullDataSerializer(options).Serialize(roundBatchTransformed); - const TString roundTransformed = NSerialization::TFullDataSerializer(options).Serialize(originalBatchTransformed); + const TString roundUnpacked = NSerialization::TNativeSerializer(options).SerializeFull(roundBatchTransformed); + const TString roundTransformed = NSerialization::TNativeSerializer(options).SerializeFull(originalBatchTransformed); Y_ABORT_UNLESS(roundBatchTransformed->num_rows() == originalBatchTransformed->num_rows()); Y_ABORT_UNLESS(roundUnpacked == roundTransformed); return data.size(); @@ -154,8 +153,8 @@ Y_UNIT_TEST_SUITE(Dictionary) { "field", NConstruction::TStringPoolFiller(pSize, strLen) ); std::shared_ptr batch = NConstruction::TRecordBatchConstructor({ column }).BuildBatch(bSize); - const TString dataFull = NSerialization::TFullDataSerializer(options).Serialize(batch); - const TString dataPayload = NSerialization::TBatchPayloadSerializer(options).Serialize(batch); + const TString dataFull = NSerialization::TNativeSerializer(options).SerializeFull(batch); + const TString dataPayload = NSerialization::TNativeSerializer(options).SerializePayload(batch); bytesFull = dataFull.size(); bytesPayload = dataPayload.size(); } diff --git a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp index 4a8413dcd5fc..24d2c52d9217 100644 --- a/ydb/core/formats/arrow/ut/ut_size_calcer.cpp +++ b/ydb/core/formats/arrow/ut/ut_size_calcer.cpp @@ -1,7 +1,5 @@ #include #include -#include -#include #include #include #include diff --git a/ydb/core/grpc_services/rpc_log_store.cpp b/ydb/core/grpc_services/rpc_log_store.cpp index 2e3453d5df96..10d9ac929f5f 100644 --- a/ydb/core/grpc_services/rpc_log_store.cpp +++ b/ydb/core/grpc_services/rpc_log_store.cpp @@ -42,16 +42,16 @@ bool ConvertCompressionFromPublicToInternal(const Ydb::LogStore::Compression& fr error = "LogStores with no compression are disabled."; return false; case Ydb::LogStore::Compression::CODEC_LZ4: - to.SetCompressionCodec(NKikimrSchemeOp::ColumnCodecLZ4); + to.SetCodec(NKikimrSchemeOp::ColumnCodecLZ4); break; case Ydb::LogStore::Compression::CODEC_ZSTD: - to.SetCompressionCodec(NKikimrSchemeOp::ColumnCodecZSTD); + to.SetCodec(NKikimrSchemeOp::ColumnCodecZSTD); break; default: break; } if (from.compression_level()) { - to.SetCompressionLevel(from.compression_level()); + to.SetLevel(from.compression_level()); } return true; } @@ -60,7 +60,7 @@ void ConvertCompressionFromInternalToPublic(const NKikimrSchemeOp::TCompressionO Ydb::LogStore::Compression& to) { to.set_compression_codec(Ydb::LogStore::Compression::CODEC_LZ4); // LZ4 if not set - switch (from.GetCompressionCodec()) { + switch (from.GetCodec()) { case NKikimrSchemeOp::ColumnCodecPlain: to.set_compression_codec(Ydb::LogStore::Compression::CODEC_PLAIN); break; @@ -73,7 +73,7 @@ void ConvertCompressionFromInternalToPublic(const NKikimrSchemeOp::TCompressionO default: break; } - to.set_compression_level(from.GetCompressionLevel()); + to.set_compression_level(from.GetLevel()); } bool ConvertSchemaFromPublicToInternal(const Ydb::LogStore::Schema& from, NKikimrSchemeOp::TColumnTableSchema& to, diff --git a/ydb/core/grpc_services/rpc_long_tx.cpp b/ydb/core/grpc_services/rpc_long_tx.cpp index 59dc26c2990d..38a29ce77520 100644 --- a/ydb/core/grpc_services/rpc_long_tx.cpp +++ b/ydb/core/grpc_services/rpc_long_tx.cpp @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include diff --git a/ydb/core/kqp/compute_actor/kqp_compute_events.h b/ydb/core/kqp/compute_actor/kqp_compute_events.h index 5d57f94fa409..f31f946d28fd 100644 --- a/ydb/core/kqp/compute_actor/kqp_compute_events.h +++ b/ydb/core/kqp/compute_actor/kqp_compute_events.h @@ -6,7 +6,6 @@ #include #include -#include namespace NKikimr::NKqp { diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp index e7706d1093c0..c3b65e981194 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.cpp @@ -17,9 +17,9 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl } } { - auto result = CompressionDiff.DeserializeFromRequestFeatures(features); - if (!result) { - return TConclusionStatus::Fail(result.GetErrorMessage()); + auto status = Serializer.DeserializeFromRequest(features); + if (!status) { + return status; } } return TConclusionStatus::Success(); @@ -28,7 +28,9 @@ TConclusionStatus TAlterColumnOperation::DoDeserialize(NYql::TObjectSettingsImpl void TAlterColumnOperation::DoSerializeScheme(NKikimrSchemeOp::TAlterColumnTableSchema& schemaData) const { auto* column = schemaData.AddAlterColumns(); column->SetName(ColumnName); - *column->MutableCompression() = CompressionDiff.SerializeToProto(); + if (!!Serializer) { + Serializer.SerializeToProto(*column->MutableSerializer()); + } *column->MutableDictionaryEncoding() = DictionaryEncodingDiff.SerializeToProto(); } diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h index 803ffd5464f9..81c0e362be3d 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/alter_column.h @@ -1,5 +1,5 @@ #include "abstract.h" -#include +#include #include namespace NKikimr::NKqp::NColumnshard { @@ -14,7 +14,7 @@ class TAlterColumnOperation : public ITableStoreOperation { TString ColumnName; - NArrow::TCompressionDiff CompressionDiff; + NArrow::NSerialization::TSerializerContainer Serializer; NArrow::NDictionary::TEncodingDiff DictionaryEncodingDiff; public: TConclusionStatus DoDeserialize(NYql::TObjectSettingsImpl::TFeaturesExtractor& features) override; diff --git a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make index 08467f55d565..3301f543b8f6 100644 --- a/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make +++ b/ydb/core/kqp/gateway/behaviour/tablestore/operations/ya.make @@ -11,7 +11,7 @@ SRCS( PEERDIR( ydb/services/metadata/manager - ydb/core/formats/arrow/compression + ydb/core/formats/arrow/serializer ydb/core/kqp/gateway/utils ydb/core/protos ) diff --git a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp index 55c8adf074d6..9ba5c0fb5647 100644 --- a/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_opt_build.cpp @@ -654,7 +654,7 @@ bool ExploreTx(TExprBase node, TExprContext& ctx, const TKiDataSink& dataSink, T { txRes.Ops.insert(node.Raw()); bool result = ExploreTx(TExprBase(node.Ref().ChildPtr(0)), ctx, dataSink, txRes, tablesData, types); - Cerr << KqpExprToPrettyString(*node.Raw(), ctx) << Endl; +// Cerr << KqpExprToPrettyString(*node.Raw(), ctx) << Endl; txRes.AddResult(node); return result; } diff --git a/ydb/core/kqp/ut/common/columnshard.cpp b/ydb/core/kqp/ut/common/columnshard.cpp index 99a10e26ecad..085415d36c59 100644 --- a/ydb/core/kqp/ut/common/columnshard.cpp +++ b/ydb/core/kqp/ut/common/columnshard.cpp @@ -1,5 +1,4 @@ #include "columnshard.h" -#include #include namespace NKikimr { diff --git a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp index 6f53286948f2..707a363f244f 100644 --- a/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp +++ b/ydb/core/kqp/ut/olap/kqp_olap_ut.cpp @@ -29,7 +29,6 @@ #include #include -#include namespace NKikimr { namespace NKqp { @@ -1367,7 +1366,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) { CompareYson(result, R"([[1u;]])"); } - AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < 0.15 * csController->GetIndexesSkippingOnSelect().Val()); + AFL_VERIFY(csController->GetIndexesApprovedOnSelect().Val() < 0.20 * csController->GetIndexesSkippingOnSelect().Val()); } @@ -3793,6 +3792,12 @@ Y_UNIT_TEST_SUITE(KqpOlap) { Y_ABORT_UNLESS(d.GetMaxCount() - d.GetMinCount() <= 2); } } + { + auto alterQuery = TStringBuilder() << "ALTER OBJECT `/Root/olapStore` (TYPE TABLESTORE) SET (ACTION=ALTER_COLUMN, NAME=field, `SERIALIZER.CLASS_NAME`=`ARROW_SERIALIZER`, `COMPRESSION.TYPE`=`zstd`);"; + auto session = tableClient.CreateSession().GetValueSync().GetSession(); + auto alterResult = session.ExecuteSchemeQuery(alterQuery).GetValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(alterResult.GetStatus(), EStatus::SUCCESS, alterResult.GetIssues().ToString()); + } } const ui64 rawBytesUnpack = rawBytesUnpack1PK - rawBytesPK1; const ui64 bytesUnpack = bytesUnpack1PK - bytesPK1; diff --git a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp index d09a196b7af5..3ba8d515e8c5 100644 --- a/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_constraints_ut.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include diff --git a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp index 1f53b50a8337..3c13d8c27b4d 100644 --- a/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp +++ b/ydb/core/kqp/ut/scheme/kqp_scheme_ut.cpp @@ -7,7 +7,6 @@ #include #include #include -#include #include #include diff --git a/ydb/core/protos/flat_scheme_op.proto b/ydb/core/protos/flat_scheme_op.proto index 895e1240cbdc..ca8629e2504a 100644 --- a/ydb/core/protos/flat_scheme_op.proto +++ b/ydb/core/protos/flat_scheme_op.proto @@ -397,14 +397,24 @@ message TDictionaryEncodingSettings { } message TCompressionOptions { - optional EColumnCodec CompressionCodec = 2; // LZ4 (in arrow LZ4_FRAME variant) if not set - optional int32 CompressionLevel = 3; // Use default compression level if not set (0 != not set) + optional EColumnCodec Codec = 2; // LZ4 (in arrow LZ4_FRAME variant) if not set + optional int32 Level = 3; // Use default compression level if not set (0 != not set) +} + +message TOlapColumn { + + message TSerializer { + optional string ClassName = 1; + oneof Implementation { + TCompressionOptions ArrowCompression = 40; + } + } } message TOlapColumnDiff { optional string Name = 1; - optional TCompressionOptions Compression = 2; optional TDictionaryEncodingSettings DictionaryEncoding = 4; + optional TOlapColumn.TSerializer Serializer = 5; } message TOlapColumnDescription { @@ -419,8 +429,9 @@ message TOlapColumnDescription { optional NKikimrProto.TTypeInfo TypeInfo = 6; optional bool NotNull = 7; - optional TCompressionOptions Compression = 8; + optional TCompressionOptions Compression = 8[deprecated = true]; optional TDictionaryEncodingSettings DictionaryEncoding = 9; + optional TOlapColumn.TSerializer Serializer = 10; } message TRequestedBloomFilter { diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/columnshard_ut_common.cpp index 8df44d7df997..407aed03b43d 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/columnshard_ut_common.cpp @@ -375,10 +375,10 @@ NMetadata::NFetcher::ISnapshot::TPtr TTestSchema::BuildSnapshot(const TTableSpec cProto.SetName(tier.Name); *cProto.MutableObjectStorage() = tier.S3; if (tier.Codec) { - cProto.MutableCompression()->SetCompressionCodec(tier.GetCodecId()); + cProto.MutableCompression()->SetCodec(tier.GetCodecId()); } if (tier.CompressionLevel) { - cProto.MutableCompression()->SetCompressionLevel(*tier.CompressionLevel); + cProto.MutableCompression()->SetLevel(*tier.CompressionLevel); } NColumnShard::NTiers::TTierConfig tConfig(tier.Name, cProto); cs->MutableTierConfigs().emplace(tConfig.GetTierName(), tConfig); diff --git a/ydb/core/tx/columnshard/columnshard_ut_common.h b/ydb/core/tx/columnshard/columnshard_ut_common.h index 0ea35595f338..58849aa342e1 100644 --- a/ydb/core/tx/columnshard/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/columnshard_ut_common.h @@ -236,10 +236,10 @@ struct TTestSchema { } if (specials.HasCodec()) { - schema->MutableDefaultCompression()->SetCompressionCodec(specials.GetCodecId()); + schema->MutableDefaultCompression()->SetCodec(specials.GetCodecId()); } if (specials.CompressionLevel) { - schema->MutableDefaultCompression()->SetCompressionLevel(*specials.CompressionLevel); + schema->MutableDefaultCompression()->SetLevel(*specials.CompressionLevel); } } diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index c68100010934..f91fcb099f42 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -3,6 +3,7 @@ #include #include #include +#include namespace NKikimr::NOlap { @@ -116,9 +117,9 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont continue; } if (b->num_rows() < 100) { - SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::UNCOMPRESSED)); + SaverContext.SetExternalSerializer(NArrow::NSerialization::TSerializerContainer(std::make_shared(arrow::Compression::type::UNCOMPRESSED))); } else { - SaverContext.SetExternalCompression(NArrow::TCompression(arrow::Compression::type::LZ4_FRAME)); + SaverContext.SetExternalSerializer(NArrow::NSerialization::TSerializerContainer(std::make_shared(arrow::Compression::type::LZ4_FRAME))); } auto portions = MakeAppendedPortions(b, pathId, maxSnapshot, nullptr, context); Y_ABORT_UNLESS(portions.size()); diff --git a/ydb/core/tx/columnshard/engines/changes/ttl.cpp b/ydb/core/tx/columnshard/engines/changes/ttl.cpp index 80117f34f331..cfba0a994064 100644 --- a/ydb/core/tx/columnshard/engines/changes/ttl.cpp +++ b/ydb/core/tx/columnshard/engines/changes/ttl.cpp @@ -43,8 +43,8 @@ std::optional TTTLColumnEngineChanges::UpdateEvictedPorti auto* tiering = Tiering.FindPtr(evictFeatures.PathId); Y_ABORT_UNLESS(tiering); - auto compression = tiering->GetCompression(evictFeatures.TargetTierName); - if (!compression) { + auto serializer = tiering->GetSerializer(evictFeatures.TargetTierName); + if (!serializer) { // Nothing to recompress. We have no other kinds of evictions yet. evictFeatures.DataChanges = false; auto result = TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs); @@ -58,7 +58,7 @@ std::optional TTTLColumnEngineChanges::UpdateEvictedPorti AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("portion_for_eviction", portionInfo.DebugString()); TSaverContext saverContext(evictFeatures.StorageOperator, SaverContext.GetStoragesManager()); - saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalCompression(compression); + saverContext.SetTierName(evictFeatures.TargetTierName).SetExternalSerializer(*serializer); auto withBlobs = TPortionInfoWithBlobs::RestorePortion(portionInfo, srcBlobs); withBlobs.GetPortionInfo().InitOperator(evictFeatures.StorageOperator, true); withBlobs.GetPortionInfo().MutableMeta().SetTierName(evictFeatures.TargetTierName); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp index e52f769a8172..3ba21e93aef1 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/source.cpp @@ -3,7 +3,6 @@ #include "fetched_data.h" #include "plain_read_data.h" #include "constructor.h" -#include #include #include #include diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.cpp b/ydb/core/tx/columnshard/engines/scheme/column_features.cpp index a416509e4878..c234314d16b3 100644 --- a/ydb/core/tx/columnshard/engines/scheme/column_features.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/column_features.cpp @@ -1,7 +1,6 @@ #include "column_features.h" #include "index_info.h" -#include -#include +#include #include namespace NKikimr::NOlap { @@ -23,26 +22,19 @@ NArrow::NTransformation::ITransformer::TPtr TColumnFeatures::GetLoadTransformer( } void TColumnFeatures::InitLoader(const TIndexInfo& info) { - NArrow::NTransformation::ITransformer::TPtr transformer = GetLoadTransformer(); auto schema = info.GetColumnSchema(ColumnId); - if (!transformer) { - Loader = std::make_shared(transformer, - std::make_shared(schema), - schema, ColumnId); - } else { - Loader = std::make_shared(transformer, - std::make_shared(), - schema, ColumnId); - } + Loader = std::make_shared(GetLoadTransformer(), Serializer, schema, ColumnId); } std::optional TColumnFeatures::BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const TIndexInfo& indexInfo) { const ui32 columnId = columnInfo.GetId(); TColumnFeatures result(columnId); - if (columnInfo.HasCompression()) { - auto settings = NArrow::TCompression::BuildFromProto(columnInfo.GetCompression()); - Y_ABORT_UNLESS(settings.IsSuccess()); - result.Compression = *settings; + if (columnInfo.HasSerializer()) { + AFL_VERIFY(result.Serializer.DeserializeFromProto(columnInfo.GetSerializer())); + } else if (columnInfo.HasCompression()) { + AFL_VERIFY(result.Serializer.DeserializeFromProto(columnInfo.GetCompression())); + } else { + result.Serializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); } if (columnInfo.HasDictionaryEncoding()) { auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnInfo.GetDictionaryEncoding()); @@ -53,20 +45,19 @@ std::optional TColumnFeatures::BuildFromProto(c return result; } -std::unique_ptr TColumnFeatures::GetCompressionCodec() const { - if (Compression) { - return Compression->BuildArrowCodec(); - } else { - return nullptr; - } -} - NKikimr::NOlap::TColumnFeatures TColumnFeatures::BuildFromIndexInfo(const ui32 columnId, const TIndexInfo& indexInfo) { TColumnFeatures result(columnId); result.InitLoader(indexInfo); return result; } +TColumnFeatures::TColumnFeatures(const ui32 columnId) + : ColumnId(columnId) + , Serializer(NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()) +{ + +} + TString TColumnLoader::DebugString() const { TStringBuilder result; if (ExpectedSchema) { @@ -75,8 +66,8 @@ TString TColumnLoader::DebugString() const { if (Transformer) { result << "transformer:" << Transformer->DebugString() << ";"; } - if (Deserializer) { - result << "deserializer:" << Deserializer->DebugString() << ";"; + if (Serializer) { + result << "serializer:" << Serializer->DebugString() << ";"; } return result; } diff --git a/ydb/core/tx/columnshard/engines/scheme/column_features.h b/ydb/core/tx/columnshard/engines/scheme/column_features.h index 11415ecc20de..7e99af80eb72 100644 --- a/ydb/core/tx/columnshard/engines/scheme/column_features.h +++ b/ydb/core/tx/columnshard/engines/scheme/column_features.h @@ -1,5 +1,4 @@ #pragma once -#include #include #include #include @@ -14,7 +13,7 @@ namespace NKikimr::NOlap { class TSaverContext { private: TString TierName; - std::optional ExternalCompression; + std::optional ExternalSerializer; YDB_READONLY_DEF(std::shared_ptr, StorageOperator); YDB_READONLY_DEF(std::shared_ptr, StoragesManager); public: @@ -24,11 +23,12 @@ class TSaverContext { } - const std::optional& GetExternalCompression() const { - return ExternalCompression; + const std::optional& GetExternalSerializer() const { + return ExternalSerializer; } - TSaverContext& SetExternalCompression(const std::optional& value) { - ExternalCompression = value; + TSaverContext& SetExternalSerializer(const std::optional& value) { + AFL_VERIFY(!!value); + ExternalSerializer = value; return *this; } const TString& GetTierName() const { @@ -43,17 +43,17 @@ class TSaverContext { class TColumnSaver { private: NArrow::NTransformation::ITransformer::TPtr Transformer; - NArrow::NSerialization::ISerializer::TPtr Serializer; + NArrow::NSerialization::TSerializerContainer Serializer; public: TColumnSaver() = default; - TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::ISerializer::TPtr serializer) + TColumnSaver(NArrow::NTransformation::ITransformer::TPtr transformer, const NArrow::NSerialization::TSerializerContainer serializer) : Transformer(transformer) , Serializer(serializer) { Y_ABORT_UNLESS(Serializer); } bool IsHardPacker() const { - return Serializer && Serializer->IsHardPacker(); + return Serializer->IsHardPacker(); } TString Apply(std::shared_ptr data, std::shared_ptr field) const { @@ -65,9 +65,9 @@ class TColumnSaver { TString Apply(const std::shared_ptr& data) const { Y_ABORT_UNLESS(Serializer); if (Transformer) { - return Serializer->Serialize(Transformer->Transform(data)); + return Serializer->SerializeFull(Transformer->Transform(data)); } else { - return Serializer->Serialize(data); + return Serializer->SerializePayload(data); } } }; @@ -75,22 +75,22 @@ class TColumnSaver { class TColumnLoader { private: NArrow::NTransformation::ITransformer::TPtr Transformer; - NArrow::NSerialization::IDeserializer::TPtr Deserializer; + NArrow::NSerialization::TSerializerContainer Serializer; std::shared_ptr ExpectedSchema; const ui32 ColumnId; public: TString DebugString() const; - TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, NArrow::NSerialization::IDeserializer::TPtr deserializer, + TColumnLoader(NArrow::NTransformation::ITransformer::TPtr transformer, const NArrow::NSerialization::TSerializerContainer& serializer, const std::shared_ptr& expectedSchema, const ui32 columnId) : Transformer(transformer) - , Deserializer(deserializer) + , Serializer(serializer) , ExpectedSchema(expectedSchema) , ColumnId(columnId) { Y_ABORT_UNLESS(ExpectedSchema); auto fieldsCountStr = ::ToString(ExpectedSchema->num_fields()); Y_ABORT_UNLESS(ExpectedSchema->num_fields() == 1, "%s", fieldsCountStr.data()); - Y_ABORT_UNLESS(Deserializer); + Y_ABORT_UNLESS(Serializer); } ui32 GetColumnId() const { @@ -106,8 +106,9 @@ class TColumnLoader { } arrow::Result> Apply(const TString& data) const { - Y_ABORT_UNLESS(Deserializer); - arrow::Result> columnArray = Deserializer->Deserialize(data); + Y_ABORT_UNLESS(Serializer); + arrow::Result> columnArray = + Transformer ? Serializer->Deserialize(data) : Serializer->Deserialize(data, ExpectedSchema); if (!columnArray.ok()) { return columnArray; } @@ -134,21 +135,19 @@ struct TIndexInfo; class TColumnFeatures { private: ui32 ColumnId; - std::optional Compression; + YDB_READONLY_DEF(NArrow::NSerialization::TSerializerContainer, Serializer); std::optional DictionaryEncoding; std::shared_ptr Loader; NArrow::NTransformation::ITransformer::TPtr GetLoadTransformer() const; void InitLoader(const TIndexInfo& info); - TColumnFeatures(const ui32 columnId) - : ColumnId(columnId) { - } + TColumnFeatures(const ui32 columnId); public: TString DebugString() const { TStringBuilder sb; - sb << "compression=" << (Compression ? Compression->DebugString() : "NO") << ";"; + sb << "serializer=" << (Serializer ? Serializer->DebugString() : "NO") << ";"; sb << "encoding=" << (DictionaryEncoding ? DictionaryEncoding->DebugString() : "NO") << ";"; sb << "loader=" << (Loader ? Loader->DebugString() : "NO") << ";"; return sb; @@ -158,8 +157,6 @@ class TColumnFeatures { static std::optional BuildFromProto(const NKikimrSchemeOp::TOlapColumnDescription& columnInfo, const TIndexInfo& indexInfo); static TColumnFeatures BuildFromIndexInfo(const ui32 columnId, const TIndexInfo& indexInfo); - std::unique_ptr GetCompressionCodec() const; - const std::shared_ptr& GetLoader() const { AFL_VERIFY(Loader); return Loader; diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 6bac1a0af456..e60785485be2 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -3,9 +3,8 @@ #include #include #include -#include +#include #include -#include #include namespace NKikimr::NOlap { @@ -296,32 +295,23 @@ bool TIndexInfo::AllowTtlOverColumn(const TString& name) const { } TColumnSaver TIndexInfo::GetColumnSaver(const ui32 columnId, const TSaverContext& context) const { - arrow::ipc::IpcWriteOptions options; - options.use_threads = false; - NArrow::NTransformation::ITransformer::TPtr transformer; - std::unique_ptr columnCodec; + NArrow::NSerialization::TSerializerContainer serializer; { auto it = ColumnFeatures.find(columnId); AFL_VERIFY(it != ColumnFeatures.end()); transformer = it->second.GetSaveTransformer(); - columnCodec = it->second.GetCompressionCodec(); - } - - if (context.GetExternalCompression()) { - options.codec = context.GetExternalCompression()->BuildArrowCodec(); - } else if (columnCodec) { - options.codec = std::move(columnCodec); - } else if (DefaultCompression) { - options.codec = DefaultCompression->BuildArrowCodec(); - } else { - options.codec = NArrow::TCompression::BuildDefaultCodec(); + serializer = it->second.GetSerializer(); } - if (!transformer) { - return TColumnSaver(transformer, std::make_shared(options)); + if (!!context.GetExternalSerializer()) { + return TColumnSaver(transformer, *context.GetExternalSerializer()); + } else if (!!serializer) { + return TColumnSaver(transformer, serializer); + } else if (DefaultSerializer) { + return TColumnSaver(transformer, DefaultSerializer); } else { - return TColumnSaver(transformer, std::make_shared(options)); + return TColumnSaver(transformer, NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()); } } @@ -390,8 +380,7 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& const ui32 id = col.GetId(); const TString& name = col.GetName(); const bool notNull = col.HasNotNull() ? col.GetNotNull() : false; - auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), - col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); + auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(col.GetTypeId(), col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); Columns[id] = NTable::TColumn(name, id, typeInfoMod.TypeInfo, typeInfoMod.TypeMod, notNull); ColumnNames[name] = id; } @@ -413,12 +402,12 @@ bool TIndexInfo::DeserializeFromProto(const NKikimrSchemeOp::TColumnTableSchema& } if (schema.HasDefaultCompression()) { - auto result = NArrow::TCompression::BuildFromProto(schema.GetDefaultCompression()); - if (!result) { - AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", result.GetErrorMessage()); + NArrow::NSerialization::TSerializerContainer container; + if (!container.DeserializeFromProto(schema.GetDefaultCompression())) { + AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("event", "cannot_parse_index_info")("reason", "cannot_parse_default_serializer"); return false; } - DefaultCompression = *result; + DefaultSerializer = container; } Version = schema.GetVersion(); return true; diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.h b/ydb/core/tx/columnshard/engines/scheme/index_info.h index 44d15cc375d9..7409e33bd910 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.h @@ -246,7 +246,7 @@ struct TIndexInfo : public NTable::TScheme::TTableSchema { std::shared_ptr ExtendedKey; // Extend PK with snapshot columns to allow old shapshot reads THashSet RequiredColumns; THashSet MinMaxIdxColumnsIds; - std::optional DefaultCompression; + NArrow::NSerialization::TSerializerContainer DefaultSerializer; }; std::shared_ptr MakeArrowSchema(const NTable::TScheme::TTableSchema::TColumns& columns, const std::vector& ids, bool withSpecials = false); diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp index 77e9430ed953..cf87cf941d8f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/abstract/meta.cpp @@ -4,7 +4,7 @@ #include #include #include -#include +#include #include namespace NKikimr::NOlap::NIndexes { @@ -14,6 +14,7 @@ void TPortionIndexChunk::DoAddIntoPortion(const TBlobRange& bRange, TPortionInfo } std::shared_ptr TIndexByColumns::DoBuildIndex(const ui32 indexId, std::map>>& data, const TIndexInfo& indexInfo) const { + AFL_VERIFY(Serializer); AFL_VERIFY(data.size()); std::vector columnReaders; for (auto&& i : ColumnIds) { @@ -27,12 +28,12 @@ std::shared_ptr TIndexByColumns::DoBuildIndex } TChunkedBatchReader reader(std::move(columnReaders)); std::shared_ptr indexBatch = DoBuildIndexImpl(reader); - const TString indexData = TColumnSaver(nullptr, Serializer).Apply(indexBatch); + const TString indexData = Serializer->SerializeFull(indexBatch); return std::make_shared(indexId, recordsCount, NArrow::GetBatchDataSize(indexBatch), indexData); } bool TIndexByColumns::DoDeserializeFromProto(const NKikimrSchemeOp::TOlapIndexDescription& /*proto*/) { - Serializer = std::make_shared(arrow::ipc::IpcWriteOptions::Defaults()); + Serializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); return true; } @@ -40,7 +41,7 @@ TIndexByColumns::TIndexByColumns(const ui32 indexId, const TString& indexName, c : TBase(indexId, indexName) , ColumnIds(columnIds) { - Serializer = std::make_shared(arrow::ipc::IpcWriteOptions::Defaults()); + Serializer = NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); } NKikimr::TConclusionStatus TIndexByColumns::CheckSameColumnsForModification(const IIndexMeta& newMeta) const { diff --git a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp index 368589b11c31..1ec8aede916d 100644 --- a/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/indexes/bloom/checker.cpp @@ -1,5 +1,5 @@ #include "checker.h" -#include +#include #include #include #include @@ -14,7 +14,7 @@ void TBloomFilterChecker::DoSerializeToProtoImpl(NKikimrSSA::TProgram::TOlapInde bool TBloomFilterChecker::DoCheckImpl(const std::vector& blobs) const { for (auto&& blob : blobs) { - auto rb = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TFullDataDeserializer().Deserialize(blob)); + auto rb = NArrow::TStatusValidator::GetValid(NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer()->Deserialize(blob)); AFL_VERIFY(rb); AFL_VERIFY(rb->schema()->num_fields() == 1); AFL_VERIFY(rb->schema()->field(0)->type()->id() == arrow::Type::BOOL); diff --git a/ydb/core/tx/columnshard/engines/scheme/tier_info.h b/ydb/core/tx/columnshard/engines/scheme/tier_info.h index d9808ca9b919..395752201f8d 100644 --- a/ydb/core/tx/columnshard/engines/scheme/tier_info.h +++ b/ydb/core/tx/columnshard/engines/scheme/tier_info.h @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -18,7 +17,7 @@ class TTierInfo { YDB_READONLY_DEF(TDuration, EvictDuration); ui32 TtlUnitsInSecond; - YDB_READONLY_DEF(std::optional, Compression); + YDB_READONLY_DEF(std::optional, Serializer); public: TTierInfo(const TString& tierName, TDuration evictDuration, const TString& column, ui32 unitsInSecond = 0) : Name(tierName) @@ -34,8 +33,8 @@ class TTierInfo { return now - EvictDuration; } - TTierInfo& SetCompression(const NArrow::TCompression& value) { - Compression = value; + TTierInfo& SetSerializer(const NArrow::NSerialization::TSerializerContainer& value) { + Serializer = value; return *this; } @@ -51,9 +50,9 @@ class TTierInfo { TString GetDebugString() const { TStringBuilder sb; - sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";compression="; - if (Compression) { - sb << Compression->DebugString(); + sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";serializer="; + if (Serializer) { + sb << Serializer->DebugString(); } else { sb << "NOT_SPECIFIED(Default)"; } @@ -133,11 +132,11 @@ class TTiering { return {}; } - std::optional GetCompression(const TString& name) const { + std::optional GetSerializer(const TString& name) const { auto it = TierByName.find(name); if (it != TierByName.end()) { Y_ABORT_UNLESS(!name.empty()); - return it->second->GetCompression(); + return it->second->GetSerializer(); } return {}; } diff --git a/ydb/core/tx/columnshard/engines/ya.make b/ydb/core/tx/columnshard/engines/ya.make index 4b058ca9bbca..8e80cb730b69 100644 --- a/ydb/core/tx/columnshard/engines/ya.make +++ b/ydb/core/tx/columnshard/engines/ya.make @@ -29,7 +29,6 @@ PEERDIR( ydb/core/tx/columnshard/engines/insert_table ydb/core/tx/columnshard/engines/changes ydb/core/tx/columnshard/engines/portions - ydb/core/formats/arrow/compression ydb/core/tx/program # for NYql::NUdf alloc stuff used in binary_json diff --git a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp index 3c5b881b4bb7..72aa5539f6b2 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp +++ b/ydb/core/tx/columnshard/splitter/ut/ut_splitter.cpp @@ -1,10 +1,9 @@ #include #include #include -#include #include #include -#include +#include #include Y_UNIT_TEST_SUITE(Splitter) { @@ -23,7 +22,7 @@ Y_UNIT_TEST_SUITE(Splitter) { } virtual NKikimr::NOlap::TColumnSaver GetColumnSaver(const ui32 columnId) const override { - return NKikimr::NOlap::TColumnSaver(nullptr, std::make_shared(arrow::ipc::IpcWriteOptions::Defaults())); + return NKikimr::NOlap::TColumnSaver(nullptr, std::make_shared(arrow::ipc::IpcOptions::Defaults())); } virtual std::optional GetColumnSerializationStats(const ui32 /*columnId*/) const override { @@ -36,7 +35,7 @@ Y_UNIT_TEST_SUITE(Splitter) { NKikimr::NOlap::TColumnLoader GetColumnLoader(const ui32 columnId) const { arrow::FieldVector v = {std::make_shared(GetColumnName(columnId), std::make_shared())}; auto schema = std::make_shared(v); - return NKikimr::NOlap::TColumnLoader(nullptr, std::make_shared(), schema, columnId); + return NKikimr::NOlap::TColumnLoader(nullptr, NSerialization::TSerializerContainer::GetDefaultSerializer(), schema, columnId); } virtual std::shared_ptr GetField(const ui32 columnId) const override { diff --git a/ydb/core/tx/columnshard/splitter/ut/ya.make b/ydb/core/tx/columnshard/splitter/ut/ya.make index e242c0b6b57b..4d9e9af04665 100644 --- a/ydb/core/tx/columnshard/splitter/ut/ya.make +++ b/ydb/core/tx/columnshard/splitter/ut/ya.make @@ -7,7 +7,6 @@ PEERDIR( ydb/library/arrow_kernels ydb/core/tx/columnshard/counters - ydb/core/formats/arrow/compression ydb/core/tx/columnshard/engines/portions ydb/core/kqp/common ydb/library/yql/parser/pg_wrapper diff --git a/ydb/core/tx/schemeshard/olap/columns/update.cpp b/ydb/core/tx/schemeshard/olap/columns/update.cpp index a6d602b5fa4b..a0427414ca75 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.cpp +++ b/ydb/core/tx/schemeshard/olap/columns/update.cpp @@ -2,6 +2,7 @@ #include #include #include +#include namespace NKikimr::NSchemeShard { @@ -13,13 +14,13 @@ namespace NKikimr::NSchemeShard { Name = columnSchema.GetName(); NotNullFlag = columnSchema.GetNotNull(); TypeName = columnSchema.GetType(); - if (columnSchema.HasCompression()) { - auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression()); - if (!compression) { - errors.AddError("Cannot parse compression info: " + compression.GetErrorMessage()); + if (columnSchema.HasSerializer()) { + NArrow::NSerialization::TSerializerContainer serializer; + if (!serializer.DeserializeFromProto(columnSchema.GetSerializer())) { + errors.AddError("Cannot parse serializer info"); return false; } - Compression = *compression; + Serializer = serializer; } if (columnSchema.HasDictionaryEncoding()) { auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); @@ -72,10 +73,14 @@ namespace NKikimr::NSchemeShard { columnSchema.GetTypeId(), nullptr) .TypeInfo; } - if (columnSchema.HasCompression()) { - auto compression = NArrow::TCompression::BuildFromProto(columnSchema.GetCompression()); - Y_ABORT_UNLESS(compression.IsSuccess(), "%s", compression.GetErrorMessage().data()); - Compression = *compression; + if (columnSchema.HasSerializer()) { + NArrow::NSerialization::TSerializerContainer serializer; + AFL_VERIFY(serializer.DeserializeFromProto(columnSchema.GetSerializer())); + Serializer = serializer; + } else if (columnSchema.HasCompression()) { + NArrow::NSerialization::TSerializerContainer serializer; + AFL_VERIFY(serializer.DeserializeFromProto(columnSchema.GetCompression())); + Serializer = serializer; } if (columnSchema.HasDictionaryEncoding()) { auto settings = NArrow::NDictionary::TEncodingSettings::BuildFromProto(columnSchema.GetDictionaryEncoding()); @@ -89,8 +94,8 @@ namespace NKikimr::NSchemeShard { columnSchema.SetName(Name); columnSchema.SetType(TypeName); columnSchema.SetNotNull(NotNullFlag); - if (Compression) { - *columnSchema.MutableCompression() = Compression->SerializeToProto(); + if (Serializer) { + Serializer->SerializeToProto(*columnSchema.MutableSerializer()); } if (DictionaryEncoding) { *columnSchema.MutableDictionaryEncoding() = DictionaryEncoding->SerializeToProto(); @@ -105,12 +110,8 @@ namespace NKikimr::NSchemeShard { bool TOlapColumnAdd::ApplyDiff(const TOlapColumnDiff& diffColumn, IErrorCollector& errors) { Y_ABORT_UNLESS(GetName() == diffColumn.GetName()); - { - auto result = diffColumn.GetCompression().Apply(Compression); - if (!result) { - errors.AddError("Cannot merge compression info: " + result.GetErrorMessage()); - return false; - } + if (diffColumn.GetSerializer()) { + Serializer = diffColumn.GetSerializer(); } { auto result = diffColumn.GetDictionaryEncoding().Apply(DictionaryEncoding); diff --git a/ydb/core/tx/schemeshard/olap/columns/update.h b/ydb/core/tx/schemeshard/olap/columns/update.h index 45fce0c99094..26eb18a971af 100644 --- a/ydb/core/tx/schemeshard/olap/columns/update.h +++ b/ydb/core/tx/schemeshard/olap/columns/update.h @@ -1,19 +1,18 @@ #pragma once -#include #include #include #include #include #include -#include #include +#include namespace NKikimr::NSchemeShard { class TOlapColumnDiff { private: YDB_READONLY_DEF(TString, Name); - YDB_READONLY_DEF(NArrow::TCompressionDiff, Compression); + YDB_READONLY_DEF(NArrow::NSerialization::TSerializerContainer, Serializer); YDB_READONLY_DEF(NArrow::NDictionary::TEncodingDiff, DictionaryEncoding); public: bool ParseFromRequest(const NKikimrSchemeOp::TOlapColumnDiff& columnSchema, IErrorCollector& errors) { @@ -22,9 +21,11 @@ class TOlapColumnDiff { errors.AddError("empty field name"); return false; } - if (!Compression.DeserializeFromProto(columnSchema.GetCompression())) { - errors.AddError("cannot parse compression diff from proto"); - return false; + if (columnSchema.HasSerializer()) { + if (!Serializer.DeserializeFromProto(columnSchema.GetSerializer())) { + errors.AddError("cannot parse serializer diff from proto"); + return false; + } } if (!DictionaryEncoding.DeserializeFromProto(columnSchema.GetDictionaryEncoding())) { errors.AddError("cannot parse dictionary encoding diff from proto"); @@ -41,7 +42,7 @@ class TOlapColumnAdd { YDB_READONLY_DEF(TString, TypeName); YDB_READONLY_DEF(NScheme::TTypeInfo, Type); YDB_FLAG_ACCESSOR(NotNull, false); - YDB_READONLY_DEF(std::optional, Compression); + YDB_READONLY_DEF(std::optional, Serializer); YDB_READONLY_DEF(std::optional, DictionaryEncoding); public: TOlapColumnAdd(const std::optional& keyOrder) diff --git a/ydb/core/tx/schemeshard/olap/columns/ya.make b/ydb/core/tx/schemeshard/olap/columns/ya.make index 44971d7eeb22..94c1c4c0febe 100644 --- a/ydb/core/tx/schemeshard/olap/columns/ya.make +++ b/ydb/core/tx/schemeshard/olap/columns/ya.make @@ -8,7 +8,8 @@ SRCS( PEERDIR( ydb/core/protos ydb/core/formats/arrow/dictionary - ydb/core/formats/arrow/compression + ydb/core/formats/arrow/serializer + ydb/core/tx/schemeshard/olap/common ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/tx/schemeshard/olap/common/ya.make b/ydb/core/tx/schemeshard/olap/common/ya.make index 58bbf1cb442a..aea04768253b 100644 --- a/ydb/core/tx/schemeshard/olap/common/ya.make +++ b/ydb/core/tx/schemeshard/olap/common/ya.make @@ -7,6 +7,7 @@ SRCS( PEERDIR( ydb/library/ydb_issue ydb/core/base + ydb/library/aclib ) END() diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index f049575c2c9c..8cbf4b637d3c 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -244,7 +244,6 @@ PEERDIR( ydb/core/engine/minikql ydb/core/external_sources ydb/core/filestore/core - ydb/core/formats/arrow/compression ydb/core/kesus/tablet ydb/core/metering ydb/core/persqueue diff --git a/ydb/core/tx/tiering/manager.cpp b/ydb/core/tx/tiering/manager.cpp index 583d06149fa9..aca16e6c4db3 100644 --- a/ydb/core/tx/tiering/manager.cpp +++ b/ydb/core/tx/tiering/manager.cpp @@ -106,10 +106,16 @@ TManager::TManager(const ui64 tabletId, const NActors::TActorId& tabletActorId, { } -NArrow::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression) { - auto out = NArrow::TCompression::BuildFromProto(compression); - Y_ABORT_UNLESS(out, "%s", out.GetErrorMessage().data()); - return *out; +NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compressionProto) { + NArrow::NSerialization::TSerializerContainer container; + AFL_VERIFY(container.DeserializeFromProto(compressionProto)); + return container; +} + +NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TOlapColumn::TSerializer& serializerProto) { + NArrow::NSerialization::TSerializerContainer container; + AFL_VERIFY(container.DeserializeFromProto(serializerProto)); + return container; } } @@ -208,7 +214,7 @@ THashMap TTiersManager::GetTiering() const { for (auto& [name, tier] : pathTiering.GetTierByName()) { auto it = tierConfigs.find(name); if (it != tierConfigs.end()) { - tier->SetCompression(NTiers::ConvertCompression(it->second.GetCompression())); + tier->SetSerializer(NTiers::ConvertCompression(it->second.GetCompression())); } } } diff --git a/ydb/core/tx/tiering/manager.h b/ydb/core/tx/tiering/manager.h index 6bb6c6194391..727d2b372264 100644 --- a/ydb/core/tx/tiering/manager.h +++ b/ydb/core/tx/tiering/manager.h @@ -15,7 +15,8 @@ namespace NKikimr::NColumnShard { namespace NTiers { -NArrow::TCompression ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compression); +NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TOlapColumn::TSerializer& serializerProto); +NArrow::NSerialization::TSerializerContainer ConvertCompression(const NKikimrSchemeOp::TCompressionOptions& compressionProto); class TManager { private: diff --git a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h index 6f09b9e5ffab..c5b4d78e2be4 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h +++ b/ydb/library/yql/providers/generic/connector/libcpp/ut_helpers/connector_client_mock.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include #include @@ -677,10 +677,10 @@ namespace NYql::NConnector::NTest { TBuilder& AddResponse( const std::shared_ptr& recordBatch, const NApi::TError& error) { - NKikimr::NArrow::NSerialization::TFullDataSerializer ser(arrow::ipc::IpcWriteOptions::Defaults()); + NKikimr::NArrow::NSerialization::TSerializerContainer ser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); auto& response = this->Result_->Responses().emplace_back(); response.mutable_error()->CopyFrom(error); - response.set_arrow_ipc_streaming(ser.Serialize(recordBatch)); + response.set_arrow_ipc_streaming(ser->SerializeFull(recordBatch)); return static_cast(*this); } diff --git a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp index 2f8b0a1c9c1a..cde710e529e7 100644 --- a/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp +++ b/ydb/library/yql/providers/generic/connector/libcpp/utils.cpp @@ -4,8 +4,7 @@ #include #include #include -#include -#include +#include #include #include @@ -76,8 +75,8 @@ namespace NYql::NConnector { } std::shared_ptr ArrowIPCStreamingToArrowRecordBatch(const TProtoStringType dump) { - NKikimr::NArrow::NSerialization::TFullDataDeserializer deser; - auto result = deser.Deserialize(dump); + NKikimr::NArrow::NSerialization::TSerializerContainer deser = NKikimr::NArrow::NSerialization::TSerializerContainer::GetDefaultSerializer(); + auto result = deser->Deserialize(dump); if (!result.ok()) { ythrow yexception() << result.status().ToString(); } diff --git a/ydb/services/bg_tasks/abstract/interface.h b/ydb/services/bg_tasks/abstract/interface.h index f6cd594e49ff..273224c968cf 100644 --- a/ydb/services/bg_tasks/abstract/interface.h +++ b/ydb/services/bg_tasks/abstract/interface.h @@ -111,6 +111,12 @@ class TCommonInterfaceContainer { : Object(object) { } + template + TCommonInterfaceContainer(std::shared_ptr object) + : Object(object) { + static_assert(std::is_base_of::value); + } + bool Initialize(const TString& className) { AFL_VERIFY(!Object)("problem", "initialize for not-empty-object"); Object.reset(TFactory::Construct(className)); @@ -169,6 +175,10 @@ class TCommonInterfaceContainer { return !Object; } + operator bool() const { + return !!Object; + } + }; class TStringContainerProcessor { diff --git a/ydb/services/metadata/abstract/request_features.h b/ydb/services/metadata/abstract/request_features.h index c985276ceb72..01d525aded8b 100644 --- a/ydb/services/metadata/abstract/request_features.h +++ b/ydb/services/metadata/abstract/request_features.h @@ -40,5 +40,6 @@ class TFeaturesExtractor: TNonCopyable { } std::optional Extract(const TString& paramName); + }; } diff --git a/ydb/services/metadata/abstract/ya.make b/ydb/services/metadata/abstract/ya.make index 1058cebaa794..7ba40379add6 100644 --- a/ydb/services/metadata/abstract/ya.make +++ b/ydb/services/metadata/abstract/ya.make @@ -19,7 +19,6 @@ PEERDIR( ydb/library/actors/core ydb/library/yql/core/expr_nodes ydb/public/api/protos - ydb/services/metadata/request ) END()