diff --git a/ydb/core/formats/arrow/arrow_helpers.cpp b/ydb/core/formats/arrow/arrow_helpers.cpp index 70ed133e6366..443bb091090c 100644 --- a/ydb/core/formats/arrow/arrow_helpers.cpp +++ b/ydb/core/formats/arrow/arrow_helpers.cpp @@ -922,25 +922,44 @@ std::vector> SliceToRecordBatches(const std: } std::sort(positions.begin(), positions.end()); positions.erase(std::unique(positions.begin(), positions.end()), positions.end()); - + AFL_VERIFY(positions.size() > 1)("size", positions.size())("positions", JoinSeq(",", positions)); std::vector>> slicedData; slicedData.resize(positions.size() - 1); - { - for (auto&& i : t->columns()) { - for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) { - auto slice = i->Slice(positions[idx], positions[idx + 1] - positions[idx]); - AFL_VERIFY(slice->num_chunks() == 1); - slicedData[idx].emplace_back(slice->chunks().front()); + for (auto&& i : t->columns()) { + ui32 currentPosition = 0; + auto it = i->chunks().begin(); + ui32 length = 0; + const auto initializeIt = [&length, &it, &i]() { + for (; it != i->chunks().end() && !(*it)->length(); ++it) { + } + if (it != i->chunks().end()) { + length = (*it)->length(); + } + }; + initializeIt(); + for (ui32 idx = 0; idx + 1 < positions.size(); ++idx) { + AFL_VERIFY(it != i->chunks().end()); + AFL_VERIFY(positions[idx + 1] - currentPosition <= length)("length", length)("idx+1", positions[idx + 1])("pos", currentPosition); + auto chunk = (*it)->Slice(positions[idx] - currentPosition, positions[idx + 1] - positions[idx]); + AFL_VERIFY_DEBUG(chunk->length() == positions[idx + 1] - positions[idx])("length", chunk->length())("expect", positions[idx + 1] - positions[idx]); + if (positions[idx + 1] - currentPosition == length) { + ++it; + initializeIt(); + currentPosition = positions[idx + 1]; } + slicedData[idx].emplace_back(chunk); } } std::vector> result; ui32 count = 0; for (auto&& i : slicedData) { + AFL_VERIFY(i.size()); + AFL_VERIFY(i.front()->length()); result.emplace_back(arrow::RecordBatch::Make(t->schema(), i.front()->length(), i)); count += result.back()->num_rows(); } - AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows()); + AFL_VERIFY(count == t->num_rows())("count", count)("t", t->num_rows())("sd_size", slicedData.size())("columns", t->num_columns())( + "schema", t->schema()->ToString()); return result; } diff --git a/ydb/core/formats/arrow/program.cpp b/ydb/core/formats/arrow/program.cpp index e07f76ed3b49..60e59749bb7a 100644 --- a/ydb/core/formats/arrow/program.cpp +++ b/ydb/core/formats/arrow/program.cpp @@ -88,7 +88,7 @@ class TConstFunction : public IStepFunction { using TBase = IStepFunction; public: using TBase::TBase; - arrow::Result Call(const TAssign& assign, const TDatumBatch& batch) const override { + arrow::Result Call(const TAssign& assign, const TDatumBatch& batch) const override { Y_UNUSED(batch); return assign.GetConstant(); } @@ -531,7 +531,7 @@ class TFilterVisitor : public arrow::ArrayVisitor { arrow::Status TDatumBatch::AddColumn(const std::string& name, arrow::Datum&& column) { - if (Schema->GetFieldIndex(name) != -1) { + if (HasColumn(name)) { return arrow::Status::Invalid("Trying to add duplicate column '" + name + "'"); } @@ -543,20 +543,27 @@ arrow::Status TDatumBatch::AddColumn(const std::string& name, arrow::Datum&& col return arrow::Status::Invalid("Wrong column length."); } - Schema = *Schema->AddField(Schema->num_fields(), field); + NewColumnIds.emplace(name, NewColumnsPtr.size()); + NewColumnsPtr.emplace_back(field); + Datums.emplace_back(column); return arrow::Status::OK(); } arrow::Result TDatumBatch::GetColumnByName(const std::string& name) const { - auto i = Schema->GetFieldIndex(name); + auto it = NewColumnIds.find(name); + if (it != NewColumnIds.end()) { + AFL_VERIFY(SchemaBase->num_fields() + it->second < Datums.size()); + return Datums[SchemaBase->num_fields() + it->second]; + } + auto i = SchemaBase->GetFieldIndex(name); if (i < 0) { return arrow::Status::Invalid("Not found column '" + name + "' or duplicate"); } return Datums[i]; } -std::shared_ptr TDatumBatch::ToTable() const { +std::shared_ptr TDatumBatch::ToTable() { std::vector> columns; columns.reserve(Datums.size()); for (auto col : Datums) { @@ -576,10 +583,10 @@ std::shared_ptr TDatumBatch::ToTable() const { AFL_VERIFY(false); } } - return arrow::Table::Make(Schema, columns, Rows); + return arrow::Table::Make(GetSchema(), columns, Rows); } -std::shared_ptr TDatumBatch::ToRecordBatch() const { +std::shared_ptr TDatumBatch::ToRecordBatch() { std::vector> columns; columns.reserve(Datums.size()); for (auto col : Datums) { @@ -594,7 +601,7 @@ std::shared_ptr TDatumBatch::ToRecordBatch() const { AFL_VERIFY(false); } } - return arrow::RecordBatch::Make(Schema, Rows, columns); + return arrow::RecordBatch::Make(GetSchema(), Rows, columns); } std::shared_ptr TDatumBatch::FromRecordBatch(const std::shared_ptr& batch) { @@ -603,12 +610,7 @@ std::shared_ptr TDatumBatch::FromRecordBatch(const std::shared_ptr< for (int64_t i = 0; i < batch->num_columns(); ++i) { datums.push_back(arrow::Datum(batch->column(i))); } - return std::make_shared( - TProgramStep::TDatumBatch{ - .Schema = std::make_shared(*batch->schema()), - .Datums = std::move(datums), - .Rows = batch->num_rows() - }); + return std::make_shared(std::make_shared(*batch->schema()), std::move(datums), batch->num_rows()); } std::shared_ptr TDatumBatch::FromTable(const std::shared_ptr& batch) { @@ -617,12 +619,15 @@ std::shared_ptr TDatumBatch::FromTable(const std::shared_ptrnum_columns(); ++i) { datums.push_back(arrow::Datum(batch->column(i))); } - return std::make_shared( - TProgramStep::TDatumBatch{ - .Schema = std::make_shared(*batch->schema()), - .Datums = std::move(datums), - .Rows = batch->num_rows() - }); + return std::make_shared(std::make_shared(*batch->schema()), std::move(datums), batch->num_rows()); +} + +TDatumBatch::TDatumBatch(const std::shared_ptr& schema, std::vector&& datums, const i64 rows) + : SchemaBase(schema) + , Rows(rows) + , Datums(std::move(datums)) { + AFL_VERIFY(SchemaBase); + AFL_VERIFY(Datums.size() == (ui32)SchemaBase->num_fields()); } TAssign TAssign::MakeTimestamp(const TColumnInfo& column, ui64 value) { @@ -680,7 +685,7 @@ arrow::Status TProgramStep::ApplyAssignes(TDatumBatch& batch, arrow::compute::Ex } batch.Datums.reserve(batch.Datums.size() + Assignes.size()); for (auto& assign : Assignes) { - if (batch.GetColumnByName(assign.GetName()).ok()) { + if (batch.HasColumn(assign.GetName())) { return arrow::Status::Invalid("Assign to existing column '" + assign.GetName() + "'."); } @@ -703,8 +708,9 @@ arrow::Status TProgramStep::ApplyAggregates(TDatumBatch& batch, arrow::compute:: } ui32 numResultColumns = GroupBy.size() + GroupByKeys.size(); - TDatumBatch res; - res.Datums.reserve(numResultColumns); + std::vector datums; + datums.reserve(numResultColumns); + std::optional resultRecordsCount; arrow::FieldVector fields; fields.reserve(numResultColumns); @@ -715,13 +721,13 @@ arrow::Status TProgramStep::ApplyAggregates(TDatumBatch& batch, arrow::compute:: if (!funcResult.ok()) { return funcResult.status(); } - res.Datums.push_back(*funcResult); - fields.emplace_back(std::make_shared(assign.GetName(), res.Datums.back().type())); + datums.push_back(*funcResult); + fields.emplace_back(std::make_shared(assign.GetName(), datums.back().type())); } - res.Rows = 1; + resultRecordsCount = 1; } else { CH::GroupByOptions funcOpts; - funcOpts.schema = batch.Schema; + funcOpts.schema = batch.GetSchema(); funcOpts.assigns.reserve(numResultColumns); funcOpts.has_nullable_key = false; @@ -759,19 +765,18 @@ arrow::Status TProgramStep::ApplyAggregates(TDatumBatch& batch, arrow::compute:: return arrow::Status::Invalid("No expected column in GROUP BY result."); } fields.emplace_back(std::make_shared(assign.result_column, column->type())); - res.Datums.push_back(column); + datums.push_back(column); } - res.Rows = gbBatch->num_rows(); + resultRecordsCount = gbBatch->num_rows(); } - - res.Schema = std::make_shared(std::move(fields)); - batch = std::move(res); + AFL_VERIFY(resultRecordsCount); + batch = TDatumBatch(std::make_shared(std::move(fields)), std::move(datums), *resultRecordsCount); return arrow::Status::OK(); } arrow::Status TProgramStep::MakeCombinedFilter(TDatumBatch& batch, NArrow::TColumnFilter& result) const { - TFilterVisitor filterVisitor(batch.Rows); + TFilterVisitor filterVisitor(batch.GetRecordsCount()); for (auto& colName : Filters) { auto column = batch.GetColumnByName(colName.GetColumnName()); if (!column.ok()) { @@ -821,13 +826,13 @@ arrow::Status TProgramStep::ApplyFilters(TDatumBatch& batch) const { } } std::vector filterDatums; - for (int64_t i = 0; i < batch.Schema->num_fields(); ++i) { - if (batch.Datums[i].is_arraylike() && (allColumns || neededColumns.contains(batch.Schema->field(i)->name()))) { + for (int64_t i = 0; i < batch.GetSchema()->num_fields(); ++i) { + if (batch.Datums[i].is_arraylike() && (allColumns || neededColumns.contains(batch.GetSchema()->field(i)->name()))) { filterDatums.emplace_back(&batch.Datums[i]); } } - bits.Apply(batch.Rows, filterDatums); - batch.Rows = bits.GetFilteredCount().value_or(batch.Rows); + bits.Apply(batch.GetRecordsCount(), filterDatums); + batch.SetRecordsCount(bits.GetFilteredCount().value_or(batch.GetRecordsCount())); return arrow::Status::OK(); } @@ -838,15 +843,14 @@ arrow::Status TProgramStep::ApplyProjection(TDatumBatch& batch) const { std::vector> newFields; std::vector newDatums; for (size_t i = 0; i < Projection.size(); ++i) { - int schemaFieldIndex = batch.Schema->GetFieldIndex(Projection[i].GetColumnName()); + int schemaFieldIndex = batch.GetSchema()->GetFieldIndex(Projection[i].GetColumnName()); if (schemaFieldIndex == -1) { return arrow::Status::Invalid("Could not find column " + Projection[i].GetColumnName() + " in record batch schema."); } - newFields.push_back(batch.Schema->field(schemaFieldIndex)); + newFields.push_back(batch.GetSchema()->field(schemaFieldIndex)); newDatums.push_back(batch.Datums[schemaFieldIndex]); } - batch.Schema = std::make_shared(std::move(newFields)); - batch.Datums = std::move(newDatums); + batch = TDatumBatch(std::make_shared(std::move(newFields)), std::move(newDatums), batch.GetRecordsCount()); return arrow::Status::OK(); } @@ -919,14 +923,10 @@ std::set TProgramStep::GetColumnsInUsage(const bool originalOnly/* } arrow::Result> TProgramStep::BuildFilter(const std::shared_ptr& t) const { - return BuildFilter(t->BuildTableVerified(GetColumnsInUsage(true))); -} - -arrow::Result> TProgramStep::BuildFilter(const std::shared_ptr& t) const { if (Filters.empty()) { return nullptr; } - std::vector> batches = NArrow::SliceToRecordBatches(t); + std::vector> batches = NArrow::SliceToRecordBatches(t->BuildTableVerified(GetColumnsInUsage(true))); NArrow::TColumnFilter fullLocal = NArrow::TColumnFilter::BuildAllowFilter(); for (auto&& rb : batches) { auto datumBatch = TDatumBatch::FromRecordBatch(rb); @@ -938,7 +938,7 @@ arrow::Result> TProgramStep::BuildFilter( } NArrow::TColumnFilter local = NArrow::TColumnFilter::BuildAllowFilter(); NArrow::TStatusValidator::Validate(MakeCombinedFilter(*datumBatch, local)); - AFL_VERIFY(local.Size() == datumBatch->Rows)("local", local.Size())("datum", datumBatch->Rows); + AFL_VERIFY(local.Size() == datumBatch->GetRecordsCount())("local", local.Size())("datum", datumBatch->GetRecordsCount()); fullLocal.Append(local); } AFL_VERIFY(fullLocal.Size() == t->num_rows())("filter", fullLocal.Size())("t", t->num_rows()); diff --git a/ydb/core/formats/arrow/program.h b/ydb/core/formats/arrow/program.h index dfb22116158b..e3f9943e6c13 100644 --- a/ydb/core/formats/arrow/program.h +++ b/ydb/core/formats/arrow/program.h @@ -37,15 +37,47 @@ const char * GetHouseFunctionName(EAggregate op); inline const char * GetHouseGroupByName() { return "ch.group_by"; } EOperation ValidateOperation(EOperation op, ui32 argsSize); -struct TDatumBatch { - std::shared_ptr Schema; - std::vector Datums; +class TDatumBatch { +private: + std::shared_ptr SchemaBase; + THashMap NewColumnIds; + std::vector> NewColumnsPtr; int64_t Rows = 0; +public: + std::vector Datums; + + ui64 GetRecordsCount() const { + return Rows; + } + + void SetRecordsCount(const ui64 value) { + Rows = value; + } + + TDatumBatch(const std::shared_ptr& schema, std::vector&& datums, const i64 rows); + + const std::shared_ptr& GetSchema() { + if (NewColumnIds.size()) { + std::vector> fields = SchemaBase->fields(); + fields.insert(fields.end(), NewColumnsPtr.begin(), NewColumnsPtr.end()); + SchemaBase = std::make_shared(fields); + NewColumnIds.clear(); + NewColumnsPtr.clear(); + } + return SchemaBase; + } + arrow::Status AddColumn(const std::string& name, arrow::Datum&& column); arrow::Result GetColumnByName(const std::string& name) const; - std::shared_ptr ToTable() const; - std::shared_ptr ToRecordBatch() const; + bool HasColumn(const std::string& name) const { + if (NewColumnIds.contains(name)) { + return true; + } + return SchemaBase->GetFieldIndex(name) > -1; + } + std::shared_ptr ToTable(); + std::shared_ptr ToRecordBatch(); static std::shared_ptr FromRecordBatch(const std::shared_ptr& batch); static std::shared_ptr FromTable(const std::shared_ptr& batch); }; @@ -405,7 +437,6 @@ class TProgramStep { return Filters.size() && (!GroupBy.size() && !GroupByKeys.size()); } - [[nodiscard]] arrow::Result> BuildFilter(const std::shared_ptr& t) const; [[nodiscard]] arrow::Result> BuildFilter(const std::shared_ptr& t) const; }; diff --git a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp index c3b1c737d143..e7d90c111148 100644 --- a/ydb/core/tx/columnshard/columnshard__progress_tx.cpp +++ b/ydb/core/tx/columnshard/columnshard__progress_tx.cpp @@ -2,27 +2,39 @@ #include "columnshard_schema.h" #include + #include namespace NKikimr::NColumnShard { -class TColumnShard::TTxProgressTx : public TTransactionBase { +class TColumnShard::TTxProgressTx: public TTransactionBase { +private: + bool AbortedThroughRemoveExpired = false; + TTxController::ITransactionOperator::TPtr TxOperator; + const ui32 TabletTxNo; + std::optional LastCompletedTx; + std::optional PlannedQueueItem; + public: TTxProgressTx(TColumnShard* self) : TTransactionBase(self) - , TabletTxNo(++Self->TabletTxCounter) - {} + , TabletTxNo(++Self->TabletTxCounter) { + } - TTxType GetTxType() const override { return TXTYPE_PROGRESS; } + TTxType GetTxType() const override { + return TXTYPE_PROGRESS; + } bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { - NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute"); + NActors::TLogContextGuard logGuard = + NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute"); Y_ABORT_UNLESS(Self->ProgressTxInFlight); Self->TabletCounters->Simple()[COUNTER_TX_COMPLETE_LAG].Set(Self->GetTxCompleteLag().MilliSeconds()); const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc); if (removedCount > 0) { // We cannot continue with this transaction, start a new transaction + AbortedThroughRemoveExpired = true; Self->Execute(new TTxProgressTx(Self), ctx); return true; } @@ -48,7 +60,11 @@ class TColumnShard::TTxProgressTx : public TTransactionBase { } void Complete(const TActorContext& ctx) override { - NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); + if (AbortedThroughRemoveExpired) { + return; + } + NActors::TLogContextGuard logGuard = + NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete"); if (TxOperator) { TxOperator->ProgressOnComplete(*Self, ctx); Self->RescheduleWaitingReads(); @@ -65,12 +81,6 @@ class TColumnShard::TTxProgressTx : public TTransactionBase { } Self->SetupIndexation(); } - -private: - TTxController::ITransactionOperator::TPtr TxOperator; - const ui32 TabletTxNo; - std::optional LastCompletedTx; - std::optional PlannedQueueItem; }; void TColumnShard::EnqueueProgressTx(const TActorContext& ctx) { @@ -101,4 +111,4 @@ void TColumnShard::Handle(TEvColumnShard::TEvCheckPlannedTransaction::TPtr& ev, // For now do not return result for not finished tx. It would be sent in TTxProgressTx::Complete() } -} +} // namespace NKikimr::NColumnShard diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 543732900a07..a3db1b8f3eb2 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -142,6 +142,9 @@ void TGeneralCompactColumnEngineChanges::BuildAppendedPortionsByChunks( shardingActualVersion = shardingActual->GetSnapshotVersion(); } AppendedPortions = merger.Execute(stats, CheckPoints, resultFiltered, GranuleMeta->GetPathId(), shardingActualVersion); + for (auto&& p : AppendedPortions) { + p.GetPortionConstructor().MutableMeta().UpdateRecordsMeta(NPortion::EProduced::SPLIT_COMPACTED); + } } TConclusionStatus TGeneralCompactColumnEngineChanges::DoConstructBlobs(TConstructionContext& context) noexcept { diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index b2c505a10b91..bca7277947ca 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -199,6 +199,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont merger.SetOptimizationWritingPackMode(true); auto localAppended = merger.Execute(stats, itGranule->second, filteredSnapshot, pathId, shardingVersion); for (auto&& i : localAppended) { + i.GetPortionConstructor().MutableMeta().UpdateRecordsMeta(NPortion::EProduced::INSERTED); AppendedPortions.emplace_back(std::move(i)); } } diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 3c4385b62fba..5b3d988abfba 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -89,7 +89,6 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self void TChangesWithAppend::DoCompile(TFinalizationContext& context) { for (auto&& i : AppendedPortions) { i.GetPortionConstructor().SetPortionId(context.NextPortionId()); - i.GetPortionConstructor().MutableMeta().UpdateRecordsMeta(TPortionMeta::EProduced::INSERTED); } for (auto& [_, portionInfo] : PortionsToRemove) { portionInfo.SetRemoveSnapshot(context.GetSnapshot()); diff --git a/ydb/core/tx/columnshard/transactions/operators/backup.h b/ydb/core/tx/columnshard/transactions/operators/backup.h index c9457cfc0d35..9e1e657ad138 100644 --- a/ydb/core/tx/columnshard/transactions/operators/backup.h +++ b/ydb/core/tx/columnshard/transactions/operators/backup.h @@ -6,7 +6,7 @@ namespace NKikimr::NColumnShard { -class TBackupTransactionOperator: public IProposeTxOperator { +class TBackupTransactionOperator: public IProposeTxOperator, public TMonitoringObjectsCounter { private: using TBase = IProposeTxOperator; diff --git a/ydb/core/tx/columnshard/transactions/operators/ev_write.h b/ydb/core/tx/columnshard/transactions/operators/ev_write.h index 5aa543a76973..15bc4e5f3ae2 100644 --- a/ydb/core/tx/columnshard/transactions/operators/ev_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/ev_write.h @@ -4,7 +4,7 @@ namespace NKikimr::NColumnShard { - class TEvWriteTransactionOperator : public TTxController::ITransactionOperator { + class TEvWriteTransactionOperator: public TTxController::ITransactionOperator, public TMonitoringObjectsCounter { using TBase = TTxController::ITransactionOperator; using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE); diff --git a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h index 511f00cbedb0..35adecab6092 100644 --- a/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h +++ b/ydb/core/tx/columnshard/transactions/operators/long_tx_write.h @@ -6,7 +6,7 @@ namespace NKikimr::NColumnShard { - class TLongTxTransactionOperator: public IProposeTxOperator { + class TLongTxTransactionOperator: public IProposeTxOperator, public TMonitoringObjectsCounter { using TBase = IProposeTxOperator; using TProposeResult = TTxController::TProposeResult; static inline auto Registrator = TFactory::TRegistrator(NKikimrTxColumnShard::TX_KIND_COMMIT); @@ -16,6 +16,10 @@ namespace NKikimr::NColumnShard { return "LONG_TX_WRITE"; } + bool TxWithDeadline() const override { + return true; + } + virtual TProposeResult DoStartProposeOnExecute(TColumnShard& owner, NTabletFlatExecutor::TTransactionContext& txc) override; virtual void DoStartProposeOnComplete(TColumnShard& /*owner*/, const TActorContext& /*ctx*/) override { diff --git a/ydb/core/tx/columnshard/transactions/operators/schema.h b/ydb/core/tx/columnshard/transactions/operators/schema.h index e891365c5243..f79e10fac2ca 100644 --- a/ydb/core/tx/columnshard/transactions/operators/schema.h +++ b/ydb/core/tx/columnshard/transactions/operators/schema.h @@ -7,7 +7,7 @@ namespace NKikimr::NColumnShard { -class TSchemaTransactionOperator: public IProposeTxOperator { +class TSchemaTransactionOperator: public IProposeTxOperator, public TMonitoringObjectsCounter { private: using TBase = IProposeTxOperator; diff --git a/ydb/core/tx/columnshard/transactions/operators/sharing.h b/ydb/core/tx/columnshard/transactions/operators/sharing.h index 342a76b8ae5e..acf02304d878 100644 --- a/ydb/core/tx/columnshard/transactions/operators/sharing.h +++ b/ydb/core/tx/columnshard/transactions/operators/sharing.h @@ -6,7 +6,7 @@ namespace NKikimr::NColumnShard { -class TSharingTransactionOperator: public IProposeTxOperator { +class TSharingTransactionOperator: public IProposeTxOperator, public TMonitoringObjectsCounter { private: using TBase = IProposeTxOperator; diff --git a/ydb/core/tx/columnshard/transactions/tx_controller.cpp b/ydb/core/tx/columnshard/transactions/tx_controller.cpp index aeebb5c78277..d9b6e1451a17 100644 --- a/ydb/core/tx/columnshard/transactions/tx_controller.cpp +++ b/ydb/core/tx/columnshard/transactions/tx_controller.cpp @@ -45,6 +45,9 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { return false; } + ui32 countWithDeadline = 0; + ui32 countOverrideDeadline = 0; + ui32 countNoDeadline = 0; while (!rowset.EndOfSet()) { const ui64 txId = rowset.GetValue(); const NKikimrTxColumnShard::ETransactionKind txKind = rowset.GetValue(); @@ -57,6 +60,13 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { txInfo.MaxStep = rowset.GetValue(); if (txInfo.MaxStep != Max()) { txInfo.MinStep = txInfo.MaxStep - MaxCommitTxDelay.MilliSeconds(); + ++countWithDeadline; + } else if (txOperator->TxWithDeadline()) { + txInfo.MinStep = GetAllowedStep(); + txInfo.MaxStep = txInfo.MinStep + MaxCommitTxDelay.MilliSeconds(); + ++countOverrideDeadline; + } else { + ++countNoDeadline; } txInfo.PlanStep = rowset.GetValueOrDefault(0); txInfo.Source = rowset.GetValue(); @@ -74,6 +84,8 @@ bool TTxController::Load(NTabletFlatExecutor::TTransactionContext& txc) { return false; } } + AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("override", countOverrideDeadline)("no_dl", countNoDeadline)("dl", countWithDeadline)( + "operators", Operators.size())("plan", PlanQueue.size())("dl_queue", DeadlineQueue.size()); return true; } diff --git a/ydb/core/tx/data_events/write_data.h b/ydb/core/tx/data_events/write_data.h index fe73059a3758..5fc92bf185b3 100644 --- a/ydb/core/tx/data_events/write_data.h +++ b/ydb/core/tx/data_events/write_data.h @@ -38,7 +38,7 @@ class TWriteMeta { YDB_ACCESSOR_DEF(TString, DedupId); YDB_READONLY(TString, Id, TGUID::CreateTimebased().AsUuidString()); - YDB_ACCESSOR(EModificationType, ModificationType, EModificationType::Upsert); + YDB_ACCESSOR(EModificationType, ModificationType, EModificationType::Replace); YDB_READONLY(TMonotonic, WriteStartInstant, TMonotonic::Now()); YDB_ACCESSOR(TMonotonic, WriteMiddle1StartInstant, TMonotonic::Now()); YDB_ACCESSOR(TMonotonic, WriteMiddle2StartInstant, TMonotonic::Now());