@@ -553,7 +553,7 @@ class TChangesTask: public NConveyor::ITask {
553
553
virtual bool DoExecute () override {
554
554
NActors::TLogContextGuard g (NActors::TLogContextBuilder::Build (NKikimrServices::TX_COLUMNSHARD)(" tablet_id" , TabletId)(" parent_id" , ParentActorId));
555
555
{
556
- NOlap::TConstructionContext context (TxEvent->IndexInfo , Counters);
556
+ NOlap::TConstructionContext context (* TxEvent->IndexInfo , Counters);
557
557
Y_ABORT_UNLESS (TxEvent->IndexChanges ->ConstructBlobs (context).Ok ());
558
558
if (!TxEvent->IndexChanges ->GetWritePortionsCount ()) {
559
559
TxEvent->SetPutStatus (NKikimrProto::OK);
@@ -600,7 +600,8 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {
600
600
}
601
601
virtual bool DoOnError (const TString& storageId, const NOlap::TBlobRange& range, const NOlap::IBlobsReadingAction::TErrorStatus& status) override {
602
602
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " DoOnError" )(" storage_id" , storageId)(" blob_id" , range)(" status" , status.GetErrorMessage ())(" status_code" , status.GetStatus ());
603
- AFL_VERIFY (false )(" blob_id" , range)(" status" , status.GetStatus ());
603
+ AFL_VERIFY (false )(" blob_id" , range)(" status" , status.GetStatus ())(" error" , status.GetErrorMessage ())(" type" , TxEvent->IndexChanges ->TypeString ())(" task_id" , TxEvent->IndexChanges ->GetTaskIdentifier ())
604
+ (" debug" , TxEvent->IndexChanges ->DebugString ());
604
605
TxEvent->SetPutStatus (NKikimrProto::ERROR);
605
606
TActorContext::AsActorContext ().Send (ParentActorId, std::move (TxEvent));
606
607
return false ;
@@ -630,9 +631,9 @@ void TColumnShard::StartIndexTask(std::vector<const NOlap::TInsertedData*>&& dat
630
631
auto indexChanges = TablesManager.MutablePrimaryIndex ().StartInsert (std::move (data));
631
632
Y_ABORT_UNLESS (indexChanges);
632
633
633
- auto actualIndexInfo = TablesManager.GetPrimaryIndex ()->GetVersionedIndex ();
634
+ auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>( TablesManager.GetPrimaryIndex ()->GetVersionedIndex () );
634
635
indexChanges->Start (*this );
635
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move ( actualIndexInfo) , indexChanges, Settings.CacheDataAfterIndexing );
636
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterIndexing );
636
637
637
638
const TString externalTaskId = indexChanges->GetTaskIdentifier ();
638
639
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " indexation" )(" bytes" , bytesToIndex)(" blobs_count" , dataToIndex.size ())(" max_limit" , (i64)Limits.MaxInsertBytes )
@@ -711,8 +712,8 @@ void TColumnShard::SetupCompaction() {
711
712
712
713
indexChanges->Start (*this );
713
714
714
- auto actualIndexInfo = TablesManager.GetPrimaryIndex ()->GetVersionedIndex ();
715
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move ( actualIndexInfo) , indexChanges, Settings.CacheDataAfterCompaction );
715
+ auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>( TablesManager.GetPrimaryIndex ()->GetVersionedIndex () );
716
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, indexChanges, Settings.CacheDataAfterCompaction );
716
717
const TString externalTaskId = indexChanges->GetTaskIdentifier ();
717
718
AFL_DEBUG (NKikimrServices::TX_COLUMNSHARD)(" event" , " compaction" )(" external_task_id" , externalTaskId);
718
719
@@ -730,40 +731,34 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls, con
730
731
return false ;
731
732
}
732
733
CSCounters.OnSetupTtl ();
733
- if (BackgroundController.IsTtlActive ()) {
734
- ACFL_DEBUG (" background" , " ttl" )(" skip_reason" , " in_progress" );
735
- return false ;
736
- }
737
- if (force) {
738
- TablesManager.MutablePrimaryIndex ().OnTieringModified (Tiers, TablesManager.GetTtl ());
739
- }
740
734
THashMap<ui64, NOlap::TTiering> eviction = pathTtls;
741
735
for (auto && i : eviction) {
742
736
ACFL_DEBUG (" background" , " ttl" )(" path" , i.first )(" info" , i.second .GetDebugString ());
743
737
}
744
738
745
- auto actualIndexInfo = TablesManager.GetPrimaryIndex ()->GetVersionedIndex ();
739
+ auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>( TablesManager.GetPrimaryIndex ()->GetVersionedIndex () );
746
740
const ui64 memoryUsageLimit = HasAppData () ? AppDataVerified ().ColumnShardConfig .GetTieringsMemoryLimit () : ((ui64)512 * 1024 * 1024 );
747
- std::shared_ptr<NOlap::TTTLColumnEngineChanges> indexChanges = TablesManager.MutablePrimaryIndex ().StartTtl (eviction, DataLocksManager, memoryUsageLimit);
741
+ std::vector<std:: shared_ptr<NOlap::TTTLColumnEngineChanges> > indexChanges = TablesManager.MutablePrimaryIndex ().StartTtl (eviction, DataLocksManager, memoryUsageLimit);
748
742
749
- if (! indexChanges) {
743
+ if (indexChanges. empty () ) {
750
744
ACFL_DEBUG (" background" , " ttl" )(" skip_reason" , " no_changes" );
751
745
return false ;
752
746
}
753
- const TString externalTaskId = indexChanges->GetTaskIdentifier ();
754
- const bool needWrites = indexChanges->NeedConstruction ();
755
- ACFL_DEBUG (" background" , " ttl" )(" need_writes" , needWrites);
756
-
757
- indexChanges->Start (*this );
758
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move (actualIndexInfo), indexChanges, false );
759
- NYDBTest::TControllers::GetColumnShardController ()->OnWriteIndexStart (TabletID (), indexChanges->TypeString ());
760
- if (needWrites) {
761
- NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (
762
- ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
763
- std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, indexChanges->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
764
- } else {
765
- ev->SetPutStatus (NKikimrProto::OK);
766
- ActorContext ().Send (SelfId (), std::move (ev));
747
+ for (auto && i : indexChanges) {
748
+ const TString externalTaskId = i->GetTaskIdentifier ();
749
+ const bool needWrites = i->NeedConstruction ();
750
+ ACFL_DEBUG (" background" , " ttl" )(" need_writes" , needWrites);
751
+ i->Start (*this );
752
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, i, false );
753
+ NYDBTest::TControllers::GetColumnShardController ()->OnWriteIndexStart (TabletID (), i->TypeString ());
754
+ if (needWrites) {
755
+ NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription (
756
+ ResourceSubscribeActor, std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
757
+ std::make_shared<TChangesReadTask>(std::move(ev), SelfId(), TabletID(), CompactionCounters), 0, i->CalcMemoryForUsage(), externalTaskId, TTLTaskSubscription));
758
+ } else {
759
+ ev->SetPutStatus (NKikimrProto::OK);
760
+ ActorContext ().Send (SelfId (), std::move (ev));
761
+ }
767
762
}
768
763
return true ;
769
764
}
@@ -784,8 +779,8 @@ void TColumnShard::SetupCleanup() {
784
779
}
785
780
786
781
ACFL_DEBUG (" background" , " cleanup" )(" changes_info" , changes->DebugString ());
787
- auto actualIndexInfo = TablesManager.GetPrimaryIndex ()->GetVersionedIndex ();
788
- auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(std::move ( actualIndexInfo) , changes, false );
782
+ auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>( TablesManager.GetPrimaryIndex ()->GetVersionedIndex () );
783
+ auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(actualIndexInfo, changes, false );
789
784
ev->SetPutStatus (NKikimrProto::OK); // No new blobs to write
790
785
791
786
changes->Start (*this );
0 commit comments