|
1 | 1 | #include "flat_dbase_sz_env.h"
|
2 | 2 | #include "flat_executor_ut_common.h"
|
3 | 3 | #include <ydb/core/base/counters.h>
|
| 4 | +#include <ydb/core/testlib/actors/block_events.h> |
4 | 5 |
|
5 | 6 | namespace NKikimr {
|
6 | 7 | namespace NTabletFlatExecutor {
|
@@ -451,6 +452,10 @@ class TTestFlatTablet : public TActor<TTestFlatTablet>, public TTabletExecutedFl
|
451 | 452 |
|
452 | 453 | void ScanComplete(NTable::EAbort, TAutoPtr<IDestructable>, ui64 cookie, const TActorContext&) override
|
453 | 454 | {
|
| 455 | + if (cookie == 12345) { |
| 456 | + // TEmptyScan in follower tests |
| 457 | + return; |
| 458 | + } |
454 | 459 | UNIT_ASSERT_VALUES_EQUAL(cookie, ScanCookie);
|
455 | 460 | Send(Sender, new TEvTestFlatTablet::TEvScanFinished);
|
456 | 461 | }
|
@@ -3535,6 +3540,247 @@ Y_UNIT_TEST_SUITE(TFlatTableExecutor_Follower) {
|
3535 | 3540 | observer.Uninstall(env.Env);
|
3536 | 3541 | }
|
3537 | 3542 |
|
| 3543 | + struct TTxMakeSnapshot : public ITransaction { |
| 3544 | + bool Execute(TTransactionContext& txc, const TActorContext&) override { |
| 3545 | + TIntrusivePtr<TTableSnapshotContext> snapContext = |
| 3546 | + new TTestTableSnapshotContext({TRowsModel::TableId}); |
| 3547 | + txc.Env.MakeSnapshot(snapContext); |
| 3548 | + return true; |
| 3549 | + } |
| 3550 | + |
| 3551 | + void Complete(const TActorContext& ctx) override { |
| 3552 | + ctx.Send(ctx.SelfID, new NFake::TEvReturn); |
| 3553 | + } |
| 3554 | + }; |
| 3555 | + |
| 3556 | + struct TTxBorrowSnapshot : public ITransactionWithExecutor { |
| 3557 | + TTxBorrowSnapshot(TString& snapBody, TIntrusivePtr<TTableSnapshotContext> snapContext, ui64 targetTabletId) |
| 3558 | + : SnapBody(snapBody) |
| 3559 | + , SnapContext(std::move(snapContext)) |
| 3560 | + , TargetTabletId(targetTabletId) |
| 3561 | + { } |
| 3562 | + |
| 3563 | + bool Execute(TTransactionContext& txc, const TActorContext&) override { |
| 3564 | + SnapBody = Executor->BorrowSnapshot(TRowsModel::TableId, *SnapContext, { }, { }, TargetTabletId); |
| 3565 | + txc.Env.DropSnapshot(SnapContext); |
| 3566 | + return true; |
| 3567 | + } |
| 3568 | + |
| 3569 | + void Complete(const TActorContext& ctx) override { |
| 3570 | + ctx.Send(ctx.SelfID, new NFake::TEvReturn); |
| 3571 | + } |
| 3572 | + |
| 3573 | + private: |
| 3574 | + TString& SnapBody; |
| 3575 | + TIntrusivePtr<TTableSnapshotContext> SnapContext; |
| 3576 | + const ui64 TargetTabletId; |
| 3577 | + }; |
| 3578 | + |
| 3579 | + struct TTxLoanSnapshot : public ITransaction { |
| 3580 | + TTxLoanSnapshot(TString snapBody) |
| 3581 | + : SnapBody(snapBody) |
| 3582 | + { } |
| 3583 | + |
| 3584 | + bool Execute(TTransactionContext& txc, const TActorContext& ctx) override { |
| 3585 | + txc.Env.LoanTable(TRowsModel::TableId, SnapBody); |
| 3586 | + ctx.Send(ctx.SelfID, new NFake::TEvReturn); |
| 3587 | + return true; |
| 3588 | + } |
| 3589 | + |
| 3590 | + void Complete(const TActorContext&) override { |
| 3591 | + } |
| 3592 | + |
| 3593 | + private: |
| 3594 | + TString SnapBody; |
| 3595 | + }; |
| 3596 | + |
| 3597 | + struct TEmptyScan : public NTable::IScan { |
| 3598 | + TEmptyScan() = default; |
| 3599 | + |
| 3600 | + void Describe(IOutputStream& out) const noexcept override { |
| 3601 | + out << "TEmptyScan{}"; |
| 3602 | + } |
| 3603 | + |
| 3604 | + TInitialState Prepare(NTable::IDriver*, TIntrusiveConstPtr<TScheme>) noexcept override { |
| 3605 | + return {EScan::Final, {}}; |
| 3606 | + } |
| 3607 | + |
| 3608 | + EScan Seek(TLead&, ui64) noexcept override { |
| 3609 | + Y_ABORT("unreachable"); |
| 3610 | + } |
| 3611 | + |
| 3612 | + EScan Feed(TArrayRef<const TCell>, const TRow&) noexcept override { |
| 3613 | + Y_ABORT("unreachable"); |
| 3614 | + } |
| 3615 | + |
| 3616 | + TAutoPtr<IDestructable> Finish(EAbort) noexcept override { |
| 3617 | + delete this; |
| 3618 | + return nullptr; |
| 3619 | + } |
| 3620 | + }; |
| 3621 | + |
| 3622 | + struct TTxQueueScan : public ITransactionWithExecutor { |
| 3623 | + TTxQueueScan() = default; |
| 3624 | + |
| 3625 | + bool Execute(TTransactionContext&, const TActorContext& ctx) override { |
| 3626 | + Executor->QueueScan(TRowsModel::TableId, |
| 3627 | + new TEmptyScan(), |
| 3628 | + /* cookie */ 12345, |
| 3629 | + TScanOptions().DisableResourceBroker()); |
| 3630 | + ctx.Send(ctx.SelfID, new NFake::TEvReturn); |
| 3631 | + return true; |
| 3632 | + } |
| 3633 | + |
| 3634 | + void Complete(const TActorContext&) override { |
| 3635 | + // nothing |
| 3636 | + } |
| 3637 | + }; |
| 3638 | + |
| 3639 | + // Regression test for KIKIMR-18605 |
| 3640 | + Y_UNIT_TEST(FollowerAttachOnTxQueueScanSnapshot) { |
| 3641 | + TMyEnvBase env; |
| 3642 | + TRowsModel rows; |
| 3643 | + |
| 3644 | + env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); |
| 3645 | + |
| 3646 | + // Start the tablet |
| 3647 | + env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { |
| 3648 | + return new TTestFlatTablet(env.Edge, tablet, info); |
| 3649 | + }); |
| 3650 | + env.WaitForWakeUp(); |
| 3651 | + |
| 3652 | + Cerr << "... initializing schema" << Endl; |
| 3653 | + env.SendSync(rows.MakeScheme(new TCompactionPolicy())); |
| 3654 | + Cerr << "... inserting rows" << Endl; |
| 3655 | + env.SendSync(rows.MakeRows(10, 0, 10)); |
| 3656 | + |
| 3657 | + Cerr << "... starting follower" << Endl; |
| 3658 | + TBlockEvents<TEvTablet::TEvNewFollowerAttached> blockedAttach(env.Env); |
| 3659 | + env.FireFollower(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { |
| 3660 | + return new TTestFlatTablet(env.Edge, tablet, info); |
| 3661 | + }, /* followerId */ 1); |
| 3662 | + env->WaitFor("follower attach", [&]{ return blockedAttach.size() >= 1; }); |
| 3663 | + |
| 3664 | + TBlockEvents<TEvBlobStorage::TEvPut> blockedSnapshot(env.Env, |
| 3665 | + [tablet = env.Tablet](const auto& ev) { |
| 3666 | + auto* msg = ev->Get(); |
| 3667 | + if (msg->Id.TabletID() == tablet && msg->Id.Cookie() == /* SnapLz4 */ 0x7000) { |
| 3668 | + return true; |
| 3669 | + } |
| 3670 | + return false; |
| 3671 | + }); |
| 3672 | + |
| 3673 | + Cerr << "... spamming QueueScan transactions" << Endl; |
| 3674 | + for (ui32 i = 0; i < 400; ++i) { |
| 3675 | + env.SendSync(new NFake::TEvExecute{ new TTxQueueScan }); |
| 3676 | + if (!blockedSnapshot.empty()) { |
| 3677 | + break; |
| 3678 | + } |
| 3679 | + } |
| 3680 | + UNIT_ASSERT_C(!blockedSnapshot.empty(), "expected tablet to make a log snapshot after 400 scans"); |
| 3681 | + |
| 3682 | + Cerr << "... unblocking attach" << Endl; |
| 3683 | + blockedAttach.Stop().Unblock(); |
| 3684 | + |
| 3685 | + Cerr << "... unblocking snapshot, expecting follower to activate" << Endl; |
| 3686 | + blockedSnapshot.Stop().Unblock(); |
| 3687 | + env.WaitForWakeUp(); |
| 3688 | + |
| 3689 | + // Bug 1: snapshot created in TTxQueueScan had an incremented serial, conflicting with the next change |
| 3690 | + Cerr << "... inserting rows" << Endl; |
| 3691 | + env.SendSync(rows.MakeRows(10, 0, 10)); |
| 3692 | + env->SimulateSleep(TDuration::MilliSeconds(1)); |
| 3693 | + |
| 3694 | + Cerr << "... rebooting leader" << Endl; |
| 3695 | + env.SendSync(new TEvents::TEvPoison, false, true); |
| 3696 | + env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { |
| 3697 | + return new TTestFlatTablet(env.Edge, tablet, info); |
| 3698 | + }); |
| 3699 | + env.WaitForWakeUp(); |
| 3700 | + } |
| 3701 | + |
| 3702 | + Y_UNIT_TEST(FollowerAttachAfterLoan) { |
| 3703 | + TMyEnvBase env; |
| 3704 | + TRowsModel rows; |
| 3705 | + |
| 3706 | + env->SetLogPriority(NKikimrServices::TABLET_EXECUTOR, NActors::NLog::PRI_DEBUG); |
| 3707 | + |
| 3708 | + // Start the source tablet |
| 3709 | + env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { |
| 3710 | + return new TTestFlatTablet(env.Edge, tablet, info); |
| 3711 | + }); |
| 3712 | + env.WaitForWakeUp(); |
| 3713 | + |
| 3714 | + Cerr << "... initializing schema" << Endl; |
| 3715 | + env.SendSync(rows.MakeScheme(new TCompactionPolicy())); |
| 3716 | + Cerr << "... inserting rows" << Endl; |
| 3717 | + env.SendSync(rows.MakeRows(10, 0, 10)); |
| 3718 | + Cerr << "... making snapshot" << Endl; |
| 3719 | + env.SendSync(new NFake::TEvExecute{ new TTxMakeSnapshot }); |
| 3720 | + Cerr << "... waiting for snapshot to complete" << Endl; |
| 3721 | + auto evSnapshot = env.GrabEdgeEvent<TEvTestFlatTablet::TEvSnapshotComplete>(); |
| 3722 | + Cerr << "...borrowing snapshot" << Endl; |
| 3723 | + TString snapBody; |
| 3724 | + env.SendSync(new NFake::TEvExecute{ new TTxBorrowSnapshot(snapBody, evSnapshot->Get()->SnapContext, env.Tablet + 1) }); |
| 3725 | + |
| 3726 | + // Stop the source tablet |
| 3727 | + Cerr << "... stopping the source tablet" << Endl; |
| 3728 | + env.SendSync(new TEvents::TEvPoison, false, true); |
| 3729 | + env.WaitForGone(); |
| 3730 | + |
| 3731 | + // Starting the destination tablet |
| 3732 | + Cerr << "... starting the destination tablet" << Endl; |
| 3733 | + ++env.Tablet; |
| 3734 | + env.FireTablet(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { |
| 3735 | + return new TTestFlatTablet(env.Edge, tablet, info); |
| 3736 | + }); |
| 3737 | + env.WaitForWakeUp(); |
| 3738 | + |
| 3739 | + Cerr << "... initializing schema" << Endl; |
| 3740 | + env.SendSync(rows.MakeScheme(new TCompactionPolicy())); |
| 3741 | + |
| 3742 | + // Loan snapshot while metadata loading is blocked |
| 3743 | + Cerr << "... loaning snapshot" << Endl; |
| 3744 | + TBlockEvents<TEvBlobStorage::TEvGet> blockedMetadata(env.Env, |
| 3745 | + [tablet = env.Tablet - 1](const auto& ev) { |
| 3746 | + auto* msg = ev->Get(); |
| 3747 | + for (ui32 i = 0; i < msg->QuerySize; ++i) { |
| 3748 | + const auto& id = msg->Queries[i].Id; |
| 3749 | + Cerr << "get blob " << id << Endl; |
| 3750 | + if (id.TabletID() == tablet && (id.Cookie() >> 12) == /* Pack */ 3) { |
| 3751 | + return true; |
| 3752 | + } |
| 3753 | + } |
| 3754 | + return false; |
| 3755 | + }); |
| 3756 | + env.SendSync(new NFake::TEvExecute{ new TTxLoanSnapshot(snapBody) }); |
| 3757 | + env->WaitFor("blocked metadata", [&]{ return blockedMetadata.size() >= 1; }); |
| 3758 | + |
| 3759 | + Cerr << "... starting follower" << Endl; |
| 3760 | + TBlockEvents<TEvTablet::TEvNewFollowerAttached> blockedAttach(env.Env); |
| 3761 | + env.FireFollower(env.Edge, env.Tablet, [&env](const TActorId &tablet, TTabletStorageInfo *info) { |
| 3762 | + return new TTestFlatTablet(env.Edge, tablet, info); |
| 3763 | + }, /* followerId */ 1); |
| 3764 | + env->WaitFor("follower attach", [&]{ return blockedAttach.size() >= 1; }); |
| 3765 | + |
| 3766 | + TBlockEvents<TEvBlobStorage::TEvPut> blockedSnapshot(env.Env, |
| 3767 | + [tablet = env.Tablet](const auto& ev) { |
| 3768 | + auto* msg = ev->Get(); |
| 3769 | + if (msg->Id.TabletID() == tablet && msg->Id.Cookie() == /* SnapLz4 */ 0x7000) { |
| 3770 | + return true; |
| 3771 | + } |
| 3772 | + return false; |
| 3773 | + }); |
| 3774 | + blockedAttach.Stop().Unblock(); |
| 3775 | + |
| 3776 | + // Bug 2: part switch completion was missing follower snapshot condition |
| 3777 | + Cerr << "... unblocking metadata" << Endl; |
| 3778 | + blockedMetadata.Stop().Unblock(); |
| 3779 | + |
| 3780 | + env->SimulateSleep(TDuration::MilliSeconds(1)); |
| 3781 | + UNIT_ASSERT_C(!blockedSnapshot.empty(), "expected tablet to make a log snapshot after part switch"); |
| 3782 | + } |
| 3783 | + |
3538 | 3784 | }
|
3539 | 3785 |
|
3540 | 3786 | Y_UNIT_TEST_SUITE(TFlatTableExecutor_RejectProbability) {
|
|
0 commit comments