Skip to content

CheckWriteUnit & FinishProposeWriteUnit #631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions ydb/core/tx/data_events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,16 @@ struct TDataEvents {
return result;
}

TString GetError() const {
return TStringBuilder() << "Status: " << Record.GetStatus() << " Issues: " << Record.GetIssues();
}

NKikimrDataEvents::TEvWriteResult::EStatus GetStatus() const { return Record.GetStatus(); }

bool IsPrepared() const { return GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED; }
bool IsComplete() const { return GetStatus() == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED; }
bool IsError() const { return !IsPrepared() && !IsComplete(); }

void SetOrbit(NLWTrace::TOrbit&& orbit) { Orbit = std::move(orbit); }
NLWTrace::TOrbit& GetOrbit() { return Orbit; }
NLWTrace::TOrbit&& MoveOrbit() { return std::move(Orbit); }
Expand Down
220 changes: 27 additions & 193 deletions ydb/core/tx/datashard/check_write_unit.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include "datashard_impl.h"
#include "datashard_pipeline.h"
#include "execution_unit_ctors.h"

#include "ydb/core/tx/datashard/datashard_write_operation.h"
#include <ydb/core/tablet/tablet_exception.h>

namespace NKikimr {
Expand All @@ -24,7 +24,7 @@ class TCheckWriteUnit: public TExecutionUnit {

TCheckWriteUnit::TCheckWriteUnit(TDataShard &dataShard,
TPipeline &pipeline)
: TExecutionUnit(EExecutionUnitKind::CheckDataTx, false, dataShard, pipeline)
: TExecutionUnit(EExecutionUnitKind::CheckWrite, false, dataShard, pipeline)
{
}

Expand All @@ -45,126 +45,37 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
Y_ABORT_UNLESS(!op->IsAborted());

if (CheckRejectDataTx(op, ctx)) {
op->Abort(EExecutionUnitKind::FinishPropose);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

return EExecutionStatus::Executed;
}

//TODO: remove this return
return EExecutionStatus::Executed;

TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
auto dataTx = tx->GetDataTx();
Y_ABORT_UNLESS(dataTx);
Y_ABORT_UNLESS(dataTx->Ready() || dataTx->RequirePrepare());

if (dataTx->Ready()) {
DataShard.IncCounter(COUNTER_MINIKQL_PROGRAM_SIZE, dataTx->ProgramSize());
} else {
Y_ABORT_UNLESS(dataTx->RequirePrepare());
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Require prepare Tx " << op->GetTxId() << " at " << DataShard.TabletID()
<< ": " << dataTx->GetErrors());
}
TWriteOperation* writeOp = dynamic_cast<TWriteOperation*>(op.Get());
Y_VERIFY_S(writeOp, "cannot cast operation of kind " << op->GetKind());
auto writeTx = writeOp->WriteTx();
Y_ABORT_UNLESS(writeTx);
Y_ABORT_UNLESS(writeTx->Ready() || writeTx->RequirePrepare());

// Check if we are out of space and tx wants to update user
// or system table.
if (DataShard.IsAnyChannelYellowStop()
&& (dataTx->HasWrites() || !op->IsImmediate())) {
&& (writeTx->HasWrites() || !op->IsImmediate())) {
TString err = TStringBuilder()
<< "Cannot perform transaction: out of disk space at tablet "
<< DataShard.TabletID() << " txId " << op->GetTxId();

DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);

BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
op->Abort(EExecutionUnitKind::FinishPropose);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}

if (tx->IsMvccSnapshotRead()) {
auto snapshot = tx->GetMvccSnapshot();
if (DataShard.IsFollower()) {
TString err = TStringBuilder()
<< "Operation " << *op << " cannot read from snapshot " << snapshot
<< " using data tx on a follower " << DataShard.TabletID();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
} else if (!DataShard.IsMvccEnabled()) {
TString err = TStringBuilder()
<< "Operation " << *op << " reads from snapshot " << snapshot
<< " with MVCC feature disabled at " << DataShard.TabletID();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
} else if (snapshot < DataShard.GetSnapshotManager().GetLowWatermark()) {
TString err = TStringBuilder()
<< "Operation " << *op << " reads from stale snapshot " << snapshot
<< " at " << DataShard.TabletID();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

;
}
}

