|
5 | 5 |
|
6 | 6 | #include <ydb/core/base/blobstorage.h>
|
7 | 7 | #include <ydb/core/kqp/executer_actor/kqp_executer.h>
|
| 8 | +#include <ydb/core/testlib/actors/block_events.h> |
8 | 9 |
|
9 | 10 | namespace NKikimr {
|
10 | 11 |
|
@@ -2988,6 +2989,94 @@ Y_UNIT_TEST_SUITE(DataShardVolatile) {
|
2988 | 2989 | "ERROR: ABORTED");
|
2989 | 2990 | }
|
2990 | 2991 |
|
| 2992 | + Y_UNIT_TEST(UpsertDependenciesShardsRestart) { |
| 2993 | + TPortManager pm; |
| 2994 | + TServerSettings serverSettings(pm.GetPort(2134)); |
| 2995 | + serverSettings.SetDomainName("Root") |
| 2996 | + .SetUseRealThreads(false) |
| 2997 | + .SetEnableDataShardVolatileTransactions(true); |
| 2998 | + |
| 2999 | + Tests::TServer::TPtr server = new TServer(serverSettings); |
| 3000 | + auto &runtime = *server->GetRuntime(); |
| 3001 | + auto sender = runtime.AllocateEdgeActor(); |
| 3002 | + |
| 3003 | + runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE); |
| 3004 | + runtime.SetLogPriority(NKikimrServices::PIPE_CLIENT, NLog::PRI_TRACE); |
| 3005 | + |
| 3006 | + InitRoot(server, sender); |
| 3007 | + |
| 3008 | + UNIT_ASSERT_VALUES_EQUAL( |
| 3009 | + KqpSchemeExec(runtime, R"( |
| 3010 | + CREATE TABLE `/Root/table` (key uint32, value uint32, PRIMARY KEY (key)) |
| 3011 | + WITH (PARTITION_AT_KEYS = (10)); |
| 3012 | + )"), |
| 3013 | + "SUCCESS"); |
| 3014 | + |
| 3015 | + const auto shards = GetTableShards(server, sender, "/Root/table"); |
| 3016 | + UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u); |
| 3017 | + |
| 3018 | + // We need to fill table with some data |
| 3019 | + Cerr << "========= Upserting initial values =========" << Endl; |
| 3020 | + KqpSimpleExec(runtime, R"( |
| 3021 | + UPSERT INTO `/Root/table` (key, subkey, value) |
| 3022 | + VALUES (1, 1), (11, 11) |
| 3023 | + )"); |
| 3024 | + |
| 3025 | + TForceVolatileProposeArbiter forceArbiter(runtime, shards.at(0)); |
| 3026 | + TBlockEvents<TEvTxProcessing::TEvPlanStep> blockedPlan(runtime, |
| 3027 | + [actor = ResolveTablet(runtime, shards.at(0))](const auto& ev) { |
| 3028 | + return ev->GetRecipientRewrite() == actor; |
| 3029 | + }); |
| 3030 | + |
| 3031 | + Cerr << "========= Starting upsert 1 =========" << Endl; |
| 3032 | + auto upsertFuture1 = KqpSimpleSend(runtime, R"( |
| 3033 | + UPSERT INTO `/Root/table` (key, value) |
| 3034 | + VALUES (2, 2), (12, 12); |
| 3035 | + )"); |
| 3036 | + runtime.SimulateSleep(TDuration::Seconds(1)); |
| 3037 | + |
| 3038 | + Cerr << "========= Starting upsert 2 =========" << Endl; |
| 3039 | + auto upsertFuture2 = KqpSimpleSend(runtime, R"( |
| 3040 | + UPSERT INTO `/Root/table` (key, value) |
| 3041 | + VALUES (2, 1002), (12, 1012); |
| 3042 | + )"); |
| 3043 | + runtime.SimulateSleep(TDuration::Seconds(1)); |
| 3044 | + |
| 3045 | + UNIT_ASSERT_VALUES_EQUAL(blockedPlan.size(), 2u); |
| 3046 | + |
| 3047 | + // We expect transaction to execute at shards[1] |
| 3048 | + // However at shards[0] it didn't even start due to blocked plans |
| 3049 | + // Now we need to restart both shards, without giving them a chance to communicate |
| 3050 | + std::vector<TActorId> shardActors{ |
| 3051 | + ResolveTablet(runtime, shards.at(0)), |
| 3052 | + ResolveTablet(runtime, shards.at(1)), |
| 3053 | + }; |
| 3054 | + for (auto& shardActor : shardActors) { |
| 3055 | + Cerr << "... killing actor " << shardActor << Endl; |
| 3056 | + // Perform a synchronous send, this makes sure both shards handle TEvPoison before anything else |
| 3057 | + runtime.Send(new IEventHandle(shardActor, TActorId(), new TEvents::TEvPoison), 0, /* viaActorSystem */ false); |
| 3058 | + } |
| 3059 | + |
| 3060 | + blockedPlan.Stop().clear(); |
| 3061 | + |
| 3062 | + // Both queries should abort with UNDETERMINED |
| 3063 | + Cerr << "... waiting for query results" << Endl; |
| 3064 | + UNIT_ASSERT_VALUES_EQUAL( |
| 3065 | + FormatResult(runtime.WaitFuture(std::move(upsertFuture1))), |
| 3066 | + "ERROR: UNDETERMINED"); |
| 3067 | + UNIT_ASSERT_VALUES_EQUAL( |
| 3068 | + FormatResult(runtime.WaitFuture(std::move(upsertFuture2))), |
| 3069 | + "ERROR: UNDETERMINED"); |
| 3070 | + |
| 3071 | + // Split the second shard, which makes sure it's not stuck |
| 3072 | + Cerr << "========= Splitting shard 2 =========" << Endl; |
| 3073 | + SetSplitMergePartCountLimit(server->GetRuntime(), -1); |
| 3074 | + ui64 txId = AsyncSplitTable(server, sender, "/Root/table", shards.at(1), 15); |
| 3075 | + Cerr << "... split txId# " << txId << " started" << Endl; |
| 3076 | + WaitTxNotification(server, sender, txId); |
| 3077 | + Cerr << "... split finished" << Endl; |
| 3078 | + } |
| 3079 | + |
2991 | 3080 | } // Y_UNIT_TEST_SUITE(DataShardVolatile)
|
2992 | 3081 |
|
2993 | 3082 | } // namespace NKikimr
|
0 commit comments