Skip to content

Commit 595af5f

Browse files
authored
Rework FormatRead and SysLogRead, move their execution to PDisk to avoid race (#10009)
1 parent 2353272 commit 595af5f

7 files changed

+46
-86
lines changed

ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
280280
str << " Config: " << Cfg->ToString();
281281
P_LOG(PRI_CRIT, BPD01, str.Str());
282282
} else {
283-
PDisk->InitiateReadSysLog(SelfId());
283+
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TReadFormat>(SelfId()));
284284
StateErrorReason =
285285
"PDisk is in StateInit, wait for PDisk to read sys log. Did you ckeck EvYardInit result? Marker# BSY09";
286286
Become(&TThis::StateInit);
@@ -557,8 +557,8 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
557557
// PDisk GUID is OK and format is complete
558558
*PDisk->Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialSysLogRead;
559559
*PDisk->Mon.PDiskDetailedState = TPDiskMon::TPDisk::BootingSysLogRead;
560-
PDisk->Format.InitMagic();
561-
PDisk->ReadSysLog(SelfId());
560+
561+
PDisk->InputRequest(PDisk->ReqCreator.CreateFromArgs<TReadSysLog>(SelfId()));
562562
}
563563
}
564564
}
@@ -1083,7 +1083,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
10831083
}
10841084

10851085
Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(PCtx->PDiskId, NKikimrProto::EReplyStatus::NOTREADY));
1086-
1086+
10871087
return;
10881088
}
10891089

@@ -1096,7 +1096,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
10961096
NPDisk::TMainKey newMainKey = ev->Get()->MainKey;
10971097

10981098
SecureWipeBuffer((ui8*)ev->Get()->MainKey.Keys.data(), sizeof(NPDisk::TKey) * ev->Get()->MainKey.Keys.size());
1099-
1099+
11001100
P_LOG(PRI_NOTICE, BSP01, "Going to restart PDisk since received TEvAskWardenRestartPDiskResult");
11011101

11021102
const TActorIdentity& thisActorId = SelfId();
@@ -1109,7 +1109,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
11091109
TIntrusivePtr<TPDiskConfig> actorCfg = std::move(Cfg);
11101110

