diff --git a/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp index 7ae4c4a2a34c..8f8f42d14b63 100644 --- a/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_data_tx_out_rs_unit.cpp @@ -68,7 +68,7 @@ EExecutionStatus TBuildDataTxOutRSUnit::Execute(TOperation::TPtr op, try { auto &outReadSets = op->OutReadSets(); - if (tx->GetDataTx()->CheckCancelled()) + if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID())) engine->Cancel(); else engine->SetMemoryLimit(txc.GetMemoryLimit() - tx->GetDataTx()->GetTxSize()); diff --git a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp index 562f18ab51eb..04660d8c7051 100644 --- a/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/build_kqp_data_tx_out_rs_unit.cpp @@ -66,7 +66,7 @@ EExecutionStatus TBuildKqpDataTxOutRSUnit::Execute(TOperation::TPtr op, TTransac const auto& dataTx = tx->GetDataTx(); ui64 tabletId = DataShard.TabletID(); - if (tx->GetDataTx()->CheckCancelled()) { + if (tx->GetDataTx()->CheckCancelled(tabletId)) { tx->ReleaseTxData(txc, ctx); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED) ->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled"); diff --git a/ydb/core/tx/datashard/datashard_active_transaction.cpp b/ydb/core/tx/datashard/datashard_active_transaction.cpp index 4c4ce56bd3ed..26e30170525e 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.cpp +++ b/ydb/core/tx/datashard/datashard_active_transaction.cpp @@ -20,7 +20,6 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, const TString &txBody, bool usesMvccSnapshot) : StepTxId_(stepTxId) - , TabletId_(self->TabletID()) , TxBody(txBody) , EngineBay(self, txc, ctx, stepTxId.ToPair()) , ErrCode(NKikimrTxDataShard::TError::OK) @@ -33,6 +32,8 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, , Cancelled(false) , ReceivedAt_(receivedAt) { + const ui64 tabletId = self->TabletID(); + bool success = Tx.ParseFromArray(TxBody.data(), TxBody.size()); if (!success) { ErrCode = NKikimrTxDataShard::TError::BAD_ARGUMENT; @@ -70,7 +71,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, } } else if (IsKqpTx()) { if (Y_UNLIKELY(!IsKqpDataTx())) { - LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Unexpected KQP transaction type, shard: " << TabletId() + LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Unexpected KQP transaction type, shard: " << tabletId << ", txid: " << StepTxId_.TxId << ", tx: " << Tx.DebugString()); ErrCode = NKikimrTxDataShard::TError::BAD_TX_KIND; ErrStr = TStringBuilder() << "Unexpected KQP transaction type: " @@ -85,7 +86,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, bool hasPersistentChannels = false; if (!KqpValidateTransaction(GetTasks(), Immediate(), StepTxId_.TxId, ctx, hasPersistentChannels)) { LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed, datashard: " - << TabletId() << ", txid: " << StepTxId_.TxId); + << tabletId << ", txid: " << StepTxId_.TxId); ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR; ErrStr = "Transaction validation failed."; return; @@ -96,7 +97,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, NKikimrTxDataShard::TKqpTransaction::TDataTaskMeta meta; if (!task.GetMeta().UnpackTo(&meta)) { LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "KQP transaction validation failed" - << ", datashard: " << TabletId() + << ", datashard: " << tabletId << ", txid: " << StepTxId_.TxId << ", failed to load task meta: " << task.GetMeta().value()); ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR; @@ -104,7 +105,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, return; } - LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << TabletId() + LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "TxId: " << StepTxId_.TxId << ", shard " << tabletId << ", task: " << task.GetId() << ", meta: " << meta.ShortDebugString()); auto& tableMeta = meta.GetTable(); @@ -139,7 +140,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, } } - KqpSetTxKeys(TabletId(), task.GetId(), tableInfo, meta, typeRegistry, ctx, EngineBay); + KqpSetTxKeys(tabletId, task.GetId(), tableInfo, meta, typeRegistry, ctx, EngineBay); for (auto& output : task.GetOutputs()) { for (auto& channel : output.GetChannels()) { @@ -166,13 +167,13 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, tasksRunner.Prepare(DefaultKqpDataReqMemoryLimits(), *execCtx); } catch (const TMemoryLimitExceededException&) { LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Not enough memory to create tasks runner, datashard: " - << TabletId() << ", txid: " << StepTxId_.TxId); + << tabletId << ", txid: " << StepTxId_.TxId); ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR; ErrStr = TStringBuilder() << "Transaction validation failed: not enough memory."; return; } catch (const yexception& e) { LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "Exception while validating KQP transaction, datashard: " - << TabletId() << ", txid: " << StepTxId_.TxId << ", error: " << e.what()); + << tabletId << ", txid: " << StepTxId_.TxId << ", error: " << e.what()); ErrCode = NKikimrTxDataShard::TError::PROGRAM_ERROR; ErrStr = TStringBuilder() << "Transaction validation failed: " << e.what() << "."; return; @@ -191,7 +192,7 @@ TValidatedDataTx::TValidatedDataTx(TDataShard *self, IsReadOnly = IsReadOnly && Tx.GetReadOnly(); auto engine = EngineBay.GetEngine(); - auto result = engine->AddProgram(TabletId_, Tx.GetMiniKQL(), Tx.GetReadOnly()); + auto result = engine->AddProgram(tabletId, Tx.GetMiniKQL(), Tx.GetReadOnly()); ErrStr = engine->GetErrors(); ErrCode = ConvertErrCode(result); @@ -258,7 +259,7 @@ bool TValidatedDataTx::CanCancel() { return true; } -bool TValidatedDataTx::CheckCancelled() { +bool TValidatedDataTx::CheckCancelled(ui64 tabletId) { if (Cancelled) { return true; } @@ -270,11 +271,11 @@ bool TValidatedDataTx::CheckCancelled() { TInstant now = AppData()->TimeProvider->Now(); Cancelled = (now >= Deadline()); - Cancelled = Cancelled || gCancelTxFailPoint.Check(TabletId(), TxId()); + Cancelled = Cancelled || gCancelTxFailPoint.Check(tabletId, TxId()); if (Cancelled) { LOG_NOTICE_S(*TlsActivationContext->ExecutorThread.ActorSystem, NKikimrServices::TX_DATASHARD, - "CANCELLED TxId " << TxId() << " at " << TabletId()); + "CANCELLED TxId " << TxId() << " at " << tabletId); } return Cancelled; } @@ -547,7 +548,7 @@ void TActiveTransaction::ReleaseTxData(NTabletFlatExecutor::TTxMemoryProviderBas LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "tx " << GetTxId() << " released its data"); } -void TActiveTransaction::DbStoreLocksAccessLog(TDataShard * self, +void TActiveTransaction::DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext &txc, const TActorContext &ctx) { @@ -570,10 +571,10 @@ void TActiveTransaction::DbStoreLocksAccessLog(TDataShard * self, LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing " << vec.size() << " locks for txid=" << GetTxId() - << " in " << self->TabletID()); + << " in " << tabletId); } -void TActiveTransaction::DbStoreArtifactFlags(TDataShard * self, +void TActiveTransaction::DbStoreArtifactFlags(ui64 tabletId, TTransactionContext &txc, const TActorContext &ctx) { @@ -585,7 +586,7 @@ void TActiveTransaction::DbStoreArtifactFlags(TDataShard * self, LOG_TRACE_S(ctx, NKikimrServices::TX_DATASHARD, "Storing artifactflags=" << ArtifactFlags << " for txid=" << GetTxId() - << " in " << self->TabletID()); + << " in " << tabletId); } ui64 TActiveTransaction::GetMemoryConsumption() const { diff --git a/ydb/core/tx/datashard/datashard_active_transaction.h b/ydb/core/tx/datashard/datashard_active_transaction.h index 347ab511d87e..cca5b12b876f 100644 --- a/ydb/core/tx/datashard/datashard_active_transaction.h +++ b/ydb/core/tx/datashard/datashard_active_transaction.h @@ -134,7 +134,6 @@ class TValidatedDataTx : TNonCopyable { TStepOrder StepTxId() const { return StepTxId_; } ui64 TxId() const { return StepTxId_.TxId; } - ui64 TabletId() const { return TabletId_; } const TString& Body() const { return TxBody; } ui64 LockTxId() const { return Tx.GetLockTxId(); } @@ -175,7 +174,7 @@ class TValidatedDataTx : TNonCopyable { void ResetCounters() { EngineBay.ResetCounters(); } bool CanCancel(); - bool CheckCancelled(); + bool CheckCancelled(ui64 tabletId); void SetWriteVersion(TRowVersion writeVersion) { EngineBay.SetWriteVersion(writeVersion); } void SetReadVersion(TRowVersion readVersion) { EngineBay.SetReadVersion(readVersion); } @@ -291,7 +290,6 @@ class TValidatedDataTx : TNonCopyable { private: TStepOrder StepTxId_; - ui64 TabletId_; TString TxBody; TActorId Source_; TEngineBay EngineBay; @@ -516,10 +514,10 @@ class TActiveTransaction : public TOperation { return ArtifactFlags & LOCKS_STORED; } - void DbStoreLocksAccessLog(TDataShard * self, + void DbStoreLocksAccessLog(ui64 tabletId, TTransactionContext &txc, const TActorContext &ctx); - void DbStoreArtifactFlags(TDataShard * self, + void DbStoreArtifactFlags(ui64 tabletId, TTransactionContext &txc, const TActorContext &ctx); diff --git a/ydb/core/tx/datashard/execute_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_data_tx_unit.cpp index db3874e72d45..a2976e05bd82 100644 --- a/ydb/core/tx/datashard/execute_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_data_tx_unit.cpp @@ -115,7 +115,7 @@ EExecutionStatus TExecuteDataTxUnit::Execute(TOperation::TPtr op, } // TODO: cancel tx in special execution unit. - if (tx->GetDataTx()->CheckCancelled()) + if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID())) engine->Cancel(); else { ui64 consumed = tx->GetDataTx()->GetTxSize() + engine->GetMemoryAllocated(); diff --git a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp index 0775744bfb5a..cd331d8f7f36 100644 --- a/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp +++ b/ydb/core/tx/datashard/execute_kqp_data_tx_unit.cpp @@ -115,7 +115,7 @@ EExecutionStatus TExecuteKqpDataTxUnit::Execute(TOperation::TPtr op, TTransactio return EExecutionStatus::Executed; } - if (dataTx->CheckCancelled()) { + if (dataTx->CheckCancelled(DataShard.TabletID())) { tx->ReleaseTxData(txc, ctx); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED) ->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled"); diff --git a/ydb/core/tx/datashard/prepare_data_tx_in_rs_unit.cpp b/ydb/core/tx/datashard/prepare_data_tx_in_rs_unit.cpp index e9b8a354b4fb..8e3d177441e2 100644 --- a/ydb/core/tx/datashard/prepare_data_tx_in_rs_unit.cpp +++ b/ydb/core/tx/datashard/prepare_data_tx_in_rs_unit.cpp @@ -60,7 +60,7 @@ EExecutionStatus TPrepareDataTxInRSUnit::Execute(TOperation::TPtr op, Y_VERIFY_S(engine, "missing engine for " << *op << " at " << DataShard.TabletID()); // TODO: cancel tx in special execution unit. - if (tx->GetDataTx()->CheckCancelled()) + if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID())) engine->Cancel(); try { diff --git a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp index bc801f534e10..d8042e00fee6 100644 --- a/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp +++ b/ydb/core/tx/datashard/prepare_kqp_data_tx_in_rs_unit.cpp @@ -45,7 +45,7 @@ EExecutionStatus TPrepareKqpDataTxInRSUnit::Execute(TOperation::TPtr op, TTransa } } - if (tx->GetDataTx()->CheckCancelled()) { + if (tx->GetDataTx()->CheckCancelled(DataShard.TabletID())) { tx->ReleaseTxData(txc, ctx); BuildResult(op, NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED) ->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Tx was cancelled"); diff --git a/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp b/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp index fbe1f8f1a832..71b429094464 100644 --- a/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp +++ b/ydb/core/tx/datashard/store_and_send_out_rs_unit.cpp @@ -56,7 +56,7 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op, if (!tx->IsLocksStored() && !tx->LocksAccessLog().Locks.empty()) { // N.B. we copy access log to locks cache, so that future lock access is repeatable tx->LocksCache().Locks = tx->LocksAccessLog().Locks; - tx->DbStoreLocksAccessLog(&DataShard, txc, ctx); + tx->DbStoreLocksAccessLog(DataShard.TabletID(), txc, ctx); // Freeze persistent locks that we have cached for (auto& pr : tx->LocksCache().Locks) { ui64 lockId = pr.first; @@ -69,7 +69,7 @@ EExecutionStatus TStoreAndSendOutRSUnit::Execute(TOperation::TPtr op, newArtifact = true; } if (newArtifact) - tx->DbStoreArtifactFlags(&DataShard, txc, ctx); + tx->DbStoreArtifactFlags(DataShard.TabletID(), txc, ctx); bool hadWrites = false; if (tx->IsOutRSStored() || tx->IsLocksStored()) {