Skip to content

Commit 4589784

Browse files
authored
Fill change exchange split & activation lists just before commit; unify checks (#6281)
1 parent d8b29c9 commit 4589784

File tree

2 files changed

+32
-28
lines changed

2 files changed

+32
-28
lines changed

ydb/core/tx/datashard/datashard_change_sending.cpp

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
286286
ChangeExchangeSplit = true;
287287
} else {
288288
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
289-
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
289+
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
290290
ActivationList.insert(dstTabletId);
291291
}
292292
}
@@ -340,13 +340,13 @@ class TDataShard::TTxRemoveChangeRecords: public TTransactionBase<TDataShard> {
340340
Self->RemoveChangeRecordsInFly = false;
341341
}
342342

343-
if (ChangeExchangeSplit) {
344-
Self->KillChangeSender(ctx);
345-
Self->ChangeExchangeSplitter.DoSplit(ctx);
346-
}
343+
if (!Self->ChangesQueue) { // double check queue
344+
if (ChangeExchangeSplit) {
345+
Self->KillChangeSender(ctx);
346+
Self->ChangeExchangeSplitter.DoSplit(ctx);
347+
}
347348

348-
for (const auto dstTabletId : ActivationList) {
349-
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
349+
for (const auto dstTabletId : ActivationList) {
350350
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
351351
}
352352
}
@@ -383,7 +383,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
383383
Y_ABORT_UNLESS(Self->ChangeExchangeSplitter.Done());
384384

385385
for (const auto dstTabletId : Self->ChangeSenderActivator.GetDstSet()) {
386-
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId)) {
386+
if (Self->SplitSrcSnapshotSender.Acked(dstTabletId) && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
387387
ActivationList.insert(dstTabletId);
388388
}
389389
}
@@ -396,9 +396,7 @@ class TDataShard::TTxChangeExchangeSplitAck: public TTransactionBase<TDataShard>
396396
<< ", at tablet# " << Self->TabletID());
397397

398398
for (const auto dstTabletId : ActivationList) {
399-
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
400-
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
401-
}
399+
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
402400
}
403401
}
404402

ydb/core/tx/datashard/datashard_split_src.cpp

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,8 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa
244244
private:
245245
TIntrusivePtr<TSplitSnapshotContext> SnapContext;
246246
bool ChangeExchangeSplit;
247+
THashSet<ui64> ActivationList;
248+
THashSet<ui64> SplitList;
247249

248250
public:
249251
TTxSplitSnapshotComplete(TDataShard* ds, TIntrusivePtr<TSplitSnapshotContext> snapContext)
@@ -378,13 +380,11 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa
378380
proto->SetTimeoutMs(kv.second.Timeout.MilliSeconds());
379381
}
380382

381-
if (Self->ChangesQueue || tableInfo.HasCdcStreams()) {
383+
if (tableInfo.HasAsyncIndexes() || tableInfo.HasCdcStreams()) {
382384
snapshot->SetWaitForActivation(true);
383-
Self->ChangeSenderActivator.AddDst(dstTablet);
384-
db.Table<Schema::SrcChangeSenderActivations>().Key(dstTablet).Update();
385-
385+
ActivationList.insert(dstTablet);
386386
if (tableInfo.HasCdcStreams()) {
387-
Self->ChangeExchangeSplitter.AddDst(dstTablet);
387+
SplitList.insert(dstTablet);
388388
}
389389
}
390390

@@ -403,14 +403,23 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa
403403
}
404404
}
405405

406-
ChangeExchangeSplit = !Self->ChangesQueue && !Self->ChangeExchangeSplitter.Done();
407-
408406
if (needToReadPages) {
409407
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " BorrowSnapshot is restarting for split OpId " << opId);
410408
return false;
411409
} else {
412410
txc.Env.DropSnapshot(SnapContext);
413411

412+
for (ui64 dstTabletId : ActivationList) {
413+
Self->ChangeSenderActivator.AddDst(dstTabletId);
414+
db.Table<Schema::SrcChangeSenderActivations>().Key(dstTabletId).Update();
415+
}
416+
417+
for (ui64 dstTabletId : SplitList) {
418+
Self->ChangeExchangeSplitter.AddDst(dstTabletId);
419+
}
420+
421+
ChangeExchangeSplit = !Self->ChangesQueue && !Self->ChangeExchangeSplitter.Done();
422+
414423
Self->State = TShardState::SplitSrcSendingSnapshot;
415424
Self->PersistSys(db, Schema::Sys_State, Self->State);
416425

@@ -421,7 +430,7 @@ class TDataShard::TTxSplitSnapshotComplete : public NTabletFlatExecutor::TTransa
421430
void Complete(const TActorContext &ctx) override {
422431
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, Self->TabletID() << " Sending snapshots from src for split OpId " << Self->SrcSplitOpId);
423432
Self->SplitSrcSnapshotSender.DoSend(ctx);
424-
if (ChangeExchangeSplit) {
433+
if (ChangeExchangeSplit && !Self->ChangesQueue) { // double check queue
425434
Self->KillChangeSender(ctx);
426435
Self->ChangeExchangeSplitter.DoSplit(ctx);
427436
}
@@ -438,14 +447,14 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra
438447
private:
439448
TEvDataShard::TEvSplitTransferSnapshotAck::TPtr Ev;
440449
bool AllDstAcksReceived;
441-
bool Activate;
450+
ui64 ActivateTabletId;
442451

443452
public:
444453
TTxSplitTransferSnapshotAck(TDataShard* ds, TEvDataShard::TEvSplitTransferSnapshotAck::TPtr& ev)
445454
: NTabletFlatExecutor::TTransactionBase<TDataShard>(ds)
446455
, Ev(ev)
447456
, AllDstAcksReceived(false)
448-
, Activate(false)
457+
, ActivateTabletId(0)
449458
{}
450459

451460
TTxType GetTxType() const override { return TXTYPE_SPLIT_TRANSFER_SNAPSHOT_ACK; }
@@ -469,8 +478,8 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra
469478
// Remove the row for acked snapshot
470479
db.Table<Schema::SplitSrcSnapshots>().Key(dstTabletId).Delete();
471480

472-
if (!Self->ChangesQueue && Self->ChangeExchangeSplitter.Done()) {
473-
Activate = !Self->ChangeSenderActivator.Acked(dstTabletId);
481+
if (!Self->ChangesQueue && Self->ChangeExchangeSplitter.Done() && !Self->ChangeSenderActivator.Acked(dstTabletId)) {
482+
ActivateTabletId = dstTabletId;
474483
}
475484

476485
return true;
@@ -485,11 +494,8 @@ class TDataShard::TTxSplitTransferSnapshotAck : public NTabletFlatExecutor::TTra
485494
}
486495
}
487496

488-
if (Activate) {
489-
const ui64 dstTabletId = Ev->Get()->Record.GetTabletId();
490-
if (!Self->ChangeSenderActivator.Acked(dstTabletId)) {
491-
Self->ChangeSenderActivator.DoSend(dstTabletId, ctx);
492-
}
497+
if (ActivateTabletId && !Self->ChangesQueue) { // double check queue
498+
Self->ChangeSenderActivator.DoSend(ActivateTabletId, ctx);
493499
}
494500
}
495501
};

0 commit comments

Comments
 (0)