Skip to content

Commit 8813414

Browse files
authored
Topic split/merge for write (#1386)
1 parent 7c628c7 commit 8813414

30 files changed

+277
-714
lines changed

ydb/core/persqueue/events/internal.h

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,14 @@ struct TEvPQ {
216216
std::optional<TRowVersion> HeartbeatVersion;
217217
};
218218

219-
TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite)
219+
TEvWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, TVector<TMsg> &&msgs, bool isDirectWrite, std::optional<ui64> initialSeqNo)
220220
: Cookie(cookie)
221221
, MessageNo(messageNo)
222222
, OwnerCookie(ownerCookie)
223223
, Offset(offset)
224224
, Msgs(std::move(msgs))
225225
, IsDirectWrite(isDirectWrite)
226+
, InitialSeqNo(initialSeqNo)
226227
{}
227228

228229
ui64 Cookie;
@@ -231,6 +232,7 @@ struct TEvPQ {
231232
TMaybe<ui64> Offset;
232233
TVector<TMsg> Msgs;
233234
bool IsDirectWrite;
235+
std::optional<ui64> InitialSeqNo;
234236

235237
};
236238

@@ -939,12 +941,6 @@ struct TEvPQ {
939941
NKikimrClient::TPersQueueFetchResponse Response;
940942
};
941943

942-
struct TEvSourceIdRequest : public TEventPB<TEvSourceIdRequest, NKikimrPQ::TEvSourceIdRequest, EvSourceIdRequest> {
943-
};
944-
945-
struct TEvSourceIdResponse : public TEventPB<TEvSourceIdResponse, NKikimrPQ::TEvSourceIdResponse, EvSourceIdResponse> {
946-
};
947-
948944
struct TEvRegisterDirectReadSession : public TEventLocal<TEvRegisterDirectReadSession, EvRegisterDirectReadSession> {
949945
TEvRegisterDirectReadSession(const NPQ::TReadSessionKey& sessionKey, ui32 tabletGeneration)
950946
: Session(sessionKey)

ydb/core/persqueue/partition.cpp

Lines changed: 29 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <ydb/core/base/counters.h>
1111
#include <ydb/core/base/path.h>
1212
#include <ydb/core/quoter/public/quoter.h>
13+
#include <ydb/core/persqueue/writer/source_id_encoding.h>
1314
#include <ydb/core/protos/counters_pq.pb.h>
1415
#include <ydb/core/protos/msgbus.pb.h>
1516
#include <ydb/library/persqueue/topic_parser/topic_parser.h>
@@ -482,8 +483,6 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
482483

483484
Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
484485

485-
SourceManager.PassAway();
486-
487486
Die(ctx);
488487
}
489488

@@ -935,50 +934,33 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
935934
}
936935

937936
void TPartition::Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx) {
938-
SourceManager.EnsureSourceIds(ev->Get()->SourceIds);
939-
MaxSeqNoRequests.emplace_back(ev);
940-
ProcessMaxSeqNoRequest(ctx);
941-
}
942-
943-
void TPartition::ProcessMaxSeqNoRequest(const TActorContext& ctx) {
944-
PQ_LOG_T("TPartition::ProcessMaxSeqNoRequest. Queue size: " << MaxSeqNoRequests.size());
945-
946-
while(!MaxSeqNoRequests.empty()) {
947-
auto& ev = MaxSeqNoRequests.front();
948-
949-
auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
950-
NKikimrClient::TResponse& resp = *response->Response;
951-
952-
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
953-
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
954-
955-
auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
956-
for (const auto& sourceId : ev->Get()->SourceIds) {
957-
auto& protoInfo = *result.AddSourceIdInfo();
958-
protoInfo.SetSourceId(sourceId);
937+
auto response = MakeHolder<TEvPQ::TEvProxyResponse>(ev->Get()->Cookie);
938+
NKikimrClient::TResponse& resp = *response->Response;
959939

960-
auto info = SourceManager.Get(sourceId);
961-
if (!info) {
962-
PQ_LOG_D("Stop MaxSeqNoRequest - scheduled a research. SourceId: " << sourceId);
963-
return;
964-
}
965-
if (info.State == TSourceIdInfo::EState::Unknown) {
966-
continue;
967-
}
940+
resp.SetStatus(NMsgBusProxy::MSTATUS_OK);
941+
resp.SetErrorCode(NPersQueue::NErrorCode::OK);
968942

969-
Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset);
970-
Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo);
943+
auto& result = *resp.MutablePartitionResponse()->MutableCmdGetMaxSeqNoResult();
944+
for (const auto& sourceId : ev->Get()->SourceIds) {
945+
auto& protoInfo = *result.AddSourceIdInfo();
946+
protoInfo.SetSourceId(sourceId);
971947

972-
protoInfo.SetSeqNo(info.SeqNo);
973-
protoInfo.SetOffset(info.Offset);
974-
protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds());
975-
protoInfo.SetExplicit(info.Explicit);
976-
protoInfo.SetState(TSourceIdInfo::ConvertState(info.State));
948+
auto info = SourceManager.Get(sourceId);
949+
if (info.State == TSourceIdInfo::EState::Unknown) {
950+
continue;
977951
}
978952

979-
ctx.Send(Tablet, response.Release());
980-
MaxSeqNoRequests.pop_front();
953+
Y_ABORT_UNLESS(info.Offset <= (ui64)Max<i64>(), "Offset is too big: %" PRIu64, info.Offset);
954+
Y_ABORT_UNLESS(info.SeqNo <= (ui64)Max<i64>(), "SeqNo is too big: %" PRIu64, info.SeqNo);
955+
956+
protoInfo.SetSeqNo(info.SeqNo);
957+
protoInfo.SetOffset(info.Offset);
958+
protoInfo.SetWriteTimestampMS(info.WriteTimestamp.MilliSeconds());
959+
protoInfo.SetExplicit(info.Explicit);
960+
protoInfo.SetState(TSourceIdInfo::ConvertState(info.State));
981961
}
962+
963+
ctx.Send(Tablet, response.Release());
982964
}
983965