TEngineBay::TSizes txReads;

if (op->IsDataTx()) {
bool hasTotalKeysSizeLimit = !!dataTx->PerShardKeysSizeLimitBytes();
txReads = dataTx->CalcReadSizes(hasTotalKeysSizeLimit);

if (txReads.ReadSize > DataShard.GetTxReadSizeLimit()) {
TString err = TStringBuilder()
<< "Transaction read size " << txReads.ReadSize << " exceeds limit "
<< DataShard.GetTxReadSizeLimit() << " at tablet " << DataShard.TabletID()
<< " txId " << op->GetTxId();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}

if (hasTotalKeysSizeLimit
&& txReads.TotalKeysSize > *dataTx->PerShardKeysSizeLimitBytes()) {
TString err = TStringBuilder()
<< "Transaction total keys size " << txReads.TotalKeysSize
<< " exceeds limit " << *dataTx->PerShardKeysSizeLimitBytes()
<< " at tablet " << DataShard.TabletID() << " txId " << op->GetTxId();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED, err);
op->Abort(EExecutionUnitKind::FinishPropose);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}

for (const auto& key : dataTx->TxInfo().Keys) {
{
for (const auto& key : writeTx->TxInfo().Keys) {
if (key.IsWrite && DataShard.IsUserTable(key.Key->TableId)) {
ui64 keySize = 0;
for (const auto& cell : key.Key->Range.From) {
Expand All @@ -176,9 +87,8 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
<< " bytes which exceeds limit " << NLimits::MaxWriteKeySize
<< " bytes at " << DataShard.TabletID();

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)
->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
op->Abort(EExecutionUnitKind::FinishPropose);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

Expand All @@ -193,8 +103,8 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
<< "Transaction write column value of " << col.ImmediateUpdateSize
<< " bytes is larger than the allowed threshold";

BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::EXEC_ERROR)->AddError(NKikimrTxDataShard::TError::BAD_ARGUMENT, err);
op->Abort(EExecutionUnitKind::FinishPropose);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, err);

Expand All @@ -216,10 +126,10 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,

DataShard.IncCounter(COUNTER_PREPARE_OUT_OF_SPACE);

BuildResult(op)->AddError(NKikimrTxDataShard::TError::OUT_OF_SPACE, err);
op->Abort(EExecutionUnitKind::FinishPropose);
writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckDataTxUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);
LOG_LOG_S_THROTTLE(DataShard.GetLogThrottler(TDataShard::ELogThrottlerType::CheckWriteUnit_Execute), ctx, NActors::NLog::PRI_ERROR, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}
Expand All @@ -229,96 +139,20 @@ EExecutionStatus TCheckWriteUnit::Execute(TOperation::TPtr op,
}
}

if (op->IsReadTable()) {
const auto& record = dataTx->GetReadTableTransaction();
const auto& userTables = DataShard.GetUserTables();

TMaybe<TString> schemaChangedError;
if (auto it = userTables.find(record.GetTableId().GetTableId()); it != userTables.end()) {
const auto& tableInfo = *it->second;
for (const auto& columnRecord : record.GetColumns()) {
if (auto* columnInfo = tableInfo.Columns.FindPtr(columnRecord.GetId())) {
// TODO: column types don't change when bound by id, but we may want to check anyway
} else {
schemaChangedError = TStringBuilder() << "ReadTable cannot find column "
<< columnRecord.GetName() << " (" << columnRecord.GetId() << ")";
break;
}
}
// TODO: validate key ranges?
} else {
schemaChangedError = TStringBuilder() << "ReadTable cannot find table "
<< record.GetTableId().GetOwnerId() << ":" << record.GetTableId().GetTableId();
}

if (schemaChangedError) {
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::ERROR)
->AddError(NKikimrTxDataShard::TError::SCHEME_CHANGED, *schemaChangedError);
op->Abort(EExecutionUnitKind::FinishPropose);
return EExecutionStatus::Executed;
}

