Skip to content

Remove TabletId from TValidatedDataTx #683

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ EExecutionStatus TBuildDataTxOutRSUnit::Execute(TOperation::TPtr op,
try {
auto &outReadSets = op->OutReadSets();

if (tx->GetDataTx()->CheckCancelled())
if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID()))
engine->Cancel();
else
engine->SetMemoryLimit(txc.GetMemoryLimit() - tx->GetDataTx()->GetTxSize());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac
const auto& dataTx = tx->GetDataTx();
ui64 tabletId = DataShard.TabletID();

if (tx->GetDataTx()->CheckCancelled()) {
if (tx->GetDataTx()->CheckCancelled(tabletId)) {
tx->ReleaseTxData(txc, ctx);
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED)
->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled");
Expand Down
33 changes: 17 additions & 16 deletions ydb/core/tx/datashard/datashard_active_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
const TString &txBody,
bool usesMvccSnapshot)
: StepTxId_(stepTxId)
, TabletId_(self->TabletID())
, TxBody(txBody)
, EngineBay(self, txc, ctx, stepTxId.ToPair())
, ErrCode(NKikimrTxDataShard::TError::OK)
Expand All @@ -33,6 +32,8 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
, Cancelled(false)
, ReceivedAt_(receivedAt)
{
const ui64 tabletId = self->TabletID();

bool success = Tx.ParseFromArray(TxBody.data(), TxBody.size());
if (!success) {
ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT;
Expand Down Expand Up @@ -70,7 +71,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
}
} else if (IsKqpTx()) {
if (Y_UNLIKELY(!IsKqpDataTx())) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Unexpected KQP transaction type, shard: " << TabletId()
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Unexpected KQP transaction type, shard: " << tabletId
<< ", txid: " << StepTxId_.TxId << ", tx: " << Tx.DebugString());
ErrCode = NKikimrTxDataShard::TError::BAD_TX_KIND;
ErrStr = TStringBuilder() << "Unexpected KQP transaction type: "
Expand All @@ -85,7 +86,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
bool hasPersistentChannels = false;
if (!KqpValidateTransaction(GetTasks(), Immediate(), StepTxId_.TxId, ctx, hasPersistentChannels)) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed, datashard: "
<< TabletId() << ", txid: " << StepTxId_.TxId);
<< tabletId << ", txid: " << StepTxId_.TxId);
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
ErrStr = "Transaction validation failed.";
return;
Expand All @@ -96,15 +97,15 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta;
if (!task.GetMeta().UnpackTo(&meta)) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed"
<< ", datashard: " << TabletId()
<< ", datashard: " << tabletId
<< ", txid: " << StepTxId_.TxId
<< ", failed to load task meta: " << task.GetMeta().value());
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
ErrStr = "Transaction validation failed: invalid task metadata.";
return;
}

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << TabletId()
LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << tabletId
<< ", task: " << task.GetId() << ", meta: " << meta.ShortDebugString());

auto& tableMeta = meta.GetTable();
Expand Down Expand Up @@ -139,7 +140,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
}
}

KqpSetTxKeys(TabletId(), task.GetId(), tableInfo, meta, typeRegistry, ctx, EngineBay);
KqpSetTxKeys(tabletId, task.GetId(), tableInfo, meta, typeRegistry, ctx, EngineBay);

for (auto& output : task.GetOutputs()) {
for (auto& channel : output.GetChannels()) {
Expand All @@ -166,13 +167,13 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
tasksRunner.Prepare(DefaultKqpDataReqMemoryLimits(), *execCtx);
} catch (const TMemoryLimitExceededException&) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Not enough memory to create tasks runner, datashard: "
<< TabletId() << ", txid: " << StepTxId_.TxId);
<< tabletId << ", txid: " << StepTxId_.TxId);
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
ErrStr = TStringBuilder() << "Transaction validation failed: not enough memory.";
return;
} catch (const yexception& e) {
LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Exception while validating KQP transaction, datashard: "
<< TabletId() << ", txid: " << StepTxId_.TxId << ", error: " << e.what());
<< tabletId << ", txid: " << StepTxId_.TxId << ", error: " << e.what());
ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR;
ErrStr = TStringBuilder() << "Transaction validation failed: " << e.what() << ".";
return;
Expand All @@ -191,7 +192,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self,
IsReadOnly = IsReadOnly && Tx.GetReadOnly();

auto engine = EngineBay.GetEngine();
auto result = engine->AddProgram(TabletId_, Tx.GetMiniKQL(), Tx.GetReadOnly());
auto result = engine->AddProgram(tabletId, Tx.GetMiniKQL(), Tx.GetReadOnly());

ErrStr = engine->GetErrors();
ErrCode = ConvertErrCode(result);
Expand Down Expand Up @@ -258,7 +259,7 @@ bool TValidatedDataTx::CanCancel() {
return true;
}

