Skip to content

At the start of a partition, the order of messages may change. #15160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,8 @@ struct TEvPQ {
}

ui32 Cookie; // InternalPartitionId
TActorId SupportivePartition;

NPQ::TSourceIdMap SrcIdInfo;
std::deque<NPQ::TDataKey> BodyKeys;
TVector<NPQ::TClientBlob> BlobsFromHead;
Expand All @@ -1102,6 +1104,7 @@ struct TEvPQ {
struct TEvGetWriteInfoError : public TEventLocal<TEvGetWriteInfoError, EvGetWriteInfoError> {
ui32 Cookie; // InternalPartitionId
TString Message;
TActorId SupportivePartition;

TEvGetWriteInfoError(ui32 cookie, TString message) :
Cookie(cookie),
Expand Down
208 changes: 150 additions & 58 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ void TPartition::InitComplete(const TActorContext& ctx) {
TabletCounters.Percentile()[COUNTER_LATENCY_PQ_INIT].IncrementFor(InitDuration.MilliSeconds());

FillReadFromTimestamps(ctx);
ResendPendingEvents(ctx);
ProcessPendingEvents(ctx);
ProcessTxsAndUserActs(ctx);

ctx.Send(ctx.SelfID, new TEvents::TEvWakeup());
Expand Down Expand Up @@ -969,37 +969,69 @@ void TPartition::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
ProcessTxsAndUserActs(ctx);
}

template <class T>
void TPartition::ProcessPendingEvent(TAutoPtr<TEventHandle<T>>& ev, const TActorContext& ctx)
{
if (PendingEvents.empty()) {
// Optimization: if the queue is empty, you can process the message immediately
ProcessPendingEvent(std::unique_ptr<T>(ev->Release().Release()), ctx);
} else {
// We need to keep the order in which the messages arrived
AddPendingEvent(ev);
ProcessPendingEvents(ctx);
}
}

template <>
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvProposePartitionConfig> ev, const TActorContext& ctx)
{
PushBackDistrTx(ev.release());

ProcessTxsAndUserActs(ctx);
}

void TPartition::Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPQ::TEvProposePartitionConfig" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);

PushBackDistrTx(ev->Release());
ProcessPendingEvent(ev, ctx);
}

ProcessTxsAndUserActs(ctx);
template <class T>
void TPartition::AddPendingEvent(TAutoPtr<TEventHandle<T>>& ev)
{
std::unique_ptr<T> p(ev->Release().Release());
PendingEvents.emplace_back(std::move(p));
}

void TPartition::HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCalcPredicate");

PendingEvents.emplace_back(ev->ReleaseBase().Release());
AddPendingEvent(ev);
}

void TPartition::HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext&)
{
PendingEvents.emplace_back(ev->ReleaseBase().Release());
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxCommit");

AddPendingEvent(ev);
}

void TPartition::HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext&)
{
PendingEvents.emplace_back(ev->ReleaseBase().Release());
PQ_LOG_D("HandleOnInit TEvPQ::TEvTxRollback");

AddPendingEvent(ev);
}

void TPartition::HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext&)
{
PendingEvents.emplace_back(ev->ReleaseBase().Release());
PQ_LOG_D("HandleOnInit TEvPQ::TEvProposePartitionConfig");

AddPendingEvent(ev);
}

void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& /* ctx */)
Expand All @@ -1009,7 +1041,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TAc
Y_ABORT_UNLESS(IsSupportive());

ev->Get()->OriginalPartition = ev->Sender;
PendingEvents.emplace_back(ev->ReleaseBase().Release());
AddPendingEvent(ev);
}

void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& /* ctx */)
Expand All @@ -1018,7 +1050,7 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TA

Y_ABORT_UNLESS(!IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
AddPendingEvent(ev);
}

void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& /* ctx */)
Expand All @@ -1027,43 +1059,46 @@ void TPartition::HandleOnInit(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActo

Y_ABORT_UNLESS(!IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
AddPendingEvent(ev);
}

void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
template <>
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCalcPredicate> ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);

