diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h index 9dd0609ce028..2777f5cff7c7 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk.h @@ -327,12 +327,9 @@ struct TEvLog : public TEventLocal { }; struct TEvMultiLog : public TEventLocal { - TBatchedVec> Logs; - TLsnSeg LsnSeg; - - void AddLog(THolder &&ev) { - Logs.emplace_back(std::move(ev)); - auto &log = *Logs.back(); + void AddLog(THolder &&ev, NWilson::TTraceId traceId = {}) { + Logs.emplace_back(std::move(ev), std::move(traceId)); + auto &log = *Logs.back().Event; if (Logs.size() == 1) { LsnSeg = TLsnSeg(log.LsnSegmentStart, log.Lsn); } else { @@ -350,11 +347,19 @@ struct TEvMultiLog : public TEventLocal TStringBuilder str; str << '{'; for (ui64 idx = 0; idx < record.Logs.size(); ++idx) { - str << (idx ? ", " : "") << idx << "# " << record.Logs[idx]->ToString(); + str << (idx ? ", " : "") << idx << "# " << record.Logs[idx].Event->ToString(); } str << '}'; return str; } + + struct TItem { + THolder Event; + NWilson::TTraceId TraceId; + }; + + TBatchedVec Logs; + TLsnSeg LsnSeg; }; struct TEvLogResult : public TEventLocal { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp index 195d48e43756..fe650bcd1deb 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_actor.cpp @@ -648,7 +648,7 @@ class TPDiskActor : public TActorBootstrapped { } str << " StateErrorReason# " << StateErrorReason; THolder result(new NPDisk::TEvLogResult(NKikimrProto::CORRUPTED, 0, str.Str())); - for (auto &log : evMultiLog.Logs) { + for (auto &[log, _] : evMultiLog.Logs) { result->Results.push_back(NPDisk::TEvLogResult::TRecord(log->Lsn, log->Cookie)); } PDisk->Mon.WriteLog.CountRequest(0); @@ -783,9 +783,9 @@ class TPDiskActor : public TActorBootstrapped { } void Handle(NPDisk::TEvMultiLog::TPtr &ev) { - for (auto &log : ev->Get()->Logs) { + for (auto &[log, traceId] : ev->Get()->Logs) { double burstMs; - TLogWrite* request = PDisk->ReqCreator.CreateLogWrite(*log, ev->Sender, burstMs, std::move(ev->TraceId)); + TLogWrite* request = PDisk->ReqCreator.CreateLogWrite(*log, ev->Sender, burstMs, std::move(traceId)); CheckBurst(request->IsSensitive, burstMs); request->Orbit = std::move(log->Orbit); PDisk->InputRequest(request); @@ -796,7 +796,7 @@ class TPDiskActor : public TActorBootstrapped { LOG_DEBUG(*TlsActivationContext, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " %s Marker# BSY01", (ui32)PDisk->PDiskId, ev->Get()->ToString().c_str()); double burstMs; - auto* request = PDisk->ReqCreator.CreateFromEv(*ev->Get(), ev->Sender, &burstMs); + auto* request = PDisk->ReqCreator.CreateFromEvPtr(ev, &burstMs); CheckBurst(request->IsSensitive, burstMs); PDisk->InputRequest(request); } @@ -874,7 +874,7 @@ class TPDiskActor : public TActorBootstrapped { } void Handle(NPDisk::TEvAskForCutLog::TPtr &ev) { - auto* request = PDisk->ReqCreator.CreateFromEv(*ev->Get(), ev->Sender); + auto* request = PDisk->ReqCreator.CreateFromEvPtr(ev); PDisk->InputRequest(request); } @@ -1135,7 +1135,7 @@ class TPDiskActor : public TActorBootstrapped { } void Handle(NPDisk::TEvReadLogContinue::TPtr &ev) { - auto *request = PDisk->ReqCreator.CreateFromEv(*ev->Get(), SelfId()); + auto *request = PDisk->ReqCreator.CreateFromEvPtr(ev); PDisk->InputRequest(request); } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp index f5d29e2312f7..9755aeff4b0a 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_completion_impl.cpp @@ -15,7 +15,8 @@ namespace NPDisk { void TCompletionLogWrite::Exec(TActorSystem *actorSystem) { // bool isNewChunksCommited = false; if (CommitedLogChunks) { - auto* req = PDisk->ReqCreator.CreateFromArgs(std::move(CommitedLogChunks)); + NWilson::TSpan span(TWilson::PDisk, TraceId.Clone(), "PDisk.CommitLogChunks"); + auto* req = PDisk->ReqCreator.CreateFromArgs(std::move(CommitedLogChunks), std::move(span)); PDisk->InputRequest(req); //isNewChunksCommited = true; } @@ -384,7 +385,9 @@ void TChunkTrimCompletion::Exec(TActorSystem *actorSystem) { << ui64(responseTimeMs) << " sizeBytes# " << SizeBytes); LWPROBE(PDiskTrimResponseTime, PDisk->PDiskId, ReqId.Id, responseTimeMs, SizeBytes); PDisk->Mon.Trim.CountResponse(); - TTryTrimChunk *tryTrim = PDisk->ReqCreator.CreateFromArgs(SizeBytes); + NWilson::TSpan span(TWilson::PDisk, std::move(TraceId), "PDisk.TryTrimChunk", NWilson::EFlags::AUTO_END, actorSystem); + span.Attribute("size", static_cast(SizeBytes)); + TTryTrimChunk *tryTrim = PDisk->ReqCreator.CreateFromArgs(SizeBytes, std::move(span)); PDisk->InputRequest(tryTrim); delete this; } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp index 80386b1bca86..463af890cf2a 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.cpp @@ -527,7 +527,7 @@ bool TPDisk::ReleaseUnusedLogChunks(TCompletionEventSender *completion) { // Case 1: Chunks to be deleted located at the start of LogChunks list } else if (!gapStart && gapEnd) { IsLogChunksReleaseInflight = true; - completion->Req = THolder(ReqCreator.CreateFromArgs(std::move(chunksToRelease))); + completion->Req = THolder(ReqCreator.CreateFromArgs(std::move(chunksToRelease), NWilson::TSpan{})); SysLogRecord.LogHeadChunkIdx = gapEnd->ChunkIdx; SysLogRecord.LogHeadChunkPreviousNonce = ChunkState[gapEnd->ChunkIdx].PreviousNonce; PrintLogChunksInfo("cut tail log"); @@ -537,7 +537,7 @@ bool TPDisk::ReleaseUnusedLogChunks(TCompletionEventSender *completion) { Y_ABORT_UNLESS(KIKIMR_PDISK_ENABLE_CUT_LOG_FROM_THE_MIDDLE); IsLogChunksReleaseInflight = true; Mon.SplicedLogChunks->Add(chunksToRelease.size()); - completion->Req = THolder(ReqCreator.CreateFromArgs(*gapStart, *gapEnd, std::move(chunksToRelease))); + completion->Req = THolder(ReqCreator.CreateFromArgs(*gapStart, *gapEnd, std::move(chunksToRelease), NWilson::TSpan{})); PrintLogChunksInfo("log splice"); return true; } else { @@ -892,6 +892,7 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi const ui32 count = evChunkWrite->PartsPtr->Size(); for (ui32 partIdx = evChunkWrite->CurrentPart; partIdx < count; ++partIdx) { ui32 remainingPartSize = (*evChunkWrite->PartsPtr)[partIdx].second - evChunkWrite->CurrentPartOffset; + auto traceId = evChunkWrite->Span.GetTraceId(); if (bytesAvailable < remainingPartSize) { ui32 sizeToWrite = bytesAvailable; if (sizeToWrite > 0) { @@ -899,10 +900,10 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi if (data) { ui8 *source = data + evChunkWrite->CurrentPartOffset; NSan::CheckMemIsInitialized(source, sizeToWrite); - writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &evChunkWrite->TraceId); + writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &traceId); *Mon.BandwidthPChunkPayload += sizeToWrite; } else { - writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &evChunkWrite->TraceId); + writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &traceId); *Mon.BandwidthPChunkPadding += sizeToWrite; } evChunkWrite->RemainingSize -= sizeToWrite; @@ -918,10 +919,10 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi ui8 *data = (ui8*)(*evChunkWrite->PartsPtr)[partIdx].first; if (data) { ui8 *source = data + evChunkWrite->CurrentPartOffset; - writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &evChunkWrite->TraceId); + writer.WriteData(source, sizeToWrite, evChunkWrite->ReqId, &traceId); *Mon.BandwidthPChunkPayload += sizeToWrite; } else { - writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &evChunkWrite->TraceId); + writer.WritePadding(sizeToWrite, evChunkWrite->ReqId, &traceId); *Mon.BandwidthPChunkPadding += sizeToWrite; } evChunkWrite->CurrentPartOffset = 0; @@ -938,13 +939,15 @@ bool TPDisk::ChunkWritePiece(TChunkWrite *evChunkWrite, ui32 pieceShift, ui32 pi << " Marker# BPD79"); if (!writer.IsEmptySector()) { + auto traceId = evChunkWrite->Span.GetTraceId(); *Mon.BandwidthPChunkPadding += writer.SectorBytesFree; - writer.WriteZeroes(writer.SectorBytesFree, evChunkWrite->ReqId, &evChunkWrite->TraceId); + writer.WriteZeroes(writer.SectorBytesFree, evChunkWrite->ReqId, &traceId); LOG_INFO(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " chunkIdx# %" PRIu32 " was zero-padded after writing", (ui32)PDiskId, (ui32)chunkIdx); } + auto traceId = evChunkWrite->Span.GetTraceId(); evChunkWrite->Completion->Orbit = std::move(evChunkWrite->Orbit); - writer.Flush(evChunkWrite->ReqId, &evChunkWrite->TraceId, evChunkWrite->Completion.Release()); + writer.Flush(evChunkWrite->ReqId, &traceId, evChunkWrite->Completion.Release()); evChunkWrite->IsReplied = true; @@ -1053,8 +1056,9 @@ TPDisk::EChunkReadPieceResult TPDisk::ChunkReadPiece(TIntrusivePtr & completion->CostNs = DriveModel.TimeForSizeNs(bytesToRead, read->ChunkIdx, TDriveModel::OP_TYPE_READ); Y_ABORT_UNLESS(bytesToRead <= completion->GetBuffer()->Size()); ui8 *data = completion->GetBuffer()->Data(); + auto traceId = read->Span.GetTraceId(); BlockDevice->PreadAsync(data, bytesToRead, readOffset, completion.Release(), - read->ReqId, &read->TraceId); + read->ReqId, &traceId); // TODO: align the data on SectorSize, not PAGE_SIZE // TODO: use the BLKSSZGET ioctl to obtain a backing store's sector size return isTheLastPart ? ReadPieceResultOk : ReadPieceResultInProgress; @@ -1346,7 +1350,8 @@ TVector TPDisk::AllocateChunkForOwner(const TRequestBase *req, const for (TChunkIdx chunkIdx : chunks) { ui64 chunkNonce = SysLogRecord.Nonces.Value[NonceData]; SysLogRecord.Nonces.Value[NonceData] += dataChunkSizeSectors; - OnNonceChange(NonceData, req->ReqId, &req->TraceId); + auto traceId = req->Span.GetTraceId(); + OnNonceChange(NonceData, req->ReqId, &traceId); // Remember who owns the sector, save chunk Nonce in order to be able to continue writing the chunk TChunkState &state = ChunkState[chunkIdx]; Y_VERIFY_S(state.OwnerId == OwnerUnallocated @@ -2140,7 +2145,7 @@ void TPDisk::KillOwner(TOwner owner, TOwnerRound killOwnerRound, TCompletionEven << " removed ownerId# " << owner << " from chunks Keeper"); } - TryTrimChunk(false, 0); + TryTrimChunk(false, 0, NWilson::TSpan{}); ui64 lastSeenLsn = 0; auto it = LogChunks.begin(); while (it != LogChunks.end()) { @@ -2401,7 +2406,7 @@ void TPDisk::ClearQuarantineChunks() { } // Should be called to initiate TRIM (on chunk delete or prev trim done) -void TPDisk::TryTrimChunk(bool prevDone, ui64 trimmedSize) { +void TPDisk::TryTrimChunk(bool prevDone, ui64 trimmedSize, const NWilson::TSpan& parentSpan) { TGuard g(StateMutex); TrimOffset += trimmedSize; if (!DriveModel.IsTrimSupported()) { @@ -2440,7 +2445,7 @@ void TPDisk::TryTrimChunk(bool prevDone, ui64 trimmedSize) { if (ChunkBeingTrimmed) { // Initiate trim of next part of chunk const ui64 trimStep = (Keeper.GetTrimmedFreeChunkCount() > 100 ? 2 << 20 : 32 << 20); ui64 trimSize = Min(Format.ChunkSize - TrimOffset, trimStep); - TChunkTrim* trim = ReqCreator.CreateChunkTrim(ChunkBeingTrimmed, TrimOffset, trimSize); + TChunkTrim* trim = ReqCreator.CreateChunkTrim(ChunkBeingTrimmed, TrimOffset, trimSize, parentSpan); InputRequest(trim); TrimInFly = true; } @@ -2496,7 +2501,7 @@ void TPDisk::ProcessFastOperationsQueue() { OnLogCommitDone(static_cast(*req)); break; case ERequestType::RequestTryTrimChunk: - TryTrimChunk(true, static_cast(*req).TrimSize); + TryTrimChunk(true, static_cast(*req).TrimSize, req->Span); break; case ERequestType::RequestReleaseChunks: MarkChunksAsReleased(static_cast(*req)); @@ -2746,6 +2751,7 @@ void TPDisk::PrepareLogError(TLogWrite *logWrite, TStringStream& err, NKikimrPro << " lsn# " << logWrite->Lsn; LOG_ERROR_S(*ActorSystem, NKikimrServices::BS_PDISK, err.Str()); + logWrite->Span.EndError(err.Str()); logWrite->Result.Reset(new NPDisk::TEvLogResult(status, GetStatusFlags(logWrite->Owner, logWrite->OwnerGroupType), err.Str())); logWrite->Result->Results.push_back(NPDisk::TEvLogResult::TRecord(logWrite->Lsn, logWrite->Cookie)); @@ -3115,6 +3121,7 @@ bool TPDisk::PreprocessRequest(TRequestBase *request) { } void TPDisk::PushRequestToForseti(TRequestBase *request) { + request->Span.Event("PushToForseti", {}); if (request->GateId != GateFastOperation) { bool isAdded = false; @@ -3153,6 +3160,9 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) { && static_cast(job->Payload)->GetType() == ERequestType::RequestLogWrite) { TLogWrite &batch = *static_cast(job->Payload); + request->Span.Event("AddToBatch", NWilson::TKeyValueList{{ + { "Batch.ReqId", static_cast(batch.ReqId.Id) }, + }}); batch.AddToBatch(static_cast(request)); ui64 prevCost = job->Cost; job->Cost += request->Cost; @@ -3176,12 +3186,17 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) { SplitChunkJobSize(whole->TotalSize, &smallJobSize, &largeJobSize, &smallJobCount); for (ui32 idx = 0; idx < smallJobCount; ++idx) { // Schedule small job. - TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * smallJobSize, smallJobSize); + auto span = request->Span.CreateChild(TWilson::PDisk, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END); + span.Attribute("small_job_idx", idx) + .Attribute("is_last_piece", false); + TChunkWritePiece *piece = new TChunkWritePiece(whole, idx * smallJobSize, smallJobSize, std::move(span)); piece->EstimateCost(DriveModel); AddJobToForseti(cbs, piece, request->JobKind); } // Schedule large job (there always is one) - TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize); + auto span = request->Span.CreateChild(TWilson::PDisk, "PDisk.ChunkWritePiece", NWilson::EFlags::AUTO_END); + span.Attribute("is_last_piece", true); + TChunkWritePiece *piece = new TChunkWritePiece(whole, smallJobCount * smallJobSize, largeJobSize, std::move(span)); piece->EstimateCost(DriveModel); AddJobToForseti(cbs, piece, request->JobKind); LWTRACK(PDiskAddWritePieceToScheduler, request->Orbit, PDiskId, request->ReqId.Id, @@ -3199,9 +3214,12 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) { ui32 largeJobSize = totalSectors - smallJobSize * smallJobCount; for (ui32 idx = 0; idx < smallJobCount; ++idx) { + auto span = request->Span.CreateChild(TWilson::PDisk, "PDisk.ChunkReadPiece", NWilson::EFlags::AUTO_END); + span.Attribute("small_job_idx", idx) + .Attribute("is_last_piece", false); // Schedule small job. auto piece = new TChunkReadPiece(read, idx * smallJobSize, - smallJobSize * Format.SectorSize, false); + smallJobSize * Format.SectorSize, false, std::move(span)); LWTRACK(PDiskChunkReadPieceAddToScheduler, read->Orbit, PDiskId, idx, idx * smallJobSize, smallJobSize * Format.SectorSize); piece->EstimateCost(DriveModel); @@ -3209,18 +3227,23 @@ void TPDisk::PushRequestToForseti(TRequestBase *request) { AddJobToForseti(cbs, piece, request->JobKind); } // Schedule large job (there always is one) + NWilson::TSpan span(TWilson::PDisk, request->Span.GetTraceId(), + "PDisk.ChunkReadPiece", NWilson::EFlags::NONE, request->Span.GetActorSystem()); + span.Attribute("is_last_piece", true); auto piece = new TChunkReadPiece(read, smallJobCount * smallJobSize, - largeJobSize * Format.SectorSize, true); + largeJobSize * Format.SectorSize, true, std::move(span)); LWTRACK(PDiskChunkReadPieceAddToScheduler, read->Orbit, PDiskId, smallJobCount, smallJobCount * smallJobSize, largeJobSize * Format.SectorSize); piece->EstimateCost(DriveModel); piece->SelfPointer = piece; AddJobToForseti(cbs, piece, request->JobKind); + } else { AddJobToForseti(cbs, request, request->JobKind); } } } else { + request->Span.Event("PushToFastOperationsQueue", {}); FastOperationsQueue.push_back(std::unique_ptr(request)); LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " ReqId# %" PRIu64 " PushRequestToForseti Push to FastOperationsQueue.size# %" PRIu64, @@ -3268,21 +3291,25 @@ void TPDisk::RouteRequest(TRequestBase *request) { case ERequestType::RequestLogReadResultProcess: [[fallthrough]]; case ERequestType::RequestLogSectorRestore: + request->Span.Event("PushToJointLogReads", {}); JointLogReads.push_back(request); break; case ERequestType::RequestChunkReadPiece: { TChunkReadPiece *piece = static_cast(request); + request->Span.Event("PushToJointChunkReads", {}); JointChunkReads.emplace_back(piece->SelfPointer.Get()); piece->SelfPointer.Reset(); // FIXME(cthulhu): Unreserve() for TChunkReadPiece is called while processing to avoid requeueing issues break; } case ERequestType::RequestChunkWritePiece: + request->Span.Event("PushToJointChunkWrites", {}); JointChunkWrites.push_back(request); break; case ERequestType::RequestChunkTrim: { + request->Span.Event("PushToJointChunkTrims", {}); TChunkTrim *trim = static_cast(request); JointChunkTrims.push_back(trim); break; @@ -3293,8 +3320,10 @@ void TPDisk::RouteRequest(TRequestBase *request) { while (log) { TLogWrite *batch = log->PopFromBatch(); + log->Span.Event("PushToJointLogWrites", {}); JointLogWrites.push_back(log); if (log->Signature.HasCommitRecord()) { + log->Span.Event("PushToJointCommits", {}); JointCommits.push_back(log); } log = batch; @@ -3303,6 +3332,7 @@ void TPDisk::RouteRequest(TRequestBase *request) { } case ERequestType::RequestChunkForget: { + request->Span.Event("PushToJointChunkForgets", {}); TChunkForget *forget = static_cast(request); JointChunkForgets.push_back(std::unique_ptr(forget)); break; @@ -3393,9 +3423,11 @@ void TPDisk::EnqueueAll() { TYardControl &evControl = *static_cast(request); switch (evControl.Action) { case NPDisk::TEvYardControl::ActionPause: + request->Span.Event("PushToPausedQueue", {}); PausedQueue.push_back(request); break; case NPDisk::TEvYardControl::ActionStep: + request->Span.Event("PushToPausedQueue", {}); PausedQueue.push_back(request); ActorSystem->Send(evControl.Sender, new NPDisk::TEvYardControlResult( NKikimrProto::OK, evControl.Cookie, TString())); @@ -3416,6 +3448,7 @@ void TPDisk::EnqueueAll() { break; } } else { + request->Span.Event("PushToPausedQueue", {}); PausedQueue.push_back(request); } } else { diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h index cc6cac3a2f5f..f4b83ffb6a59 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl.h @@ -352,7 +352,7 @@ class TPDisk : public IPDisk { void ProcessChunkTrimQueue(); void ClearQuarantineChunks(); // Should be called to initiate TRIM (on chunk delete or prev trim done) - void TryTrimChunk(bool prevDone, ui64 trimmedSize); + void TryTrimChunk(bool prevDone, ui64 trimmedSize, const NWilson::TSpan& parentSpan); void ProcessFastOperationsQueue(); //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Drive info and write cache diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp index a417835a8d75..31d4e3684d7b 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_impl_log.cpp @@ -592,8 +592,9 @@ void TPDisk::ProcessLogReadQueue() { TLogReadContinue *read = static_cast(req); read->CompletionAction->CostNs = DriveModel.TimeForSizeNs(read->Size, read->Offset / Format.ChunkSize, TDriveModel::OP_TYPE_READ); + auto traceId = read->Span.GetTraceId(); BlockDevice->PreadAsync(read->Data, read->Size, read->Offset, read->CompletionAction, - read->ReqId, &read->TraceId); // ??? TraceId + read->ReqId, &traceId); // ??? TraceId break; } case ERequestType::RequestLogSectorRestore: @@ -722,7 +723,7 @@ void TPDisk::ProcessLogWriteQueueAndCommits() { PrepareLogError(logCommit, errorReason, status); } } - NWilson::TTraceId *traceId = nullptr; + NWilson::TTraceId traceId; size_t logOperationSizeBytes = 0; TVector logChunksToCommit; for (TLogWrite *logWrite : JointLogWrites) { @@ -733,8 +734,8 @@ void TPDisk::ProcessLogWriteQueueAndCommits() { if (status == NKikimrProto::OK) { LogWrite(*logWrite, logChunksToCommit); logWrite->ScheduleTime = HPNow(); - if (logWrite->TraceId) { - traceId = &logWrite->TraceId; + if (logWrite->Span) { + traceId = logWrite->Span.GetTraceId(); } } else { PrepareLogError(logWrite, errorReason, status); @@ -748,14 +749,14 @@ void TPDisk::ProcessLogWriteQueueAndCommits() { TReqId reqId = JointLogWrites.back()->ReqId; auto write = MakeHolder( this, std::move(JointLogWrites), std::move(JointCommits), std::move(logChunksToCommit)); - LogFlush(write.Get(), write->GetCommitedLogChunksPtr(), reqId, traceId); + LogFlush(write.Get(), write->GetCommitedLogChunksPtr(), reqId, &traceId); Y_UNUSED(write.Release()); JointCommits.clear(); JointLogWrites.clear(); // Check if we can TRIM some chunks that were deleted - TryTrimChunk(false, 0); + TryTrimChunk(false, 0, NWilson::TSpan{}); Mon.LogOperationSizeBytes.Increment(logOperationSizeBytes); } @@ -890,31 +891,32 @@ void TPDisk::LogWrite(TLogWrite &evLog, TVector &logChunksToCommit) { } // Write to log - CommonLogger->LogHeader(evLog.Owner, evLog.Signature, evLog.Lsn, payloadSize, evLog.ReqId, &evLog.TraceId); - OnNonceChange(NonceLog, evLog.ReqId, &evLog.TraceId); + auto evLogTraceId = evLog.Span.GetTraceId(); + CommonLogger->LogHeader(evLog.Owner, evLog.Signature, evLog.Lsn, payloadSize, evLog.ReqId, &evLogTraceId); + OnNonceChange(NonceLog, evLog.ReqId, &evLogTraceId); if (evLog.Data.size()) { - CommonLogger->LogDataPart(evLog.Data.data(), evLog.Data.size(), evLog.ReqId, &evLog.TraceId); + CommonLogger->LogDataPart(evLog.Data.data(), evLog.Data.size(), evLog.ReqId, &evLogTraceId); } if (isCommitRecord) { ui32 commitChunksCount = evLog.CommitRecord.CommitChunks.size(); if (commitChunksCount) { CommonLogger->LogDataPart(evLog.CommitRecord.CommitChunks.data(), commitChunksCount * sizeof(ui32), - evLog.ReqId, &evLog.TraceId); + evLog.ReqId, &evLogTraceId); TVector commitChunkNonces(commitChunksCount); for (ui32 idx = 0; idx < commitChunksCount; ++idx) { commitChunkNonces[idx] = ChunkState[evLog.CommitRecord.CommitChunks[idx]].Nonce; } - CommonLogger->LogDataPart(&commitChunkNonces[0], sizeof(ui64) * commitChunksCount, evLog.ReqId, &evLog.TraceId); + CommonLogger->LogDataPart(&commitChunkNonces[0], sizeof(ui64) * commitChunksCount, evLog.ReqId, &evLogTraceId); } ui32 deleteChunksCount = evLog.CommitRecord.DeleteChunks.size(); if (deleteChunksCount) { CommonLogger->LogDataPart(evLog.CommitRecord.DeleteChunks.data(), deleteChunksCount * sizeof(ui32), - evLog.ReqId, &evLog.TraceId); + evLog.ReqId, &evLogTraceId); } NPDisk::TCommitRecordFooter footer(evLog.Data.size(), evLog.CommitRecord.FirstLsnToKeep, evLog.CommitRecord.CommitChunks.size(), evLog.CommitRecord.DeleteChunks.size(), evLog.CommitRecord.IsStartingPoint); - CommonLogger->LogDataPart(&footer, sizeof(footer), evLog.ReqId, &evLog.TraceId); + CommonLogger->LogDataPart(&footer, sizeof(footer), evLog.ReqId, &evLogTraceId); { TGuard guard(StateMutex); @@ -1262,7 +1264,7 @@ void TPDisk::OnLogCommitDone(TLogCommitDone &req) { WriteSysLogRestorePoint(completion.Release(), req.ReqId, {}); // FIXME: wilson } } - TryTrimChunk(false, 0); + TryTrimChunk(false, 0, req.Span); } void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) { @@ -1275,7 +1277,7 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) { } if (req.IsChunksFromLogSplice) { - auto *releaseReq = ReqCreator.CreateFromArgs(std::move(req.ChunksToRelease)); + auto *releaseReq = ReqCreator.CreateFromArgs(std::move(req.ChunksToRelease), req.Span.CreateChild(TWilson::PDisk, "PDisk.ReleaseChunks")); auto flushAction = MakeHolder(this, THolder(releaseReq)); @@ -1302,7 +1304,7 @@ void TPDisk::MarkChunksAsReleased(TReleaseChunks& req) { } IsLogChunksReleaseInflight = false; - TryTrimChunk(false, 0); + TryTrimChunk(false, 0, req.Span); } } diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h index 2453cbbf5022..96fb97d2a1fd 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_req_creator.h @@ -10,6 +10,8 @@ #include +#include + namespace NKikimr::NPDisk { LWTRACE_USING(BLOBSTORAGE_PROVIDER); @@ -185,6 +187,16 @@ class TReqCreator { ActorSystem = actorSystem; } + template + [[nodiscard]] TReq* CreateFromEvPtr(TEvPtr &ev, double *burstMs = nullptr) { + auto& sender = ev->Sender; + LOG_DEBUG_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId << " ev# " + << ToString(ev) << " Sender# " << sender.LocalId() << " ReqId# " << AtomicGet(LastReqId)); + auto req = MakeHolder(ev, AtomicIncrement(LastReqId)); + NewRequest(req.Get(), burstMs); + return req.Release(); + } + template [[nodiscard]] TReq* CreateFromEv(TEv &&ev, const TActorId &sender, double *burstMs = nullptr) { LOG_DEBUG_S(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# " << PDiskId << " ev# " @@ -205,12 +217,18 @@ class TReqCreator { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // TODO: Make all functions in style - [[nodiscard]] TChunkTrim* CreateChunkTrim(ui32 chunkIdx, ui32 offset, ui64 size) { + [[nodiscard]] TChunkTrim* CreateChunkTrim(ui32 chunkIdx, ui32 offset, ui64 size, const NWilson::TSpan& parent) { + NWilson::TSpan span = parent.CreateChild(TWilson::PDisk, "PDisk.TChunkTrim"); + span.Attribute("chunk_idx", chunkIdx) + .Attribute("offset", offset) + .Attribute("size", static_cast(size)); Mon->Trim.CountRequest(size); - return CreateFromArgs(chunkIdx, offset, size); + return CreateFromArgs(chunkIdx, offset, size, std::move(span)); } [[nodiscard]] TLogWrite* CreateLogWrite(NPDisk::TEvLog &ev, const TActorId &sender, double& burstMs, NWilson::TTraceId traceId) { + NWilson::TSpan span(TWilson::PDisk, std::move(traceId), "PDisk.LogWrite", NWilson::EFlags::AUTO_END, ActorSystem); + TReqId reqId(TReqId::LogWrite, AtomicIncrement(LastReqId)); LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " %s Sender# %" PRIu64 " ReqId# %" PRIu64, (ui32)PDiskId, ev.ToString().c_str(), (ui64)sender.LocalId(), (ui64)reqId.Id); @@ -220,24 +238,28 @@ class TReqCreator { if (ev.Data.size() > (1 << 20)) { Mon->WriteHugeLog.CountRequest(); } - return NewRequest(new TLogWrite(ev, sender, AtomicGet(*EstimatedLogChunkIdx), reqId, std::move(traceId)), &burstMs); + return NewRequest(new TLogWrite(ev, sender, AtomicGet(*EstimatedLogChunkIdx), reqId, std::move(span)), &burstMs); } [[nodiscard]] TChunkRead* CreateChunkRead(const NPDisk::TEvChunkRead &ev, const TActorId &sender, double& burstMs, NWilson::TTraceId traceId) { + NWilson::TSpan span(TWilson::PDisk, std::move(traceId), "PDisk.ChunkRead", NWilson::EFlags::AUTO_END, ActorSystem); + TReqId reqId(TReqId::ChunkRead, AtomicIncrement(LastReqId)); LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " %s Sender# %" PRIu64 " ReqId# %" PRIu64, (ui32)PDiskId, ev.ToString().c_str(), (ui64)sender.LocalId(), (ui64)reqId.Id); Mon->QueueRequests->Inc(); *Mon->QueueBytes += ev.Size; Mon->GetReadCounter(ev.PriorityClass)->CountRequest(ev.Size); - auto read = new TChunkRead(ev, sender, reqId, std::move(traceId)); + auto read = new TChunkRead(ev, sender, reqId, std::move(span)); read->SelfPointer = read; return NewRequest(read, &burstMs); } [[nodiscard]] TChunkWrite* CreateChunkWrite(const NPDisk::TEvChunkWrite &ev, const TActorId &sender, double& burstMs, NWilson::TTraceId traceId) { + NWilson::TSpan span(TWilson::PDisk, std::move(traceId), "PDisk.ChunkWrite", NWilson::EFlags::AUTO_END, ActorSystem); + TReqId reqId(TReqId::ChunkWrite, AtomicIncrement(LastReqId)); LOG_DEBUG(*ActorSystem, NKikimrServices::BS_PDISK, "PDiskId# %" PRIu32 " %s Sender# %" PRIu64 " ReqId# %" PRIu64, (ui32)PDiskId, ev.ToString().c_str(), (ui64)sender.LocalId(), (ui64)reqId.Id); @@ -246,7 +268,7 @@ class TReqCreator { ev.Validate(); *Mon->QueueBytes += size; Mon->GetWriteCounter(ev.PriorityClass)->CountRequest(size); - return NewRequest(new TChunkWrite(ev, sender, reqId, std::move(traceId)), &burstMs); + return NewRequest(new TChunkWrite(ev, sender, reqId, std::move(span)), &burstMs); } }; diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.cpp b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.cpp index 221065acc05e..b8ab31040a5e 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.cpp +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.cpp @@ -9,6 +9,7 @@ namespace NPDisk { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void TRequestBase::AbortDelete(TRequestBase* request, TActorSystem* actorSystem) { + request->Span.EndError("Abort"); switch(request->GetType()) { case ERequestType::RequestChunkRead: { @@ -64,8 +65,8 @@ void TChunkRead::Abort(TActorSystem* actorSystem) { //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TChunkReadPiece::TChunkReadPiece(TIntrusivePtr &read, ui64 pieceCurrentSector, ui64 pieceSizeLimit, - bool isTheLastPiece) - : TRequestBase(read->Sender, read->ReqId, read->Owner, read->OwnerRound, read->PriorityClass) + bool isTheLastPiece, NWilson::TSpan span) + : TRequestBase(read->Sender, read->ReqId, read->Owner, read->OwnerRound, read->PriorityClass, std::move(span)) , ChunkRead(read) , PieceCurrentSector(pieceCurrentSector) , PieceSizeLimit(pieceSizeLimit) diff --git a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h index 1029057ecd83..c8d3b18ca798 100644 --- a/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h +++ b/ydb/core/blobstorage/pdisk/blobstorage_pdisk_requestimpl.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -54,11 +55,11 @@ class TRequestBase : public TThrRefBase { NHPTimer::STime ScheduleTime = 0; // Tracing - mutable NWilson::TTraceId TraceId; + mutable NWilson::TSpan Span; mutable NLWTrace::TOrbit Orbit; public: TRequestBase(const TActorId &sender, TReqId reqId, TOwner owner, TOwnerRound ownerRound, ui8 priorityClass, - NWilson::TTraceId traceId = {}) + NWilson::TSpan span = {}) : Sender(sender) , ReqId(reqId) , Owner(owner) @@ -66,8 +67,10 @@ class TRequestBase : public TThrRefBase { , PriorityClass(priorityClass) , OwnerGroupType(EOwnerGroupType::Dynamic) , CreationTime(HPNow()) - , TraceId(std::move(traceId)) - {} + , Span(std::move(span)) + { + Span.EnableAutoEnd(); + } void SetOwnerGroupType(bool isStaticGroupOwner) { OwnerGroupType = (isStaticGroupOwner ? EOwnerGroupType::Static : EOwnerGroupType::Dynamic); @@ -177,11 +180,14 @@ class TLogRead : public TRequestBase { TLogPosition Position; ui64 SizeLimit; - TLogRead(const NPDisk::TEvReadLog &ev, const TActorId &sender, TAtomicBase reqIdx) - : TRequestBase(sender, TReqId(TReqId::LogRead, reqIdx), ev.Owner, ev.OwnerRound, NPriInternal::LogRead) - , Position(ev.Position) - , SizeLimit(ev.SizeLimit) - {} + TLogRead(const NPDisk::TEvReadLog::TPtr &ev, TAtomicBase reqIdx) + : TRequestBase(ev->Sender, TReqId(TReqId::LogRead, reqIdx), ev->Get()->Owner, ev->Get()->OwnerRound, NPriInternal::LogRead, + NWilson::TSpan(TWilson::PDisk, std::move(ev->TraceId), "PDisk.LogRead")) + , Position(ev->Get()->Position) + , SizeLimit(ev->Get()->SizeLimit) + { + Span.Attribute("size_limit", static_cast(ev->Get()->SizeLimit)); + } ERequestType GetType() const override { return ERequestType::RequestLogRead; @@ -199,14 +205,18 @@ class TLogReadContinue : public TRequestBase { TCompletionAction *CompletionAction; TReqId ReqId; - TLogReadContinue(const NPDisk::TEvReadLogContinue &ev, const TActorId &sender, TAtomicBase /*reqIdx*/) - : TRequestBase(sender, ev.ReqId, 0, 0, NPriInternal::LogRead) - , Data(ev.Data) - , Size(ev.Size) - , Offset(ev.Offset) - , CompletionAction(ev.CompletionAction) - , ReqId(ev.ReqId) - {} + TLogReadContinue(const NPDisk::TEvReadLogContinue::TPtr &ev, TAtomicBase /*reqIdx*/) + : TRequestBase(ev->Sender, ev->Get()->ReqId, 0, 0, NPriInternal::LogRead, + NWilson::TSpan(TWilson::PDisk, std::move(ev->TraceId), "PDisk.LogReadContinue")) + , Data(ev->Get()->Data) + , Size(ev->Get()->Size) + , Offset(ev->Get()->Offset) + , CompletionAction(ev->Get()->CompletionAction) + , ReqId(ev->Get()->ReqId) + { + Span.Attribute("size", ev->Get()->Size) + .Attribute("offset", static_cast(ev->Get()->Offset)); + } ERequestType GetType() const override { return ERequestType::RequestLogReadContinue; @@ -222,7 +232,6 @@ class TLogReadResultProcess : public TRequestBase { TLogReadResultProcess(NPDisk::TEvReadLogResult::TPtr &ev, const TActorId &sender, TAtomicBase reqIdx) : TRequestBase(sender, TReqId(TReqId::LogReadResultProcess, reqIdx), 0, 0, NPriInternal::LogRead) - , ReadLogResult(std::move(ev)) {} @@ -274,8 +283,8 @@ class TLogWrite : public TRequestBase { THolder Result; std::function OnDestroy; - TLogWrite(NPDisk::TEvLog &ev, const TActorId &sender, ui32 estimatedChunkIdx, TReqId reqId, NWilson::TTraceId traceId) - : TRequestBase(sender, reqId, ev.Owner, ev.OwnerRound, NPriInternal::LogWrite, std::move(traceId)) + TLogWrite(NPDisk::TEvLog &ev, const TActorId &sender, ui32 estimatedChunkIdx, TReqId reqId, NWilson::TSpan span) + : TRequestBase(sender, reqId, ev.Owner, ev.OwnerRound, NPriInternal::LogWrite, std::move(span)) , Signature(ev.Signature) , EstimatedChunkIdx(estimatedChunkIdx) , Data(ev.Data) @@ -368,8 +377,8 @@ class TChunkRead : public TRequestBase { std::function DebugInfoGenerator; - TChunkRead(const NPDisk::TEvChunkRead &ev, const TActorId &sender, TReqId reqId, NWilson::TTraceId traceId) - : TRequestBase(sender, reqId, ev.Owner, ev.OwnerRound, ev.PriorityClass, std::move(traceId)) + TChunkRead(const NPDisk::TEvChunkRead &ev, const TActorId &sender, TReqId reqId, NWilson::TSpan span) + : TRequestBase(sender, reqId, ev.Owner, ev.OwnerRound, ev.PriorityClass, std::move(span)) , ChunkIdx(ev.ChunkIdx) , Offset(ev.Offset) , Size(ev.Size) @@ -433,7 +442,7 @@ class TChunkReadPiece : public TRequestBase { // scheduler and drop owning when poped from scheduler TIntrusivePtr SelfPointer; - TChunkReadPiece(TIntrusivePtr &read, ui64 pieceCurrentSector, ui64 pieceSizeLimit, bool isTheLastPiece); + TChunkReadPiece(TIntrusivePtr &read, ui64 pieceCurrentSector, ui64 pieceSizeLimit, bool isTheLastPiece, NWilson::TSpan span); virtual ~TChunkReadPiece() { Y_ABORT_UNLESS(!SelfPointer); @@ -481,8 +490,8 @@ class TChunkWrite : public TRequestBase { THolder Completion; - TChunkWrite(const NPDisk::TEvChunkWrite &ev, const TActorId &sender, TReqId reqId, NWilson::TTraceId traceId) - : TRequestBase(sender, reqId, ev.Owner, ev.OwnerRound, ev.PriorityClass, std::move(traceId)) + TChunkWrite(const NPDisk::TEvChunkWrite &ev, const TActorId &sender, TReqId reqId, NWilson::TSpan span) + : TRequestBase(sender, reqId, ev.Owner, ev.OwnerRound, ev.PriorityClass, std::move(span)) , ChunkIdx(ev.ChunkIdx) , Offset(ev.Offset) , PartsPtr(ev.PartsPtr) @@ -545,8 +554,8 @@ class TChunkWritePiece : public TRequestBase { ui32 PieceShift; ui32 PieceSize; - TChunkWritePiece(TIntrusivePtr &write, ui32 pieceShift, ui32 pieceSize) - : TRequestBase(write->Sender, write->ReqId, write->Owner, write->OwnerRound, write->PriorityClass) + TChunkWritePiece(TIntrusivePtr &write, ui32 pieceShift, ui32 pieceSize, NWilson::TSpan span) + : TRequestBase(write->Sender, write->ReqId, write->Owner, write->OwnerRound, write->PriorityClass, std::move(span)) , ChunkWrite(write) , PieceShift(pieceShift) , PieceSize(pieceSize) @@ -571,8 +580,9 @@ class TChunkTrim : public TRequestBase { ui32 Offset; ui64 Size; - TChunkTrim(ui32 chunkIdx, ui32 offset, ui64 size, TAtomicBase reqIdx) - : TRequestBase(TActorId(), TReqId(TReqId::ChunkTrim, reqIdx), OwnerUnallocated, TOwnerRound(0), NPriInternal::Trim) + TChunkTrim(ui32 chunkIdx, ui32 offset, ui64 size, NWilson::TSpan span, TAtomicBase reqIdx) + : TRequestBase(TActorId(), TReqId(TReqId::ChunkTrim, reqIdx), OwnerUnallocated, + TOwnerRound(0), NPriInternal::Trim, std::move(span)) , ChunkIdx(chunkIdx) , Offset(offset) , Size(size) @@ -827,8 +837,10 @@ class TYardControl : public TRequestBase { // class TAskForCutLog : public TRequestBase { public: - TAskForCutLog(const NPDisk::TEvAskForCutLog &ev, const TActorId &sender, TAtomicBase reqIdx) - : TRequestBase(sender, TReqId(TReqId::AskForCutLog, reqIdx), ev.Owner, ev.OwnerRound, NPriInternal::Other) + TAskForCutLog(const NPDisk::TEvAskForCutLog::TPtr &ev, TAtomicBase reqIdx) + : TRequestBase(ev->Sender, TReqId(TReqId::AskForCutLog, reqIdx), ev->Get()->Owner, ev->Get()->OwnerRound, NPriInternal::Other, + NWilson::TSpan(TWilson::PDisk, std::move(ev->TraceId), "PDisk.AskForCutLog") + ) {} ERequestType GetType() const override { @@ -872,8 +884,8 @@ class TCommitLogChunks : public TRequestBase { public: TVector CommitedLogChunks; - TCommitLogChunks(TVector&& commitedLogChunks, TAtomicBase reqIdx) - : TRequestBase(TActorId(), TReqId(TReqId::CommitLogChunks, reqIdx), OwnerSystem, 0, NPriInternal::Other) + TCommitLogChunks(TVector&& commitedLogChunks, NWilson::TSpan span, TAtomicBase reqIdx) + : TRequestBase(TActorId(), TReqId(TReqId::CommitLogChunks, reqIdx), OwnerSystem, 0, NPriInternal::Other, std::move(span)) , CommitedLogChunks(std::move(commitedLogChunks)) {} @@ -893,16 +905,16 @@ class TReleaseChunks : public TRequestBase { bool IsChunksFromLogSplice; TReleaseChunks(const TLogChunkInfo& gapStart, const TLogChunkInfo& gapEnd, TVector chunksToRelease, - TAtomicBase reqIdx) - : TRequestBase(TActorId(), TReqId(TReqId::ReleaseChunks, reqIdx), OwnerSystem, 0, NPriInternal::Other) + NWilson::TSpan span, TAtomicBase reqIdx) + : TRequestBase(TActorId(), TReqId(TReqId::ReleaseChunks, reqIdx), OwnerSystem, 0, NPriInternal::Other, std::move(span)) , GapStart(gapStart) , GapEnd(gapEnd) , ChunksToRelease(std::move(chunksToRelease)) , IsChunksFromLogSplice(true) {} - TReleaseChunks(TVector chunksToRelease, TAtomicBase reqIdx) - : TRequestBase(TActorId(), TReqId(TReqId::ReleaseChunks, reqIdx), OwnerSystem, 0, NPriInternal::Other) + TReleaseChunks(TVector chunksToRelease, NWilson::TSpan span, TAtomicBase reqIdx) + : TRequestBase(TActorId(), TReqId(TReqId::ReleaseChunks, reqIdx), OwnerSystem, 0, NPriInternal::Other, std::move(span)) , ChunksToRelease(std::move(chunksToRelease)) , IsChunksFromLogSplice(false) {} @@ -956,8 +968,8 @@ class TTryTrimChunk : public TRequestBase { public: ui64 TrimSize; - TTryTrimChunk(ui64 trimSize, TAtomicBase reqIdx) - : TRequestBase(TActorId(), TReqId(TReqId::TryTrimChunk, reqIdx), OwnerSystem, 0, NPriInternal::Other) + TTryTrimChunk(ui64 trimSize, NWilson::TSpan span, TAtomicBase reqIdx) + : TRequestBase(TActorId(), TReqId(TReqId::TryTrimChunk, reqIdx), OwnerSystem, 0, NPriInternal::Other, std::move(span)) , TrimSize(trimSize) {} diff --git a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp index 48ddca94f1dc..f057bb46d5df 100644 --- a/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp +++ b/ydb/core/blobstorage/pdisk/mock/pdisk_mock.cpp @@ -482,7 +482,7 @@ class TPDiskMockActor : public TActorBootstrapped { if (LogQ.empty()) { TActivationContext::Send(new IEventHandle(EvResume, 0, SelfId(), TActorId(), nullptr, 0)); } - for (auto& msg : ev->Get()->Logs) { + for (auto& [msg, _] : ev->Get()->Logs) { Y_ABORT_UNLESS(!Impl.CheckIsReadOnlyOwner(msg.Get())); LogQ.emplace_back(ev->Sender, std::move(msg)); } @@ -853,7 +853,7 @@ class TPDiskMockActor : public TActorBootstrapped { void ErrorHandle(NPDisk::TEvMultiLog::TPtr &ev) { const NPDisk::TEvMultiLog &evMultiLog = *ev->Get(); THolder result(new NPDisk::TEvLogResult(NKikimrProto::CORRUPTED, 0, State->GetStateErrorReason())); - for (auto &log : evMultiLog.Logs) { + for (auto &[log, _] : evMultiLog.Logs) { result->Results.push_back(NPDisk::TEvLogResult::TRecord(log->Lsn, log->Cookie)); } Send(ev->Sender, result.Release()); diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_recoverylogwriter.cpp b/ydb/core/blobstorage/vdisk/common/vdisk_recoverylogwriter.cpp index e0fbdf84d802..c0b04d93c7e4 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_recoverylogwriter.cpp +++ b/ydb/core/blobstorage/vdisk/common/vdisk_recoverylogwriter.cpp @@ -114,7 +114,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); case TEvBlobStorage::EvMultiLog: { NPDisk::TEvMultiLog *evLogs = ev->Get(); - for (auto &log : evLogs->Logs) { + for (auto &[log, _] : evLogs->Logs) { LWTRACK(VDiskRecoveryLogWriterVPutIsSent, log->Orbit, Owner, log->Lsn); } break; @@ -154,12 +154,12 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); NPDisk::TEvMultiLog *logs = ev->Get(); ui64 lsnSegmentStart = logs->LsnSeg.First; ui64 lsn = logs->LsnSeg.Last; - Y_VERIFY_DEBUG_S(lsnSegmentStart == logs->Logs.front()->LsnSegmentStart && lsn == logs->Logs.back()->Lsn, + Y_VERIFY_DEBUG_S(lsnSegmentStart == logs->Logs.front().Event->LsnSegmentStart && lsn == logs->Logs.back().Event->Lsn, "LsnSeg not match with inner logs" << "LsnSeg# " << logs->LsnSeg.ToString() - << "Logs.front().LsnSegmentStart# " << logs->Logs.front()->LsnSegmentStart - << "Logs.back().Lsn# " << logs->Logs.back()->Lsn); - for (auto &log : logs->Logs) { + << "Logs.front().LsnSegmentStart# " << logs->Logs.front().Event->LsnSegmentStart + << "Logs.back().Lsn# " << logs->Logs.back().Event->Lsn); + for (auto &[log, _] : logs->Logs) { LWTRACK(VDiskRecoveryLogWriterVPutIsRecieved, log->Orbit, Owner, log->Lsn); TLogSignature signature = log->Signature.GetUnmasked(); Y_ABORT_UNLESS(TLogSignature::First < signature && signature < TLogSignature::Max); diff --git a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp index f7ccf53042ce..5d7983281655 100644 --- a/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp +++ b/ydb/core/blobstorage/vdisk/query/query_readbatch.cpp @@ -219,9 +219,9 @@ namespace NKikimr { IActor *TReadBatcher::CreateAsyncDataReader(const TActorId ¬ifyID, ui8 priority, NWilson::TTraceId traceId, bool isRepl) { - if (Result->DiskDataItemPtrs.empty()) + if (Result->DiskDataItemPtrs.empty()) { return nullptr; - else { + } else { // prepare read plan PrepareReadPlan(); Y_DEBUG_ABORT_UNLESS(!Result->GlueReads.empty()); diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp index 79ce890c9991..da904f927202 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeleton.cpp @@ -360,9 +360,8 @@ namespace NKikimr { template<> struct TLoggedRecType { using T = TLoggedRecVMultiPutItem; }; template - std::unique_ptr CreatePutLogEvent(const TActorContext &ctx, TString evPrefix, NActors::TActorId sender, - ui64 cookie, NLWTrace::TOrbit &&orbit, TVPutInfo &info, - std::unique_ptr result) + std::pair, NWilson::TTraceId> CreatePutLogEvent(const TActorContext &ctx, TString evPrefix, + NActors::TActorId sender, ui64 cookie, NLWTrace::TOrbit &&orbit, TVPutInfo &info, std::unique_ptr result) { Y_DEBUG_ABORT_UNLESS(info.HullStatus.Status == NKikimrProto::OK); const TLogoBlobID &id = info.BlobId; @@ -393,15 +392,16 @@ namespace NKikimr { UpdatePDiskWriteBytes(dataToWrite.size()); bool confirmSyncLogAlso = static_cast(syncLogMsg); - intptr_t loggedRecId = LoggedRecsVault.Put(new typename TLoggedRecType::T(seg, confirmSyncLogAlso, - id, ingress, std::move(buffer), std::move(result), sender, cookie, std::move(info.TraceId))); + auto loggedRec = new typename TLoggedRecType::T(seg, confirmSyncLogAlso, + id, ingress, std::move(buffer), std::move(result), sender, cookie, std::move(info.TraceId)); + intptr_t loggedRecId = LoggedRecsVault.Put(loggedRec); void *loggedRecCookie = reinterpret_cast(loggedRecId); // create log msg auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureLogoBlobOpt, dataToWrite, seg, loggedRecCookie, std::move(syncLogMsg), nullptr); // send prepared message to recovery log logMsg->Orbit = std::move(orbit); - return logMsg; + return {std::move(logMsg), loggedRec->GetTraceId()}; } std::unique_ptr CreateHullWriteHugeBlob(NActors::TActorId sender, @@ -592,9 +592,9 @@ namespace NKikimr { info.Lsn = TLsnSeg(lsnBatch.First, lsnBatch.First); lsnBatch.First++; - auto logMsg = CreatePutLogEvent(ctx, "TEvVMultiPut", vMultiPutActorId, cookie, std::move(orbit), + auto [logMsg, traceId] = CreatePutLogEvent(ctx, "TEvVMultiPut", vMultiPutActorId, cookie, std::move(orbit), info, std::move(result)); - evLogs->AddLog(THolder(logMsg.release())); + evLogs->AddLog(THolder(logMsg.release()), std::move(traceId)); } // Manage PDisk scheduler weights @@ -713,16 +713,17 @@ namespace NKikimr { OverloadHandler->ActualizeWeights(ctx, Mask(EHullDbType::LogoBlobs)); if (!info.IsHugeBlob) { - auto logMsg = CreatePutLogEvent(ctx, "TEvVPut", ev->Sender, ev->Cookie, + auto [logMsg, traceId] = CreatePutLogEvent(ctx, "TEvVPut", ev->Sender, ev->Cookie, std::move(ev->Get()->Orbit), info, std::move(result)); - ctx.Send(Db->LoggerID, logMsg.release(), 0, 0, std::move(info.TraceId)); + ctx.Send(Db->LoggerID, logMsg.release(), 0, 0, std::move(traceId)); } else if (info.Buffer) { + auto traceId = std::move(info.TraceId); // pass the work to huge blob writer NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass(); auto hugeWrite = CreateHullWriteHugeBlob(ev->Sender, ev->Cookie, ignoreBlock, handleClass, info, std::move(result)); hugeWrite->Orbit = std::move(ev->Get()->Orbit); - ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(info.TraceId)); + ctx.Send(Db->HugeKeeperID, hugeWrite.release(), 0, 0, std::move(traceId)); } else { ctx.Send(SelfId(), new TEvHullLogHugeBlob(0, info.BlobId, info.Ingress, TDiskPart(), ignoreBlock, ev->Sender, ev->Cookie, std::move(result), &info.ExtraBlockChecks), 0, 0, std::move(info.TraceId)); @@ -778,6 +779,7 @@ namespace NKikimr { auto dataToWrite = logRec.Serialize(); UpdatePDiskWriteBytes(dataToWrite.size()); // prepare TLoggedRecVPutHuge + auto traceId = ev->TraceId.Clone(); bool confirmSyncLogAlso = static_cast(syncLogMsg); intptr_t loggedRecId = LoggedRecsVault.Put(new TLoggedRecVPutHuge(seg, confirmSyncLogAlso, Db->HugeKeeperID, ev)); void *loggedRecCookie = reinterpret_cast(loggedRecId); @@ -785,7 +787,7 @@ namespace NKikimr { auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureHugeLogoBlob, dataToWrite, seg, loggedRecCookie, std::move(syncLogMsg), nullptr); // send prepared message to recovery log - ctx.Send(Db->LoggerID, logMsg.release()); + ctx.Send(Db->LoggerID, logMsg.release(), 0, 0, std::move(traceId)); } //////////////////////////////////////////////////////////////////////// @@ -831,10 +833,11 @@ namespace NKikimr { const TLsnSeg seg = Db->LsnMngr->AllocLsnForHull(ev->Get()->Essence.GetLsnRange()); NPDisk::TCommitRecord commitRecord; TString data = ev->Get()->Serialize(commitRecord); + auto traceId = ev->TraceId.Clone(); intptr_t loggedRecId = LoggedRecsVault.Put(new TLoggedRecAddBulkSst(seg, false, ev)); auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureAddBulkSst, commitRecord, data, seg, reinterpret_cast(loggedRecId), nullptr); - ctx.Send(Db->LoggerID, logMsg.release()); + ctx.Send(Db->LoggerID, logMsg.release(), 0, 0, std::move(traceId)); } @@ -1021,7 +1024,7 @@ namespace NKikimr { auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureBlock, ev->GetChainBuffer()->GetString(), seg, loggedRecCookie, std::move(syncLogMsg), nullptr); // send prepared message to recovery log - ctx.Send(Db->LoggerID, logMsg.release()); + ctx.Send(Db->LoggerID, logMsg.release(), 0, 0, std::move(ev->TraceId)); } //////////////////////////////////////////////////////////////////////// @@ -1118,6 +1121,7 @@ namespace NKikimr { std::unique_ptr syncLogMsg( new NSyncLog::TEvSyncLogPut(Db->GType, seg.Last, record, ingress)); + auto traceId = ev->TraceId.Clone(); TString data = ev->GetChainBuffer()->GetString(); intptr_t loggedRecId = LoggedRecsVault.Put(new TLoggedRecVCollectGarbage(seg, true, ingress, std::move(result), ev)); void *loggedRecCookie = reinterpret_cast(loggedRecId); @@ -1125,7 +1129,7 @@ namespace NKikimr { auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureGC, data, seg, loggedRecCookie, std::move(syncLogMsg), nullptr); // send prepared message to recovery log - ctx.Send(Db->LoggerID, logMsg.release()); + ctx.Send(Db->LoggerID, logMsg.release(), 0, 0, std::move(traceId)); } @@ -1473,6 +1477,7 @@ namespace NKikimr { OverloadHandler->ActualizeWeights(ctx, AllEHullDbTypes); + auto traceId = ev->TraceId.Clone(); TString data = ev->Get()->Serialize(); intptr_t loggedRecId = LoggedRecsVault.Put(new TLoggedRecLocalSyncData(seg, false, std::move(result), ev)); void *loggedRecCookie = reinterpret_cast(loggedRecId); @@ -1480,7 +1485,7 @@ namespace NKikimr { auto logMsg = CreateHullUpdate(HullLogCtx, TLogSignature::SignatureLocalSyncData, data, seg, loggedRecCookie, nullptr, nullptr); // send prepared message to recovery log - ctx.Send(Db->LoggerID, logMsg.release()); + ctx.Send(Db->LoggerID, logMsg.release(), 0, 0, std::move(traceId)); } //////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp index 15167c052c83..d370a8cdcd5b 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.cpp @@ -51,6 +51,10 @@ namespace NKikimr { SendVDiskResponse(ctx, Recipient, Result.release(), RecipientCookie); } + NWilson::TTraceId TLoggedRecVPut::GetTraceId() const { + return Span.GetTraceId(); + } + /////////////////////////////////////////////////////////////////////////////////////////////////////// // TLoggedRecVPut -- incapsulates TEvVPut replay action (for small blobs) /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -88,6 +92,10 @@ namespace NKikimr { ctx.Send(Recipient, Result.release(), RecipientCookie); } + NWilson::TTraceId TLoggedRecVMultiPutItem::GetTraceId() const { + return Span.GetTraceId(); + } + /////////////////////////////////////////////////////////////////////////////////////////////////////// // TLoggedRecVPut -- incapsulates TEvVPut replay action (for huge blobs) /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -166,6 +174,7 @@ namespace NKikimr { , Ingress(ingress) , Result(std::move(result)) , OrigEv(origEv) + , Span(TWilson::VDiskInternals, std::move(OrigEv->TraceId), "VDisk.LoggedRecVCollectGarbage") {} void TLoggedRecVCollectGarbage::Replay(THull &hull, const TActorContext &ctx) { @@ -175,6 +184,7 @@ namespace NKikimr { LOG_DEBUG_S(ctx, NKikimrServices::BS_VDISK_GC, hull.GetHullCtx()->VCtx->VDiskLogPrefix << "TEvVCollectGarbage: result# " << Result->ToString() << " Marker# BSVSLR05"); + Span.EndOk(); SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), OrigEv->Cookie); } @@ -189,6 +199,7 @@ namespace NKikimr { : ILoggedRec(seg, confirmSyncLogAlso) , Result(std::move(result)) , OrigEv(origEv) + , Span(TWilson::VDiskInternals, std::move(OrigEv->TraceId), "VDisk.LoggedRecLocalSyncData") {} void TLoggedRecLocalSyncData::Replay(THull &hull, const TActorContext &ctx) { @@ -201,6 +212,7 @@ namespace NKikimr { #else hull.AddSyncDataCmd(ctx, OrigEv->Get()->Data, Seg, replySender); #endif + Span.EndOk(); SendVDiskResponse(ctx, OrigEv->Sender, Result.release(), OrigEv->Cookie); } diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h index 32f3306d30cd..bb994a757b37 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_loggedrec.h @@ -53,6 +53,8 @@ namespace NKikimr { ui64 recipientCookie, NWilson::TTraceId traceId); void Replay(THull &hull, const TActorContext &ctx) override; + NWilson::TTraceId GetTraceId() const; + private: TLogoBlobID Id; TIngress Ingress; @@ -73,6 +75,8 @@ namespace NKikimr { ui64 recipientCookie, NWilson::TTraceId traceId); void Replay(THull &hull, const TActorContext &ctx) override; + NWilson::TTraceId GetTraceId() const; + private: TLogoBlobID Id; TIngress Ingress; @@ -130,6 +134,7 @@ namespace NKikimr { TBarrierIngress Ingress; std::unique_ptr Result; TEvBlobStorage::TEvVCollectGarbage::TPtr OrigEv; + NWilson::TSpan Span; }; /////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -144,6 +149,7 @@ namespace NKikimr { private: std::unique_ptr Result; TEvLocalSyncData::TPtr OrigEv; + NWilson::TSpan Span; }; /////////////////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/library/wilson_ids/wilson.h b/ydb/library/wilson_ids/wilson.h index 110d26a0cf1a..4d6d96d4fe88 100644 --- a/ydb/library/wilson_ids/wilson.h +++ b/ydb/library/wilson_ids/wilson.h @@ -8,6 +8,7 @@ namespace NKikimr { DsProxyInternals = 9, VDiskTopLevel = 12, VDiskInternals = 13, + PDisk = 14, }; };