From 9d3d8702b65a0110fc35b04665a7756fa4f173d0 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 20 Jun 2024 15:08:21 +0300 Subject: [PATCH 01/10] fix --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 22 ++++++++++--------- ydb/core/kqp/runtime/kqp_write_table.cpp | 4 ++-- .../kqp/session_actor/kqp_session_actor.cpp | 2 ++ 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 8f63dcf2c030..3635e7a1a6d0 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -81,12 +81,12 @@ namespace { namespace NKikimr { namespace NKqp { -class TKqpWriteActor : public TActorBootstrapped, public NYql::NDq::IDqComputeActorAsyncOutput { - using TBase = TActorBootstrapped; +class TKqpDirectWriteActor : public TActorBootstrapped, public NYql::NDq::IDqComputeActorAsyncOutput { + using TBase = TActorBootstrapped; class TResumeNotificationManager { public: - TResumeNotificationManager(TKqpWriteActor& writer) + TResumeNotificationManager(TKqpDirectWriteActor& writer) : Writer(writer) { CheckMemory(); } @@ -102,7 +102,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N } private: - TKqpWriteActor& Writer; + TKqpDirectWriteActor& Writer; i64 LastFreeMemory = std::numeric_limits::max(); }; @@ -127,7 +127,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N }; public: - TKqpWriteActor( + TKqpDirectWriteActor( NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args, TIntrusivePtr counters) @@ -157,13 +157,13 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N void Bootstrap() { LogPrefix = TStringBuilder() << "SelfId: " << this->SelfId() << ", " << LogPrefix; ResolveTable(); - Become(&TKqpWriteActor::StateFunc); + Become(&TKqpDirectWriteActor::StateFunc); } static constexpr char ActorName[] = "KQP_WRITE_ACTOR"; private: - virtual ~TKqpWriteActor() { + virtual ~TKqpDirectWriteActor() { } void CommitState(const NYql::NDqProto::TCheckpoint&) final {}; @@ -669,7 +669,7 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N void PassAway() override { Send(PipeCacheId, new TEvPipeCache::TEvUnlink(0)); - TActorBootstrapped::PassAway(); + TActorBootstrapped::PassAway(); } void Prepare() { @@ -721,7 +721,6 @@ class TKqpWriteActor : public TActorBootstrapped, public NYql::N Callbacks->ResumeExecution(); } - NActors::TActorId TxProxyId = MakeTxProxyID(); NActors::TActorId PipeCacheId = NKikimr::MakePipePerNodeCacheID(false); TString LogPrefix; @@ -754,8 +753,11 @@ void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr< factory.RegisterSink( TString(NYql::KqpTableSinkName), [counters] (NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args) { - auto* actor = new TKqpWriteActor(std::move(settings), std::move(args), counters); + // for inconsistent Tx & olap + auto* actor = new TKqpDirectWriteActor(std::move(settings), std::move(args), counters); return std::make_pair(actor, actor); + // for oltp txs + // retunr TKqpForwardWriteActor(...) }); } diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 0fc9add54970..f062af1d15fc 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -19,7 +19,7 @@ namespace NKqp { namespace { constexpr ui64 MaxBatchBytes = 8_MB; -constexpr ui64 MaxUnshardedBatchBytes = 4_MB; +constexpr ui64 MaxUnshardedBatchBytes = 0_MB; TVector BuildColumns(const TConstArrayRef inputColumns) { TVector result; @@ -265,7 +265,7 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { } void FlushUnsharded(bool force) { - if ((BatchBuilder.Bytes() > 0 && force) || BatchBuilder.Bytes() >= MaxUnshardedBatchBytes) { + if ((BatchBuilder.Bytes() > 0 && force) || BatchBuilder.Bytes() > MaxUnshardedBatchBytes) { const auto unshardedBatch = BatchBuilder.FlushBatch(true); YQL_ENSURE(unshardedBatch); ShardAndFlushBatch(unshardedBatch, force); diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index b66c28e20be1..d42d06ed37c4 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -588,6 +588,8 @@ class TKqpSessionActor : public TActorBootstrapped { return; } + + Cerr << "COMPILED " << QueryState->CompileResult->PreparedQuery->GetPhysicalQuery().GetQueryAst() << Endl; OnSuccessCompileRequest(); } From 52b8a85b1d95fd8d888b809014acd86fd616506e Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 2 Jul 2024 10:27:41 +0300 Subject: [PATCH 02/10] fix --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 4 ++-- ydb/core/kqp/runtime/kqp_write_table.cpp | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 3635e7a1a6d0..f9b0217fad79 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -24,8 +24,8 @@ namespace { constexpr i64 kInFlightMemoryLimitPerActor = 64_MB; - constexpr i64 kMemoryLimitPerMessage = 48_MB; - constexpr i64 kMaxBatchesPerMessage = 1; + constexpr i64 kMemoryLimitPerMessage = 64_MB; + constexpr i64 kMaxBatchesPerMessage = 8; struct TWriteActorBackoffSettings { TDuration StartRetryDelay = TDuration::MilliSeconds(250); diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index f062af1d15fc..263aa700b475 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -18,7 +18,8 @@ namespace NKqp { namespace { -constexpr ui64 MaxBatchBytes = 8_MB; +constexpr ui64 DataShardMaxOperationBytes = 8_MB; +constexpr ui64 ColumnShardMaxOperationBytes = 8_MB; constexpr ui64 MaxUnshardedBatchBytes = 0_MB; TVector BuildColumns(const TConstArrayRef inputColumns) { @@ -289,7 +290,7 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { } void FlushUnpreparedBatch(const ui64 shardId, TUnpreparedBatch& unpreparedBatch, bool force) { - while (!unpreparedBatch.Batches.empty() && (unpreparedBatch.TotalDataSize >= MaxBatchBytes || force)) { + while (!unpreparedBatch.Batches.empty() && (unpreparedBatch.TotalDataSize >= ColumnShardMaxOperationBytes || force)) { std::vector toPrepare; i64 toPrepareSize = 0; while (!unpreparedBatch.Batches.empty()) { @@ -309,7 +310,7 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { for (i64 index = 0; index < batch->num_rows(); ++index) { i64 nextRowSize = rowCalculator.GetRowBytesSize(index); - if (toPrepareSize + nextRowSize >= (i64)MaxBatchBytes) { + if (toPrepareSize + nextRowSize >= (i64)ColumnShardMaxOperationBytes) { YQL_ENSURE(index > 0); toPrepare.push_back(batch->Slice(0, index)); @@ -497,7 +498,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { if (batcherIter == std::end(Batchers)) { Batchers.emplace( shardIter->ShardId, - TCellsBatcher(Columns.size(), MaxBatchBytes)); + TCellsBatcher(Columns.size(), DataShardMaxOperationBytes)); } Memory += Batchers.at(shardIter->ShardId).AddRow(row); From bf358113b55f6727b685abda768c86eec9393a09 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Tue, 2 Jul 2024 18:12:08 +0300 Subject: [PATCH 03/10] fix --- ydb/core/engine/mkql_keys.cpp | 8 -- ydb/core/engine/mkql_keys.h | 7 ++ ydb/core/kqp/runtime/kqp_write_table.cpp | 151 +++++++++++++++++++++-- 3 files changed, 149 insertions(+), 17 deletions(-) diff --git a/ydb/core/engine/mkql_keys.cpp b/ydb/core/engine/mkql_keys.cpp index 93d70dcfcbb3..d282ccf5f40f 100644 --- a/ydb/core/engine/mkql_keys.cpp +++ b/ydb/core/engine/mkql_keys.cpp @@ -51,14 +51,6 @@ NScheme::TTypeInfo UnpackTypeInfo(NKikimr::NMiniKQL::TType *type, bool &isOption } } - -template -TCell MakeCell(const NUdf::TUnboxedValuePod& value) { - static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell."); - const auto v = value.Get(); - return TCell(reinterpret_cast(&v), sizeof(v)); -} - THolder ExtractKeyTuple(const TTableId& tableId, TTupleLiteral* tuple, const TVector& columns, TKeyDesc::ERowOperation rowOperation, bool requireStaticKey, const TTypeEnvironment& env) { diff --git a/ydb/core/engine/mkql_keys.h b/ydb/core/engine/mkql_keys.h index 517120748a77..b51e789d0182 100644 --- a/ydb/core/engine/mkql_keys.h +++ b/ydb/core/engine/mkql_keys.h @@ -45,6 +45,13 @@ THolder ExtractTableKey(TCallable& callable, const TTableStrings& stri TVector> ExtractTableKeys(TExploringNodeVisitor& explorer, const TTypeEnvironment& env); TTableId ExtractTableId(const TRuntimeNode& node); +template +TCell MakeCell(const NUdf::TUnboxedValuePod& value) { + static_assert(TCell::CanInline(sizeof(T)), "Can't inline data in cell."); + const auto v = value.Get(); + return TCell(reinterpret_cast(&v), sizeof(v)); +} + TCell MakeCell(NScheme::TTypeInfo type, const NUdf::TUnboxedValuePod& value, const TTypeEnvironment& env, bool copy = true, i32 typmod = -1, TMaybe* error = {}); diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 263aa700b475..8fa17adfc877 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -1,7 +1,5 @@ #include "kqp_write_table.h" -#include -#include #include #include #include @@ -11,6 +9,7 @@ #include #include #include +#include #include namespace NKikimr { @@ -170,6 +169,140 @@ TVector BuildKeyColumnTypes( return keyColumnTypes; } +struct TRowWithData { + TVector Cells; + NUdf::TStringValue Data; +}; + +class TRowBuilder { +private: + struct TCellInfo { + NScheme::TTypeInfo Type; + NUdf::TUnboxedValuePod Value; + TString PgBinaryValue; + }; + +public: + explicit TRowBuilder(size_t size) + : CellsInfo(size) { + } + + TRowBuilder& AddCell( + const size_t index, + const NScheme::TTypeInfo type, + const NUdf::TUnboxedValuePod& value, + const i32 typmod = -1) { + CellsInfo[index].Type = type; + CellsInfo[index].Value = value; + + if (type.GetTypeId() == NScheme::NTypeIds::Pg) { + const auto typeDesc = type.GetTypeDesc(); + if (typmod != -1 && NPg::TypeDescNeedsCoercion(typeDesc)) { + TMaybe err; + CellsInfo[index].PgBinaryValue = NYql::NCommon::PgValueCoerce(value, NPg::PgTypeIdFromTypeDesc(typeDesc), typmod, &err); + if (err) { + ythrow yexception() << "PgValueCoerce error: " << *err; + } + } else { + CellsInfo[index].PgBinaryValue = NYql::NCommon::PgValueToNativeBinary(value, NPg::PgTypeIdFromTypeDesc(typeDesc)); + } + } + return *this; + } + + size_t DataSize() const { + size_t result = 0; + for (const auto& cellInfo : CellsInfo) { + result += GetCellSize(cellInfo); + } + return result; + } + + TRowWithData Build() { + TVector cells; + cells.reserve(CellsInfo.size()); + const auto size = DataSize(); + auto data = Allocate(size); + char* ptr = data.Data(); + + for (const auto& cellInfo : CellsInfo) { + cells.push_back(BuildCell(cellInfo, ptr)); + } + + AFL_ENSURE(ptr == data.Data() + size); + + Clear(); + + return TRowWithData { + .Cells = std::move(cells), + .Data = std::move(data), + }; + } + + void Clear() { + CellsInfo.clear(); + } + +private: + TCell BuildCell(const TCellInfo& cellInfo, char*& dataPtr) { + if (!cellInfo.Value) { + return TCell(); + } + + switch(cellInfo.Type.GetTypeId()) { + #define MAKE_PRIMITIVE_TYPE_CELL_CASE(type, layout) \ + case NUdf::TDataType::Id: return NMiniKQL::MakeCell(cellInfo.Value); + KNOWN_FIXED_VALUE_TYPES(MAKE_PRIMITIVE_TYPE_CELL_CASE) + case NUdf::TDataType::Id: + { + auto intValue = cellInfo.Value.GetInt128(); + constexpr auto valueSize = sizeof(intValue); + + char* initialPtr = dataPtr; + std::memcpy(initialPtr, reinterpret_cast(&intValue), valueSize); + dataPtr += valueSize; + return TCell(initialPtr, valueSize); + } + } + + const auto ref = cellInfo.Type.GetTypeId() == NScheme::NTypeIds::Pg + ? NYql::NUdf::TStringRef(cellInfo.PgBinaryValue) + : cellInfo.Value.AsStringRef(); + + char* initialPtr = dataPtr; + std::memcpy(initialPtr, ref.Data(), ref.Size()); + dataPtr += ref.Size(); + return TCell(initialPtr, ref.Size()); + } + + size_t GetCellSize(const TCellInfo& cellInfo) const { + if (!cellInfo.Value) { + return 0; + } + + switch(cellInfo.Type.GetTypeId()) { + #define MAKE_PRIMITIVE_TYPE_CELL_CASE_SIZE(type, layout) \ + case NUdf::TDataType::Id: + KNOWN_FIXED_VALUE_TYPES(MAKE_PRIMITIVE_TYPE_CELL_CASE_SIZE) + return 0; + case NUdf::TDataType::Id: + return sizeof(cellInfo.Value.GetInt128()); + } + + if (cellInfo.Type.GetTypeId() == NScheme::NTypeIds::Pg) { + return cellInfo.PgBinaryValue.size(); + } + return cellInfo.Value.AsStringRef().Size(); + } + + NUdf::TStringValue Allocate(size_t size) { + Y_DEBUG_ABORT_UNLESS(NMiniKQL::TlsAllocState); + return NUdf::TStringValue(size); + } + + TVector CellsInfo; +}; + class TColumnShardPayloadSerializer : public IPayloadSerializer { using TRecordBatchPtr = std::shared_ptr; @@ -242,16 +375,16 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { return; } - TVector cells(Columns.size()); + Y_UNUSED(TypeEnv); + + TRowBuilder rowBuilder(Columns.size()); data.ForEachRow([&](const auto& row) { + TRowBuilder rowBuilder(Columns.size()); for (size_t index = 0; index < Columns.size(); ++index) { - cells[WriteIndex[index]] = MakeCell( - Columns[index].PType, - row.GetElement(index), - TypeEnv, - /* copy */ false); + rowBuilder.AddCell(WriteIndex[index], Columns[index].PType, row.GetElement(index)); } - BatchBuilder.AddRow(TConstArrayRef{cells.begin(), cells.end()}); + auto rowWithData = rowBuilder.Build(); + BatchBuilder.AddRow(TConstArrayRef{rowWithData.Cells.begin(), rowWithData.Cells.end()}); }); FlushUnsharded(false); From 33073df55d33dc68740dd19250bda44605bb1740 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 3 Jul 2024 13:20:21 +0300 Subject: [PATCH 04/10] fix --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 4 +- ydb/core/kqp/runtime/kqp_write_table.cpp | 164 ++++++++++++++++++----- ydb/core/kqp/runtime/kqp_write_table.h | 48 ------- ydb/core/scheme/scheme_tablecell.cpp | 48 ++----- ydb/core/scheme/scheme_tablecell.h | 26 +--- 5 files changed, 140 insertions(+), 150 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index f9b0217fad79..fdf6c2cb18a9 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -491,7 +491,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu << ", Cookie=" << ev->Cookie << ", LocksCount=" << ev->Get()->Record.GetTxLocks().size()); - PopShardBatch(ev->Get()->Record.GetOrigin(), ev->Cookie); + OnMessageAcknowledged(ev->Get()->Record.GetOrigin(), ev->Cookie); for (const auto& lock : ev->Get()->Record.GetTxLocks()) { LocksInfo[ev->Get()->Record.GetOrigin()].AddAndCheckLock(lock); @@ -500,7 +500,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu ProcessBatches(); } - void PopShardBatch(ui64 shardId, ui64 cookie) { + void OnMessageAcknowledged(ui64 shardId, ui64 cookie) { TResumeNotificationManager resumeNotificator(*this); const auto removedDataSize = ShardedWriteController->OnMessageAcknowledged(shardId, cookie); if (removedDataSize) { diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 8fa17adfc877..61df929741bb 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -21,6 +21,41 @@ constexpr ui64 DataShardMaxOperationBytes = 8_MB; constexpr ui64 ColumnShardMaxOperationBytes = 8_MB; constexpr ui64 MaxUnshardedBatchBytes = 0_MB; +class IPayloadSerializer : public TThrRefBase { +public: + class IBatch : public TThrRefBase { + public: + virtual TString SerializeToString() const = 0; + virtual i64 GetMemory() const = 0; + bool IsEmpty() const; + }; + + using IBatchPtr = TIntrusivePtr; + + virtual void AddData(NMiniKQL::TUnboxedValueBatch&& data) = 0; + virtual void AddBatch(const IBatchPtr& batch) = 0; + + virtual void Close() = 0; + + virtual bool IsClosed() = 0; + virtual bool IsEmpty() = 0; + virtual bool IsFinished() = 0; + + virtual NKikimrDataEvents::EDataFormat GetDataFormat() = 0; + virtual std::vector GetWriteColumnIds() = 0; + + using TBatches = THashMap>; + + virtual TBatches FlushBatchesForce() = 0; + + virtual IBatchPtr FlushBatch(ui64 shardId) = 0; + virtual const THashSet& GetShardIds() const = 0; + + virtual i64 GetMemory() = 0; +}; + +using IPayloadSerializerPtr = TIntrusivePtr; + TVector BuildColumns(const TConstArrayRef inputColumns) { TVector result; result.reserve(inputColumns.size()); @@ -194,6 +229,7 @@ class TRowBuilder { const i32 typmod = -1) { CellsInfo[index].Type = type; CellsInfo[index].Value = value; + CellsInfo[index].PgBinaryValue.clear(); if (type.GetTypeId() == NScheme::NTypeIds::Pg) { const auto typeDesc = type.GetTypeDesc(); @@ -231,18 +267,12 @@ class TRowBuilder { AFL_ENSURE(ptr == data.Data() + size); - Clear(); - return TRowWithData { .Cells = std::move(cells), .Data = std::move(data), }; } - void Clear() { - CellsInfo.clear(); - } - private: TCell BuildCell(const TCellInfo& cellInfo, char*& dataPtr) { if (!cellInfo.Value) { @@ -340,7 +370,7 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { public: TColumnShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - const TConstArrayRef inputColumns, // key columns then value columns + const TConstArrayRef inputColumns, // key columns then value columns const NMiniKQL::TTypeEnvironment& typeEnv) : TypeEnv(typeEnv) , Columns(BuildColumns(inputColumns)) @@ -376,10 +406,9 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { } Y_UNUSED(TypeEnv); - + //auto allocGuard = TypeEnv.BindAllocator(); TRowBuilder rowBuilder(Columns.size()); data.ForEachRow([&](const auto& row) { - TRowBuilder rowBuilder(Columns.size()); for (size_t index = 0; index < Columns.size(); ++index) { rowBuilder.AddCell(WriteIndex[index], Columns[index].PType, row.GetElement(index)); } @@ -568,7 +597,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { class TBatch : public IPayloadSerializer::IBatch { public: TString SerializeToString() const override { - return TSerializedCellMatrix::Serialize(Data, Rows, Columns); + return TSerializedCellMatrix::Serialize(Cells, Rows, Columns); } i64 GetMemory() const override { @@ -576,29 +605,88 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { } bool IsEmpty() const { - return Data.empty(); + return Cells.empty(); } - std::vector Extract() { + std::pair, std::vector> Extract() { Size = 0; Rows = 0; - return std::move(Data); + return {std::move(Cells), std::move(Data)}; } - TBatch(std::vector&& data, i64 size, ui32 rows, ui16 columns) - : Data(std::move(data)) + TBatch(std::vector&& cells, std::vector&& data, i64 size, ui32 rows, ui16 columns) + : Cells(std::move(cells)) + , Data(std::move(data)) , Size(size) , Rows(rows) , Columns(columns) { } private: - std::vector Data; + std::vector Cells; + std::vector Data; ui64 Size = 0; ui32 Rows = 0; ui16 Columns = 0; }; + class TRowsBatcher { + public: + explicit TRowsBatcher(ui16 columnCount, ui64 maxBytesPerBatch) + : ColumnCount(columnCount) + , MaxBytesPerBatch(maxBytesPerBatch) { + } + + bool IsEmpty() const { + return Batches.empty(); + } + + struct TBatch { + ui64 Memory = 0; + ui64 MemorySerialized = 0; + TVector Cells; + TVector Data; + }; + + TBatch Flush(bool force) { + TBatch res; + if ((!Batches.empty() && force) || Batches.size() > 1) { + res = std::move(Batches.front()); + Batches.pop_front(); + } + return res; + } + + ui64 AddRow(TRowWithData&& rowWithData) { + Y_ABORT_UNLESS(rowWithData.Cells.size() == ColumnCount); + ui64 newMemory = 0; + for (const auto& cell : rowWithData.Cells) { + newMemory += cell.Size(); + } + if (Batches.empty() || newMemory + GetCellHeaderSize() * ColumnCount + Batches.back().MemorySerialized > MaxBytesPerBatch) { + Batches.emplace_back(); + Batches.back().Memory = 0; + Batches.back().MemorySerialized = GetCellMatrixHeaderSize(); + } + + for (auto& cell : rowWithData.Cells) { + Batches.back().Cells.emplace_back(std::move(cell)); + } + Batches.back().Data.emplace_back(std::move(rowWithData.Data)); + + Batches.back().Memory += newMemory; + Batches.back().MemorySerialized += newMemory + GetCellHeaderSize() * ColumnCount; + + return newMemory; + } + + private: + std::deque Batches; + + ui16 ColumnCount; + ui64 MaxBytesPerBatch; + }; + public: TDataShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, @@ -614,11 +702,11 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { , KeyColumnTypes(BuildKeyColumnTypes(SchemeEntry)) { } - void AddRow(TArrayRef row, const TKeyDesc& keyRange) { + void AddRow(TRowWithData&& row, const TKeyDesc& keyRange) { auto shardIter = std::lower_bound( std::begin(keyRange.GetPartitions()), std::end(keyRange.GetPartitions()), - TArrayRef(row.data(), KeyColumnTypes.size()), + TArrayRef(row.Cells.data(), KeyColumnTypes.size()), [this](const auto &partition, const auto& key) { const auto& range = *partition.Range; return 0 > CompareBorders(range.EndKeyPrefix.GetCells(), key, @@ -631,42 +719,41 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { if (batcherIter == std::end(Batchers)) { Batchers.emplace( shardIter->ShardId, - TCellsBatcher(Columns.size(), DataShardMaxOperationBytes)); + TRowsBatcher(Columns.size(), DataShardMaxOperationBytes)); } - Memory += Batchers.at(shardIter->ShardId).AddRow(row); + Memory += Batchers.at(shardIter->ShardId).AddRow(std::move(row)); ShardIds.insert(shardIter->ShardId); } void AddData(NMiniKQL::TUnboxedValueBatch&& data) override { YQL_ENSURE(!Closed); - TVector cells(Columns.size()); + Y_UNUSED(TypeEnv); + //auto allocGuard = TypeEnv.BindAllocator(); + TRowBuilder rowBuilder(Columns.size()); data.ForEachRow([&](const auto& row) { for (size_t index = 0; index < Columns.size(); ++index) { - // TODO: move to SerializedVector - cells[WriteIndex[index]] = MakeCell( - Columns[index].PType, - row.GetElement(index), - TypeEnv, - /* copy */ true); + rowBuilder.AddCell(WriteIndex[index], Columns[index].PType, row.GetElement(index)); } - AddRow(cells, GetKeyRange()); - - cells.resize(Columns.size()); + auto rowWithData = rowBuilder.Build(); + AddRow(std::move(rowWithData), GetKeyRange()); }); } void AddBatch(const IPayloadSerializer::IBatchPtr& batch) override { auto datashardBatch = dynamic_cast(batch.Get()); YQL_ENSURE(datashardBatch); - auto data = datashardBatch->Extract(); - const auto rows = data.size() / Columns.size(); - YQL_ENSURE(data.size() == rows * Columns.size()); + auto [cells, data] = datashardBatch->Extract(); + const auto rows = cells.size() / Columns.size(); + YQL_ENSURE(cells.size() == rows * Columns.size()); for (size_t rowIndex = 0; rowIndex < rows; ++rowIndex) { AddRow( - TArrayRef{&data[rowIndex * Columns.size()], Columns.size()}, + TRowWithData{ + TVector(cells.begin() + (rowIndex * Columns.size()), cells.begin() + (rowIndex * Columns.size()) + Columns.size()), + data[rowIndex], + }, GetKeyRange()); } } @@ -700,12 +787,13 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { return IsClosed() && IsEmpty(); } - IBatchPtr ExtractNextBatch(TCellsBatcher& batcher, bool force) { + IBatchPtr ExtractNextBatch(TRowsBatcher& batcher, bool force) { auto batchResult = batcher.Flush(force); Memory -= batchResult.Memory; - const ui32 rows = batchResult.Data.size() / Columns.size(); + const ui32 rows = batchResult.Cells.size() / Columns.size(); YQL_ENSURE(Columns.size() <= std::numeric_limits::max()); return MakeIntrusive( + std::move(batchResult.Cells), std::move(batchResult.Data), static_cast(batchResult.MemorySerialized), rows, @@ -753,7 +841,7 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { const std::vector WriteColumnIds; const TVector KeyColumnTypes; - THashMap Batchers; + THashMap Batchers; THashSet ShardIds; i64 Memory = 0; @@ -1001,6 +1089,7 @@ class TShardedWriteController : public IShardedWriteController { YQL_ENSURE(!data.IsWide(), "Wide stream is not supported yet"); YQL_ENSURE(!Closed); + auto allocGuard = TypeEnv.BindAllocator(); YQL_ENSURE(Serializer); Serializer->AddData(std::move(data)); @@ -1065,6 +1154,7 @@ class TShardedWriteController : public IShardedWriteController { } std::optional OnMessageAcknowledged(ui64 shardId, ui64 cookie) override { + auto allocGuard = TypeEnv.BindAllocator(); auto& shardInfo = ShardsInfo.GetShard(shardId); const auto removedDataSize = shardInfo.PopBatches(cookie); return removedDataSize; diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h index 7846cb954cc6..9672bb25a606 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.h +++ b/ydb/core/kqp/runtime/kqp_write_table.h @@ -10,54 +10,6 @@ namespace NKikimr { namespace NKqp { -class IPayloadSerializer : public TThrRefBase { -public: - class IBatch : public TThrRefBase { - public: - virtual TString SerializeToString() const = 0; - virtual i64 GetMemory() const = 0; - bool IsEmpty() const; - }; - - using IBatchPtr = TIntrusivePtr; - - virtual void AddData(NMiniKQL::TUnboxedValueBatch&& data) = 0; - virtual void AddBatch(const IBatchPtr& batch) = 0; - - virtual void Close() = 0; - - virtual bool IsClosed() = 0; - virtual bool IsEmpty() = 0; - virtual bool IsFinished() = 0; - - virtual NKikimrDataEvents::EDataFormat GetDataFormat() = 0; - virtual std::vector GetWriteColumnIds() = 0; - - using TBatches = THashMap>; - - virtual TBatches FlushBatchesForce() = 0; - - virtual IBatchPtr FlushBatch(ui64 shardId) = 0; - virtual const THashSet& GetShardIds() const = 0; - - virtual i64 GetMemory() = 0; -}; - -using IPayloadSerializerPtr = TIntrusivePtr; - - -IPayloadSerializerPtr CreateColumnShardPayloadSerializer( - const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv); - -IPayloadSerializerPtr CreateDataShardPayloadSerializer( - const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - NSchemeCache::TSchemeCacheRequest::TEntry&& partitionsEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv); - - class IShardedWriteController : public TThrRefBase { public: virtual void OnPartitioningChanged(const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry) = 0; diff --git a/ydb/core/scheme/scheme_tablecell.cpp b/ydb/core/scheme/scheme_tablecell.cpp index 2ee86265b1a4..c2b541bb7984 100644 --- a/ydb/core/scheme/scheme_tablecell.cpp +++ b/ydb/core/scheme/scheme_tablecell.cpp @@ -307,46 +307,6 @@ bool TSerializedCellMatrix::DoTryParse(const TString& data) { return TryDeserializeCellMatrix(data, Buf, Cells, RowCount, ColCount); } -TCellsBatcher::TCellsBatcher(ui16 colCount, ui64 maxBytesPerBatch) - : ColCount(colCount) - , MaxBytesPerBatch(maxBytesPerBatch) { -} - -bool TCellsBatcher::IsEmpty() const { - return Batches.empty(); -} - -TCellsBatcher::TBatch TCellsBatcher::Flush(bool force) { - TBatch res; - if ((!Batches.empty() && force) || Batches.size() > 1) { - res = std::move(Batches.front()); - Batches.pop_front(); - } - return res; -} - -ui64 TCellsBatcher::AddRow(TArrayRef cells) { - Y_ABORT_UNLESS(cells.size() == ColCount); - ui64 newMemory = 0; - for (const auto& cell : cells) { - newMemory += cell.Size(); - } - if (Batches.empty() || newMemory + sizeof(TCellHeader) * ColCount + Batches.back().MemorySerialized > MaxBytesPerBatch) { - Batches.emplace_back(); - Batches.back().Memory = 0; - Batches.back().MemorySerialized = CellMatrixHeaderSize; - } - - for (auto& cell : cells) { - Batches.back().Data.emplace_back(std::move(cell)); - } - - Batches.back().Memory += newMemory; - Batches.back().MemorySerialized += newMemory + sizeof(TCellHeader) * ColCount; - - return newMemory; -} - void TCellsStorage::Reset(TArrayRef cells) { size_t cellsSize = cells.size(); @@ -499,5 +459,13 @@ TString DbgPrintTuple(const TDbTupleRef& row, const NScheme::TTypeRegistry& type return res; } +size_t GetCellMatrixHeaderSize() { + return CellMatrixHeaderSize; +} + +size_t GetCellHeaderSize() { + return sizeof(TCellHeader); +} + } // namespace NKikimr diff --git a/ydb/core/scheme/scheme_tablecell.h b/ydb/core/scheme/scheme_tablecell.h index b7635a424725..d2abd9f5d548 100644 --- a/ydb/core/scheme/scheme_tablecell.h +++ b/ydb/core/scheme/scheme_tablecell.h @@ -653,29 +653,6 @@ class TSerializedCellMatrix { ui16 ColCount; }; -class TCellsBatcher { -public: - explicit TCellsBatcher(ui16 colCount, ui64 maxBytesPerBatch); - - bool IsEmpty() const; - - struct TBatch { - ui64 Memory = 0; - ui64 MemorySerialized = 0; - TVector Data; - }; - - TBatch Flush(bool force); - - ui64 AddRow(TArrayRef cells); - -private: - std::deque Batches; - - ui16 ColCount; - ui64 MaxBytesPerBatch; -}; - class TCellsStorage { public: @@ -760,4 +737,7 @@ void DbgPrintValue(TString&, const TCell&, NScheme::TTypeInfo typeInfo); TString DbgPrintCell(const TCell& r, NScheme::TTypeInfo typeInfo, const NScheme::TTypeRegistry& typeRegistry); TString DbgPrintTuple(const TDbTupleRef& row, const NScheme::TTypeRegistry& typeRegistry); +size_t GetCellMatrixHeaderSize(); +size_t GetCellHeaderSize(); + } From 446d4c73bbdfe561c9d1f8f04df6db3acba204c3 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 3 Jul 2024 15:22:03 +0300 Subject: [PATCH 05/10] fix_ctas --- ydb/core/kqp/host/kqp_statement_rewrite.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/kqp/host/kqp_statement_rewrite.cpp b/ydb/core/kqp/host/kqp_statement_rewrite.cpp index bf15ab483261..3ab1fc45789b 100644 --- a/ydb/core/kqp/host/kqp_statement_rewrite.cpp +++ b/ydb/core/kqp/host/kqp_statement_rewrite.cpp @@ -188,6 +188,7 @@ namespace { ? tableName : (TStringBuilder() << CanonizePath(AppData()->TenantName) + << sessionCtx->GetDatabase() << "/.tmp/sessions/" << sessionCtx->GetSessionId() << CanonizePath(tmpTableName)); From 54bd9707971190acf26f8aec32e1f37ed321f744 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 3 Jul 2024 15:24:01 +0300 Subject: [PATCH 06/10] style --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index fdf6c2cb18a9..371e7f98e313 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -753,11 +753,8 @@ void RegisterKqpWriteActor(NYql::NDq::TDqAsyncIoFactory& factory, TIntrusivePtr< factory.RegisterSink( TString(NYql::KqpTableSinkName), [counters] (NKikimrKqp::TKqpTableSinkSettings&& settings, NYql::NDq::TDqAsyncIoFactory::TSinkArguments&& args) { - // for inconsistent Tx & olap auto* actor = new TKqpDirectWriteActor(std::move(settings), std::move(args), counters); return std::make_pair(actor, actor); - // for oltp txs - // retunr TKqpForwardWriteActor(...) }); } From 191367c194b988092e33820418dccfb60559ec8b Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Wed, 3 Jul 2024 15:32:45 +0300 Subject: [PATCH 07/10] fix --- ydb/core/kqp/runtime/kqp_write_table.cpp | 37 ++++++------------- .../kqp/session_actor/kqp_session_actor.cpp | 2 - 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index 61df929741bb..ee90dfcd8659 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -229,7 +229,6 @@ class TRowBuilder { const i32 typmod = -1) { CellsInfo[index].Type = type; CellsInfo[index].Value = value; - CellsInfo[index].PgBinaryValue.clear(); if (type.GetTypeId() == NScheme::NTypeIds::Pg) { const auto typeDesc = type.GetTypeDesc(); @@ -242,6 +241,8 @@ class TRowBuilder { } else { CellsInfo[index].PgBinaryValue = NYql::NCommon::PgValueToNativeBinary(value, NPg::PgTypeIdFromTypeDesc(typeDesc)); } + } else { + CellsInfo[index].PgBinaryValue.clear(); } return *this; } @@ -370,10 +371,8 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { public: TColumnShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - const TConstArrayRef inputColumns, // key columns then value columns - const NMiniKQL::TTypeEnvironment& typeEnv) - : TypeEnv(typeEnv) - , Columns(BuildColumns(inputColumns)) + const TConstArrayRef inputColumns) // key columns then value columns + : Columns(BuildColumns(inputColumns)) , WriteIndex(BuildWriteIndex(schemeEntry, inputColumns)) , WriteColumnIds(BuildWriteColumnIds(inputColumns, WriteIndex)) , BatchBuilder(arrow::Compression::UNCOMPRESSED, BuildNotNullColumns(inputColumns)) { @@ -405,8 +404,6 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { return; } - Y_UNUSED(TypeEnv); - //auto allocGuard = TypeEnv.BindAllocator(); TRowBuilder rowBuilder(Columns.size()); data.ForEachRow([&](const auto& row) { for (size_t index = 0; index < Columns.size(); ++index) { @@ -576,7 +573,6 @@ class TColumnShardPayloadSerializer : public IPayloadSerializer { } private: - const NMiniKQL::TTypeEnvironment& TypeEnv; std::shared_ptr Sharding; const TVector Columns; @@ -691,10 +687,8 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { TDataShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, NSchemeCache::TSchemeCacheRequest::TEntry&& partitionsEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv) - : TypeEnv(typeEnv) - , SchemeEntry(schemeEntry) + const TConstArrayRef inputColumns) + : SchemeEntry(schemeEntry) , KeyDescription(std::move(partitionsEntry.KeyDescription)) , Columns(BuildColumns(inputColumns)) , WriteIndex(BuildWriteIndexKeyFirst(SchemeEntry, inputColumns)) @@ -729,8 +723,6 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { void AddData(NMiniKQL::TUnboxedValueBatch&& data) override { YQL_ENSURE(!Closed); - Y_UNUSED(TypeEnv); - //auto allocGuard = TypeEnv.BindAllocator(); TRowBuilder rowBuilder(Columns.size()); data.ForEachRow([&](const auto& row) { for (size_t index = 0; index < Columns.size(); ++index) { @@ -832,7 +824,6 @@ class TDataShardPayloadSerializer : public IPayloadSerializer { return *KeyDescription; } - const NMiniKQL::TTypeEnvironment& TypeEnv; const NSchemeCache::TSchemeCacheNavigate::TEntry SchemeEntry; THolder KeyDescription; @@ -857,19 +848,17 @@ bool IPayloadSerializer::IBatch::IsEmpty() const { IPayloadSerializerPtr CreateColumnShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv) { + const TConstArrayRef inputColumns) { return MakeIntrusive( - schemeEntry, inputColumns, typeEnv); + schemeEntry, inputColumns); } IPayloadSerializerPtr CreateDataShardPayloadSerializer( const NSchemeCache::TSchemeCacheNavigate::TEntry& schemeEntry, NSchemeCache::TSchemeCacheRequest::TEntry&& partitionsEntry, - const TConstArrayRef inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv) { + const TConstArrayRef inputColumns) { return MakeIntrusive( - schemeEntry, std::move(partitionsEntry), inputColumns, typeEnv); + schemeEntry, std::move(partitionsEntry), inputColumns); } namespace { @@ -1048,8 +1037,7 @@ class TShardedWriteController : public IShardedWriteController { BeforePartitioningChanged(); Serializer = CreateColumnShardPayloadSerializer( schemeEntry, - InputColumnsMetadata, - TypeEnv); + InputColumnsMetadata); AfterPartitioningChanged(); } @@ -1060,8 +1048,7 @@ class TShardedWriteController : public IShardedWriteController { Serializer = CreateDataShardPayloadSerializer( schemeEntry, std::move(partitionsEntry), - InputColumnsMetadata, - TypeEnv); + InputColumnsMetadata); AfterPartitioningChanged(); } diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index d42d06ed37c4..b66c28e20be1 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -588,8 +588,6 @@ class TKqpSessionActor : public TActorBootstrapped { return; } - - Cerr << "COMPILED " << QueryState->CompileResult->PreparedQuery->GetPhysicalQuery().GetQueryAst() << Endl; OnSuccessCompileRequest(); } From fa99968eba7c49dd61ccb3e9da2ca4f098bf7b36 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 4 Jul 2024 09:34:49 +0300 Subject: [PATCH 08/10] fix --- ydb/core/kqp/runtime/kqp_write_table.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index ee90dfcd8659..a07c33120355 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -1084,6 +1084,7 @@ class TShardedWriteController : public IShardedWriteController { } void Close() override { + auto allocGuard = TypeEnv.BindAllocator(); YQL_ENSURE(Serializer); Closed = true; Serializer->Close(); @@ -1189,6 +1190,12 @@ class TShardedWriteController : public IShardedWriteController { , TypeEnv(typeEnv) { } + ~TShardedWriteController() { + auto allocGuard = TypeEnv.BindAllocator(); + ShardsInfo.Clear(); + Serializer = nullptr; + } + private: void FlushSerializer(bool force) { if (force) { From dba67caeb2b52ce48fd8dd0293e01b6ca06e5582 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Thu, 4 Jul 2024 19:13:08 +0300 Subject: [PATCH 09/10] fix --- ydb/core/kqp/runtime/kqp_write_actor.cpp | 5 ++++- ydb/core/kqp/runtime/kqp_write_table.cpp | 16 +++++++++++----- ydb/core/kqp/runtime/kqp_write_table.h | 3 ++- .../actors/compute/dq_compute_actor_async_io.h | 1 + .../dq/actors/compute/dq_compute_actor_impl.h | 1 + 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/ydb/core/kqp/runtime/kqp_write_actor.cpp b/ydb/core/kqp/runtime/kqp_write_actor.cpp index 371e7f98e313..dbc4059ef033 100644 --- a/ydb/core/kqp/runtime/kqp_write_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_write_actor.cpp @@ -137,6 +137,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu , Callbacks(args.Callback) , Counters(counters) , TypeEnv(args.TypeEnv) + , Alloc(args.Alloc) , TxId(args.TxId) , TableId( Settings.GetTable().GetOwnerId(), @@ -693,7 +694,8 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu : kMaxBatchesPerMessage), }, std::move(columnsMetadata), - TypeEnv); + TypeEnv, + Alloc); } catch (...) { RuntimeError( CurrentExceptionMessage(), @@ -730,6 +732,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped, pu NYql::NDq::IDqComputeActorAsyncOutput::ICallbacks * Callbacks = nullptr; TIntrusivePtr Counters; const NMiniKQL::TTypeEnvironment& TypeEnv; + std::shared_ptr Alloc; const NYql::NDq::TTxId TxId; const TTableId TableId; diff --git a/ydb/core/kqp/runtime/kqp_write_table.cpp b/ydb/core/kqp/runtime/kqp_write_table.cpp index a07c33120355..21dc4b1f2734 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.cpp +++ b/ydb/core/kqp/runtime/kqp_write_table.cpp @@ -1184,14 +1184,17 @@ class TShardedWriteController : public IShardedWriteController { TShardedWriteController( const TShardedWriteControllerSettings settings, TVector&& inputColumnsMetadata, - const NMiniKQL::TTypeEnvironment& typeEnv) + const NMiniKQL::TTypeEnvironment& typeEnv, + std::shared_ptr alloc) : Settings(settings) , InputColumnsMetadata(std::move(inputColumnsMetadata)) - , TypeEnv(typeEnv) { + , TypeEnv(typeEnv) + , Alloc(alloc) { } ~TShardedWriteController() { - auto allocGuard = TypeEnv.BindAllocator(); + Y_ABORT_UNLESS(Alloc); + TGuard allocGuard(*Alloc); ShardsInfo.Clear(); Serializer = nullptr; } @@ -1237,6 +1240,7 @@ class TShardedWriteController : public IShardedWriteController { TShardedWriteControllerSettings Settings; TVector InputColumnsMetadata; const NMiniKQL::TTypeEnvironment& TypeEnv; + std::shared_ptr Alloc; TShardsInfo ShardsInfo; bool Closed = false; @@ -1250,8 +1254,10 @@ class TShardedWriteController : public IShardedWriteController { IShardedWriteControllerPtr CreateShardedWriteController( const TShardedWriteControllerSettings& settings, TVector&& inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv) { - return MakeIntrusive(settings, std::move(inputColumns), typeEnv); + const NMiniKQL::TTypeEnvironment& typeEnv, + std::shared_ptr alloc) { + return MakeIntrusive( + settings, std::move(inputColumns), typeEnv, alloc); } } diff --git a/ydb/core/kqp/runtime/kqp_write_table.h b/ydb/core/kqp/runtime/kqp_write_table.h index 9672bb25a606..46e5ac4f7308 100644 --- a/ydb/core/kqp/runtime/kqp_write_table.h +++ b/ydb/core/kqp/runtime/kqp_write_table.h @@ -64,7 +64,8 @@ struct TShardedWriteControllerSettings { IShardedWriteControllerPtr CreateShardedWriteController( const TShardedWriteControllerSettings& settings, TVector&& inputColumns, - const NMiniKQL::TTypeEnvironment& typeEnv); + const NMiniKQL::TTypeEnvironment& typeEnv, + std::shared_ptr alloc); } } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h index 0122b27b150a..4f01a2d6a232 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h @@ -299,6 +299,7 @@ struct IDqAsyncIoFactory : public TThrRefBase { const THashMap& TaskParams; const NKikimr::NMiniKQL::TTypeEnvironment& TypeEnv; const NKikimr::NMiniKQL::THolderFactory& HolderFactory; + std::shared_ptr Alloc; IRandomProvider *const RandomProvider; }; diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index e87acb1f7b7f..d2ae5da446f7 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -1359,6 +1359,7 @@ class TDqComputeActorBase : public NActors::TActorBootstrapped .TaskParams = taskParams, .TypeEnv = typeEnv, .HolderFactory = holderFactory, + .Alloc = Alloc, .RandomProvider = randomProvider }); } catch (const std::exception& ex) { From 6a53c2398fbddcc13779ba2976819bf997a8c4dd Mon Sep 17 00:00:00 2001 From: Nikita Vasilev Date: Fri, 5 Jul 2024 13:14:46 +0300 Subject: [PATCH 10/10] fix --- ydb/core/kqp/host/kqp_statement_rewrite.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/kqp/host/kqp_statement_rewrite.cpp b/ydb/core/kqp/host/kqp_statement_rewrite.cpp index 3ab1fc45789b..bf15ab483261 100644 --- a/ydb/core/kqp/host/kqp_statement_rewrite.cpp +++ b/ydb/core/kqp/host/kqp_statement_rewrite.cpp @@ -188,7 +188,6 @@ namespace { ? tableName : (TStringBuilder() << CanonizePath(AppData()->TenantName) - << sessionCtx->GetDatabase() << "/.tmp/sessions/" << sessionCtx->GetSessionId() << CanonizePath(tmpTableName));