Skip to content

Commit 6a15cce

Browse files
committed
CheckWriteUnit & FinishProposeWriteUnit
1 parent 559d708 commit 6a15cce

16 files changed

+367
-268
lines changed

ydb/core/tx/data_events/events.h

+10
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,16 @@ struct TDataEvents {
106106
return result;
107107
}
108108

109+
TString GetError() const {
110+
return TStringBuilder() << "Status: " << Record.GetStatus() << " Issues: " << Record.GetIssues();
111+
}
112+
113+
NKikimrDataEvents::TEvWriteResult::EStatus GetStatus() const { return Record.GetStatus(); }
114+
115+
bool IsPrepared() const { return GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED; }
116+
bool IsComplete() const { return GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED; }
117+
bool IsError() const { return !IsPrepared() && !IsComplete(); }
118+
109119
void SetOrbit(NLWTrace::TOrbit&& orbit) { Orbit = std::move(orbit); }
110120
NLWTrace::TOrbit& GetOrbit() { return Orbit; }
111121
NLWTrace::TOrbit&& MoveOrbit() { return std::move(Orbit); }

ydb/core/tx/datashard/check_write_unit.cpp

+27-193
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#include "datashard_impl.h"
22
#include "datashard_pipeline.h"
3-
#include "execution_unit_ctors.h"
43

4+
#include "ydb/core/tx/datashard/datashard_write_operation.h"
55
#include <ydb/core/tablet/tablet_exception.h>
66

