diff --git a/ydb/core/persqueue/partition.cpp b/ydb/core/persqueue/partition.cpp index 4eb2093bf279..70aafdc97a13 100644 --- a/ydb/core/persqueue/partition.cpp +++ b/ydb/core/persqueue/partition.cpp @@ -82,7 +82,7 @@ TString TPartition::LogPrefix() const { } else { state = "Unknown"; } - return TStringBuilder() << "[PQ: " << TabletID << ", Partition:" << Partition << ", State:" << state << "] "; + return TStringBuilder() << "[PQ: " << TabletID << ", Partition: " << Partition << ", State: " << state << "] "; } bool TPartition::IsActive() const { @@ -2134,6 +2134,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event) void TPartition::CommitWriteOperations(TTransaction& t) { + PQ_LOG_D("TPartition::CommitWriteOperations TxId: " << t.GetTxId()); + Y_ABORT_UNLESS(PersistRequest); Y_ABORT_UNLESS(!PartitionedBlob.IsInited()); @@ -2151,6 +2153,10 @@ void TPartition::CommitWriteOperations(TTransaction& t) HaveWriteMsg = true; } + PQ_LOG_D("t.WriteInfo->BodyKeys.size=" << t.WriteInfo->BodyKeys.size() << + ", t.WriteInfo->BlobsFromHead.size=" << t.WriteInfo->BlobsFromHead.size()); + PQ_LOG_D("Head=" << Head << ", NewHead=" << NewHead); + if (!t.WriteInfo->BodyKeys.empty()) { PartitionedBlob = TPartitionedBlob(Partition, NewHead.Offset, @@ -2165,6 +2171,7 @@ void TPartition::CommitWriteOperations(TTransaction& t) MaxBlobSize); for (auto& k : t.WriteInfo->BodyKeys) { + PQ_LOG_D("add key " << k.Key.ToString()); auto write = PartitionedBlob.Add(k.Key, k.Size); if (write && !write->Value.empty()) { AddCmdWrite(write, PersistRequest.Get(), ctx); @@ -2173,18 +2180,17 @@ void TPartition::CommitWriteOperations(TTransaction& t) } } - } - if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) { - ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get()); - RenameFormedBlobs(formedBlobs, - *Parameters, - curWrites, - PersistRequest.Get(), - ctx); - } + PQ_LOG_D("PartitionedBlob.GetFormedBlobs().size=" << PartitionedBlob.GetFormedBlobs().size()); + if (const auto& formedBlobs = PartitionedBlob.GetFormedBlobs(); !formedBlobs.empty()) { + ui32 curWrites = RenameTmpCmdWrites(PersistRequest.Get()); + RenameFormedBlobs(formedBlobs, + *Parameters, + curWrites, + PersistRequest.Get(), + ctx); + } - if (!t.WriteInfo->BodyKeys.empty()) { const auto& last = t.WriteInfo->BodyKeys.back(); NewHead.Offset += (last.Key.GetOffset() + last.Key.GetCount()); diff --git a/ydb/core/persqueue/partition_id.h b/ydb/core/persqueue/partition_id.h index 5ef5c4fa75e2..0c1dbb8d3afd 100644 --- a/ydb/core/persqueue/partition_id.h +++ b/ydb/core/persqueue/partition_id.h @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -51,6 +52,13 @@ class TPartitionId { } } + TString ToString() const + { + TStringBuilder s; + s << *this; + return s; + } + bool IsSupportivePartition() const { return WriteId.Defined(); diff --git a/ydb/core/persqueue/partition_write.cpp b/ydb/core/persqueue/partition_write.cpp index a7b7af8c51ce..a9a592a794eb 100644 --- a/ydb/core/persqueue/partition_write.cpp +++ b/ydb/core/persqueue/partition_write.cpp @@ -1064,16 +1064,16 @@ void TPartition::RenameFormedBlobs(const std::deque