if (record.HasSnapshotStep() && record.HasSnapshotTxId()) {
if (!op->IsImmediate()) {
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
NKikimrTxDataShard::TError::BAD_ARGUMENT,
"ReadTable from snapshot must be an immediate transaction");
op->Abort(EExecutionUnitKind::FinishPropose);
return EExecutionStatus::Executed;
}

const TSnapshotKey key(
record.GetTableId().GetOwnerId(),
record.GetTableId().GetTableId(),
record.GetSnapshotStep(),
record.GetSnapshotTxId());

if (!DataShard.GetSnapshotManager().AcquireReference(key)) {
// TODO: try upgrading to mvcc snapshot when available
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::BAD_REQUEST)->AddError(
NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST,
TStringBuilder()
<< "Shard " << DataShard.TabletID()
<< " has no snapshot " << key);
op->Abort(EExecutionUnitKind::FinishPropose);
return EExecutionStatus::Executed;
}

op->SetAcquiredSnapshotKey(key);
op->SetUsingSnapshotFlag();
}
}

if (!op->IsImmediate()) {
if (!Pipeline.AssignPlanInterval(op)) {
TString err = TStringBuilder()
<< "Can't propose tx " << op->GetTxId() << " at blocked shard "
<< DataShard.TabletID();
BuildResult(op)->AddError(NKikimrTxDataShard::TError::SHARD_IS_BLOCKED, err);
op->Abort(EExecutionUnitKind::FinishPropose);
TString err = TStringBuilder() << "Can't propose tx " << op->GetTxId() << " at blocked shard " << DataShard.TabletID();

writeOp->SetError(NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR, err);
op->Abort(EExecutionUnitKind::FinishProposeWrite);

LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, err);

return EExecutionStatus::Executed;
}

auto &res = BuildResult(op);
res->SetPrepared(op->GetMinStep(), op->GetMaxStep(), op->GetReceivedAt());

if (op->IsDataTx()) {
res->Record.SetReadSize(txReads.ReadSize);
res->Record.SetReplySize(txReads.ReplySize);

for (const auto& rs : txReads.OutReadSetSize) {
auto entry = res->Record.AddOutgoingReadSetInfo();
entry->SetShardId(rs.first);
entry->SetSize(rs.second);
}
}

LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
"Prepared " << op->GetKind() << " transaction txId " << op->GetTxId()
<< " at tablet " << DataShard.TabletID());
writeOp->SetWriteResult(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(writeOp->WriteTx()->TabletId(), op->GetTxId(), {op->GetMinStep(), op->GetMaxStep(), {}}));
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Prepared " << *op << " at " << DataShard.TabletID());
}

return EExecutionStatus::Executed;
Expand Down
10 changes: 4 additions & 6 deletions ydb/core/tx/datashard/datashard.h
Original file line number Diff line number Diff line change
Expand Up @@ -641,12 +641,11 @@ struct TEvDataShard {
TString GetError() const {
if (Record.ErrorSize() > 0) {
TString result;
TStringOutput out(result);
for (ui32 i = 0; i < Record.ErrorSize(); ++i) {
if (Record.GetError(i).HasReason()) {
result += Record.GetError(i).GetReason() + "|";
} else {
result += "no reason|";
}
out << Record.GetError(i).GetKind() << " ("
<< (Record.GetError(i).HasReason() ? Record.GetError(i).GetReason() : "no reason")
<< ") |";
}
return result;
} else {
Expand All @@ -665,7 +664,6 @@ struct TEvDataShard {
error->SetKey(keyBuffer.data(), keyBuffer.size());
}
}

private:
bool ForceOnline = false;
bool ForceDirty = false;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/datashard/datashard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2018,6 +2018,7 @@ class TDataShard

enum ELogThrottlerType {
CheckDataTxUnit_Execute = 0,
CheckWriteUnit_Execute = 0,
TxProposeTransactionBase_Execute,
FinishProposeUnit_CompleteRequest,
FinishProposeUnit_UpdateCounters,
Expand Down
Loading