if (PlanStep.Defined() && TxId.Defined()) {
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
Send(Tablet,
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Get()->Step,
ev->Get()->TxId,
MakeHolder<TEvPQ::TEvTxCalcPredicateResult>(ev->Step,
ev->TxId,
Partition,
Nothing()).Release());
return;
}
}

PushBackDistrTx(ev->Release());
PushBackDistrTx(ev.release());

ProcessTxsAndUserActs(ctx);
}

void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
void TPartition::Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
PQ_LOG_D("Handle TEvPQ::TEvTxCalcPredicate" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);

ProcessPendingEvent(ev, ctx);
}

template <>
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxCommit> ev, const TActorContext& ctx)
{
if (PlanStep.Defined() && TxId.Defined()) {
if (GetStepAndTxId(*ev->Get()) < GetStepAndTxId(*PlanStep, *TxId)) {
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
PQ_LOG_D("Send TEvTxCommitDone" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);
ctx.Send(Tablet, MakeCommitDone(ev->Get()->Step, ev->Get()->TxId).Release());
" Step " << ev->Step <<
", TxId " << ev->TxId);
ctx.Send(Tablet, MakeCommitDone(ev->Step, ev->TxId).Release());
return;
}
}
Expand All @@ -1073,33 +1108,42 @@ void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
Y_ABORT_UNLESS(TransactionsInflight.size() == 1,
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
TabletID, Partition.OriginalPartitionId,
ev->Get()->Step, ev->Get()->TxId);
PendingExplicitMessageGroups = ev->Get()->ExplicitMessageGroups;
ev->Step, ev->TxId);
PendingExplicitMessageGroups = ev->ExplicitMessageGroups;
} else {
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
TabletID, Partition.OriginalPartitionId,
ev->Get()->Step, ev->Get()->TxId);
txIter = TransactionsInflight.find(ev->Get()->TxId);
ev->Step, ev->TxId);
txIter = TransactionsInflight.find(ev->TxId);
Y_ABORT_UNLESS(!txIter.IsEnd(),
"PQ: %" PRIu64 ", Partition: %" PRIu32 ", Step: %" PRIu64 ", TxId: %" PRIu64,
TabletID, Partition.OriginalPartitionId,
ev->Get()->Step, ev->Get()->TxId);
ev->Step, ev->TxId);
}
Y_ABORT_UNLESS(txIter->second->State == ECommitState::Pending);

txIter->second->State = ECommitState::Committed;
ProcessTxsAndUserActs(ctx);
}

void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
void TPartition::Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPQ::TEvTxCommit" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);

ProcessPendingEvent(ev, ctx);
}

template <>
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvTxRollback> ev, const TActorContext& ctx)
{
auto* event = ev->Get();
if (PlanStep.Defined() && TxId.Defined()) {
if (GetStepAndTxId(*event) < GetStepAndTxId(*PlanStep, *TxId)) {
if (GetStepAndTxId(*ev) < GetStepAndTxId(*PlanStep, *TxId)) {
PQ_LOG_D("Rollback for" <<
" Step " << ev->Get()->Step <<
", TxId " << ev->Get()->TxId);
" Step " << ev->Step <<
", TxId " << ev->TxId);
return;
}
}
Expand All @@ -1113,7 +1157,7 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
Y_ABORT_UNLESS(!TransactionsInflight.empty(),
"PQ: %" PRIu64 ", Partition: %" PRIu32,
TabletID, Partition.OriginalPartitionId);
txIter = TransactionsInflight.find(ev->Get()->TxId);
txIter = TransactionsInflight.find(ev->TxId);
Y_ABORT_UNLESS(!txIter.IsEnd(),
"PQ: %" PRIu64 ", Partition: %" PRIu32,
TabletID, Partition.OriginalPartitionId);
Expand All @@ -1124,13 +1168,17 @@ void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx
ProcessTxsAndUserActs(ctx);
}

void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");
TActorId originalPartition = ev->Get()->OriginalPartition;
if (!originalPartition) {
// original message
originalPartition = ev->Sender;
}
void TPartition::Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx)
{
ProcessPendingEvent(ev, ctx);
}

