Skip to content

Commit 18ebd43

Browse files
authored
Volatile transaction support in EvWrite (#2196)
1 parent 4bf8208 commit 18ebd43

11 files changed

+173
-53
lines changed

ydb/core/tx/datashard/complete_write_unit.cpp

+22
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,28 @@ void TCompleteWriteUnit::Complete(TOperation::TPtr op, const TActorContext &ctx)
103103

104104
DataShard.EnqueueChangeRecords(std::move(op->ChangeRecords()));
105105
DataShard.EmitHeartbeats();
106+
107+
if (op->HasOutputData()) {
108+
const auto& outReadSets = op->OutReadSets();
109+
const auto& expectedReadSets = op->ExpectedReadSets();
110+
auto itOut = outReadSets.begin();
111+
auto itExpected = expectedReadSets.begin();
112+
while (itExpected != expectedReadSets.end()) {
113+
while (itOut != outReadSets.end() && itOut->first < itExpected->first) {
114+
++itOut;
115+
}
116+
if (itOut != outReadSets.end() && itOut->first == itExpected->first) {
117+
++itOut;
118+
++itExpected;
119+
continue;
120+
}
121+
// We have an expected readset without a corresponding out readset
122+
for (const auto& recipient : itExpected->second) {
123+
DataShard.SendReadSetNoData(ctx, recipient, op->GetStep(), op->GetTxId(), itExpected->first.first, itExpected->first.second);
124+
}
125+
++itExpected;
126+
}
127+
}
106128
}
107129

108130
THolder<TExecutionUnit> CreateCompleteWriteUnit(TDataShard &dataShard, TPipeline &pipeline)

ydb/core/tx/datashard/datashard.cpp

+52-5
Original file line numberDiff line numberDiff line change
@@ -582,11 +582,18 @@ class TDataShard::TSendVolatileResult final : public IVolatileTxCallback {
582582
{ }
583583

584584
void OnCommit(ui64) override {
585-
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
585+
TString error = Result->GetError();
586+
if (error) {
587+
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
588+
"Complete [" << Step << " : " << TxId << "] from " << Self->TabletID()
589+
<< " at tablet " << Self->TabletID() << ", error: " << error);
590+
} else {
591+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
586592
"Complete [" << Step << " : " << TxId << "] from " << Self->TabletID()
587593
<< " at tablet " << Self->TabletID() << " send result to client "
588594
<< Target << ", exec latency: " << Result->Record.GetExecLatency()
589595
<< " ms, propose latency: " << Result->Record.GetProposeLatency() << " ms");
596+
}
590597

591598
ui64 resultSize = Result->GetTxResult().size();
592599
ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0);
@@ -609,6 +616,49 @@ class TDataShard::TSendVolatileResult final : public IVolatileTxCallback {
609616
ui64 TxId;
610617
};
611618

