@@ -2139,7 +2139,8 @@ Y_UNIT_TEST_SUITE(TClientTest) {
2139
2139
TPortManager tp;
2140
2140
ui16 port = tp.GetPort (2134 );
2141
2141
2142
- const auto settings = TServerSettings (port);
2142
+ const auto settings = TServerSettings (port)
2143
+ .SetUseRealThreads (false );
2143
2144
TServer server (settings);
2144
2145
TClient client (settings);
2145
2146
SetupLogging (server);
@@ -2153,32 +2154,129 @@ Y_UNIT_TEST_SUITE(TClientTest) {
2153
2154
const TActorId edge = runtime.AllocateEdgeActor ();
2154
2155
2155
2156
{
2157
+ ui64 confirmationsCount = 0 ;
2158
+ auto observeConfirmations = [&](TAutoPtr<IEventHandle>& ev) {
2159
+ switch (ev->GetTypeRewrite ()) {
2160
+ case TEvBlobStorage::TEvPut::EventType: {
2161
+ const auto * msg = ev->Get <TEvBlobStorage::TEvPut>();
2162
+ // step 1 is snapshot
2163
+ // step 2 is schema alter
2164
+ // step 3 is expected write below
2165
+ if (msg->Id .TabletID () == tabletId &&
2166
+ msg->Id .Channel () == 0 &&
2167
+ msg->Id .Cookie () == 1 &&
2168
+ msg->Id .Step () > 2 )
2169
+ {
2170
+ ++confirmationsCount;
2171
+ }
2172
+ break ;
2173
+ }
2174
+ }
2175
+ return TTestActorRuntime::EEventAction::PROCESS;
2176
+ };
2177
+ runtime.SetObserverFunc (observeConfirmations);
2178
+
2156
2179
const TActorId leaderTablet = runtime.Register (CreateTablet (edge, tabletInfo.Get (), setupInfo.Get (), 0 , nullptr , nullptr ));
2157
2180
const TActorId leaderId = runtime.GrabEdgeEvent <TEvTablet::TEvRestored>(edge)->Get ()->UserTabletActor ;
2158
- Y_UNUSED (leaderId);
2181
+
2182
+ // we use it to kill leader only when it has sent the write to the follower and it is confirmed
2183
+ const TActorId followerTablet = runtime.Register (CreateTabletFollower (edge, tabletInfo.Get (), setupInfo.Get (), 1 , nullptr , nullptr ));
2184
+
2185
+ auto doLeaderWrite = [&](ui64 key, ui64 value) {
2186
+ const char *writeQuery = R"__( (
2187
+ (let row_ '('('key (Uint64 '%lu))))
2188
+ (let update_ '('('v_ui64 (Uint64 '%lu))))
2189
+ (let result_ (UpdateRow 't_by_ui64 row_ update_))
2190
+ (return (AsList result_))
2191
+ ))__" ;
2192
+
2193
+ THolder<TEvTablet::TEvLocalMKQL> reqWrite = MakeHolder<TEvTablet::TEvLocalMKQL>();
2194
+ reqWrite->Record .MutableProgram ()->MutableProgram ()->SetText (Sprintf (writeQuery, key, value));
2195
+ runtime.Send (new IEventHandle (leaderId, edge, reqWrite.Release ()));
2196
+
2197
+ auto reply = runtime.GrabEdgeEvent <TEvTablet::TEvLocalMKQLResponse>(edge);
2198
+ UNIT_ASSERT_VALUES_EQUAL (reply->Get ()->Record .GetStatus (), 0 );
2199
+ };
2200
+
2201
+ doLeaderWrite (42 , 51 );
2202
+
2203
+ auto waitFor = [&](const auto & condition, const TString& description) {
2204
+ if (!condition ()) {
2205
+ Cerr << " ... waiting for " << description << Endl;
2206
+ TDispatchOptions options;
2207
+ options.CustomFinalCondition = [&]() {
2208
+ return condition ();
2209
+ };
2210
+ runtime.DispatchEvents (options);
2211
+ UNIT_ASSERT_C (condition (), " ... failed to wait for " << description);
2212
+ }
2213
+ };
2214
+
2215
+ waitFor ([&](){ return confirmationsCount > 0 ; }, " Write confirmed" );
2159
2216
2160
2217
runtime.Send (new IEventHandle (leaderTablet, edge, new TEvents::TEvPoisonPill ()));
2161
2218
auto reply = runtime.GrabEdgeEvent <TEvTablet::TEvTabletDead>(edge);
2162
2219
UNIT_ASSERT_VALUES_EQUAL (reply->Get ()->TabletID , tabletId);
2220
+
2221
+ runtime.Send (new IEventHandle (followerTablet, edge, new TEvents::TEvPoisonPill ()));
2222
+ reply = runtime.GrabEdgeEvent <TEvTablet::TEvTabletDead>(edge);
2223
+ UNIT_ASSERT_VALUES_EQUAL (reply->Get ()->TabletID , tabletId);
2163
2224
}
2164
2225
2165
- const TActorId followerTablet = runtime.Register (CreateTabletFollower (edge, tabletInfo.Get (), setupInfo.Get (), 1 , nullptr , nullptr ));
2166
- Y_UNUSED (followerTablet);
2226
+ // now we start follower without its leader
2167
2227
2168
- const TActorId followerId = runtime.GrabEdgeEvent <TEvTablet::TEvRestored>(edge)->Get ()->UserTabletActor ;
2169
- Y_UNUSED (followerId);
2228
+ const TActorId followerEdge = runtime.AllocateEdgeActor ();
2229
+ const TActorId followerTablet = runtime.Register (CreateTabletFollower (followerEdge, tabletInfo.Get (), setupInfo.Get (), 1 , nullptr , nullptr ));
2230
+ Y_UNUSED (followerTablet);
2231
+ const TActorId followerId = runtime.GrabEdgeEvent <TEvTablet::TEvRestored>(followerEdge)->Get ()->UserTabletActor ;
2170
2232
2171
2233
{
2172
2234
NTabletPipe::TClientConfig pipeClientConfig;
2173
2235
pipeClientConfig.AllowFollower = true ;
2174
2236
pipeClientConfig.ForceFollower = true ;
2175
2237
pipeClientConfig.RetryPolicy = {.RetryLimitCount = 2 };
2176
- runtime.Register (NTabletPipe::CreateClient (edge , tabletId, pipeClientConfig));
2238
+ runtime.Register (NTabletPipe::CreateClient (followerEdge , tabletId, pipeClientConfig));
2177
2239
2178
- auto reply = runtime.GrabEdgeEvent <TEvTabletPipe::TEvClientConnected>(edge );
2240
+ auto reply = runtime.GrabEdgeEvent <TEvTabletPipe::TEvClientConnected>(followerEdge );
2179
2241
2180
2242
UNIT_ASSERT_VALUES_EQUAL (reply->Get ()->Status , NKikimrProto::OK);
2181
2243
}
2244
+
2245
+ auto doFollowerRead = [&](ui64 key) -> TMaybe<ui64> {
2246
+ const char *readQuery = R"__( (
2247
+ (let row_ '('('key (Uint64 '%lu))))
2248
+ (let select_ '('v_ui64))
2249
+ (let pgmReturn (AsList
2250
+ (SetResult 'res (SelectRow 't_by_ui64 row_ select_))
2251
+ ))
2252
+ (return pgmReturn)
2253
+ ))__" ;
2254
+
2255
+ THolder<TEvTablet::TEvLocalMKQL> reqRead = MakeHolder<TEvTablet::TEvLocalMKQL>();
2256
+ reqRead->Record .MutableProgram ()->MutableProgram ()->SetText (Sprintf (readQuery, key));
2257
+ runtime.Send (new IEventHandle (followerId, followerEdge, reqRead.Release ()));
2258
+
2259
+ auto reply = runtime.GrabEdgeEvent <TEvTablet::TEvLocalMKQLResponse>(followerEdge);
2260
+ UNIT_ASSERT_VALUES_EQUAL (reply->Get ()->Record .GetStatus (), 0 );
2261
+ const auto res = reply->Get ()->Record
2262
+ .GetExecutionEngineEvaluatedResponse ()
2263
+ .GetValue ()
2264
+ .GetStruct (0 )
2265
+ .GetOptional ();
2266
+ if (!res.HasOptional ()) {
2267
+ return Nothing ();
2268
+ }
2269
+
2270
+ return res
2271
+ .GetOptional ()
2272
+ .GetStruct (0 )
2273
+ .GetOptional ()
2274
+ .GetUint64 ();
2275
+ };
2276
+
2277
+ // Perform basic sanity checks
2278
+ UNIT_ASSERT_VALUES_EQUAL (doFollowerRead (41 ), Nothing ());
2279
+ UNIT_ASSERT_VALUES_EQUAL (doFollowerRead (42 ), 51u );
2182
2280
}
2183
2281
2184
2282
Y_UNIT_TEST (FollowerOfflineBoot) {
0 commit comments