6
6
#include < ydb/core/persqueue/events/global.h>
7
7
#include < ydb/core/persqueue/user_info.h>
8
8
#include < ydb/core/persqueue/write_meta.h>
9
+ #include < ydb/core/testlib/actors/block_events.h>
9
10
#include < ydb/core/tx/scheme_board/events.h>
10
11
#include < ydb/core/tx/scheme_board/events_internal.h>
11
12
#include < ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
@@ -1985,7 +1986,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1985
1986
return result;
1986
1987
}
1987
1988
1988
- void WaitForContent (TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
1989
+ TVector<NJson::TJsonValue> WaitForContent (TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
1989
1990
while (true ) {
1990
1991
const auto records = GetRecords (*server->GetRuntime (), sender, path, 0 );
1991
1992
for (ui32 i = 0 ; i < std::min (records.size (), expected.size ()); ++i) {
@@ -1995,7 +1996,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
1995
1996
if (records.size () >= expected.size ()) {
1996
1997
UNIT_ASSERT_VALUES_EQUAL_C (records.size (), expected.size (),
1997
1998
" Unexpected record: " << records.at (expected.size ()).second );
1998
- break ;
1999
+ TVector<NJson::TJsonValue> values;
2000
+ for (const auto & pr : records) {
2001
+ bool ok = NJson::ReadJsonTree (pr.second , &values.emplace_back ());
2002
+ Y_ABORT_UNLESS (ok);
2003
+ }
2004
+ return values;
1999
2005
}
2000
2006
2001
2007
SimulateSleep (server, TDuration::Seconds (1 ));
@@ -3692,6 +3698,141 @@ Y_UNIT_TEST_SUITE(Cdc) {
3692
3698
});
3693
3699
}
3694
3700
3701
+ Y_UNIT_TEST (ResolvedTimestampForDisplacedUpsert) {
3702
+ TPortManager portManager;
3703
+ TServer::TPtr server = new TServer (TServerSettings (portManager.GetPort (2134 ), {}, DefaultPQConfig ())
3704
+ .SetUseRealThreads (false )
3705
+ .SetDomainName (" Root" )
3706
+ );
3707
+
3708
+ TDisableDataShardLogBatching disableDataShardLogBatching;
3709
+
3710
+ auto & runtime = *server->GetRuntime ();
3711
+ const auto edgeActor = runtime.AllocateEdgeActor ();
3712
+
3713
+ SetupLogging (runtime);
3714
+ InitRoot (server, edgeActor);
3715
+ SetSplitMergePartCountLimit (&runtime, -1 );
3716
+ CreateShardedTable (server, edgeActor, " /Root" , " Table" , SimpleTable ());
3717
+
3718
+ WaitTxNotification (server, edgeActor, AsyncAlterAddStream (server, " /Root" , " Table" ,
3719
+ WithVirtualTimestamps (WithResolvedTimestamps (
3720
+ TDuration::Seconds (3 ), Updates (NKikimrSchemeOp::ECdcStreamFormatJson)))));
3721
+
3722
+ Cerr << " ... prepare" << Endl;
3723
+ WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3724
+ R"( {"resolved":"***"})" ,
3725
+ });
3726
+
3727
+ KqpSimpleExec (runtime, R"(
3728
+ UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
3729
+ )" );
3730
+
3731
+ auto records = WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3732
+ R"( {"resolved":"***"})" ,
3733
+ R"( {"update":{"value":10},"key":[1],"ts":"***"})" ,
3734
+ R"( {"resolved":"***"})" ,
3735
+ });
3736
+
3737
+ // Take the final step
3738
+ ui64 lastStep = records.back ()[" resolved" ][0 ].GetUInteger ();
3739
+ Cerr << " ... last heartbeat at " << lastStep << Endl;
3740
+
3741
+ const auto tableId = ResolveTableId (server, edgeActor, " /Root/Table" );
3742
+ const auto shards = GetTableShards (server, edgeActor, " /Root/Table" );
3743
+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 1u );
3744
+
3745
+ ui64 coordinator = ChangeStateStorage (Coordinator, server->GetSettings ().Domain );
3746
+ ui64 snapshotStep = lastStep + 3000 - 1 ;
3747
+ ForwardToTablet (runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps (coordinator, snapshotStep));
3748
+
3749
+ TBlockEvents<TEvMediatorTimecast::TEvGranularUpdate> blockedGranularUpdates (runtime,
3750
+ [&](auto & ev) {
3751
+ return ev->Get ()->Record .GetLatestStep () > snapshotStep;
3752
+ });
3753
+ TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates (runtime,
3754
+ [&](auto & ev) {
3755
+ return ev->Get ()->Record .GetTimeBarrier () > snapshotStep;
3756
+ });
3757
+
3758
+ Cerr << " ... performing a read from snapshot just before the next heartbeat" << Endl;
3759
+ {
3760
+ auto req = std::make_unique<TEvDataShard::TEvRead>();
3761
+ {
3762
+ auto & record = req->Record ;
3763
+ record.SetReadId (1 );
3764
+ record.MutableTableId ()->SetOwnerId (tableId.PathId .OwnerId );
3765
+ record.MutableTableId ()->SetTableId (tableId.PathId .LocalPathId );
3766
+ record.AddColumns (1 );
3767
+ record.AddColumns (2 );
3768
+ record.SetResultFormat (NKikimrDataEvents::FORMAT_CELLVEC);
3769
+ ui32 key = 1 ;
3770
+ TVector<TCell> keys;
3771
+ keys.push_back (TCell::Make (key));
3772
+ req->Keys .push_back (TSerializedCellVec (TSerializedCellVec::Serialize (keys)));
3773
+ record.MutableSnapshot ()->SetStep (snapshotStep);
3774
+ record.MutableSnapshot ()->SetTxId (Max<ui64>());
3775
+ }
3776
+ ForwardToTablet (runtime, shards.at (0 ), edgeActor, req.release ());
3777
+ auto ev = runtime.GrabEdgeEventRethrow <TEvDataShard::TEvReadResult>(edgeActor);
3778
+ auto * res = ev->Get ();
3779
+ UNIT_ASSERT_VALUES_EQUAL (res->Record .GetStatus ().GetCode (), Ydb::StatusIds::SUCCESS);
3780
+ UNIT_ASSERT_VALUES_EQUAL (res->Record .GetFinished (), true );
3781
+ Cerr << " ... read finished" << Endl;
3782
+ }
3783
+ for (int i = 0 ; i < 10 ; ++i) {
3784
+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3785
+ }
3786
+
3787
+ Cerr << " ... starting upsert 1 (expected to displace)" << Endl;
3788
+ auto upsert1 = KqpSimpleSend (runtime, R"(
3789
+ UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
3790
+ )" );
3791
+ for (int i = 0 ; i < 10 ; ++i) {
3792
+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3793
+ }
3794
+
3795
+ Cerr << " ... starting upsert 2 (expected to displace)" << Endl;
3796
+ auto upsert2 = KqpSimpleSend (runtime, R"(
3797
+ UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
3798
+ )" );
3799
+ for (int i = 0 ; i < 10 ; ++i) {
3800
+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3801
+ }
3802
+
3803
+ Cerr << " ... unblocking updates" << Endl;
3804
+ blockedGranularUpdates.Unblock ().Stop ();
3805
+ blockedUpdates.Unblock ().Stop ();
3806
+ for (int i = 0 ; i < 10 ; ++i) {
3807
+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3808
+ }
3809
+
3810
+ Cerr << " ... checking the update is logged before the new resolved timestamp" << Endl;
3811
+ records = WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3812
+ R"( {"resolved":"***"})" ,
3813
+ R"( {"update":{"value":10},"key":[1],"ts":"***"})" ,
3814
+ R"( {"resolved":"***"})" ,
3815
+ R"( {"update":{"value":20},"key":[2],"ts":"***"})" ,
3816
+ R"( {"update":{"value":30},"key":[3],"ts":"***"})" ,
3817
+ R"( {"resolved":"***"})" ,
3818
+ });
3819
+
3820
+ TRowVersion resolved (0 , 0 );
3821
+ for (auto & record : records) {
3822
+ if (record.Has (" resolved" )) {
3823
+ resolved.Step = record[" resolved" ][0 ].GetUInteger ();
3824
+ resolved.TxId = record[" resolved" ][1 ].GetUInteger ();
3825
+ }
3826
+ if (record.Has (" ts" )) {
3827
+ TRowVersion ts (
3828
+ record[" ts" ][0 ].GetUInteger (),
3829
+ record[" ts" ][1 ].GetUInteger ());
3830
+ UNIT_ASSERT_C (resolved < ts,
3831
+ " Record with ts " << ts << " after resolved " << resolved);
3832
+ }
3833
+ }
3834
+ }
3835
+
3695
3836
} // Cdc
3696
3837
3697
3838
} // NKikimr
0 commit comments