bool TValidatedDataTx::CheckCancelled() {
bool TValidatedDataTx::CheckCancelled(ui64 tabletId) {
if (Cancelled) {
return true;
}
Expand All @@ -270,11 +271,11 @@ bool TValidatedDataTx::CheckCancelled() {
TInstant now = AppData()->TimeProvider->Now();
Cancelled = (now >= Deadline());

Cancelled = Cancelled || gCancelTxFailPoint.Check(TabletId(), TxId());
Cancelled = Cancelled || gCancelTxFailPoint.Check(tabletId, TxId());

if (Cancelled) {
LOG_NOTICE_S(*TlsActivationContext->ExecutorThread.ActorSystem, NKikimrServices::TX_DATASHARD,
"CANCELLED TxId " << TxId() << " at " << TabletId());
"CANCELLED TxId " << TxId() << " at " << tabletId);
}
return Cancelled;
}
Expand Down Expand Up @@ -547,7 +548,7 @@ void TActiveTransaction::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBas
LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data");
}

void TActiveTransaction::DbStoreLocksAccessLog(TDataShard * self,
void TActiveTransaction::DbStoreLocksAccessLog(ui64 tabletId,
TTransactionContext &txc,
const TActorContext &ctx)
{
Expand All @@ -570,10 +571,10 @@ void TActiveTransaction::DbStoreLocksAccessLog(TDataShard * self,

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
"Storing " << vec.size() << " locks for txid=" << GetTxId()
<< " in " << self->TabletID());
<< " in " << tabletId);
}

void TActiveTransaction::DbStoreArtifactFlags(TDataShard * self,
void TActiveTransaction::DbStoreArtifactFlags(ui64 tabletId,
TTransactionContext &txc,
const TActorContext &ctx)
{
Expand All @@ -585,7 +586,7 @@ void TActiveTransaction::DbStoreArtifactFlags(TDataShard * self,

LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD,
"Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId()
<< " in " << self->TabletID());
<< " in " << tabletId);
}

ui64 TActiveTransaction::GetMemoryConsumption() const {
Expand Down
8 changes: 3 additions & 5 deletions ydb/core/tx/datashard/datashard_active_transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ class TValidatedDataTx : TNonCopyable {

TStepOrder StepTxId() const { return StepTxId_; }
ui64 TxId() const { return StepTxId_.TxId; }
ui64 TabletId() const { return TabletId_; }
const TString& Body() const { return TxBody; }

ui64 LockTxId() const { return Tx.GetLockTxId(); }
Expand Down Expand Up @@ -175,7 +174,7 @@ class TValidatedDataTx : TNonCopyable {
void ResetCounters() { EngineBay.ResetCounters(); }

bool CanCancel();
bool CheckCancelled();
bool CheckCancelled(ui64 tabletId);

void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); }
void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); }
Expand Down Expand Up @@ -291,7 +290,6 @@ class TValidatedDataTx : TNonCopyable {

private:
TStepOrder StepTxId_;
ui64 TabletId_;
TString TxBody;
TActorId Source_;
TEngineBay EngineBay;
Expand Down Expand Up @@ -516,10 +514,10 @@ class TActiveTransaction : public TOperation {
return ArtifactFlags & LOCKS_STORED;
}

void DbStoreLocksAccessLog(TDataShard * self,
void DbStoreLocksAccessLog(ui64 tabletId,
TTransactionContext &txc,
const TActorContext &ctx);
void DbStoreArtifactFlags(TDataShard * self,
void DbStoreArtifactFlags(ui64 tabletId,
TTransactionContext &txc,
const TActorContext &ctx);

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/execute_data_tx_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op,
}

// TODO: cancel tx in special execution unit.
if (tx->GetDataTx()->CheckCancelled())
if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID()))
engine->Cancel();
else {
ui64 consumed = tx->GetDataTx()->GetTxSize() + engine->GetMemoryAllocated();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio
return EExecutionStatus::Executed;
}

if (dataTx->CheckCancelled()) {
if (dataTx->CheckCancelled(DataShard.TabletID())) {
tx->ReleaseTxData(txc, ctx);
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED)
->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled");
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/prepare_data_tx_in_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ EExecutionStatus TPrepareDataTxInRSUnit::Execute(TOperation::TPtr op,
Y_VERIFY_S(engine, "missing engine for " << *op << " at " << DataShard.TabletID());

// TODO: cancel tx in special execution unit.
if (tx->GetDataTx()->CheckCancelled())
if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID()))
engine->Cancel();

try {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ EExecutionStatus TPrepareKqpDataTxInRSUnit::Execute(TOperation::TPtr op, TTransa
}
}

if (tx->GetDataTx()->CheckCancelled()) {
if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID())) {
tx->ReleaseTxData(txc, ctx);
BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED)
->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled");
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op,
if (!tx->IsLocksStored() && !tx->LocksAccessLog().Locks.empty()) {
// N.B. we copy access log to locks cache, so that future lock access is repeatable
tx->LocksCache().Locks = tx->LocksAccessLog().Locks;
tx->DbStoreLocksAccessLog(&DataShard, txc, ctx);
tx->DbStoreLocksAccessLog(DataShard.TabletID(), txc, ctx);
// Freeze persistent locks that we have cached
for (auto& pr : tx->LocksCache().Locks) {
ui64 lockId = pr.first;
Expand All @@ -69,7 +69,7 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op,
newArtifact = true;
}
if (newArtifact)
tx->DbStoreArtifactFlags(&DataShard, txc, ctx);
tx->DbStoreArtifactFlags(DataShard.TabletID(), txc, ctx);

bool hadWrites = false;
if (tx->IsOutRSStored() || tx->IsLocksStored()) {
Expand Down