77
namespace NKikimr {
@@ -24,7 +24,7 @@ class TCheckWriteUnit: public TExecutionUnit {
2424

2525
TCheckWriteUnit::TCheckWriteUnit(TDataShard &dataShard,
2626
TPipeline &pipeline)
27-
: TExecutionUnit(EExecutionUnitKind::CheckDataTx, false, dataShard, pipeline)
27+
: TExecutionUnit(EExecutionUnitKind::CheckWrite, false, dataShard, pipeline)
2828
{
2929
}
3030

@@ -45,126 +45,37 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
4545
Y_ABORT_UNLESS(!op->IsAborted());
4646

4747
if (CheckRejectDataTx(op, ctx)) {
48-
op->Abort(EExecutionUnitKind::FinishPropose);
48+
op->Abort(EExecutionUnitKind::FinishProposeWrite);
4949

5050
return EExecutionStatus::Executed;
5151
}
5252

53-
//TODO: remove this return
54-
return EExecutionStatus::Executed;
55-
56-
TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
57-
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
58-
auto dataTx = tx->GetDataTx();
59-
Y_ABORT_UNLESS(dataTx);
60-
Y_ABORT_UNLESS(dataTx->Ready() || dataTx->RequirePrepare());
61-
62-
if (dataTx->Ready()) {
63-
DataShard.IncCounter(COUNTER_MINIKQL_PROGRAM_SIZE, dataTx->ProgramSize());
64-
} else {
65-
Y_ABORT_UNLESS(dataTx->RequirePrepare());
66-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
67-
"Require prepare Tx " << op->GetTxId() << " at " << DataShard.TabletID()
68-
<< ": " << dataTx->GetErrors());
69-
}
53+
TWriteOperation* writeOp = dynamic_cast<TWriteOperation*>(op.Get());
54+
Y_VERIFY_S(writeOp, "cannot cast operation of kind " << op->GetKind());
55+
auto writeTx = writeOp->WriteTx();
56+
Y_ABORT_UNLESS(writeTx);
57+
Y_ABORT_UNLESS(writeTx->Ready() || writeTx->RequirePrepare());
7058

7159
// Check if we are out of space and tx wants to update user
7260
// or system table.
7361
if (DataShard.IsAnyChannelYellowStop()
74-
&& (dataTx->HasWrites() || !op->IsImmediate())) {
62+
&& (writeTx->HasWrites() || !op->IsImmediate())) {
7563
TString err = TStringBuilder()
7664
<< "Cannot perform transaction: out of disk space at tablet "
7765
<< DataShard.TabletID() << " txId " << op->GetTxId();
7866

7967
DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
8068

81-
BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
82-
op->Abort(EExecutionUnitKind::FinishPropose);
69+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
70+
op->Abort(EExecutionUnitKind::FinishProposeWrite);
8371

84-
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
72+
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
8573

8674
return EExecutionStatus::Executed;
8775
}
8876

89-
if (tx->IsMvccSnapshotRead()) {
90-
auto snapshot = tx->GetMvccSnapshot();
91-
if (DataShard.IsFollower()) {
92-
TString err = TStringBuilder()
93-
<< "Operation " << *op << " cannot read from snapshot " << snapshot
94-
<< " using data tx on a follower " << DataShard.TabletID();
95-
96-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
97-
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
98-
op->Abort(EExecutionUnitKind::FinishPropose);
99-
100-
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
101-
102-
return EExecutionStatus::Executed;
103-
} else if (!DataShard.IsMvccEnabled()) {
104-
TString err = TStringBuilder()
105-
<< "Operation " << *op << " reads from snapshot " << snapshot
106-
<< " with MVCC feature disabled at " << DataShard.TabletID();
107-
108-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
109-
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
110-
op->Abort(EExecutionUnitKind::FinishPropose);
111-
112-
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
113-
114-
return EExecutionStatus::Executed;
115-
} else if (snapshot < DataShard.GetSnapshotManager().GetLowWatermark()) {
116-
TString err = TStringBuilder()
117-
<< "Operation " << *op << " reads from stale snapshot " << snapshot
118-
<< " at " << DataShard.TabletID();
119-
120-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
121-
->AddError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, err);
122-
op->Abort(EExecutionUnitKind::FinishPropose);
123-
124-
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
125-
126-
;
127-
}
128-
}
129-
130-
TEngineBay::TSizes txReads;
131-
132-
if (op->IsDataTx()) {
133-
bool hasTotalKeysSizeLimit = !!dataTx->PerShardKeysSizeLimitBytes();
134-
txReads = dataTx->CalcReadSizes(hasTotalKeysSizeLimit);
135-
136-
if (txReads.ReadSize > DataShard.GetTxReadSizeLimit()) {
137-
TString err = TStringBuilder()
138-
<< "Transaction read size " << txReads.ReadSize << " exceeds limit "
139-
<< DataShard.GetTxReadSizeLimit() << " at tablet " << DataShard.TabletID()
140-
<< " txId " << op->GetTxId();
141-
142-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
143-
->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
144-
op->Abort(EExecutionUnitKind::FinishPropose);
145-
146-
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
147-
148-
return EExecutionStatus::Executed;
149-
}
150-
151-
if (hasTotalKeysSizeLimit
152-
&& txReads.TotalKeysSize > *dataTx->PerShardKeysSizeLimitBytes()) {
153-
TString err = TStringBuilder()
154-
<< "Transaction total keys size " << txReads.TotalKeysSize
155-
<< " exceeds limit " << *dataTx->PerShardKeysSizeLimitBytes()
156-
<< " at tablet " << DataShard.TabletID() << " txId " << op->GetTxId();
157-
158-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
159-
->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
160-
op->Abort(EExecutionUnitKind::FinishPropose);
161-
162-
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
163-
164-
return EExecutionStatus::Executed;
165-
}
166-
167-
for (const auto& key : dataTx->TxInfo().Keys) {
77+
{
78+
for (const auto& key : writeTx->TxInfo().Keys) {
16879
if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) {
16980
ui64 keySize = 0;
17081
for (const auto& cell : key.Key->Range.From) {
@@ -176,9 +87,8 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
17687
<< " bytes which exceeds limit " << NLimits::MaxWriteKeySize
17788
<< " bytes at " << DataShard.TabletID();
17889

179-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
180-
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
181-
op->Abort(EExecutionUnitKind::FinishPropose);
90+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err);
91+
op->Abort(EExecutionUnitKind::FinishProposeWrite);
18292

18393
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
18494

@@ -193,8 +103,8 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
193103
<< "Transaction write column value of " << col.ImmediateUpdateSize
194104
<< " bytes is larger than the allowed threshold";
195105

196-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR)->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
197-
op->Abort(EExecutionUnitKind::FinishPropose);
106+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err);
107+
op->Abort(EExecutionUnitKind::FinishProposeWrite);
198108

199109
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);
200110

@@ -216,10 +126,10 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
216126

217127
DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);
218128

219-
BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
220-
op->Abort(EExecutionUnitKind::FinishPropose);
129+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
130+
op->Abort(EExecutionUnitKind::FinishProposeWrite);
221131

222-
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
132+
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
223133

224134
return EExecutionStatus::Executed;
225135
}
@@ -229,96 +139,20 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
229139
}
230140
}
231141

