|
| 1 | +#include <ydb/core/blobstorage/ut_blobstorage/lib/env.h> |
| 2 | + |
| 3 | +#include <library/cpp/iterator/enumerate.h> |
| 4 | + |
| 5 | +#include <util/random/entropy.h> |
| 6 | + |
| 7 | + |
| 8 | +using TPartsLocations = TVector<TVector<ui8>>; |
| 9 | + |
| 10 | +TString ToString(const TPartsLocations& partsLocations) { |
| 11 | + TStringBuilder b; |
| 12 | + b << "["; |
| 13 | + for (const auto& parts: partsLocations) { |
| 14 | + if (parts.size() == 0) { |
| 15 | + b << ". "; |
| 16 | + } else { |
| 17 | + for (auto x: parts) { |
| 18 | + b << ToString(x); |
| 19 | + } |
| 20 | + b << " "; |
| 21 | + } |
| 22 | + } |
| 23 | + b << "]"; |
| 24 | + return b; |
| 25 | +} |
| 26 | + |
| 27 | + |
| 28 | +struct TTestEnv { |
| 29 | + TTestEnv(ui32 nodeCount, TBlobStorageGroupType erasure) |
| 30 | + : Env({ |
| 31 | + .NodeCount = nodeCount, |
| 32 | + .VDiskReplPausedAtStart = false, |
| 33 | + .Erasure = erasure, |
| 34 | + .FeatureFlags = MakeFeatureFlags(), |
| 35 | + }) |
| 36 | + { |
| 37 | + Env.CreateBoxAndPool(1, 1); |
| 38 | + Env.Sim(TDuration::Minutes(1)); |
| 39 | + |
| 40 | + auto groups = Env.GetGroups(); |
| 41 | + UNIT_ASSERT_VALUES_EQUAL(groups.size(), 1); |
| 42 | + GroupInfo = Env.GetGroupInfo(groups.front()); |
| 43 | + |
| 44 | + for (ui32 i = 0; i < Env.Settings.NodeCount; ++i) { |
| 45 | + RunningNodes.insert(i); |
| 46 | + } |
| 47 | + } |
| 48 | + |
| 49 | + static TFeatureFlags MakeFeatureFlags() { |
| 50 | + TFeatureFlags res; |
| 51 | + res.SetUseVDisksBalancing(true); |
| 52 | + return res; |
| 53 | + } |
| 54 | + |
| 55 | + static TString PrepareData(const ui32 dataLen, const ui32 start) { |
| 56 | + TString data(Reserve(dataLen)); |
| 57 | + for (ui32 i = 0; i < dataLen; ++i) { |
| 58 | + data.push_back('a' + (start + i) % 26); |
| 59 | + } |
| 60 | + return data; |
| 61 | + }; |
| 62 | + |
| 63 | + void SendPut(ui32 step, const TString& data, NKikimrProto::EReplyStatus expectedStatus) { |
| 64 | + const TLogoBlobID id(1, 1, step, 0, data.size(), 0); |
| 65 | + Cerr << "SEND TEvPut with key " << id.ToString() << Endl; |
| 66 | + const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__); |
| 67 | + auto ev = std::make_unique<TEvBlobStorage::TEvPut>(id, data, TInstant::Max()); |
| 68 | + Env.Runtime->WrapInActorContext(sender, [&] { |
| 69 | + SendToBSProxy(sender, GroupInfo->GroupID, ev.release()); |
| 70 | + }); |
| 71 | + auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvPutResult>(sender, false); |
| 72 | + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, expectedStatus); |
| 73 | + Cerr << "TEvPutResult: " << res->Get()->ToString() << Endl; |
| 74 | + }; |
| 75 | + |
| 76 | + auto SendGet(ui32 step, ui32 dataSize, bool mustRestoreFirst=false) { |
| 77 | + const TLogoBlobID blobId(1, 1, step, 0, dataSize, 0); |
| 78 | + Cerr << "SEND TEvGet with key " << blobId.ToString() << Endl; |
| 79 | + const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__); |
| 80 | + auto ev = std::make_unique<TEvBlobStorage::TEvGet>( |
| 81 | + blobId, |
| 82 | + /* shift */ 0, |
| 83 | + /* size */ dataSize, |
| 84 | + TInstant::Max(), |
| 85 | + NKikimrBlobStorage::EGetHandleClass::FastRead, |
| 86 | + mustRestoreFirst |
| 87 | + ); |
| 88 | + Env.Runtime->WrapInActorContext(sender, [&] () { |
| 89 | + SendToBSProxy(sender, GroupInfo->GroupID, ev.release()); |
| 90 | + }); |
| 91 | + TInstant getDeadline = Env.Now() + TDuration::Seconds(30); |
| 92 | + auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvGetResult>(sender, /* termOnCapture */ false, getDeadline); |
| 93 | + Cerr << "TEvGetResult: " << res->Get()->ToString() << Endl; |
| 94 | + return res; |
| 95 | + }; |
| 96 | + |
| 97 | + TActorId GetQueue(const TVDiskID& vDiskId) { |
| 98 | + if (!Queues.contains(vDiskId)) { |
| 99 | + Queues[vDiskId] = Env.CreateQueueActor(vDiskId, NKikimrBlobStorage::EVDiskQueueId::GetFastRead, 1000); |
| 100 | + } |
| 101 | + return Queues[vDiskId]; |
| 102 | + } |
| 103 | + |
| 104 | + TVector<ui32> GetParts(ui32 position, const TLogoBlobID& blobId) { |
| 105 | + if (!RunningNodes.contains(position)) { |
| 106 | + return {}; |
| 107 | + } |
| 108 | + auto vDiskId = GroupInfo->GetVDiskId(position); |
| 109 | + auto ev = TEvBlobStorage::TEvVGet::CreateExtremeIndexQuery( |
| 110 | + vDiskId, TInstant::Max(), NKikimrBlobStorage::EGetHandleClass::AsyncRead, |
| 111 | + TEvBlobStorage::TEvVGet::EFlags::None, 0, |
| 112 | + {{blobId, 0, 0}} |
| 113 | + ); |
| 114 | + const TActorId sender = Env.Runtime->AllocateEdgeActor(GroupInfo->GetActorId(*RunningNodes.begin()).NodeId(), __FILE__, __LINE__); |
| 115 | + TVector<ui32> partsRes; |
| 116 | + |
| 117 | + Cerr << "Get request for vdisk " << position << Endl; |
| 118 | + auto queueId = GetQueue(vDiskId); |
| 119 | + Env.Runtime->WrapInActorContext(sender, [&] { |
| 120 | + Env.Runtime->Send(new IEventHandle(queueId, sender, ev.release())); |
| 121 | + }); |
| 122 | + auto res = Env.WaitForEdgeActorEvent<TEvBlobStorage::TEvVGetResult>(sender, false); |
| 123 | + auto parts = res->Get()->Record.GetResult().at(0).GetParts(); |
| 124 | + partsRes = TVector<ui32>(parts.begin(), parts.end()); |
| 125 | + return partsRes; |
| 126 | + } |
| 127 | + |
| 128 | + TPartsLocations GetExpectedPartsLocations(const TLogoBlobID& blobId) { |
| 129 | + TPartsLocations result(GroupInfo->GetTopology().GType.BlobSubgroupSize()); |
| 130 | + TBlobStorageGroupInfo::TOrderNums orderNums; |
| 131 | + GroupInfo->GetTopology().PickSubgroup(blobId.Hash(), orderNums); |
| 132 | + for (ui32 i = 0; i < GroupInfo->GetTopology().GType.TotalPartCount(); ++i) { |
| 133 | + result[orderNums[i]].push_back(i + 1); |
| 134 | + } |
| 135 | + return result; |
| 136 | + } |
| 137 | + |
| 138 | + TPartsLocations GetActualPartsLocations(const TLogoBlobID& blobId) { |
| 139 | + TPartsLocations result(GroupInfo->GetTopology().GType.BlobSubgroupSize()); |
| 140 | + for (ui32 i = 0; i < result.size(); ++i) { |
| 141 | + for (ui32 part: GetParts(i, blobId)) { |
| 142 | + result[i].push_back(part); |
| 143 | + } |
| 144 | + Sort(result[i].begin(), result[i].end()); |
| 145 | + } |
| 146 | + return result; |
| 147 | + } |
| 148 | + |
| 149 | + bool CheckPartsLocations(const TLogoBlobID& blobId) { |
| 150 | + auto expectedParts = GetExpectedPartsLocations(blobId); |
| 151 | + auto actualParts = GetActualPartsLocations(blobId); |
| 152 | + TString errMsg = ToString(expectedParts) + " != " + ToString(actualParts); |
| 153 | + UNIT_ASSERT_VALUES_EQUAL_C(expectedParts.size(), actualParts.size(), errMsg); |
| 154 | + |
| 155 | + for (ui32 i = 0; i < expectedParts.size(); ++i) { |
| 156 | + UNIT_ASSERT_VALUES_EQUAL_C(expectedParts[i].size(), actualParts[i].size(), errMsg); |
| 157 | + for (ui32 j = 0; j < expectedParts[i].size(); ++j) { |
| 158 | + UNIT_ASSERT_VALUES_EQUAL_C(expectedParts[i][j], actualParts[i][j], errMsg); |
| 159 | + } |
| 160 | + } |
| 161 | + |
| 162 | + return true; |
| 163 | + } |
| 164 | + |
| 165 | + void StopNode(ui32 position) { |
| 166 | + if (!RunningNodes.contains(position)) { |
| 167 | + return; |
| 168 | + } |
| 169 | + Env.StopNode(GroupInfo->GetActorId(position).NodeId()); |
| 170 | + RunningNodes.erase(position); |
| 171 | + } |
| 172 | + |
| 173 | + void StartNode(ui32 position) { |
| 174 | + if (RunningNodes.contains(position)) { |
| 175 | + return; |
| 176 | + } |
| 177 | + Env.StartNode(GroupInfo->GetActorId(position).NodeId()); |
| 178 | + RunningNodes.insert(position); |
| 179 | + for (auto [_, queueId]: Queues) { |
| 180 | + Env.Runtime->Send(new IEventHandle(TEvents::TSystem::Poison, 0, queueId, {}, nullptr, 0), queueId.NodeId()); |
| 181 | + } |
| 182 | + Queues.clear(); |
| 183 | + } |
| 184 | + |
| 185 | + TEnvironmentSetup* operator->() { |
| 186 | + return &Env; |
| 187 | + } |
| 188 | + |
| 189 | + TEnvironmentSetup Env; |
| 190 | + TIntrusivePtr<TBlobStorageGroupInfo> GroupInfo; |
| 191 | + THashSet<ui32> RunningNodes; |
| 192 | + THashMap<TVDiskID, TActorId> Queues; |
| 193 | +}; |
| 194 | + |
| 195 | +TLogoBlobID MakeLogoBlobId(ui32 step, ui32 dataSize) { |
| 196 | + return TLogoBlobID(1, 1, step, 0, dataSize, 0); |
| 197 | +} |
| 198 | + |
| 199 | + |
| 200 | +TString GenData(ui32 len) { |
| 201 | + TString res = TString::Uninitialized(len); |
| 202 | + EntropyPool().Read(res.Detach(), res.size()); |
| 203 | + return res; |
| 204 | +} |
| 205 | + |
| 206 | + |
| 207 | +struct TStopOneNodeTest { |
| 208 | + TTestEnv Env; |
| 209 | + TString data; |
| 210 | + |
| 211 | + void RunTest() { |
| 212 | + ui32 step = 0; |
| 213 | + |
| 214 | + { // Check just a normal put works |
| 215 | + Env.SendPut(++step, data, NKikimrProto::OK); |
| 216 | + UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data); |
| 217 | + Env.CheckPartsLocations(MakeLogoBlobId(step, data.size())); |
| 218 | + } |
| 219 | + |
| 220 | + |
| 221 | + { // Stop one node that should have a part, make put, start it and check that blob would be moved from handoff on main |
| 222 | + auto blobId = MakeLogoBlobId(++step, data.size()); |
| 223 | + auto locations = Env.GetExpectedPartsLocations(blobId); |
| 224 | + ui32 nodeIdWithBlob = 0; |
| 225 | + while (locations[nodeIdWithBlob].size() == 0) ++nodeIdWithBlob; |
| 226 | + |
| 227 | + Env.StopNode(nodeIdWithBlob); |
| 228 | + Env.SendPut(step, data, NKikimrProto::OK); |
| 229 | + Env->Sim(TDuration::Seconds(10)); |
| 230 | + Env.StartNode(nodeIdWithBlob); |
| 231 | + Env->Sim(TDuration::Seconds(10)); |
| 232 | + |
| 233 | + Cerr << "Start compaction 1" << Endl; |
| 234 | + for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { |
| 235 | + Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); |
| 236 | + } |
| 237 | + Env->Sim(TDuration::Seconds(10)); |
| 238 | + Cerr << "Finish compaction 1" << Endl; |
| 239 | + |
| 240 | + Cerr << "Start compaction 2" << Endl; |
| 241 | + for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { |
| 242 | + Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); |
| 243 | + } |
| 244 | + Env->Sim(TDuration::Seconds(10)); |
| 245 | + Cerr << "Finish compaction 2" << Endl; |
| 246 | + |
| 247 | + Cerr << "Start compaction 3" << Endl; |
| 248 | + for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { |
| 249 | + Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); |
| 250 | + } |
| 251 | + Env->Sim(TDuration::Seconds(10)); |
| 252 | + Cerr << "Finish compaction 3" << Endl; |
| 253 | + |
| 254 | + Env.CheckPartsLocations(MakeLogoBlobId(step, data.size())); |
| 255 | + UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data.size())->Get()->Responses[0].Buffer.ConvertToString(), data); |
| 256 | + } |
| 257 | + } |
| 258 | +}; |
| 259 | + |
| 260 | +struct TRandomTest { |
| 261 | + TTestEnv Env; |
| 262 | + ui32 NumIters; |
| 263 | + |
| 264 | + void RunTest() { |
| 265 | + TVector<TString> data(Reserve(NumIters)); |
| 266 | + |
| 267 | + for (ui32 step = 0; step < NumIters; ++step) { |
| 268 | + Cerr << step << Endl; |
| 269 | + data.push_back(GenData(16 + random() % 4096)); |
| 270 | + auto blobId = MakeLogoBlobId(step, data.back().size()); |
| 271 | + auto locations = Env.GetExpectedPartsLocations(blobId); |
| 272 | + |
| 273 | + if (random() % 10 == 1 && Env.RunningNodes.size() + 2 > Env->Settings.NodeCount) { |
| 274 | + ui32 nodeId = random() % Env->Settings.NodeCount; |
| 275 | + Cerr << "Stop node " << nodeId << Endl; |
| 276 | + Env.StopNode(nodeId); |
| 277 | + Env->Sim(TDuration::Seconds(10)); |
| 278 | + } |
| 279 | + |
| 280 | + Env.SendPut(step, data.back(), NKikimrProto::OK); |
| 281 | + |
| 282 | + if (random() % 10 == 1) { |
| 283 | + for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { |
| 284 | + if (!Env.RunningNodes.contains(pos)) { |
| 285 | + Cerr << "Start node " << pos << Endl; |
| 286 | + Env.StartNode(pos); |
| 287 | + Env->Sim(TDuration::Seconds(10)); |
| 288 | + break; |
| 289 | + } |
| 290 | + } |
| 291 | + } |
| 292 | + |
| 293 | + if (random() % 50 == 1) { |
| 294 | + ui32 pos = random() % Env->Settings.NodeCount; |
| 295 | + if (Env.RunningNodes.contains(pos)) { |
| 296 | + Env->CompactVDisk(Env.GroupInfo->GetActorId(pos)); |
| 297 | + Env->Sim(TDuration::Seconds(10)); |
| 298 | + } |
| 299 | + } |
| 300 | + |
| 301 | + // Wipe random node |
| 302 | + if (random() % 100 == 1) { |
| 303 | + ui32 pos = random() % Env->Settings.NodeCount; |
| 304 | + if (Env.RunningNodes.contains(pos)) { |
| 305 | + auto baseConfig = Env->FetchBaseConfig(); |
| 306 | + const auto& somePDisk = baseConfig.GetPDisk(pos); |
| 307 | + const auto& someVSlot = baseConfig.GetVSlot(pos); |
| 308 | + Env->Wipe(somePDisk.GetNodeId(), somePDisk.GetPDiskId(), someVSlot.GetVSlotId().GetVSlotId()); |
| 309 | + Env->Sim(TDuration::Seconds(10)); |
| 310 | + } |
| 311 | + } |
| 312 | + } |
| 313 | + |
| 314 | + for (ui32 pos = 0; pos < Env->Settings.NodeCount; ++pos) { |
| 315 | + Env.StartNode(pos); |
| 316 | + } |
| 317 | + |
| 318 | + Env->Sim(TDuration::Seconds(300)); |
| 319 | + Cerr << "Start checking" << Endl; |
| 320 | + for (ui32 step = 0; step < NumIters; ++step) { |
| 321 | + Cerr << step << Endl; |
| 322 | + Env.CheckPartsLocations(MakeLogoBlobId(step, data[step].size())); |
| 323 | + UNIT_ASSERT_VALUES_EQUAL(Env.SendGet(step, data[step].size())->Get()->Responses[0].Buffer.ConvertToString(), data[step]); |
| 324 | + } |
| 325 | + } |
| 326 | +}; |
| 327 | + |
| 328 | + |
| 329 | + |
| 330 | +Y_UNIT_TEST_SUITE(VDiskBalancing) { |
| 331 | + |
| 332 | + Y_UNIT_TEST(TestStopOneNode_Block42) { |
| 333 | + TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(100)}.RunTest(); |
| 334 | + } |
| 335 | + Y_UNIT_TEST(TestStopOneNode_Mirror3dc) { |
| 336 | + TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(100)}.RunTest(); |
| 337 | + } |
| 338 | + Y_UNIT_TEST(TestStopOneNode_Block42_HugeBlob) { |
| 339 | + TStopOneNodeTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), GenData(521_KB)}.RunTest(); |
| 340 | + } |
| 341 | + Y_UNIT_TEST(TestStopOneNode_Mirror3dc_HugeBlob) { |
| 342 | + TStopOneNodeTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), GenData(521_KB)}.RunTest(); |
| 343 | + } |
| 344 | + |
| 345 | + Y_UNIT_TEST(TestRandom_Block42) { |
| 346 | + TRandomTest{TTestEnv(8, TBlobStorageGroupType::Erasure4Plus2Block), 1000}.RunTest(); |
| 347 | + } |
| 348 | + Y_UNIT_TEST(TestRandom_Mirror3dc) { |
| 349 | + TRandomTest{TTestEnv(9, TBlobStorageGroupType::ErasureMirror3dc), 1000}.RunTest(); |
| 350 | + } |
| 351 | + |
| 352 | +} |
0 commit comments