Skip to content

Commit 74ed68a

Browse files
authored
Merge f0f60eb into fac9434
2 parents fac9434 + f0f60eb commit 74ed68a

11 files changed

+563
-48
lines changed

ydb/core/tx/datashard/datashard__engine_host.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,11 +370,11 @@ class TDataShardEngineHost final
370370
TSmallVec<NTable::TUpdateOp> ops;
371371
ConvertTableValues(Scheme, tableInfo, commands, ops, nullptr);
372372

373-
UserDb.UpdateRow(tableId, key, ops);
373+
UserDb.UpsertRow(tableId, key, ops);
374374
}
375375

376-
void UpdateRow(const TTableId& tableId, const TArrayRef<const TRawTypeValue> key, const TArrayRef<const NIceDb::TUpdateOp> ops) override {
377-
UserDb.UpdateRow(tableId, key, ops);
376+
void UpsertRow(const TTableId& tableId, const TArrayRef<const TRawTypeValue> key, const TArrayRef<const NIceDb::TUpdateOp> ops) override {
377+
UserDb.UpsertRow(tableId, key, ops);
378378
}
379379

380380
void ReplaceRow(const TTableId& tableId, const TArrayRef<const TRawTypeValue> key, const TArrayRef<const NIceDb::TUpdateOp> ops) override {
@@ -385,6 +385,10 @@ class TDataShardEngineHost final
385385
UserDb.InsertRow(tableId, key, ops);
386386
}
387387

388+
void UpdateRow(const TTableId& tableId, const TArrayRef<const TRawTypeValue> key, const TArrayRef<const NIceDb::TUpdateOp> ops) override {
389+
UserDb.UpdateRow(tableId, key, ops);
390+
}
391+
388392
void EraseRow(const TTableId& tableId, const TArrayRef<const TCell>& row) override {
389393
if (TSysTables::IsSystemTable(tableId)) {
390394
DataShardSysTable(tableId).EraseRow(row);

ydb/core/tx/datashard/datashard_pipeline.cpp

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2285,11 +2285,15 @@ void TPipeline::AddCommittingOp(const TOperation::TPtr& op) {
22852285
if (!Self->IsMvccEnabled() || op->IsReadOnly())
22862286
return;
22872287

2288+
Y_VERIFY_S(!op->GetCommittingOpsVersion(),
2289+
"Trying to AddCommittingOp " << *op << " more than once");
2290+
22882291
TRowVersion version = Self->GetReadWriteVersions(op.Get()).WriteVersion;
22892292
if (op->IsImmediate())
22902293
CommittingOps.Add(op->GetTxId(), version);
22912294
else
22922295
CommittingOps.Add(version);
2296+
op->SetCommittingOpsVersion(version);
22932297
}
22942298

22952299
void TPipeline::RemoveCommittingOp(const TRowVersion& version) {
@@ -2299,13 +2303,13 @@ void TPipeline::RemoveCommittingOp(const TRowVersion& version) {
22992303
}
23002304

23012305
void TPipeline::RemoveCommittingOp(const TOperation::TPtr& op) {
2302-
if (!Self->IsMvccEnabled() || op->IsReadOnly())
2303-
return;
2304-
2305-
if (op->IsImmediate())
2306-
CommittingOps.Remove(op->GetTxId());
2307-
else
2308-
CommittingOps.Remove(TRowVersion(op->GetStep(), op->GetTxId()));
2306+
if (const auto& version = op->GetCommittingOpsVersion()) {
2307+
if (op->IsImmediate())
2308+
CommittingOps.Remove(op->GetTxId(), *version);
2309+
else
2310+
CommittingOps.Remove(*version);
2311+
op->ResetCommittingOpsVersion();
2312+
}
23092313
}
23102314

23112315
bool TPipeline::WaitCompletion(const TOperation::TPtr& op) const {

ydb/core/tx/datashard/datashard_pipeline.h

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -424,11 +424,13 @@ class TPipeline : TNonCopyable {
424424
ui64 Step;
425425
ui64 TxId;
426426
mutable ui32 Counter;
427+
mutable ui32 TxCounter;
427428

428429
TItem(const TRowVersion& from)
429430
: Step(from.Step)
430431
, TxId(from.TxId)
431432
, Counter(1u)
433+
, TxCounter(0u)
432434
{}
433435

434436
friend constexpr bool operator<(const TItem& a, const TItem& b) {
@@ -442,6 +444,7 @@ class TPipeline : TNonCopyable {
442444

443445
using TItemsSet = TSet<TItem>;
444446
using TTxIdMap = THashMap<ui64, TItemsSet::iterator>;
447+
445448
public:
446449
inline void Add(ui64 txId, TRowVersion version) {
447450
auto res = ItemsSet.emplace(version);
@@ -450,6 +453,7 @@ class TPipeline : TNonCopyable {
450453
auto res2 = TxIdMap.emplace(txId, res.first);
451454
Y_VERIFY_S(res2.second, "Unexpected duplicate immediate tx " << txId
452455
<< " committing at " << version);
456+
res.first->TxCounter += 1;
453457
}
454458

455459
inline void Add(TRowVersion version) {
@@ -458,17 +462,29 @@ class TPipeline : TNonCopyable {
458462
res.first->Counter += 1;
459463
}
460464

461-
inline void Remove(ui64 txId) {
462-
if (auto it = TxIdMap.find(txId); it != TxIdMap.end()) {
463-
if (--it->second->Counter == 0)
464-
ItemsSet.erase(it->second);
465-
TxIdMap.erase(it);
466-
}
465+
inline void Remove(ui64 txId, TRowVersion version) {
466+
auto it = TxIdMap.find(txId);
467+
Y_VERIFY_S(it != TxIdMap.end(), "Removing immediate tx " << txId << " " << version
468+
<< " does not match a previous Add");
469+
Y_VERIFY_S(TRowVersion(it->second->Step, it->second->TxId) == version, "Removing immediate tx " << txId << " " << version
470+
<< " does not match a previous Add " << TRowVersion(it->second->Step, it->second->TxId));
471+
Y_VERIFY_S(it->second->TxCounter > 0, "Removing immediate tx " << txId << " " << version
472+
<< " with a mismatching TxCounter");
473+
--it->second->TxCounter;
474+
if (--it->second->Counter == 0)
475+
ItemsSet.erase(it->second);
476+
TxIdMap.erase(it);
467477
}
468478

469479
inline void Remove(TRowVersion version) {
470-
if (auto it = ItemsSet.find(version); it != ItemsSet.end() && --it->Counter == 0)
480+
auto it = ItemsSet.find(version);
481+
Y_VERIFY_S(it != ItemsSet.end(), "Removing version " << version
482+
<< " does not match a previous Add");
483+
if (--it->Counter == 0) {
484+
Y_VERIFY_S(it->TxCounter == 0, "Removing version " << version
485+
<< " while TxCounter has active references, possible Add/Remove mismatch");
471486
ItemsSet.erase(it);
487+
}
472488
}
473489

474490
inline bool HasOpsBelow(TRowVersion upperBound) const {

ydb/core/tx/datashard/datashard_user_db.cpp

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ ui64 CalculateValueBytes(const TArrayRef<const NIceDb::TUpdateOp> ops) {
6262
return bytes;
6363
};
6464

65-
void TDataShardUserDb::UpdateRow(
65+
void TDataShardUserDb::UpsertRow(
6666
const TTableId& tableId,
6767
const TArrayRef<const TRawTypeValue> key,
6868
const TArrayRef<const NIceDb::TUpdateOp> ops)
@@ -108,11 +108,11 @@ void TDataShardUserDb::UpdateRow(
108108
if (specUpdates.ColIdUpdateNo != Max<ui32>()) {
109109
addExtendedOp(specUpdates.ColIdUpdateNo, specUpdates.UpdateNo);
110110
}
111-
UpdateRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, extendedOps);
111+
UpsertRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, extendedOps);
112112

113113
IncreaseUpdateCounters(key, extendedOps);
114114
} else {
115-
UpdateRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, ops);
115+
UpsertRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, ops);
116116

117117
IncreaseUpdateCounters(key, ops);
118118
}
@@ -126,7 +126,7 @@ void TDataShardUserDb::ReplaceRow(
126126
auto localTableId = Self.GetLocalTableId(tableId);
127127
Y_ABORT_UNLESS(localTableId != 0, "Unexpected ReplaceRow for an unknown table");
128128

129-
UpdateRowInt(NTable::ERowOp::Reset, tableId, localTableId, key, ops);
129+
UpsertRowInt(NTable::ERowOp::Reset, tableId, localTableId, key, ops);
130130

131131
IncreaseUpdateCounters(key, ops);
132132
}
@@ -139,9 +139,26 @@ void TDataShardUserDb::InsertRow(
139139
auto localTableId = Self.GetLocalTableId(tableId);
140140
Y_ABORT_UNLESS(localTableId != 0, "Unexpected InsertRow for an unknown table");
141141

142-
EnsureMissingRow(tableId, key);
142+
if (RowExists(tableId, key))
143+
throw TUniqueConstrainException();
144+
145+
UpsertRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, ops);
146+
147+
IncreaseUpdateCounters(key, ops);
148+
}
149+
150+
void TDataShardUserDb::UpdateRow(
151+
const TTableId& tableId,
152+
const TArrayRef<const TRawTypeValue> key,
153+
const TArrayRef<const NIceDb::TUpdateOp> ops)
154+
{
155+
auto localTableId = Self.GetLocalTableId(tableId);
156+
Y_ABORT_UNLESS(localTableId != 0, "Unexpected UpdateRow for an unknown table");
157+
158+
if (!RowExists(tableId, key))
159+
return;
143160

144-
UpdateRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, ops);
161+
UpsertRowInt(NTable::ERowOp::Upsert, tableId, localTableId, key, ops);
145162

146163
IncreaseUpdateCounters(key, ops);
147164
}
@@ -153,7 +170,7 @@ void TDataShardUserDb::EraseRow(
153170
auto localTableId = Self.GetLocalTableId(tableId);
154171
Y_ABORT_UNLESS(localTableId != 0, "Unexpected UpdateRow for an unknown table");
155172

156-
UpdateRowInt(NTable::ERowOp::Erase, tableId, localTableId, key, {});
173+
UpsertRowInt(NTable::ERowOp::Erase, tableId, localTableId, key, {});
157174

158175
ui64 keyBytes = CalculateKeyBytes(key);
159176

@@ -172,7 +189,7 @@ void TDataShardUserDb::IncreaseUpdateCounters(
172189
Counters.UpdateRowBytes += keyBytes + valueBytes;
173190
}
174191

175-
void TDataShardUserDb::UpdateRowInt(
192+
void TDataShardUserDb::UpsertRowInt(
176193
NTable::ERowOp rowOp,
177194
const TTableId& tableId,
178195
ui64 localTableId,
@@ -216,7 +233,7 @@ void TDataShardUserDb::UpdateRowInt(
216233
Self.GetKeyAccessSampler()->AddSample(tableId, keyCells);
217234
}
218235

219-
void TDataShardUserDb::EnsureMissingRow (
236+
bool TDataShardUserDb::RowExists (
220237
const TTableId& tableId,
221238
const TArrayRef<const TRawTypeValue> key)
222239
{
@@ -227,12 +244,10 @@ void TDataShardUserDb::EnsureMissingRow (
227244
throw TNotReadyTabletException();
228245
}
229246
case NTable::EReady::Data: {
230-
if (rowState == NTable::ERowOp::Upsert)
231-
throw TUniqueConstrainException();
232-
break;
247+
return true;
233248
}
234249
case NTable::EReady::Gone: {
235-
break;
250+
return false;
236251
}
237252
}
238253
}

ydb/core/tx/datashard/datashard_user_db.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ class IDataShardUserDb {
3737
NTable::TRowState& row,
3838
const TMaybe<TRowVersion>& readVersion = {}) = 0;
3939

40-
virtual void UpdateRow(
40+
virtual void UpsertRow(
4141
const TTableId& tableId,
4242
const TArrayRef<const TRawTypeValue> key,
4343
const TArrayRef<const NIceDb::TUpdateOp> ops) = 0;
@@ -52,6 +52,11 @@ class IDataShardUserDb {
5252
const TArrayRef<const TRawTypeValue> key,
5353
const TArrayRef<const NIceDb::TUpdateOp> ops) = 0;
5454

55+
virtual void UpdateRow(
56+
const TTableId& tableId,
57+
const TArrayRef<const TRawTypeValue> key,
58+
const TArrayRef<const NIceDb::TUpdateOp> ops) = 0;
59+
5560
virtual void EraseRow(
5661
const TTableId& tableId,
5762
const TArrayRef<const TRawTypeValue> key) = 0;
@@ -107,7 +112,7 @@ class TDataShardUserDb final
107112
NTable::TRowState& row,
108113
const TMaybe<TRowVersion>& readVersion = {}) override;
109114

110-
void UpdateRow(
115+
void UpsertRow(
111116
const TTableId& tableId,
112117
const TArrayRef<const TRawTypeValue> key,
113118
const TArrayRef<const NIceDb::TUpdateOp> ops) override;
@@ -122,6 +127,11 @@ class TDataShardUserDb final
122127
const TArrayRef<const TRawTypeValue> key,
123128
const TArrayRef<const NIceDb::TUpdateOp> ops) override;
124129

130+
void UpdateRow(
131+
const TTableId& tableId,
132+
const TArrayRef<const TRawTypeValue> key,
133+
const TArrayRef<const NIceDb::TUpdateOp> ops) override;
134+
125135
void EraseRow(
126136
const TTableId& tableId,
127137
const TArrayRef<const TRawTypeValue> key) override;
@@ -169,8 +179,8 @@ class TDataShardUserDb final
169179
private:
170180
static TSmallVec<TCell> ConvertTableKeys(const TArrayRef<const TRawTypeValue> key);
171181

172-
void UpdateRowInt(NTable::ERowOp rowOp, const TTableId& tableId, ui64 localTableId, const TArrayRef<const TRawTypeValue> key, const TArrayRef<const NIceDb::TUpdateOp> ops);
173-
void EnsureMissingRow(const TTableId& tableId, const TArrayRef<const TRawTypeValue> key);
182+
void UpsertRowInt(NTable::ERowOp rowOp, const TTableId& tableId, ui64 localTableId, const TArrayRef<const TRawTypeValue> key, const TArrayRef<const NIceDb::TUpdateOp> ops);
183+
bool RowExists(const TTableId& tableId, const TArrayRef<const TRawTypeValue> key);
174184

175185
void IncreaseUpdateCounters(const TArrayRef<const TRawTypeValue> key, const TArrayRef<const NIceDb::TUpdateOp> ops);
176186
private:

0 commit comments

Comments
 (0)