Skip to content

Commit 83b1860

Browse files
authored
24-3-13-hotfix: Fix bulk operations breaking frozen locks (#12019)
1 parent 07b2a44 commit 83b1860

File tree

3 files changed

+195
-1
lines changed

3 files changed

+195
-1
lines changed

ydb/core/tx/datashard/datashard__op_rows.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,11 @@ void TDataShard::Handle(TEvDataShard::TEvUploadRowsRequest::TPtr& ev, const TAct
220220
UpdateProposeQueueSize();
221221
return;
222222
}
223+
if (Pipeline.HasProposeDelayers()) {
224+
DelayedProposeQueue.emplace_back().Reset(ev.Release());
225+
UpdateProposeQueueSize();
226+
return;
227+
}
223228
if (IsReplicated()) {
224229
return Reject<TEvDataShard::TEvUploadRowsResponse>(this, ev, "bulk upsert",
225230
ERejectReasons::WrongState, "Can't execute bulk upsert at replicated table", &ReadOnly, ctx, TDataShard::ELogThrottlerType::UploadRows_Reject);
@@ -237,6 +242,11 @@ void TDataShard::Handle(TEvDataShard::TEvEraseRowsRequest::TPtr& ev, const TActo
237242
UpdateProposeQueueSize();
238243
return;
239244
}
245+
if (Pipeline.HasProposeDelayers()) {
246+
DelayedProposeQueue.emplace_back().Reset(ev.Release());
247+
UpdateProposeQueueSize();
248+
return;
249+
}
240250
if (IsReplicated()) {
241251
return Reject<TEvDataShard::TEvEraseRowsResponse>(this, ev, "erase",
242252
ERejectReasons::WrongState, "Can't execute erase at replicated table", &ExecError, ctx, TDataShard::ELogThrottlerType::EraseRows_Reject);

ydb/core/tx/datashard/datashard_ut_snapshot.cpp

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3977,6 +3977,190 @@ Y_UNIT_TEST_SUITE(DataShardSnapshots) {
39773977
"{ items { uint32_value: 4 } items { uint32_value: 40 } }");
39783978
}
39793979

3980+
Y_UNIT_TEST(UncommittedWriteRestartDuringCommitThenBulkErase) {
3981+
NKikimrConfig::TAppConfig app;
3982+
3983+
TPortManager pm;
3984+
TServerSettings serverSettings(pm.GetPort(2134));
3985+
serverSettings.SetDomainName("Root")
3986+
.SetUseRealThreads(false)
3987+
.SetDomainPlanResolution(100)
3988+
.SetAppConfig(app)
3989+
// Bug was with non-volatile transactions
3990+
.SetEnableDataShardVolatileTransactions(false);
3991+
3992+
Tests::TServer::TPtr server = new TServer(serverSettings);
3993+
auto &runtime = *server->GetRuntime();
3994+
auto sender = runtime.AllocateEdgeActor();
3995+
3996+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
3997+
runtime.SetLogPriority(NKikimrServices::TX_PROXY, NLog::PRI_DEBUG);
3998+
runtime.SetLogPriority(NKikimrServices::KQP_EXECUTER, NLog::PRI_TRACE);
3999+
runtime.SetLogPriority(NKikimrServices::KQP_SESSION, NLog::PRI_TRACE);
4000+
4001+
InitRoot(server, sender);
4002+
4003+
TDisableDataShardLogBatching disableDataShardLogBatching;
4004+
UNIT_ASSERT_VALUES_EQUAL(
4005+
KqpSchemeExec(runtime, R"(
4006+
CREATE TABLE `/Root/table` (key Uint32, value Uint32, PRIMARY KEY (key))
4007+
WITH (PARTITION_AT_KEYS = (5));
4008+
)"),
4009+
"SUCCESS");
4010+
4011+
// Insert some initial data
4012+
ExecSQL(server, sender, "UPSERT INTO `/Root/table` (key, value) VALUES (1, 10), (5, 50);");
4013+
4014+
const auto shards = GetTableShards(server, sender, "/Root/table");
4015+
const auto tableId = ResolveTableId(server, sender, "/Root/table");
4016+
UNIT_ASSERT_VALUES_EQUAL(shards.size(), 2u);
4017+
4018+
TString sessionId, txId;
4019+
4020+
// Start inserting a couple of rows into the table
4021+
Cerr << "... sending initial upsert" << Endl;
4022+
UNIT_ASSERT_VALUES_EQUAL(
4023+
KqpSimpleBegin(runtime, sessionId, txId, R"(
4024+
SELECT key, value FROM `/Root/table` WHERE key = 1;
4025+
UPSERT INTO `/Root/table` (key, value) VALUES (2, 20), (6, 60);
4026+
)"),
4027+
"{ items { uint32_value: 1 } items { uint32_value: 10 } }");
4028+
4029+
// We want to block readsets next
4030+
std::vector<std::unique_ptr<IEventHandle>> readSets;
4031+
auto blockReadSets = runtime.AddObserver<TEvTxProcessing::TEvReadSet>([&](TEvTxProcessing::TEvReadSet::TPtr& ev) {
4032+
readSets.emplace_back(ev.Release());
4033+
});
4034+
4035+
// Start committing an additional read/write
4036+
// Note: select on the table flushes accumulated changes first
4037+
Cerr << "... sending commit request" << Endl;
4038+
auto commitFuture = SendRequest(runtime, MakeSimpleRequestRPC(R"(
4039+
SELECT key, value FROM `/Root/table` ORDER BY key;
4040+
)", sessionId, txId, /* commitTx */ true));
4041+
4042+
WaitFor(runtime, [&]{ return readSets.size() >= 2; }, "readset exchange");
4043+
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u);
4044+
4045+
// We want to make sure we block the first progress message when shards reboot
4046+
std::vector<TActorId> shardActors(shards.size());
4047+
UNIT_ASSERT_VALUES_EQUAL(shardActors.size(), 2u);
4048+
std::vector<std::unique_ptr<IEventHandle>> blockedProgress;
4049+
auto blockProgressQueue = runtime.AddObserver([&](TAutoPtr<IEventHandle>& ev) noexcept {
4050+
switch (ev->GetTypeRewrite()) {
4051+
case TEvTablet::TEvBoot::EventType: {
4052+
auto* msg = ev->Get<TEvTablet::TEvBoot>();
4053+
Cerr << "... observed TEvBoot for " << msg->TabletID << " at " << ev->GetRecipientRewrite() << Endl;
4054+
auto it = std::find(shards.begin(), shards.end(), msg->TabletID);
4055+
if (it != shards.end()) {
4056+
shardActors.at(it - shards.begin()) = ev->GetRecipientRewrite();
4057+
}
4058+
break;
4059+
}
4060+
case EventSpaceBegin(TKikimrEvents::ES_PRIVATE) + 0 /* EvProgressTransaction */: {
4061+
auto it = std::find(shardActors.begin(), shardActors.end(), ev->GetRecipientRewrite());
4062+
if (it != shardActors.end()) {
4063+
ui64 shardId = shards.at(it - shardActors.begin());
4064+
Cerr << "... blocking TEvProgressTranasction at " << ev->GetRecipientRewrite() << " shard " << shardId << Endl;
4065+
blockedProgress.emplace_back(ev.Release());
4066+
return;
4067+
}
4068+
break;
4069+
}
4070+
}
4071+
});
4072+
4073+
// Clear old readsets and reboot both shards with TEvPoison
4074+
// This way shards don't have a chance to reply causing an UNDETERMINED error
4075+
readSets.clear();
4076+
for (ui64 shardId : shards) {
4077+
Cerr << "... sending TEvPoison to " << shardId << Endl;
4078+
ForwardToTablet(runtime, shardId, sender, new TEvents::TEvPoison);
4079+
}
4080+
4081+
// Note: we cannot wait for the commit result, since KQP is blocked trying to abort
4082+
4083+
// Sleep a little to make sure everything settles
4084+
Cerr << "... sleeping for 1 second" << Endl;
4085+
runtime.SimulateSleep(TDuration::Seconds(1));
4086+
4087+
UNIT_ASSERT_VALUES_EQUAL(readSets.size(), 2u);
4088+
UNIT_ASSERT_VALUES_EQUAL(blockedProgress.size(), 2u);
4089+
4090+
// Send an erase rows request before the progress queue resumes
4091+
{
4092+
Cerr << "... sending TEvEraseRowsRequest to shard 1 for key 1" << Endl;
4093+
auto req = std::make_unique<TEvDataShard::TEvEraseRowsRequest>();
4094+
req->Record.SetTableId(tableId.PathId.LocalPathId);
4095+
req->Record.SetSchemaVersion(tableId.SchemaVersion);
4096+
req->Record.AddKeyColumnIds(1);
4097+
ui32 key = 1;
4098+
TCell keyCell = TCell::Make(key);
4099+
req->Record.AddKeyColumns(TSerializedCellVec::Serialize(TArrayRef<const TCell>(&keyCell, 1)));
4100+
runtime.Send(new IEventHandle(shardActors.at(0), sender, req.release()), 0, true);
4101+
// Give shard 1 a chance to process this request incorrectly
4102+
Cerr << "... sleeping for 1 second" << Endl;
4103+
runtime.SimulateSleep(TDuration::Seconds(1));
4104+
}
4105+
4106+
// Unblock progress queue and resend blocked messages
4107+
Cerr << "... resending progress queue" << Endl;
4108+
blockProgressQueue.Remove();
4109+
for (auto& ev : blockedProgress) {
4110+
runtime.Send(ev.release(), 0, true);
4111+
}
4112+
blockedProgress.clear();
4113+
4114+
// This insert must run after the currently committing transaction, so it must fail: either read happens before
4115+
// the commit and is broken later by the commit, or the read finds a duplicate row and insert fails. Due to a
4116+
// bug the commit lock might already be broken, causing conflicts not to work properly, and allowing the insert
4117+
// to overwrite key = 2.
4118+
Cerr << "... sending an insert" << Endl;
4119+
auto insertFuture = KqpSimpleSend(runtime, R"(
4120+
INSERT INTO `/Root/table` (key, value) VALUES (2, 22);
4121+
)");
4122+
4123+
// Sleep a little to make sure everything settles
4124+
Cerr << "... sleeping for 1 second" << Endl;
4125+
runtime.SimulateSleep(TDuration::Seconds(1));
4126+
4127+
// Unblock readsets letting transaction to complete
4128+
Cerr << "... resending readsets" << Endl;
4129+
blockReadSets.Remove();
4130+
for (auto& ev : readSets) {
4131+
runtime.Send(ev.release(), 0, true);
4132+
}
4133+
readSets.clear();
4134+
4135+
// Sleep a little to make sure everything settles
4136+
Cerr << "... sleeping for 1 second" << Endl;
4137+
runtime.SimulateSleep(TDuration::Seconds(1));
4138+
4139+
// We expect erase to succeed by this point
4140+
Cerr << "... checking the erase result" << Endl;
4141+
{
4142+
auto ev = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvEraseRowsResponse>(sender);
4143+
UNIT_ASSERT_VALUES_EQUAL(ev->Get()->Record.GetStatus(), NKikimrTxDataShard::TEvEraseRowsResponse::OK);
4144+
}
4145+
4146+
// We expect commit to fail with an UNDETERMINED error
4147+
Cerr << "... checking the commit result" << Endl;
4148+
UNIT_ASSERT_VALUES_EQUAL(
4149+
FormatResult(AwaitResponse(runtime, std::move(commitFuture))),
4150+
"ERROR: UNDETERMINED");
4151+
4152+
// Now make a read query, we must not observe any partial commits
4153+
Cerr << "... checking final table state" << Endl;
4154+
UNIT_ASSERT_VALUES_EQUAL(
4155+
KqpSimpleExec(runtime, R"(
4156+
SELECT key, value FROM `/Root/table`
4157+
ORDER BY key;
4158+
)"),
4159+
"{ items { uint32_value: 2 } items { uint32_value: 20 } }, "
4160+
"{ items { uint32_value: 5 } items { uint32_value: 50 } }, "
4161+
"{ items { uint32_value: 6 } items { uint32_value: 60 } }");
4162+
}
4163+
39804164
/**
39814165
* This observer forces newly created nodes to start on particular nodes
39824166
*/

ydb/core/tx/locks/locks.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ enum class ELockRangeFlags : ui8 {
239239
using ELockRangeFlagsRaw = std::underlying_type<ELockRangeFlags>::type;
240240

241241
inline ELockRangeFlags operator|(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); }
242-
inline ELockRangeFlags operator&(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) | ELockRangeFlagsRaw(b)); }
242+
inline ELockRangeFlags operator&(ELockRangeFlags a, ELockRangeFlags b) { return ELockRangeFlags(ELockRangeFlagsRaw(a) & ELockRangeFlagsRaw(b)); }
243243
inline ELockRangeFlags& operator|=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a | b; }
244244
inline ELockRangeFlags& operator&=(ELockRangeFlags& a, ELockRangeFlags b) { return a = a & b; }
245245
inline bool operator!(ELockRangeFlags c) { return ELockRangeFlagsRaw(c) == 0; }

0 commit comments

Comments
 (0)