232-
if (op->IsReadTable()) {
233-
const auto& record = dataTx->GetReadTableTransaction();
234-
const auto& userTables = DataShard.GetUserTables();
235-
236-
TMaybe<TString> schemaChangedError;
237-
if (auto it = userTables.find(record.GetTableId().GetTableId()); it != userTables.end()) {
238-
const auto& tableInfo = *it->second;
239-
for (const auto& columnRecord : record.GetColumns()) {
240-
if (auto* columnInfo = tableInfo.Columns.FindPtr(columnRecord.GetId())) {
241-
// TODO: column types don't change when bound by id, but we may want to check anyway
242-
} else {
243-
schemaChangedError = TStringBuilder() << "ReadTable cannot find column "
244-
<< columnRecord.GetName() << " (" << columnRecord.GetId() << ")";
245-
break;
246-
}
247-
}
248-
// TODO: validate key ranges?
249-
} else {
250-
schemaChangedError = TStringBuilder() << "ReadTable cannot find table "
251-
<< record.GetTableId().GetOwnerId() << ":" << record.GetTableId().GetTableId();
252-
}
253-
254-
if (schemaChangedError) {
255-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR)
256-
->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, *schemaChangedError);
257-
op->Abort(EExecutionUnitKind::FinishPropose);
258-
return EExecutionStatus::Executed;
259-
}
260-
261-
if (record.HasSnapshotStep() && record.HasSnapshotTxId()) {
262-
if (!op->IsImmediate()) {
263-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
264-
NKikimrTxDataShard::TError::BAD_ARGUMENT,
265-
"ReadTable from snapshot must be an immediate transaction");
266-
op->Abort(EExecutionUnitKind::FinishPropose);
267-
return EExecutionStatus::Executed;
268-
}
269-
270-
const TSnapshotKey key(
271-
record.GetTableId().GetOwnerId(),
272-
record.GetTableId().GetTableId(),
273-
record.GetSnapshotStep(),
274-
record.GetSnapshotTxId());
275-
276-
if (!DataShard.GetSnapshotManager().AcquireReference(key)) {
277-
// TODO: try upgrading to mvcc snapshot when available
278-
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
279-
NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
280-
TStringBuilder()
281-
<< "Shard " << DataShard.TabletID()
282-
<< " has no snapshot " << key);
283-
op->Abort(EExecutionUnitKind::FinishPropose);
284-
return EExecutionStatus::Executed;
285-
}
286-
287-
op->SetAcquiredSnapshotKey(key);
288-
op->SetUsingSnapshotFlag();
289-
}
290-
}
291-
292142
if (!op->IsImmediate()) {
293143
if (!Pipeline.AssignPlanInterval(op)) {
294-
TString err = TStringBuilder()
295-
<< "Can't propose tx " << op->GetTxId() << " at blocked shard "
296-
<< DataShard.TabletID();
297-
BuildResult(op)->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err);
298-
op->Abort(EExecutionUnitKind::FinishPropose);
144+
TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID();
145+
146+
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
147+
op->Abort(EExecutionUnitKind::FinishProposeWrite);
299148

300149
LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);
301150

302151
return EExecutionStatus::Executed;
303152
}
304153

305-
auto &res = BuildResult(op);
306-
res->SetPrepared(op->GetMinStep(), op->GetMaxStep(), op->GetReceivedAt());
307-
308-
if (op->IsDataTx()) {
309-
res->Record.SetReadSize(txReads.ReadSize);
310-
res->Record.SetReplySize(txReads.ReplySize);
311-
312-
for (const auto& rs : txReads.OutReadSetSize) {
313-
auto entry = res->Record.AddOutgoingReadSetInfo();
314-
entry->SetShardId(rs.first);
315-
entry->SetSize(rs.second);
316-
}
317-
}
318-
319-
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
320-
"Prepared " << op->GetKind() << " transaction txId " << op->GetTxId()
321-
<< " at tablet " << DataShard.TabletID());
154+
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(writeOp->WriteTx()->TabletId(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}}));
155+
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID());
322156
}
323157

324158
return EExecutionStatus::Executed;

ydb/core/tx/datashard/datashard.h

+4-6
Original file line numberDiff line numberDiff line change
@@ -641,12 +641,11 @@ struct TEvDataShard {
641641
TString GetError() const {
642642
if (Record.ErrorSize() > 0) {
643643
TString result;
644+
TStringOutput out(result);
644645
for (ui32 i = 0; i < Record.ErrorSize(); ++i) {
645-
if (Record.GetError(i).HasReason()) {
646-
result += Record.GetError(i).GetReason() + "|";
647-
} else {
648-
result += "no reason|";
649-
}
646+
out << Record.GetError(i).GetKind() << " ("
647+
<< (Record.GetError(i).HasReason() ? Record.GetError(i).GetReason() : "no reason")
648+
<< ") |";
650649
}
651650
return result;
652651
} else {
@@ -665,7 +664,6 @@ struct TEvDataShard {
665664
error->SetKey(keyBuffer.data(), keyBuffer.size());
666665
}
667666
}
668-
669667
private:
670668
bool ForceOnline = false;
671669
bool ForceDirty = false;

ydb/core/tx/datashard/datashard_impl.h

+1
Original file line numberDiff line numberDiff line change
@@ -2018,6 +2018,7 @@ class TDataShard
20182018

20192019
enum ELogThrottlerType {
20202020
CheckDataTxUnit_Execute = 0,
2021+
CheckWriteUnit_Execute = 0,
20212022
TxProposeTransactionBase_Execute,
20222023
FinishProposeUnit_CompleteRequest,
20232024
FinishProposeUnit_UpdateCounters,

0 commit comments

Comments
 (0)