@@ -4551,6 +4551,130 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
4551
4551
}
4552
4552
}
4553
4553
4554
+ void CompactBorrowed (TTestActorRuntime& runtime, ui64 shardId, const TTableId& tableId) {
4555
+ auto msg = MakeHolder<TEvDataShard::TEvCompactBorrowed>(tableId.PathId );
4556
+ auto sender = runtime.AllocateEdgeActor ();
4557
+ runtime.SendToPipe (shardId, sender, msg.Release (), 0 , GetPipeConfigWithRetries ());
4558
+ runtime.GrabEdgeEventRethrow <TEvDataShard::TEvCompactBorrowedResult>(sender);
4559
+ }
4560
+
4561
+ Y_UNIT_TEST (PostMergeNotCompactedTooEarly) {
4562
+ TPortManager pm;
4563
+ TServerSettings serverSettings (pm.GetPort (2134 ));
4564
+ serverSettings.SetDomainName (" Root" )
4565
+ .SetUseRealThreads (false )
4566
+ .SetDomainPlanResolution (100 );
4567
+
4568
+ Tests::TServer::TPtr server = new TServer (serverSettings);
4569
+ auto &runtime = *server->GetRuntime ();
4570
+ auto sender = runtime.AllocateEdgeActor ();
4571
+
4572
+ runtime.SetLogPriority (NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
4573
+
4574
+ InitRoot (server, sender);
4575
+
4576
+ TDisableDataShardLogBatching disableDataShardLogBatching;
4577
+
4578
+ KqpSchemeExec (runtime, R"(
4579
+ CREATE TABLE `/Root/table` (key int, value bytes, PRIMARY KEY (key))
4580
+ WITH (AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = 1,
4581
+ PARTITION_AT_KEYS = (5));
4582
+ )" );
4583
+
4584
+ const auto shards = GetTableShards (server, sender, " /Root/table" );
4585
+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 2u );
4586
+ const auto tableId = ResolveTableId (server, sender, " /Root/table" );
4587
+
4588
+ for (int i = 0 ; i < 20 ; ++i) {
4589
+ Cerr << " ... upserting key " << i << Endl;
4590
+ auto query = Sprintf (R"(
4591
+ UPSERT INTO `/Root/table` (key, value) VALUES (%d, '%s');
4592
+ )" , i, TString (128 * 1024 , ' x' ).c_str ());
4593
+ ExecSQL (server, sender, query);
4594
+ if (i >= 5 ) {
4595
+ Cerr << " ... compacting shard " << shards.at (1 ) << Endl;
4596
+ CompactTable (runtime, shards.at (1 ), tableId, false );
4597
+ } else if (i == 4 ) {
4598
+ Cerr << " ... compacting shard " << shards.at (0 ) << Endl;
4599
+ CompactTable (runtime, shards.at (0 ), tableId, false );
4600
+ }
4601
+ }
4602
+
4603
+ // Read (and snapshot) current data, so it doesn't go away on compaction
4604
+ UNIT_ASSERT_VALUES_EQUAL (
4605
+ KqpSimpleExec (runtime, " SELECT COUNT(*) FROM `/Root/table`;" ),
4606
+ " { items { uint64_value: 20 } }" );
4607
+
4608
+ // Delete all the data in shard 0, this is small and will stay in memtable
4609
+ // But when borrowed dst compaction will have pressure to compact it all
4610
+ ExecSQL (server, sender, " DELETE FROM `/Root/table` WHERE key < 5" );
4611
+
4612
+ std::vector<TEvDataShard::TEvSplitTransferSnapshot::TPtr> snapshots;
4613
+ auto captureSnapshots = runtime.AddObserver <TEvDataShard::TEvSplitTransferSnapshot>(
4614
+ [&](TEvDataShard::TEvSplitTransferSnapshot::TPtr& ev) {
4615
+ auto * msg = ev->Get ();
4616
+ Cerr << " ... captured snapshot from " << msg->Record .GetSrcTabletId () << Endl;
4617
+ snapshots.emplace_back (ev.Release ());
4618
+ });
4619
+
4620
+ Cerr << " ... merging table" << Endl;
4621
+ SetSplitMergePartCountLimit (server->GetRuntime (), -1 );
4622
+ ui64 txId = AsyncMergeTable (server, sender, " /Root/table" , shards);
4623
+ Cerr << " ... started merge " << txId << Endl;
4624
+ WaitFor (runtime, [&]{ return snapshots.size () >= 2 ; }, " both src tablet snapshots" );
4625
+
4626
+ std::vector<TEvBlobStorage::TEvGet::TPtr> gets ;
4627
+ auto captureGets = runtime.AddObserver <TEvBlobStorage::TEvGet>(
4628
+ [&](TEvBlobStorage::TEvGet::TPtr& ev) {
4629
+ auto * msg = ev->Get ();
4630
+ if (msg->Queries [0 ].Id .TabletID () == shards.at (1 )) {
4631
+ Cerr << " ... blocking blob get of " << msg->Queries [0 ].Id << Endl;
4632
+ gets .emplace_back (ev.Release ());
4633
+ }
4634
+ });
4635
+
4636
+ // Release snapshot for shard 0 then shard 1
4637
+ captureSnapshots.Remove ();
4638
+ Cerr << " ... unlocking snapshots from tablet " << shards.at (0 ) << Endl;
4639
+ for (auto & ev : snapshots) {
4640
+ if (ev && ev->Get ()->Record .GetSrcTabletId () == shards.at (0 )) {
4641
+ runtime.Send (ev.Release (), 0 , true );
4642
+ }
4643
+ }
4644
+ Cerr << " ... unblocking snapshots from tablet " << shards.at (1 ) << Endl;
4645
+ for (auto & ev : snapshots) {
4646
+ if (ev && ev->Get ()->Record .GetSrcTabletId () == shards.at (1 )) {
4647
+ runtime.Send (ev.Release (), 0 , true );
4648
+ }
4649
+ }
4650
+
4651
+ // Let it commit above snapshots and incorrectly compact after the first one is loaded and merged
4652
+ runtime.SimulateSleep (TDuration::Seconds (1 ));
4653
+ UNIT_ASSERT (gets .size () > 0 );
4654
+
4655
+ Cerr << " ... unblocking blob gets" << Endl;
4656
+ captureGets.Remove ();
4657
+ for (auto & ev : gets ) {
4658
+ runtime.Send (ev.Release (), 0 , true );
4659
+ }
4660
+
4661
+ // Let it finish loading the second snapshot
4662
+ runtime.SimulateSleep (TDuration::Seconds (1 ));
4663
+
4664
+ // Wait for merge to complete and start a borrowed compaction
4665
+ // When bug is present it will cause newly compacted to part to have epoch larger than previously compacted
4666
+ WaitTxNotification (server, sender, txId);
4667
+ const auto merged = GetTableShards (server, sender, " /Root/table" );
4668
+ UNIT_ASSERT_VALUES_EQUAL (merged.size (), 1u );
4669
+ Cerr << " ... compacting borrowed parts in shard " << merged.at (0 ) << Endl;
4670
+ CompactBorrowed (runtime, merged.at (0 ), tableId);
4671
+
4672
+ // Validate we have an expected number of rows
4673
+ UNIT_ASSERT_VALUES_EQUAL (
4674
+ KqpSimpleExec (runtime, " SELECT COUNT(*) FROM `/Root/table`;" ),
4675
+ " { items { uint64_value: 15 } }" );
4676
+ }
4677
+
4554
4678
}
4555
4679
4556
4680
} // namespace NKikimr
0 commit comments