619+
class TDataShard::TSendVolatileWriteResult final: public IVolatileTxCallback {
620+
public:
621+
TSendVolatileWriteResult(
622+
TDataShard* self, std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> writeResult,
623+
const TActorId& target,
624+
ui64 step, ui64 txId
625+
)
626+
: Self(self)
627+
, WriteResult(std::move(writeResult))
628+
, Target(target)
629+
, Step(step)
630+
, TxId(txId)
631+
{
632+
}
633+
634+
void OnCommit(ui64) override {
635+
if (WriteResult->IsError()) {
636+
LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
637+
"Complete volatile write [" << Step << " : " << TxId << "] from " << Self->TabletID()
638+
<< " at tablet " << Self->TabletID() << ", error: " << WriteResult->GetError());
639+
} else {
640+
LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
641+
"Complete volatile write [" << Step << " : " << TxId << "] from " << Self->TabletID()
642+
<< " at tablet " << Self->TabletID() << " send result to client " << Target);
643+
}
644+
645+
LWTRACK(ProposeTransactionSendResult, WriteResult->GetOrbit());
646+
Self->Send(Target, WriteResult.release(), 0);
647+
}
648+
649+
void OnAbort(ui64 txId) override {
650+
WriteResult = NEvents::TDataEvents::TEvWriteResult::BuildError(Self->TabletID(), txId, NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED, "Distributed transaction aborted due to commit failure");
651+
OnCommit(txId);
652+
}
653+
654+
private:
655+
TDataShard* Self;
656+
std::unique_ptr<NEvents::TDataEvents::TEvWriteResult> WriteResult;
657+
TActorId Target;
658+
ui64 Step;
659+
ui64 TxId;
660+
};
661+
612662
void TDataShard::SendResult(const TActorContext &ctx,
613663
TOutputOpData::TResultPtr &res,
614664
const TActorId &target,
@@ -640,15 +690,12 @@ void TDataShard::SendResult(const TActorContext &ctx,
640690
void TDataShard::SendWriteResult(const TActorContext& ctx, std::unique_ptr<NEvents::TDataEvents::TEvWriteResult>& result, const TActorId& target, ui64 step, ui64 txId) {
641691
Y_ABORT_UNLESS(txId == result->Record.GetTxId(), "%" PRIu64 " vs %" PRIu64, txId, result->Record.GetTxId());
642692

643-
// TODO: Volatile
644-
/*
645693
if (VolatileTxManager.FindByTxId(txId)) {
646694
// This is a volatile transaction, and we need to wait until it is resolved
647-
bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, new TSendVolatileResult(this, std::move(result), target, step, txId));
695+
bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, new TSendVolatileWriteResult(this, std::move(result), target, step, txId));
648696
Y_ABORT_UNLESS(ok);
649697
return;
650698
}
651-
*/
652699

653700
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Complete write [" << step << " : " << txId << "] from " << TabletID() << " at tablet " << TabletID() << " send result to client " << target);
654701

ydb/core/tx/datashard/datashard_impl.h

+1
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ class TDataShard
328328

329329
class TWaitVolatileDependencies;
330330
class TSendVolatileResult;
331+
class TSendVolatileWriteResult;
331332

332333
struct TEvPrivate {
333334
enum EEv {

ydb/core/tx/datashard/datashard_pipeline.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -600,9 +600,9 @@ bool TPipeline::LoadWriteDetails(TTransactionContext& txc, const TActorContext&
600600
if (it != DataTxCache.end()) {
601601
auto baseTx = it->second;
602602
Y_ABORT_UNLESS(baseTx->GetType() == TValidatedTx::EType::WriteTx, "Wrong writeOp type in cache");
603-
TValidatedWriteTx::TPtr dataTx = std::static_pointer_cast<TValidatedWriteTx>(baseTx);
603+
TValidatedWriteTx::TPtr writeTx = std::static_pointer_cast<TValidatedWriteTx>(baseTx);
604604

605-
writeOp->FillTxData(dataTx);
605+
writeOp->FillTxData(writeTx);
606606
// Remove writeOp from cache.
607607
ForgetTx(writeOp->GetTxId());
608608

ydb/core/tx/datashard/datashard_ut_read_iterator.cpp

+6-3
Original file line numberDiff line numberDiff line change
@@ -3218,10 +3218,11 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
32183218
helper.TestReadOneKey(tableName, {1, 1, 1}, 101);
32193219
}
32203220

3221-
Y_UNIT_TEST_TWIN(TryCommitLocksPrepared, BreakLocks) {
3221+
Y_UNIT_TEST_QUAD(TryCommitLocksPrepared, Volatile, BreakLocks) {
32223222
TTestHelper helper;
32233223

32243224
auto runtime = helper.Server->GetRuntime();
3225+
runtime->SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_TRACE);
32253226

32263227
const ui64 lockTxId = 1011121314;
32273228
const TString tableName1 = "table-1";
@@ -3266,7 +3267,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
32663267

32673268
Cerr << "===== Commit locks on table 1" << Endl;
32683269
{
3269-
auto writeRequest = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(++helper.TxId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
3270+
auto writeRequest = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(++helper.TxId,
3271+
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
32703272

32713273
NKikimrDataEvents::TKqpLocks& kqpLocks = *writeRequest->Record.MutableLocks();
32723274
kqpLocks.MutableLocks()->CopyFrom(readLocks);
@@ -3284,7 +3286,8 @@ Y_UNIT_TEST_SUITE(DataShardReadIterator) {
32843286

32853287
Cerr << "===== Write and commit locks on table 2" << Endl;
32863288
{
3287-
auto writeRequest = helper.MakeWriteRequest(tableName2, helper.TxId, {1, 1, 1, 1001}, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
3289+
auto writeRequest = helper.MakeWriteRequest(tableName2, helper.TxId, {1, 1, 1, 1001},
3290+
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
32883291

32893292
NKikimrDataEvents::TKqpLocks& kqpLocks = *writeRequest->Record.MutableLocks();
32903293
kqpLocks.AddSendingShards(tabletId1);

ydb/core/tx/datashard/datashard_ut_write.cpp

+16-15
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,10 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
5353
}
5454
}
5555

56-
Y_UNIT_TEST_TWIN(UpsertPrepared, EvWrite) {
56+
Y_UNIT_TEST_QUAD(UpsertPrepared, EvWrite, Volatile) {
5757
auto [runtime, server, sender] = TestCreateServer();
5858

59-
// Disable volatile transactions, since EvWrite has not yet supported them.
60-
runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(false);
59+
runtime.GetAppData().FeatureFlags.SetEnableDataShardVolatileTransactions(Volatile);
6160

6261
auto opts = TShardedTableOptions();
6362
auto [shards1, tableId1] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
@@ -160,11 +159,12 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
160159
}
161160
}
162161

163-
Y_UNIT_TEST(WritePrepared) {
162+
Y_UNIT_TEST_TWIN(WritePrepared, Volatile) {
164163
auto [runtime, server, sender] = TestCreateServer();
165164

166165
TShardedTableOptions opts;
167-
const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", "table-1", opts);
166+
const TString tableName = "table-1";
167+
const auto [shards, tableId] = CreateShardedTable(server, sender, "/Root", tableName, opts);
168168
const ui64 shard = shards[0];
169169
const ui64 coordinator = ChangeStateStorage(Coordinator, server->GetSettings().Domain);
170170
const ui32 rowCount = 3;
@@ -174,9 +174,9 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
174174

175175
Cout << "========= Send prepare =========\n";
176176
{
177-
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
177+
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId,
178+
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
178179

179-
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED);
180180
UNIT_ASSERT_GT(writeResult.GetMinStep(), 0);
181181
UNIT_ASSERT_GT(writeResult.GetMaxStep(), writeResult.GetMinStep());
182182
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
@@ -198,27 +198,25 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
198198
{
199199
auto writeResult = WaitForWriteCompleted(runtime, sender);
200200

201-
UNIT_ASSERT_VALUES_EQUAL_C(writeResult.GetStatus(), NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED, "Status: " << writeResult.GetStatus() << " Issues: " << writeResult.GetIssues());
202201
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrigin(), shard);
203202
UNIT_ASSERT_GE(writeResult.GetStep(), minStep);
204203
UNIT_ASSERT_LE(writeResult.GetStep(), maxStep);
205204
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetOrderId(), txId);
206205
UNIT_ASSERT_VALUES_EQUAL(writeResult.GetTxId(), txId);
207206

208207
const auto& tableAccessStats = writeResult.GetTxStats().GetTableAccessStats(0);
209-
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/table-1");
208+
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetTableInfo().GetName(), "/Root/" + tableName);
210209
UNIT_ASSERT_VALUES_EQUAL(tableAccessStats.GetUpdateRow().GetCount(), rowCount);
211210
}
212211

213212
Cout << "========= Read table =========\n";
214213
{
215-
auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/table-1")).All();
214+
auto tableState = TReadTableState(server, MakeReadTableSettings("/Root/" + tableName)).All();
216215
UNIT_ASSERT_VALUES_EQUAL(tableState, expectedTableState);
217216
}
218-
219217
}
220218

221-
Y_UNIT_TEST(WritePreparedManyTables) {
219+
Y_UNIT_TEST_TWIN(WritePreparedManyTables, Volatile) {
222220
auto [runtime, server, sender] = TestCreateServer();
223221

224222
TShardedTableOptions opts;
@@ -236,7 +234,8 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
236234

237235
Cerr << "===== Write prepared to table 1" << Endl;
238236
{
239-
const auto writeResult = Write(runtime, sender, tabletId1, tableId1, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
237+
const auto writeResult = Write(runtime, sender, tabletId1, tableId1, opts.Columns_, rowCount, txId,
238+
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
240239

241240
minStep1 = writeResult.GetMinStep();
242241
maxStep1 = writeResult.GetMaxStep();
@@ -282,7 +281,7 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
282281
}
283282

284283

285-
Y_UNIT_TEST(WritePreparedNoTxCache) {
284+
Y_UNIT_TEST_TWIN(WritePreparedNoTxCache, Volatile) {
286285
auto [runtime, server, sender] = TestCreateServer();
287286

288287
TShardedTableOptions opts;
@@ -296,7 +295,9 @@ Y_UNIT_TEST_SUITE(DataShardWrite) {
296295

297296
Cout << "========= Send prepare =========\n";
298297
{
299-
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId, NKikimrDataEvents::TEvWrite::MODE_PREPARE);
298+
const auto writeResult = Write(runtime, sender, shard, tableId, opts.Columns_, rowCount, txId,
299+
Volatile ? NKikimrDataEvents::TEvWrite::MODE_VOLATILE_PREPARE : NKikimrDataEvents::TEvWrite::MODE_PREPARE);
300+
300301
minStep = writeResult.GetMinStep();
301302
maxStep = writeResult.GetMaxStep();
302303
}

ydb/core/tx/datashard/datashard_write_operation.cpp

+13-12
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@
3535
namespace NKikimr {
3636
namespace NDataShard {
3737

38-
TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const TRowVersion& readVersion, const TRowVersion& writeVersion, const NEvents::TDataEvents::TEvWrite& ev)
39-
: UserDb(*self, txc.DB, globalTxId, readVersion, writeVersion, EngineHostCounters, TAppData::TimeProvider->Now())
38+
TValidatedWriteTx::TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev)
39+
: UserDb(*self, txc.DB, globalTxId, TRowVersion::Min(), TRowVersion::Max(), EngineHostCounters, TAppData::TimeProvider->Now())
4040
, KeyValidator(*self, txc.DB)
4141
, TabletId(self->TabletID())
4242
, ReceivedAt(receivedAt)
@@ -410,8 +410,7 @@ TValidatedWriteTx::TPtr TWriteOperation::BuildWriteTx(TDataShard* self, TTransac
410410
{
411411
if (!WriteTx) {
412412
Y_ABORT_UNLESS(WriteRequest);
413-
auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);
414-
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetGlobalTxId(), GetReceivedAt(), readVersion, writeVersion, *WriteRequest);
413+
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetGlobalTxId(), GetReceivedAt(), *WriteRequest);
415414
}
416415
return WriteTx;
417416
}
@@ -515,9 +514,8 @@ ERestoreDataStatus TWriteOperation::RestoreTxData(TDataShard* self, TTransaction
515514
LocksCache().Locks[lock.LockId] = lock;
516515

517516
bool extractKeys = WriteTx->IsTxInfoLoaded();
518-
auto [readVersion, writeVersion] = self->GetReadWriteVersions(this);
519517

520-
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetTxId(), GetReceivedAt(), readVersion, writeVersion, *WriteRequest);
518+
WriteTx = std::make_shared<TValidatedWriteTx>(self, txc, GetTxId(), GetReceivedAt(), *WriteRequest);
521519
if (WriteTx->Ready() && extractKeys) {
522520
WriteTx->ExtractKeys(true);
523521
}
@@ -547,22 +545,25 @@ void TWriteOperation::BuildExecutionPlan(bool loaded)
547545
plan.push_back(EExecutionUnitKind::ExecuteWrite);
548546
plan.push_back(EExecutionUnitKind::FinishProposeWrite);
549547
plan.push_back(EExecutionUnitKind::CompletedOperations);
550-
}
551-
/*
552-
else if (HasVolatilePrepareFlag()) {
548+
} else if (HasVolatilePrepareFlag()) {
553549
Y_ABORT_UNLESS(!loaded);
550+
plan.push_back(EExecutionUnitKind::CheckWrite);
554551
plan.push_back(EExecutionUnitKind::StoreWrite); // note: stores in memory
555552
plan.push_back(EExecutionUnitKind::FinishProposeWrite);
556553
Y_ABORT_UNLESS(!GetStep());
557554
plan.push_back(EExecutionUnitKind::WaitForPlan);
558555
plan.push_back(EExecutionUnitKind::PlanQueue);
559-
plan.push_back(EExecutionUnitKind::LoadTxDetails); // note: reloads from memory
556+
plan.push_back(EExecutionUnitKind::LoadWriteDetails); // note: reloads from memory
560557
plan.push_back(EExecutionUnitKind::BuildAndWaitDependencies);
558+
// Note: execute will also prepare and send readsets
561559
plan.push_back(EExecutionUnitKind::ExecuteWrite);
560+
// Note: it is important that plan here is the same as regular
561+
// distributed tx, since normal tx may decide to commit in a
562+
// volatile manner with dependencies, to avoid waiting for
563+
// locked keys to resolve.
562564
plan.push_back(EExecutionUnitKind::CompleteWrite);
563565
plan.push_back(EExecutionUnitKind::CompletedOperations);
564-
*/
565-
else {
566+
} else {
566567
if (!loaded) {
567568
plan.push_back(EExecutionUnitKind::CheckWrite);
568569
plan.push_back(EExecutionUnitKind::StoreWrite);

ydb/core/tx/datashard/datashard_write_operation.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class TValidatedWriteTx: TNonCopyable, public TValidatedTx {
1919
public:
2020
using TPtr = std::shared_ptr<TValidatedWriteTx>;
2121

22-
TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const TRowVersion& readVersion, const TRowVersion& writeVersion, const NEvents::TDataEvents::TEvWrite& ev);
22+
TValidatedWriteTx(TDataShard* self, TTransactionContext& txc, ui64 globalTxId, TInstant receivedAt, const NEvents::TDataEvents::TEvWrite& ev);
2323
~TValidatedWriteTx();
2424

2525
EType GetType() const override {

0 commit comments

Comments
 (0)