Skip to content

Commit e91747e

Browse files
authored
Fix missing locks on read iterator empty result elision. Fixes #2765. (#2825)
1 parent 8279206 commit e91747e

File tree

2 files changed

+82
-0
lines changed

2 files changed

+82
-0
lines changed

ydb/core/tx/datashard/datashard__read_iterator.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,10 @@ class TReader {
665665
}
666666
}
667667

668+
if (record.TxLocksSize() > 0 || record.BrokenTxLocksSize() > 0) {
669+
useful = true;
670+
}
671+
668672
Self->IncCounter(COUNTER_READ_ITERATOR_ROWS_READ, RowsRead);
669673
if (!isKeysRequest) {
670674
Self->IncCounter(COUNTER_ENGINE_HOST_SELECT_RANGE_ROW_SKIPS, DeletedRowSkips);

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

+78
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
namespace NKikimr {
2424

2525
using namespace NKikimr::NDataShard;
26+
using namespace NKikimr::NDataShard::NKqpHelpers;
2627
using namespace NSchemeShard;
2728
using namespace Tests;
2829

@@ -4167,6 +4168,83 @@ Y_UNIT_TEST_SUITE(DataShardReadIteratorPageFaults) {
41674168
// We should be able to drop table
41684169
WaitTxNotification(server, AsyncDropTable(server, sender, "/Root", "table-1"));
41694170
}
4171+
4172+
Y_UNIT_TEST(LocksNotLostOnPageFault) {
4173+
TPortManager pm;
4174+
NFake::TCaches caches;
4175+
caches.Shared = 1 /* bytes */;
4176+
TServerSettings serverSettings(pm.GetPort(2134));
4177+
serverSettings.SetDomainName("Root")
4178+
.SetUseRealThreads(false)
4179+
.SetCacheParams(caches);
4180+
TServer::TPtr server = new TServer(serverSettings);
4181+
4182+
auto& runtime = *server->GetRuntime();
4183+
auto sender = runtime.AllocateEdgeActor();
4184+
4185+
runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
4186+
4187+
InitRoot(server, sender);
4188+
4189+
TDisableDataShardLogBatching disableDataShardLogBatching;
4190+
4191+
// Use a policy that forces very small page sizes, effectively making each row on its own page
4192+
NLocalDb::TCompactionPolicyPtr policy = NLocalDb::CreateDefaultTablePolicy();
4193+
policy->MinDataPageSize = 1;
4194+
4195+
auto opts = TShardedTableOptions()
4196+
.Columns({{"key", "Int32", true, false},
4197+
{"index", "Int32", true, false},
4198+
{"value", "Int32", false, false}})
4199+
.Policy(policy.Get())
4200+
.ExecutorCacheSize(1 /* byte */);
4201+
auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
4202+
4203+
ExecSQL(server, sender, "UPSERT INTO `/Root/table-1` (key, index, value) VALUES (1, 0, 10), (3, 0, 30), (5, 0, 50), (7, 0, 70), (9, 0, 90);");
4204+
runtime.SimulateSleep(TDuration::Seconds(1));
4205+
4206+
const auto shard1 = shards.at(0);
4207+
CompactTable(runtime, shard1, tableId, false);
4208+
RebootTablet(runtime, shard1, sender);
4209+
runtime.SimulateSleep(TDuration::Seconds(1));
4210+
4211+
// Start a write transaction that has uncommitted write to key (2, 0)
4212+
// This is because read iterator measures "work" in processed/skipped rows, so we have to give it something
4213+
TString writeSessionId, writeTxId;
4214+
UNIT_ASSERT_VALUES_EQUAL(
4215+
KqpSimpleBegin(runtime, writeSessionId, writeTxId, R"(
4216+
UPSERT INTO `/Root/table-1` (key, index, value) VALUES (2, 0, 20), (4, 0, 40);
4217+
4218+
SELECT key, index, value FROM `/Root/table-1`
4219+
WHERE key = 2
4220+
ORDER BY key, index;
4221+
)"),
4222+
"{ items { int32_value: 2 } items { int32_value: 0 } items { int32_value: 20 } }");
4223+
4224+
// Start a read transaction with several range read in a specific order
4225+
// The first two prefixes don't exist (nothing committed yet)
4226+
// The other two prefixes are supposed to page fault
4227+
TString sessionId, txId;
4228+
UNIT_ASSERT_VALUES_EQUAL(
4229+
KqpSimpleBegin(runtime, sessionId, txId, R"(
4230+
SELECT key, index, value FROM `/Root/table-1`
4231+
WHERE key IN (2, 4, 7, 9)
4232+
ORDER BY key, index;
4233+
)"),
4234+
"{ items { int32_value: 7 } items { int32_value: 0 } items { int32_value: 70 } }, "
4235+
"{ items { int32_value: 9 } items { int32_value: 0 } items { int32_value: 90 } }");
4236+
4237+
// Commit the first transaction, it must succeed
4238+
UNIT_ASSERT_VALUES_EQUAL(
4239+
KqpSimpleCommit(runtime, writeSessionId, writeTxId, "SELECT 1;"),
4240+
"{ items { int32_value: 1 } }");
4241+
4242+
// Commit the second transaction with a new upsert, it must not succeed
4243+
UNIT_ASSERT_VALUES_EQUAL(
4244+
KqpSimpleCommit(runtime, sessionId, txId,
4245+
"UPSERT INTO `/Root/table-1` (key, index, value) VALUES (2, 0, 22);"),
4246+
"ERROR: ABORTED");
4247+
}
41704248
}
41714249

41724250
} // namespace NKikimr

0 commit comments

Comments
 (0)