template <>
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoRequest> ev, const TActorContext& ctx)
{
TActorId originalPartition = ev->OriginalPartition;
Y_ABORT_UNLESS(originalPartition != TActorId());

if (ClosedInternalPartition || WaitingForPreviousBlobQuota() || (CurrentStateFunc() != &TThis::StateIdle)) {
PQ_LOG_D("Send TEvPQ::TEvGetWriteInfoError");
auto* response = new TEvPQ::TEvGetWriteInfoError(Partition.InternalPartitionId,
Expand Down Expand Up @@ -1162,6 +1210,14 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorCon
ctx.Send(originalPartition, response);
}

void TPartition::Handle(TEvPQ::TEvGetWriteInfoRequest::TPtr& ev, const TActorContext& ctx) {
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoRequest");

ev->Get()->OriginalPartition = ev->Sender;

ProcessPendingEvent(ev, ctx);
}

void TPartition::WriteInfoResponseHandler(
const TActorId& sender,
TGetWriteInfoResp&& ev,
Expand Down Expand Up @@ -1250,17 +1306,36 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
return ret;
}

template <>
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoResponse> ev, const TActorContext& ctx)
{
const auto sender = ev->SupportivePartition;
WriteInfoResponseHandler(sender, ev.release(), ctx);
}

void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx) {
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse");
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);

ev->Get()->SupportivePartition = ev->Sender;

ProcessPendingEvent(ev, ctx);
}

template <>
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvGetWriteInfoError> ev, const TActorContext& ctx)
{
const auto sender = ev->SupportivePartition;
WriteInfoResponseHandler(sender, ev.release(), ctx);
}

void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError " <<
"Cookie " << ev->Get()->Cookie <<
", Message " << ev->Get()->Message);
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);

ev->Get()->SupportivePartition = ev->Sender;

ProcessPendingEvent(ev, ctx);
}

void TPartition::ReplyToProposeOrPredicate(TSimpleSharedPtr<TTransaction>& tx, bool isPredicate) {
Expand Down Expand Up @@ -2698,16 +2773,6 @@ void TPartition::ChangePlanStepAndTxId(ui64 step, ui64 txId)
TxIdHasChanged = true;
}

void TPartition::ResendPendingEvents(const TActorContext& ctx)
{
PQ_LOG_D("Resend pending events. Count " << PendingEvents.size());

while (!PendingEvents.empty()) {
ctx.Schedule(TDuration::Zero(), PendingEvents.front().release());
PendingEvents.pop_front();
}
}

TPartition::EProcessResult TPartition::PreProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx)
{
if (AffectedUsers.size() >= MAX_USERS) {
Expand Down Expand Up @@ -3560,14 +3625,17 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T

void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
{
PQ_LOG_D("HandleOnInit TEvPQ::TEvDeletePartition");

Y_ABORT_UNLESS(IsSupportive());

PendingEvents.emplace_back(ev->ReleaseBase().Release());
AddPendingEvent(ev);
}

void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
template <>
void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition> ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");
Y_UNUSED(ev);

Y_ABORT_UNLESS(IsSupportive());
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
Expand All @@ -3577,6 +3645,13 @@ void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& c
ProcessTxsAndUserActs(ctx);
}

void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPQ::TEvDeletePartition");

ProcessPendingEvent(ev, ctx);
}

void TPartition::ScheduleNegativeReplies()
{
auto processQueue = [&](std::deque<TUserActionAndTransactionEvent>& queue) {
Expand Down Expand Up @@ -3647,6 +3722,23 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac
MakeHolder<TEvPQ::TEvTransactionCompleted>(writeId).Release());
}

void TPartition::ProcessPendingEvents(const TActorContext& ctx)
{
PQ_LOG_D("Process pending events. Count " << PendingEvents.size());

while (!PendingEvents.empty()) {
auto ev = std::move(PendingEvents.front());
PendingEvents.pop_front();

auto visitor = [this, &ctx](auto&& v) {
using T = std::decay_t<decltype(v)>;
ProcessPendingEvent(std::forward<T>(v), ctx);
};

std::visit(visitor, std::move(ev));
}
}

const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
{
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);
Expand Down
Loading
Loading