984966
void TPartition::Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx) {
@@ -2612,42 +2594,6 @@ void TPartition::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
26122594
}
26132595
}
26142596

2615-
void TPartition::Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx) {
2616-
auto& record = ev->Get()->Record;
2617-
2618-
if (Partition != record.GetPartition()) {
2619-
LOG_INFO_S(
2620-
ctx, NKikimrServices::PERSQUEUE,
2621-
"TEvSourceIdRequest for wrong partition " << record.GetPartition() << "." <<
2622-
" Topic: \"" << TopicName() << "\"." <<
2623-
" Partition: " << Partition << "."
2624-
);
2625-
return;
2626-
}
2627-
2628-
auto& memoryStorage = SourceIdStorage.GetInMemorySourceIds();
2629-
2630-
auto response = MakeHolder<TEvPQ::TEvSourceIdResponse>();
2631-
for(auto& sourceId : record.GetSourceId()) {
2632-
auto* s = response->Record.AddSource();
2633-
s->SetId(sourceId);
2634-
2635-
auto it = memoryStorage.find(sourceId);
2636-
if (it != memoryStorage.end()) {
2637-
auto& info = it->second;
2638-
s->SetState(Convert(info.State));
2639-
s->SetSeqNo(info.SeqNo);
2640-
s->SetOffset(info.Offset);
2641-
s->SetExplicit(info.Explicit);
2642-
s->SetWriteTimestamp(info.WriteTimestamp.GetValue());
2643-
} else {
2644-
s->SetState(NKikimrPQ::TEvSourceIdResponse::EState::TEvSourceIdResponse_EState_Unknown);
2645-
}
2646-
}
2647-
2648-
Send(ev->Sender, response.Release());
2649-
}
2650-
26512597
void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx) {
26522598
auto& record = ev->Get()->Record;
26532599

@@ -2664,6 +2610,13 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
26642610
auto response = MakeHolder<TEvPQ::TEvCheckPartitionStatusResponse>();
26652611
response->Record.SetStatus(PartitionConfig ? PartitionConfig->GetStatus() : NKikimrPQ::ETopicPartitionStatus::Active);
26662612

2613+
if (record.HasSourceId()) {
2614+
auto sit = SourceIdStorage.GetInMemorySourceIds().find(NSourceIdEncoding::EncodeSimple(record.GetSourceId()));
2615+
if (sit != SourceIdStorage.GetInMemorySourceIds().end()) {
2616+
response->Record.SetSeqNo(sit->second.SeqNo);
2617+
}
2618+
}
2619+
26672620
Send(ev->Sender, response.Release());
26682621
}
26692622

ydb/core/persqueue/partition.h

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
344344
void CheckIfSessionExists(TUserInfoBase& userInfo, const TActorId& newPipe);
345345
// void DestroyReadSession(const TReadSessionKey& key);
346346

347-
void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
348347
void Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const TActorContext& ctx);
349348

350349
TString LogPrefix() const;
@@ -482,9 +481,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
482481
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
483482
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
484483
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
485-
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
486484
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
487-
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
488485
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
489486
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
490487
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
@@ -540,9 +537,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
540537
HFuncTraced(TEvPQ::TEvTxCommit, Handle);
541538
HFuncTraced(TEvPQ::TEvTxRollback, Handle);
542539
HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
543-
HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
544540
HFuncTraced(TEvPQ::TEvCheckPartitionStatusRequest, Handle);
545-
HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
546541
HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
547542
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
548543
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
@@ -613,7 +608,6 @@ class TPartition : public TActorBootstrapped<TPartition> {
613608

614609
std::deque<TMessage> Requests;
615610
std::deque<TMessage> Responses;
616-
std::deque<TEvPQ::TEvGetMaxSeqNoRequest::TPtr> MaxSeqNoRequests;
617611

618612
THead Head;
619613
THead NewHead;

0 commit comments

Comments
 (0)