11111111
auto& newCfg = ev->Get()->Config;
1112-
1112+
11131113
if (newCfg) {
11141114
Y_VERIFY_S(newCfg->PDiskId == pdiskId,
11151115
"New config's PDiskId# " << newCfg->PDiskId << " is not equal to real PDiskId# " << pdiskId);
@@ -1124,7 +1124,7 @@ class TPDiskActor : public TActorBootstrapped<TPDiskActor> {
11241124
TGenericExecutorThread& executorThread = actorCtx.ExecutorThread;
11251125

11261126
PassAway();
1127-
1127+
11281128
CreatePDiskActor(executorThread, counters, actorCfg, newMainKey, pdiskId, poolId, nodeId);
11291129

11301130
Send(ev->Sender, new TEvBlobStorage::TEvNotifyWardenPDiskRestarted(pdiskId));

ydb/core/blobstorage/pdisk/blobstorage_pdisk_blockdevice_ut.cpp

Lines changed: 0 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -173,69 +173,6 @@ void WaitForValue(TAtomic *counter, TDuration maxDuration, TAtomicBase expectedV
173173
}
174174
}
175175

176-
void RunTestMultipleRequestsFromCompletionAction() {
177-
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
178-
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
179-
const ui32 dataSize = 4 << 10;
180-
const ui64 generations = 8;
181-
TAtomic counter = 0;
182-
183-
184-
TTempDir tempDir;
185-
TString path = CreateFile(tempDir().c_str(), dataSize);
186-
187-
{
188-
TActorSystemCreator creator;
189-
THolder<NPDisk::TBufferPool> bufferPool(NPDisk::CreateBufferPool(dataSize, 1, false, {}));
190-
NPDisk::TBuffer::TPtr alignedBuffer(bufferPool->Pop());
191-
memset(alignedBuffer->Data(), 0, dataSize);
192-
THolder<NPDisk::IBlockDevice> device(NPDisk::CreateRealBlockDevice(path, *mon, 0, 0, 4,
193-
NPDisk::TDeviceMode::LockFile, 2 << generations, nullptr));
194-
device->Initialize(std::make_shared<NPDisk::TPDiskCtx>(creator.GetActorSystem()));
195-
196-
(new TWriter(*device, alignedBuffer.Get(), (i32)generations, &counter))->Exec(nullptr);
197-
198-
TAtomicBase expectedCounter = 0;
199-
for (ui64 i = 0; i <= generations; ++i) {
200-
expectedCounter += 1ull << i;
201-
}
202-
WaitForValue(&counter, TIMEOUT, expectedCounter);
203-
204-
TAtomicBase resultingCounter = AtomicGet(counter);
205-
206-
UNIT_ASSERT_VALUES_EQUAL(
207-
resultingCounter,
208-
expectedCounter
209-
);
210-
}
211-
Ctest << "Done" << Endl;
212-
}
213-
214-
void RunTestDestructionWithMultipleFlushesFromCompletionAction() {
215-
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
216-
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
217-
const ui32 dataSize = 4 << 10;
218-
const i32 generations = 8;
219-
TAtomic counter = 0;
220-
221-
TTempDir tempDir;
222-
TString path = CreateFile(tempDir().c_str(), dataSize);
223-
224-
TActorSystemCreator creator;
225-
THolder<NPDisk::IBlockDevice> device(NPDisk::CreateRealBlockDevice(path, *mon, 0, 0, 4,
226-
NPDisk::TDeviceMode::LockFile, 2 << generations, nullptr));
227-
device->Initialize(std::make_shared<NPDisk::TPDiskCtx>(creator.GetActorSystem()));
228-
229-
(new TFlusher(*device, generations, &counter))->Exec(nullptr);
230-
device->Stop();
231-
for (int i = 0; i < 10000; ++i) {
232-
(new TFlusher(*device, generations, &counter))->Exec(nullptr);
233-
}
234-
device.Destroy();
235-
236-
Ctest << "Done" << Endl;
237-
}
238-
239176
void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 diskSize, ui32 bufferSize, bool sequential = true) {
240177
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
241178
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));
@@ -264,14 +201,6 @@ void RunWriteTestWithSectorMap(NPDisk::NSectorMap::EDiskMode diskMode, ui32 disk
264201

265202
Y_UNIT_TEST_SUITE(TBlockDeviceTest) {
266203

267-
Y_UNIT_TEST(TestMultipleRequestsFromCompletionAction) {
268-
RunTestMultipleRequestsFromCompletionAction();
269-
}
270-
271-
Y_UNIT_TEST(TestDestructionWithMultipleFlushesFromCompletionAction) {
272-
RunTestDestructionWithMultipleFlushesFromCompletionAction();
273-
}
274-
275204
Y_UNIT_TEST(TestDeviceWithSubmitGetThread) {
276205
const TIntrusivePtr<::NMonitoring::TDynamicCounters> counters = new ::NMonitoring::TDynamicCounters;
277206
THolder<TPDiskMon> mon(new TPDiskMon(counters, 0, nullptr));

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2487,6 +2487,11 @@ void TPDisk::ProcessFastOperationsQueue() {
24872487
break;
24882488
case ERequestType::RequestContinueReadMetadata:
24892489
static_cast<TContinueReadMetadata&>(*req).Execute(PCtx->ActorSystem);
2490+
case ERequestType::RequestReadFormat:
2491+
InitiateReadFormat();
2492+
break;
2493+
case ERequestType::RequestReadSysLog:
2494+
InitiateReadSysLog();
24902495
break;
24912496
default:
24922497
Y_FAIL_S("Unexpected request type# " << (ui64)req->GetType());
@@ -3082,6 +3087,8 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) {
30823087
case ERequestType::RequestReadMetadata:
30833088
case ERequestType::RequestWriteMetadata:
30843089
case ERequestType::RequestContinueReadMetadata:
3090+
case ERequestType::RequestReadFormat:
3091+
case ERequestType::RequestReadSysLog:
30853092
break;
30863093
case ERequestType::RequestStopDevice:
30873094
BlockDevice->Stop();

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ class TPDisk : public IPDisk {
211211
bool IsFormatMagicValid(ui8 *magicData, ui32 magicDataSize, const TMainKey& mainKey); // Called by actor
212212
bool CheckGuid(TString *outReason); // Called by actor
213213
bool CheckFormatComplete(); // Called by actor
214-
void ReadSysLog(const TActorId &pDiskActor); // Called by actor
214+
void InitiateReadSysLog();
215215
bool ProcessChunk0(const TEvReadLogResult &readLogResult, TString& errorReason);
216216
void PrintChunksDebugInfo();
217217
TRcBuf ProcessReadSysLogResult(ui64 &outWritePosition, ui64 &outLsn, const TEvReadLogResult &readLogResult);
@@ -394,9 +394,8 @@ class TPDisk : public IPDisk {
394394
// Internal interface
395395

396396
// Schedules EvReadLogResult event for the system log
397-
void ResetInit();
398397
bool Initialize(); // Called by actor
399-
void InitiateReadSysLog(const TActorId &pDiskActor); // Called by actor
398+
void InitiateReadFormat();
400399
void ProcessReadLogResult(const TEvReadLogResult &evReadLogResult, const TActorId &pDiskActor);
401400

402401
NKikimrProto::EReplyStatus ValidateRequest(TLogWrite *logWrite, TStringStream& outErrorReason);
@@ -432,4 +431,3 @@ bool ParseSectorOffset(const TDiskFormat& format, TActorSystem *actorSystem, ui3
432431

433432
} // NPDisk
434433
} // NKikimr
435-

ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -205,8 +205,10 @@ void TPDisk::GetStartingPoints(NPDisk::TOwner owner, TMap<TLogSignature, NPDisk:
205205
}
206206
}
207207

208-
void TPDisk::ReadSysLog(const TActorId &pDiskActor) {
209-
TIntrusivePtr<TSysLogReader> sysLogReader(new TSysLogReader(this, PCtx->ActorSystem, pDiskActor,
208+
void TPDisk::InitiateReadSysLog() {
209+
Format.InitMagic();
210+
211+
TIntrusivePtr<TSysLogReader> sysLogReader(new TSysLogReader(this, PCtx->ActorSystem, PCtx->PDiskActor,
210212
TReqId(TReqId::ReadSysLog, 0)));
211213
sysLogReader->Start();
212214
return;
@@ -1310,15 +1312,15 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) {
13101312
}
13111313

13121314
// Schedules EvReadLogResult event for the system log
1313-
void TPDisk::InitiateReadSysLog(const TActorId &pDiskActor) {
1315+
void TPDisk::InitiateReadFormat() {
13141316
Y_VERIFY_S(PDiskThread.Running(), "expect PDiskThread to be running");
13151317
Y_VERIFY_S(InitPhase == EInitPhase::Uninitialized, "expect InitPhase to be Uninitialized, but InitPhase# "
13161318
<< InitPhase);
13171319
ui32 formatSectorsSize = FormatSectorSize * ReplicationFactor;
13181320
THolder<TEvReadFormatResult> evReadFormatResult(new TEvReadFormatResult(formatSectorsSize, UseHugePages));
13191321
ui8 *formatSectors = evReadFormatResult->FormatSectors.Get();
13201322
BlockDevice->PreadAsync(formatSectors, formatSectorsSize, 0,
1321-
new TCompletionEventSender(this, pDiskActor, evReadFormatResult.Release()), TReqId(TReqId::InitialFormatRead, 0), {});
1323+
new TCompletionEventSender(this, PCtx->PDiskActor, evReadFormatResult.Release()), TReqId(TReqId::InitialFormatRead, 0), {});
13221324
*Mon.PDiskState = NKikimrBlobStorage::TPDiskState::InitialFormatRead;
13231325
*Mon.PDiskDetailedState = TPDiskMon::TPDisk::BootingFormatRead;
13241326
InitPhase = EInitPhase::ReadingSysLog;

ydb/core/blobstorage/pdisk/blobstorage_pdisk_request_id.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ enum class ERequestType {
153153
RequestWriteMetadataResult,
154154
RequestPushUnformattedMetadataSector,
155155
RequestContinueReadMetadata,
156+
RequestReadFormat,
157+
RequestReadSysLog,
156158
};
157159

158160
inline IOutputStream& operator <<(IOutputStream& out, const TReqId& reqId) {

ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,28 @@ class TRequestBase : public TThrRefBase {
119119
static void AbortDelete(TRequestBase* request, TActorSystem* actorSystem);
120120
};
121121

122+
class TReadFormat : public TRequestBase {
123+
public:
124+
TReadFormat(TActorId pdiskActor, TAtomicBase reqIdx)
125+
: TRequestBase(pdiskActor, TReqId(TReqId::ReadFormatInfo, reqIdx), 0, 0, NPriInternal::Other)
126+
{}
127+
128+
ERequestType GetType() const override {
129+
return ERequestType::RequestReadFormat;
130+
}
131+
};
132+
133+
class TReadSysLog : public TRequestBase {
134+
public:
135+
TReadSysLog(TActorId pdiskActor, TAtomicBase reqIdx)
136+
: TRequestBase(pdiskActor, TReqId(TReqId::ReadSysLog, reqIdx), 0, 0, NPriInternal::Other)
137+
{}
138+
139+
ERequestType GetType() const override {
140+
return ERequestType::RequestReadSysLog;
141+
}
142+
};
143+
122144
//
123145
// TYardInit
124146
//

0 commit comments

Comments
 (0)