6
6
#include < ydb/core/persqueue/events/global.h>
7
7
#include < ydb/core/persqueue/user_info.h>
8
8
#include < ydb/core/persqueue/write_meta.h>
9
+ #include < ydb/core/testlib/actors/block_events.h>
9
10
#include < ydb/core/tx/scheme_board/events.h>
10
11
#include < ydb/core/tx/scheme_board/events_internal.h>
11
12
#include < ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
@@ -1985,7 +1986,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1985
1986
return result;
1986
1987
}
1987
1988
1988
- void WaitForContent (TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
1989
+ TVector<NJson::TJsonValue> WaitForContent (TServer::TPtr server, const TActorId& sender, const TString& path, const TVector<TString>& expected) {
1989
1990
while (true ) {
1990
1991
const auto records = GetRecords (*server->GetRuntime (), sender, path, 0 );
1991
1992
for (ui32 i = 0 ; i < std::min (records.size (), expected.size ()); ++i) {
@@ -1995,7 +1996,12 @@ Y_UNIT_TEST_SUITE(Cdc) {
1995
1996
if (records.size () >= expected.size ()) {
1996
1997
UNIT_ASSERT_VALUES_EQUAL_C (records.size (), expected.size (),
1997
1998
" Unexpected record: " << records.at (expected.size ()).second );
1998
- break ;
1999
+ TVector<NJson::TJsonValue> values;
2000
+ for (const auto & pr : records) {
2001
+ bool ok = NJson::ReadJsonTree (pr.second , &values.emplace_back ());
2002
+ Y_ABORT_UNLESS (ok);
2003
+ }
2004
+ return values;
1999
2005
}
2000
2006
2001
2007
SimulateSleep (server, TDuration::Seconds (1 ));
@@ -3692,6 +3698,136 @@ Y_UNIT_TEST_SUITE(Cdc) {
3692
3698
});
3693
3699
}
3694
3700
3701
+ Y_UNIT_TEST (ResolvedTimestampForDisplacedUpsert) {
3702
+ TPortManager portManager;
3703
+ TServer::TPtr server = new TServer (TServerSettings (portManager.GetPort (2134 ), {}, DefaultPQConfig ())
3704
+ .SetUseRealThreads (false )
3705
+ .SetDomainName (" Root" )
3706
+ );
3707
+
3708
+ TDisableDataShardLogBatching disableDataShardLogBatching;
3709
+
3710
+ auto & runtime = *server->GetRuntime ();
3711
+ const auto edgeActor = runtime.AllocateEdgeActor ();
3712
+
3713
+ SetupLogging (runtime);
3714
+ InitRoot (server, edgeActor);
3715
+ SetSplitMergePartCountLimit (&runtime, -1 );
3716
+ CreateShardedTable (server, edgeActor, " /Root" , " Table" , SimpleTable ());
3717
+
3718
+ WaitTxNotification (server, edgeActor, AsyncAlterAddStream (server, " /Root" , " Table" ,
3719
+ WithVirtualTimestamps (WithResolvedTimestamps (
3720
+ TDuration::Seconds (3 ), Updates (NKikimrSchemeOp::ECdcStreamFormatJson)))));
3721
+
3722
+ Cerr << " ... prepare" << Endl;
3723
+ WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3724
+ R"( {"resolved":"***"})" ,
3725
+ });
3726
+
3727
+ KqpSimpleExec (runtime, R"(
3728
+ UPSERT INTO `/Root/Table` (key, value) VALUES (1, 10);
3729
+ )" );
3730
+
3731
+ auto records = WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3732
+ R"( {"resolved":"***"})" ,
3733
+ R"( {"update":{"value":10},"key":[1],"ts":"***"})" ,
3734
+ R"( {"resolved":"***"})" ,
3735
+ });
3736
+
3737
+ // Take the final step
3738
+ ui64 lastStep = records.back ()[" resolved" ][0 ].GetUInteger ();
3739
+ Cerr << " ... last heartbeat at " << lastStep << Endl;
3740
+
3741
+ const auto tableId = ResolveTableId (server, edgeActor, " /Root/Table" );
3742
+ const auto shards = GetTableShards (server, edgeActor, " /Root/Table" );
3743
+ UNIT_ASSERT_VALUES_EQUAL (shards.size (), 1u );
3744
+
3745
+ ui64 coordinator = ChangeStateStorage (Coordinator, server->GetSettings ().Domain );
3746
+ ui64 snapshotStep = lastStep + 3000 - 1 ;
3747
+ ForwardToTablet (runtime, coordinator, edgeActor, new TEvTxProxy::TEvRequirePlanSteps (coordinator, snapshotStep));
3748
+
3749
+ TBlockEvents<TEvMediatorTimecast::TEvUpdate> blockedUpdates (runtime,
3750
+ [&](auto & ev) {
3751
+ return ev->Get ()->Record .GetTimeBarrier () > snapshotStep;
3752
+ });
3753
+
3754
+ Cerr << " ... performing a read from snapshot just before the next heartbeat" << Endl;
3755
+ {
3756
+ auto req = std::make_unique<TEvDataShard::TEvRead>();
3757
+ {
3758
+ auto & record = req->Record ;
3759
+ record.SetReadId (1 );
3760
+ record.MutableTableId ()->SetOwnerId (tableId.PathId .OwnerId );
3761
+ record.MutableTableId ()->SetTableId (tableId.PathId .LocalPathId );
3762
+ record.AddColumns (1 );
3763
+ record.AddColumns (2 );
3764
+ record.SetResultFormat (NKikimrDataEvents::FORMAT_CELLVEC);
3765
+ ui32 key = 1 ;
3766
+ TVector<TCell> keys;
3767
+ keys.push_back (TCell::Make (key));
3768
+ req->Keys .push_back (TSerializedCellVec (TSerializedCellVec::Serialize (keys)));
3769
+ record.MutableSnapshot ()->SetStep (snapshotStep);
3770
+ record.MutableSnapshot ()->SetTxId (Max<ui64>());
3771
+ }
3772
+ ForwardToTablet (runtime, shards.at (0 ), edgeActor, req.release ());
3773
+ auto ev = runtime.GrabEdgeEventRethrow <TEvDataShard::TEvReadResult>(edgeActor);
3774
+ auto * res = ev->Get ();
3775
+ UNIT_ASSERT_VALUES_EQUAL (res->Record .GetStatus ().GetCode (), Ydb::StatusIds::SUCCESS);
3776
+ UNIT_ASSERT_VALUES_EQUAL (res->Record .GetFinished (), true );
3777
+ Cerr << " ... read finished" << Endl;
3778
+ }
3779
+ for (int i = 0 ; i < 10 ; ++i) {
3780
+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3781
+ }
3782
+
3783
+ Cerr << " ... starting upsert 1 (expected to displace)" << Endl;
3784
+ auto upsert1 = KqpSimpleSend (runtime, R"(
3785
+ UPSERT INTO `/Root/Table` (key, value) VALUES (2, 20);
3786
+ )" );
3787
+ for (int i = 0 ; i < 10 ; ++i) {
3788
+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3789
+ }
3790
+
3791
+ Cerr << " ... starting upsert 2 (expected to displace)" << Endl;
3792
+ auto upsert2 = KqpSimpleSend (runtime, R"(
3793
+ UPSERT INTO `/Root/Table` (key, value) VALUES (3, 30);
3794
+ )" );
3795
+ for (int i = 0 ; i < 10 ; ++i) {
3796
+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3797
+ }
3798
+
3799
+ Cerr << " ... unblocking updates" << Endl;
3800
+ blockedUpdates.Unblock ().Stop ();
3801
+ for (int i = 0 ; i < 10 ; ++i) {
3802
+ runtime.SimulateSleep (TDuration::MilliSeconds (1 ));
3803
+ }
3804
+
3805
+ Cerr << " ... checking the update is logged before the new resolved timestamp" << Endl;
3806
+ records = WaitForContent (server, edgeActor, " /Root/Table/Stream" , {
3807
+ R"( {"resolved":"***"})" ,
3808
+ R"( {"update":{"value":10},"key":[1],"ts":"***"})" ,
3809
+ R"( {"resolved":"***"})" ,
3810
+ R"( {"update":{"value":20},"key":[2],"ts":"***"})" ,
3811
+ R"( {"update":{"value":30},"key":[3],"ts":"***"})" ,
3812
+ R"( {"resolved":"***"})" ,
3813
+ });
3814
+
3815
+ TRowVersion resolved (0 , 0 );
3816
+ for (auto & record : records) {
3817
+ if (record.Has (" resolved" )) {
3818
+ resolved.Step = record[" resolved" ][0 ].GetUInteger ();
3819
+ resolved.TxId = record[" resolved" ][1 ].GetUInteger ();
3820
+ }
3821
+ if (record.Has (" ts" )) {
3822
+ TRowVersion ts (
3823
+ record[" ts" ][0 ].GetUInteger (),
3824
+ record[" ts" ][1 ].GetUInteger ());
3825
+ UNIT_ASSERT_C (resolved < ts,
3826
+ " Record with ts " << ts << " after resolved " << resolved);
3827
+ }
3828
+ }
3829
+ }
3830
+
3695
3831
} // Cdc
3696
3832
3697
3833
} // NKikimr
0 commit comments