diff --git a/ydb/core/engine/mkql_keys.cpp b/ydb/core/engine/mkql_keys.cpp index 93d70dcfcbb3..d282ccf5f40f 100644 --- a/ydb/core/engine/mkql_keys.cpp +++ b/ydb/core/engine/mkql_keys.cpp @@ -51,14 +51,6 @@ NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType *type, bool &isOption } } - -template -TCell MakeCell(const NUdf::TUnboxedValuePod& value) { - static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell."); - const auto v = value.Get(); - return TCell(reinterpret_cast(&v), sizeof(v)); -} - THolder ExtractKeyTuple(const TTableId& tableId, TTupleLiteral* tuple, const TVector& columns, TKeyDesc::ERowOperation rowOperation, bool requireStaticKey, const TTypeEnvironment& env) { diff --git a/ydb/core/engine/mkql_keys.h b/ydb/core/engine/mkql_keys.h index 517120748a77..b51e789d0182 100644 --- a/ydb/core/engine/mkql_keys.h +++ b/ydb/core/engine/mkql_keys.h @@ -45,6 +45,13 @@ THolder ExtractTableKey(TCallable& callable, const TTableStrings& stri TVector> ExtractTableKeys(TExploringNodeVisitor& explorer, const TTypeEnvironment& env); TTableId ExtractTableId(const TRuntimeNode& node); +template +TCell MakeCell(const NUdf::TUnboxedValuePod& value) { + static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell."); + const auto v = value.Get(); + return TCell(reinterpret_cast(&v), sizeof(v)); +} + TCell MakeCell(NScheme::TTypeInfo type, const NUdf::TUnboxedValuePod& value, const TTypeEnvironment& env, bool copy = true, i32 typmod = -1, TMaybe* error = {}); diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 8f63dcf2c030..dbc4059ef033 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -24,8 +24,8 @@ namespace { constexpr i64 kInFlightMemoryLimitPerActor = 64_MB; - constexpr i64 kMemoryLimitPerMessage = 48_MB; - constexpr i64 kMaxBatchesPerMessage = 1; + constexpr i64 kMemoryLimitPerMessage = 64_MB; + constexpr i64 kMaxBatchesPerMessage = 8; struct TWriteActorBackoffSettings { TDuration StartRetryDelay = TDuration::MilliSeconds(250); @@ -81,12 +81,12 @@ namespace { namespace NKikimr { namespace NKqp { -class TKqpWriteActor : public TActorBootstrapped, public NYql::NDq::IDqComputeActorAsyncOutput { - using TBase = TActorBootstrapped; +class TKqpDirectWriteActor : public TActorBootstrapped, public NYql::NDq::IDqComputeActorAsyncOutput { + using TBase = TActorBootstrapped; class TResumeNotificationManager { public: - TResumeNotificationManager(TKqpWriteActor& writer) + TResumeNotificationManager(TKqpDirectWriteActor& writer) : Writer(writer) { CheckMemory(); } @@ -102,7 +102,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N } private: - TKqpWriteActor& Writer; + TKqpDirectWriteActor& Writer; i64 LastFreeMemory = std::numeric_limits::max(); }; @@ -127,7 +127,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N }; public: - TKqpWriteActor( + TKqpDirectWriteActor( NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args, TIntrusivePtr counters) @@ -137,6 +137,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N , Callbacks(args.Callback) , Counters(counters) , TypeEnv(args.TypeEnv) + , Alloc(args.Alloc) , TxId(args.TxId) , TableId( Settings.GetTable().GetOwnerId(), @@ -157,13 +158,13 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N void Bootstrap() { LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix; ResolveTable(); - Become(&TKqpWriteActor::StateFunc); + Become(&TKqpDirectWriteActor::StateFunc); } static constexpr char ActorName[] = "KQP_WRITE_ACTOR"; private: - virtual ~TKqpWriteActor() { + virtual ~TKqpDirectWriteActor() { } void CommitState(const NYql::NDqProto::TCheckpoint&) final {}; @@ -491,7 +492,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N << ", Cookie=" << ev->Cookie << ", LocksCount=" << ev->Get()->Record.GetTxLocks().size()); - PopShardBatch(ev->Get()->Record.GetOrigin(), ev->Cookie); + OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie); for (const auto& lock : ev->Get()->Record.GetTxLocks()) { LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock); @@ -500,7 +501,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N ProcessBatches(); } - void PopShardBatch(ui64 shardId, ui64 cookie) { + void OnMessageAcknowledged(ui64 shardId, ui64 cookie) { TResumeNotificationManager resumeNotificator(*this); const auto removedDataSize = ShardedWriteController->OnMessageAcknowledged(shardId, cookie); if (removedDataSize) { @@ -669,7 +670,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N void PassAway() override { Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); - TActorBootstrapped::PassAway(); + TActorBootstrapped::PassAway(); } void Prepare() { @@ -693,7 +694,8 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N : kMaxBatchesPerMessage), }, std::move(columnsMetadata), - TypeEnv); + TypeEnv, + Alloc); } catch (...) { RuntimeError( CurrentExceptionMessage(), @@ -721,7 +723,6 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N Callbacks->ResumeExecution(); } - NActors::TActorId TxProxyId = MakeTxProxyID(); NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false); TString LogPrefix; @@ -731,6 +732,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr; TIntrusivePtr Counters; const NMiniKQL::TTypeEnvironment& TypeEnv; + std::shared_ptr Alloc; const NYql::NDq::TTxId TxId; const TTableId TableId; @@ -754,7 +756,7 @@ void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr< factory.RegisterSink( TString(NYql::KqpTableSinkName), [counters] (NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args) { - auto* actor = new TKqpWriteActor(std::move(settings), std::move(args), counters); + auto* actor = new TKqpDirectWriteActor(std::move(settings), std::move(args), counters); return std::make_pair(actor, actor); }); } diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 0fc9add54970..21dc4b1f2734 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -1,7 +1,5 @@ #include "kqp_write_table.h" -#include -#include #include #include #include @@ -11,6 +9,7 @@ #include #include #include +#include #include namespace NKikimr { @@ -18,8 +17,44 @@ namespace NKqp { namespace { -constexpr ui64 MaxBatchBytes = 8_MB; -constexpr ui64 MaxUnshardedBatchBytes = 4_MB; +constexpr ui64 DataShardMaxOperationBytes = 8_MB; +constexpr ui64 ColumnShardMaxOperationBytes = 8_MB; +constexpr ui64 MaxUnshardedBatchBytes = 0_MB; + +class IPayloadSerializer : public TThrRefBase { +public: + class IBatch : public TThrRefBase { + public: + virtual TString SerializeToString() const = 0; + virtual i64 GetMemory() const = 0; + bool IsEmpty() const; + }; + + using IBatchPtr = TIntrusivePtr; + + virtual void AddData(NMiniKQL::TUnboxedValueBatch&& data) = 0; + virtual void AddBatch(const IBatchPtr& batch) = 0; + + virtual void Close() = 0; + + virtual bool IsClosed() = 0; + virtual bool IsEmpty() = 0; + virtual bool IsFinished() = 0; + + virtual NKikimrDataEvents::EDataFormat GetDataFormat() = 0; + virtual std::vector GetWriteColumnIds() = 0; + + using TBatches = THashMap>; + + virtual TBatches FlushBatchesForce() = 0; + + virtual IBatchPtr FlushBatch(ui64 shardId) = 0; + virtual const THashSet& GetShardIds() const = 0; + + virtual i64 GetMemory() = 0; +}; + +using IPayloadSerializerPtr = TIntrusivePtr; TVector BuildColumns(const TConstArrayRef inputColumns) { TVector result; @@ -169,6 +204,136 @@ TVector BuildKeyColumnTypes( return keyColumnTypes; } +struct TRowWithData { + TVector Cells; + NUdf::TStringValue Data; +}; + +class TRowBuilder { +private: + struct TCellInfo { + NScheme::TTypeInfo Type; + NUdf::TUnboxedValuePod Value; + TString PgBinaryValue; + }; + +public: + explicit TRowBuilder(size_t size) + : CellsInfo(size) { + } + + TRowBuilder& AddCell( + const size_t index, + const NScheme::TTypeInfo type, + const NUdf::TUnboxedValuePod& value, + const i32 typmod = -1) { + CellsInfo[index].Type = type; + CellsInfo[index].Value = value; + + if (type.GetTypeId() == NScheme::NTypeIds::Pg) { + const auto typeDesc = type.GetTypeDesc(); + if (typmod != -1 && NPg::TypeDescNeedsCoercion(typeDesc)) { + TMaybe err; + CellsInfo[index].PgBinaryValue = NYql::NCommon::PgValueCoerce(value, NPg::PgTypeIdFromTypeDesc(typeDesc), typmod, &err); + if (err) { + ythrow yexception() << "PgValueCoerce error: " << *err; + } + } else { + CellsInfo[index].PgBinaryValue = NYql::NCommon::PgValueToNativeBinary(value, NPg::PgTypeIdFromTypeDesc(typeDesc)); + } + } else { + CellsInfo[index].PgBinaryValue.clear(); + } + return *this; + } + + size_t DataSize() const { + size_t result = 0; + for (const auto& cellInfo : CellsInfo) { + result += GetCellSize(cellInfo); + } + return result; + } + + TRowWithData Build() { + TVector cells; + cells.reserve(CellsInfo.size()); + const auto size = DataSize(); + auto data = Allocate(size); + char* ptr = data.Data(); + + for (const auto& cellInfo : CellsInfo) { + cells.push_back(BuildCell(cellInfo, ptr)); + } + + AFL_ENSURE(ptr == data.Data() + size); + + return TRowWithData { + .Cells = std::move(cells), + .Data = std::move(data), + }; + } + +private: + TCell BuildCell(const TCellInfo& cellInfo, char*& dataPtr) { + if (!cellInfo.Value) { + return TCell(); + } + + switch(cellInfo.Type.GetTypeId()) { + #define MAKE_PRIMITIVE_TYPE_CELL_CASE(type, layout) \ + case NUdf::TDataType::Id: return NMiniKQL::MakeCell(cellInfo.Value); + KNOWN_FIXED_VALUE_TYPES(MAKE_PRIMITIVE_TYPE_CELL_CASE) + case NUdf::TDataType::Id: + { + auto intValue = cellInfo.Value.GetInt128(); + constexpr auto valueSize = sizeof(intValue); + + char* initialPtr = dataPtr; + std::memcpy(initialPtr, reinterpret_cast(&intValue), valueSize); + dataPtr += valueSize; + return TCell(initialPtr, valueSize); + } + } + + const auto ref = cellInfo.Type.GetTypeId() == NScheme::NTypeIds::Pg + ? NYql::NUdf::TStringRef(cellInfo.PgBinaryValue) + : cellInfo.Value.AsStringRef(); + + char* initialPtr = dataPtr; + std::memcpy(initialPtr, ref.Data(), ref.Size()); + dataPtr += ref.Size(); + return TCell(initialPtr, ref.Size()); + } + + size_t GetCellSize(const TCellInfo& cellInfo) const { + if (!cellInfo.Value) { + return 0; + } + + switch(cellInfo.Type.GetTypeId()) { + #define MAKE_PRIMITIVE_TYPE_CELL_CASE_SIZE(type, layout) \ + case NUdf::TDataType::Id: + KNOWN_FIXED_VALUE_TYPES(MAKE_PRIMITIVE_TYPE_CELL_CASE_SIZE) + return 0; + case NUdf::TDataType::Id: + return sizeof(cellInfo.Value.GetInt128()); + } + + if (cellInfo.Type.GetTypeId() == NScheme::NTypeIds::Pg) { + return cellInfo.PgBinaryValue.size(); + } + return cellInfo.Value.AsStringRef().Size(); + } + + NUdf::TStringValue Allocate(size_t size) { + Y_DEBUG_ABORT_UNLESS(NMiniKQL::TlsAllocState); + return NUdf::TStringValue(size); + } + + TVector CellsInfo; +}; + class TColumnShardPayloadSerializer : public IPayloadSerializer { using TRecordBatchPtr = std::shared_ptr; @@ -206,10 +371,8 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { public: TColumnShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - const TConstArrayRef inputColumns, // key columns then value columns - const NMiniKQL::TTypeEnvironment& typeEnv) - : TypeEnv(typeEnv) - , Columns(BuildColumns(inputColumns)) + const TConstArrayRef inputColumns) // key columns then value columns + : Columns(BuildColumns(inputColumns)) , WriteIndex(BuildWriteIndex(schemeEntry, inputColumns)) , WriteColumnIds(BuildWriteColumnIds(inputColumns, WriteIndex)) , BatchBuilder(arrow::Compression::UNCOMPRESSED, BuildNotNullColumns(inputColumns)) { @@ -241,16 +404,13 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { return; } - TVector cells(Columns.size()); + TRowBuilder rowBuilder(Columns.size()); data.ForEachRow([&](const auto& row) { for (size_t index = 0; index < Columns.size(); ++index) { - cells[WriteIndex[index]] = MakeCell( - Columns[index].PType, - row.GetElement(index), - TypeEnv, - /* copy */ false); + rowBuilder.AddCell(WriteIndex[index], Columns[index].PType, row.GetElement(index)); } - BatchBuilder.AddRow(TConstArrayRef{cells.begin(), cells.end()}); + auto rowWithData = rowBuilder.Build(); + BatchBuilder.AddRow(TConstArrayRef{rowWithData.Cells.begin(), rowWithData.Cells.end()}); }); FlushUnsharded(false); @@ -265,7 +425,7 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { } void FlushUnsharded(bool force) { - if ((BatchBuilder.Bytes() > 0 && force) || BatchBuilder.Bytes() >= MaxUnshardedBatchBytes) { + if ((BatchBuilder.Bytes() > 0 && force) || BatchBuilder.Bytes() > MaxUnshardedBatchBytes) { const auto unshardedBatch = BatchBuilder.FlushBatch(true); YQL_ENSURE(unshardedBatch); ShardAndFlushBatch(unshardedBatch, force); @@ -289,7 +449,7 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { } void FlushUnpreparedBatch(const ui64 shardId, TUnpreparedBatch& unpreparedBatch, bool force) { - while (!unpreparedBatch.Batches.empty() && (unpreparedBatch.TotalDataSize >= MaxBatchBytes || force)) { + while (!unpreparedBatch.Batches.empty() && (unpreparedBatch.TotalDataSize >= ColumnShardMaxOperationBytes || force)) { std::vector toPrepare; i64 toPrepareSize = 0; while (!unpreparedBatch.Batches.empty()) { @@ -309,7 +469,7 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { for (i64 index = 0; index < batch->num_rows(); ++index) { i64 nextRowSize = rowCalculator.GetRowBytesSize(index); - if (toPrepareSize + nextRowSize >= (i64)MaxBatchBytes) { + if (toPrepareSize + nextRowSize >= (i64)ColumnShardMaxOperationBytes) { YQL_ENSURE(index > 0); toPrepare.push_back(batch->Slice(0, index)); @@ -413,7 +573,6 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { } private: - const NMiniKQL::TTypeEnvironment& TypeEnv; std::shared_ptr Sharding; const TVector Columns; @@ -434,7 +593,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { class TBatch : public IPayloadSerializer::IBatch { public: TString SerializeToString() const override { - return TSerializedCellMatrix::Serialize(Data, Rows, Columns); + return TSerializedCellMatrix::Serialize(Cells, Rows, Columns); } i64 GetMemory() const override { @@ -442,37 +601,94 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { } bool IsEmpty() const { - return Data.empty(); + return Cells.empty(); } - std::vector Extract() { + std::pair, std::vector> Extract() { Size = 0; Rows = 0; - return std::move(Data); + return {std::move(Cells), std::move(Data)}; } - TBatch(std::vector&& data, i64 size, ui32 rows, ui16 columns) - : Data(std::move(data)) + TBatch(std::vector&& cells, std::vector&& data, i64 size, ui32 rows, ui16 columns) + : Cells(std::move(cells)) + , Data(std::move(data)) , Size(size) , Rows(rows) , Columns(columns) { } private: - std::vector Data; + std::vector Cells; + std::vector Data; ui64 Size = 0; ui32 Rows = 0; ui16 Columns = 0; }; + class TRowsBatcher { + public: + explicit TRowsBatcher(ui16 columnCount, ui64 maxBytesPerBatch) + : ColumnCount(columnCount) + , MaxBytesPerBatch(maxBytesPerBatch) { + } + + bool IsEmpty() const { + return Batches.empty(); + } + + struct TBatch { + ui64 Memory = 0; + ui64 MemorySerialized = 0; + TVector Cells; + TVector Data; + }; + + TBatch Flush(bool force) { + TBatch res; + if ((!Batches.empty() && force) || Batches.size() > 1) { + res = std::move(Batches.front()); + Batches.pop_front(); + } + return res; + } + + ui64 AddRow(TRowWithData&& rowWithData) { + Y_ABORT_UNLESS(rowWithData.Cells.size() == ColumnCount); + ui64 newMemory = 0; + for (const auto& cell : rowWithData.Cells) { + newMemory += cell.Size(); + } + if (Batches.empty() || newMemory + GetCellHeaderSize() * ColumnCount + Batches.back().MemorySerialized > MaxBytesPerBatch) { + Batches.emplace_back(); + Batches.back().Memory = 0; + Batches.back().MemorySerialized = GetCellMatrixHeaderSize(); + } + + for (auto& cell : rowWithData.Cells) { + Batches.back().Cells.emplace_back(std::move(cell)); + } + Batches.back().Data.emplace_back(std::move(rowWithData.Data)); + + Batches.back().Memory += newMemory; + Batches.back().MemorySerialized += newMemory + GetCellHeaderSize() * ColumnCount; + + return newMemory; + } + + private: + std::deque Batches; + + ui16 ColumnCount; + ui64 MaxBytesPerBatch; + }; + public: TDataShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, NSchemeCache::TSchemeCacheRequest::TEntry&& partitionsEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv) - : TypeEnv(typeEnv) - , SchemeEntry(schemeEntry) + const TConstArrayRef inputColumns) + : SchemeEntry(schemeEntry) , KeyDescription(std::move(partitionsEntry.KeyDescription)) , Columns(BuildColumns(inputColumns)) , WriteIndex(BuildWriteIndexKeyFirst(SchemeEntry, inputColumns)) @@ -480,11 +696,11 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { , KeyColumnTypes(BuildKeyColumnTypes(SchemeEntry)) { } - void AddRow(TArrayRef row, const TKeyDesc& keyRange) { + void AddRow(TRowWithData&& row, const TKeyDesc& keyRange) { auto shardIter = std::lower_bound( std::begin(keyRange.GetPartitions()), std::end(keyRange.GetPartitions()), - TArrayRef(row.data(), KeyColumnTypes.size()), + TArrayRef(row.Cells.data(), KeyColumnTypes.size()), [this](const auto &partition, const auto& key) { const auto& range = *partition.Range; return 0 > CompareBorders(range.EndKeyPrefix.GetCells(), key, @@ -497,42 +713,39 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { if (batcherIter == std::end(Batchers)) { Batchers.emplace( shardIter->ShardId, - TCellsBatcher(Columns.size(), MaxBatchBytes)); + TRowsBatcher(Columns.size(), DataShardMaxOperationBytes)); } - Memory += Batchers.at(shardIter->ShardId).AddRow(row); + Memory += Batchers.at(shardIter->ShardId).AddRow(std::move(row)); ShardIds.insert(shardIter->ShardId); } void AddData(NMiniKQL::TUnboxedValueBatch&& data) override { YQL_ENSURE(!Closed); - TVector cells(Columns.size()); + TRowBuilder rowBuilder(Columns.size()); data.ForEachRow([&](const auto& row) { for (size_t index = 0; index < Columns.size(); ++index) { - // TODO: move to SerializedVector - cells[WriteIndex[index]] = MakeCell( - Columns[index].PType, - row.GetElement(index), - TypeEnv, - /* copy */ true); + rowBuilder.AddCell(WriteIndex[index], Columns[index].PType, row.GetElement(index)); } - AddRow(cells, GetKeyRange()); - - cells.resize(Columns.size()); + auto rowWithData = rowBuilder.Build(); + AddRow(std::move(rowWithData), GetKeyRange()); }); } void AddBatch(const IPayloadSerializer::IBatchPtr& batch) override { auto datashardBatch = dynamic_cast(batch.Get()); YQL_ENSURE(datashardBatch); - auto data = datashardBatch->Extract(); - const auto rows = data.size() / Columns.size(); - YQL_ENSURE(data.size() == rows * Columns.size()); + auto [cells, data] = datashardBatch->Extract(); + const auto rows = cells.size() / Columns.size(); + YQL_ENSURE(cells.size() == rows * Columns.size()); for (size_t rowIndex = 0; rowIndex < rows; ++rowIndex) { AddRow( - TArrayRef{&data[rowIndex * Columns.size()], Columns.size()}, + TRowWithData{ + TVector(cells.begin() + (rowIndex * Columns.size()), cells.begin() + (rowIndex * Columns.size()) + Columns.size()), + data[rowIndex], + }, GetKeyRange()); } } @@ -566,12 +779,13 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { return IsClosed() && IsEmpty(); } - IBatchPtr ExtractNextBatch(TCellsBatcher& batcher, bool force) { + IBatchPtr ExtractNextBatch(TRowsBatcher& batcher, bool force) { auto batchResult = batcher.Flush(force); Memory -= batchResult.Memory; - const ui32 rows = batchResult.Data.size() / Columns.size(); + const ui32 rows = batchResult.Cells.size() / Columns.size(); YQL_ENSURE(Columns.size() <= std::numeric_limits::max()); return MakeIntrusive( + std::move(batchResult.Cells), std::move(batchResult.Data), static_cast(batchResult.MemorySerialized), rows, @@ -610,7 +824,6 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { return *KeyDescription; } - const NMiniKQL::TTypeEnvironment& TypeEnv; const NSchemeCache::TSchemeCacheNavigate::TEntry SchemeEntry; THolder KeyDescription; @@ -619,7 +832,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { const std::vector WriteColumnIds; const TVector KeyColumnTypes; - THashMap Batchers; + THashMap Batchers; THashSet ShardIds; i64 Memory = 0; @@ -635,19 +848,17 @@ bool IPayloadSerializer::IBatch::IsEmpty() const { IPayloadSerializerPtr CreateColumnShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv) { + const TConstArrayRef inputColumns) { return MakeIntrusive( - schemeEntry, inputColumns, typeEnv); + schemeEntry, inputColumns); } IPayloadSerializerPtr CreateDataShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, NSchemeCache::TSchemeCacheRequest::TEntry&& partitionsEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv) { + const TConstArrayRef inputColumns) { return MakeIntrusive( - schemeEntry, std::move(partitionsEntry), inputColumns, typeEnv); + schemeEntry, std::move(partitionsEntry), inputColumns); } namespace { @@ -826,8 +1037,7 @@ class TShardedWriteController : public IShardedWriteController { BeforePartitioningChanged(); Serializer = CreateColumnShardPayloadSerializer( schemeEntry, - InputColumnsMetadata, - TypeEnv); + InputColumnsMetadata); AfterPartitioningChanged(); } @@ -838,8 +1048,7 @@ class TShardedWriteController : public IShardedWriteController { Serializer = CreateDataShardPayloadSerializer( schemeEntry, std::move(partitionsEntry), - InputColumnsMetadata, - TypeEnv); + InputColumnsMetadata); AfterPartitioningChanged(); } @@ -867,6 +1076,7 @@ class TShardedWriteController : public IShardedWriteController { YQL_ENSURE(!data.IsWide(), "Wide stream is not supported yet"); YQL_ENSURE(!Closed); + auto allocGuard = TypeEnv.BindAllocator(); YQL_ENSURE(Serializer); Serializer->AddData(std::move(data)); @@ -874,6 +1084,7 @@ class TShardedWriteController : public IShardedWriteController { } void Close() override { + auto allocGuard = TypeEnv.BindAllocator(); YQL_ENSURE(Serializer); Closed = true; Serializer->Close(); @@ -931,6 +1142,7 @@ class TShardedWriteController : public IShardedWriteController { } std::optional OnMessageAcknowledged(ui64 shardId, ui64 cookie) override { + auto allocGuard = TypeEnv.BindAllocator(); auto& shardInfo = ShardsInfo.GetShard(shardId); const auto removedDataSize = shardInfo.PopBatches(cookie); return removedDataSize; @@ -972,10 +1184,19 @@ class TShardedWriteController : public IShardedWriteController { TShardedWriteController( const TShardedWriteControllerSettings settings, TVector&& inputColumnsMetadata, - const NMiniKQL::TTypeEnvironment& typeEnv) + const NMiniKQL::TTypeEnvironment& typeEnv, + std::shared_ptr alloc) : Settings(settings) , InputColumnsMetadata(std::move(inputColumnsMetadata)) - , TypeEnv(typeEnv) { + , TypeEnv(typeEnv) + , Alloc(alloc) { + } + + ~TShardedWriteController() { + Y_ABORT_UNLESS(Alloc); + TGuard allocGuard(*Alloc); + ShardsInfo.Clear(); + Serializer = nullptr; } private: @@ -1019,6 +1240,7 @@ class TShardedWriteController : public IShardedWriteController { TShardedWriteControllerSettings Settings; TVector InputColumnsMetadata; const NMiniKQL::TTypeEnvironment& TypeEnv; + std::shared_ptr Alloc; TShardsInfo ShardsInfo; bool Closed = false; @@ -1032,8 +1254,10 @@ class TShardedWriteController : public IShardedWriteController { IShardedWriteControllerPtr CreateShardedWriteController( const TShardedWriteControllerSettings& settings, TVector&& inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv) { - return MakeIntrusive(settings, std::move(inputColumns), typeEnv); + const NMiniKQL::TTypeEnvironment& typeEnv, + std::shared_ptr alloc) { + return MakeIntrusive( + settings, std::move(inputColumns), typeEnv, alloc); } } diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h index 7846cb954cc6..46e5ac4f7308 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.h +++ b/ydb/core/kqp/runtime/kqp_write_table.h @@ -10,54 +10,6 @@ namespace NKikimr { namespace NKqp { -class IPayloadSerializer : public TThrRefBase { -public: - class IBatch : public TThrRefBase { - public: - virtual TString SerializeToString() const = 0; - virtual i64 GetMemory() const = 0; - bool IsEmpty() const; - }; - - using IBatchPtr = TIntrusivePtr; - - virtual void AddData(NMiniKQL::TUnboxedValueBatch&& data) = 0; - virtual void AddBatch(const IBatchPtr& batch) = 0; - - virtual void Close() = 0; - - virtual bool IsClosed() = 0; - virtual bool IsEmpty() = 0; - virtual bool IsFinished() = 0; - - virtual NKikimrDataEvents::EDataFormat GetDataFormat() = 0; - virtual std::vector GetWriteColumnIds() = 0; - - using TBatches = THashMap>; - - virtual TBatches FlushBatchesForce() = 0; - - virtual IBatchPtr FlushBatch(ui64 shardId) = 0; - virtual const THashSet& GetShardIds() const = 0; - - virtual i64 GetMemory() = 0; -}; - -using IPayloadSerializerPtr = TIntrusivePtr; - - -IPayloadSerializerPtr CreateColumnShardPayloadSerializer( - const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv); - -IPayloadSerializerPtr CreateDataShardPayloadSerializer( - const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - NSchemeCache::TSchemeCacheRequest::TEntry&& partitionsEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv); - - class IShardedWriteController : public TThrRefBase { public: virtual void OnPartitioningChanged(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry) = 0; @@ -112,7 +64,8 @@ struct TShardedWriteControllerSettings { IShardedWriteControllerPtr CreateShardedWriteController( const TShardedWriteControllerSettings& settings, TVector&& inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv); + const NMiniKQL::TTypeEnvironment& typeEnv, + std::shared_ptr alloc); } } diff --git a/ydb/core/scheme/scheme_tablecell.cpp b/ydb/core/scheme/scheme_tablecell.cpp index 2ee86265b1a4..c2b541bb7984 100644 --- a/ydb/core/scheme/scheme_tablecell.cpp +++ b/ydb/core/scheme/scheme_tablecell.cpp @@ -307,46 +307,6 @@ bool TSerializedCellMatrix::DoTryParse(const TString& data) { return TryDeserializeCellMatrix(data, Buf, Cells, RowCount, ColCount); } -TCellsBatcher::TCellsBatcher(ui16 colCount, ui64 maxBytesPerBatch) - : ColCount(colCount) - , MaxBytesPerBatch(maxBytesPerBatch) { -} - -bool TCellsBatcher::IsEmpty() const { - return Batches.empty(); -} - -TCellsBatcher::TBatch TCellsBatcher::Flush(bool force) { - TBatch res; - if ((!Batches.empty() && force) || Batches.size() > 1) { - res = std::move(Batches.front()); - Batches.pop_front(); - } - return res; -} - -ui64 TCellsBatcher::AddRow(TArrayRef cells) { - Y_ABORT_UNLESS(cells.size() == ColCount); - ui64 newMemory = 0; - for (const auto& cell : cells) { - newMemory += cell.Size(); - } - if (Batches.empty() || newMemory + sizeof(TCellHeader) * ColCount + Batches.back().MemorySerialized > MaxBytesPerBatch) { - Batches.emplace_back(); - Batches.back().Memory = 0; - Batches.back().MemorySerialized = CellMatrixHeaderSize; - } - - for (auto& cell : cells) { - Batches.back().Data.emplace_back(std::move(cell)); - } - - Batches.back().Memory += newMemory; - Batches.back().MemorySerialized += newMemory + sizeof(TCellHeader) * ColCount; - - return newMemory; -} - void TCellsStorage::Reset(TArrayRef cells) { size_t cellsSize = cells.size(); @@ -499,5 +459,13 @@ TString DbgPrintTuple(const TDbTupleRef& row, const NScheme::TTypeRegistry& type return res; } +size_t GetCellMatrixHeaderSize() { + return CellMatrixHeaderSize; +} + +size_t GetCellHeaderSize() { + return sizeof(TCellHeader); +} + } // namespace NKikimr diff --git a/ydb/core/scheme/scheme_tablecell.h b/ydb/core/scheme/scheme_tablecell.h index b7635a424725..d2abd9f5d548 100644 --- a/ydb/core/scheme/scheme_tablecell.h +++ b/ydb/core/scheme/scheme_tablecell.h @@ -653,29 +653,6 @@ class TSerializedCellMatrix { ui16 ColCount; }; -class TCellsBatcher { -public: - explicit TCellsBatcher(ui16 colCount, ui64 maxBytesPerBatch); - - bool IsEmpty() const; - - struct TBatch { - ui64 Memory = 0; - ui64 MemorySerialized = 0; - TVector Data; - }; - - TBatch Flush(bool force); - - ui64 AddRow(TArrayRef cells); - -private: - std::deque Batches; - - ui16 ColCount; - ui64 MaxBytesPerBatch; -}; - class TCellsStorage { public: @@ -760,4 +737,7 @@ void DbgPrintValue(TString&, const TCell&, NScheme::TTypeInfo typeInfo); TString DbgPrintCell(const TCell& r, NScheme::TTypeInfo typeInfo, const NScheme::TTypeRegistry& typeRegistry); TString DbgPrintTuple(const TDbTupleRef& row, const NScheme::TTypeRegistry& typeRegistry); +size_t GetCellMatrixHeaderSize(); +size_t GetCellHeaderSize(); + } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 0122b27b150a..4f01a2d6a232 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -299,6 +299,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { const THashMap& TaskParams; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + std::shared_ptr Alloc; IRandomProvider *const RandomProvider; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index e87acb1f7b7f..d2ae5da446f7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1359,6 +1359,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .TaskParams = taskParams, .TypeEnv = typeEnv, .HolderFactory = holderFactory, + .Alloc = Alloc, .RandomProvider = randomProvider }); } catch (const std::exception& ex) {