Skip to content

Commit f5dc352

Browse files
nsofyansofya
and
nsofya
authored
Base transaction class for all propose operations (#2208)
Co-authored-by: nsofya <[email protected]>
1 parent 0dd4395 commit f5dc352

File tree

9 files changed

+157
-115
lines changed

9 files changed

+157
-115
lines changed

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp

+12-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "tx_write.h"
22

33
namespace NKikimr::NColumnShard {
4+
45
bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId) {
56
NKikimrTxColumnShard::TLogicalMetadata meta;
67
meta.SetNumRows(batch->GetRowsCount());
@@ -28,7 +29,6 @@ bool TTxWrite::InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSeriali
2829
return false;
2930
}
3031

31-
3232
bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
3333
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
3434
ACFL_DEBUG("event", "start_execute");
@@ -76,19 +76,12 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
7676
}
7777
for (auto&& aggr : buffer.GetAggregations()) {
7878
const auto& writeMeta = aggr->GetWriteData()->GetWriteMeta();
79-
std::unique_ptr<TEvColumnShard::TEvWriteResult> result;
80-
TWriteOperation::TPtr operation;
8179
if (!writeMeta.HasLongTxId()) {
82-
operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
80+
auto operation = Self->OperationsManager->GetOperation((TWriteId)writeMeta.GetWriteId());
8381
Y_ABORT_UNLESS(operation);
8482
Y_ABORT_UNLESS(operation->GetStatus() == EOperationStatus::Started);
85-
}
86-
if (operation) {
8783
operation->OnWriteFinish(txc, aggr->GetWriteIds());
88-
auto txInfo = Self->ProgressTxController->RegisterTxWithDeadline(operation->GetTxId(), NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, "", writeMeta.GetSource(), 0, txc);
89-
Y_UNUSED(txInfo);
90-
NEvents::TDataEvents::TCoordinatorInfo tInfo = Self->ProgressTxController->GetCoordinatorInfo(operation->GetTxId());
91-
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), operation->GetTxId(), tInfo));
84+
ProposeTransaction(TTxController::TBasicTxInfo(NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetTxId()), "", writeMeta.GetSource(), 0, txc);
9285
} else {
9386
Y_ABORT_UNLESS(aggr->GetWriteIds().size() == 1);
9487
Results.emplace_back(std::make_unique<TEvColumnShard::TEvWriteResult>(Self->TabletID(), writeMeta, (ui64)aggr->GetWriteIds().front(), NKikimrTxColumnShard::EResultStatus::SUCCESS));
@@ -97,6 +90,15 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
9790
return true;
9891
}
9992

93+
void TTxWrite::OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) {
94+
Y_UNUSED(proposeResult);
95+
Results.emplace_back(NEvents::TDataEvents::TEvWriteResult::BuildPrepared(Self->TabletID(), txInfo.TxId, Self->GetProgressTxController().BuildCoordinatorInfo(txInfo)));
96+
}
97+
98+
void TTxWrite::OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) {
99+
AFL_VERIFY("Unexpected behaviour")("tx_id", txInfo.TxId)("details", proposeResult.DebugString());
100+
}
101+
100102
void TTxWrite::Complete(const TActorContext& ctx) {
101103
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
102104
const auto now = TMonotonic::Now();

ydb/core/tx/columnshard/blobs_action/transaction/tx_write.h

+8-3
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
#pragma once
22
#include <ydb/core/tx/columnshard/columnshard_impl.h>
3+
#include <ydb/core/tx/columnshard/transactions/propose_transaction_base.h>
34
#include <ydb/core/tx/columnshard/engines/writer/indexed_blob_constructor.h>
45

56
namespace NKikimr::NColumnShard {
67

7-
class TTxWrite : public TTransactionBase<TColumnShard> {
8+
class TTxWrite : public TProposeTransactionBase {
89
public:
910
TTxWrite(TColumnShard* self, const TEvPrivate::TEvWriteBlobsResult::TPtr& putBlobResult)
10-
: TBase(self)
11+
: TProposeTransactionBase(self)
1112
, PutBlobResult(putBlobResult)
1213
, TabletTxNo(++Self->TabletTxCounter)
1314
{}
@@ -16,13 +17,17 @@ class TTxWrite : public TTransactionBase<TColumnShard> {
1617
void Complete(const TActorContext& ctx) override;
1718
TTxType GetTxType() const override { return TXTYPE_WRITE; }
1819

19-
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
2020

2121
private:
2222
TEvPrivate::TEvWriteBlobsResult::TPtr PutBlobResult;
2323
const ui32 TabletTxNo;
2424
std::vector<std::unique_ptr<NActors::IEventBase>> Results;
2525

26+
27+
bool InsertOneBlob(TTransactionContext& txc, const NOlap::TWideSerializedBatch& batch, const TWriteId writeId);
28+
void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) override;
29+
void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) override;
30+
2631
TStringBuilder TxPrefix() const {
2732
return TStringBuilder() << "TxWrite[" << ToString(TabletTxNo) << "] ";
2833
}

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

+22-69
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,17 @@
33
#include "columnshard_schema.h"
44
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
55
#include <ydb/library/yql/dq/actors/dq.h>
6+
#include <ydb/core/tx/columnshard/transactions/propose_transaction_base.h>
67

78
namespace NKikimr::NColumnShard {
89

910
using namespace NTabletFlatExecutor;
1011

11-
class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
12+
class TTxProposeTransaction : public TProposeTransactionBase {
1213
public:
1314
TTxProposeTransaction(TColumnShard* self, TEvColumnShard::TEvProposeTransaction::TPtr& ev)
14-
: TBase(self)
15+
: TProposeTransactionBase(self)
1516
, Ev(ev)
16-
, TabletTxNo(++Self->TabletTxCounter)
1717
{}
1818

1919
bool Execute(TTransactionContext& txc, const TActorContext& ctx) override;
@@ -22,35 +22,26 @@ class TTxProposeTransaction : public NTabletFlatExecutor::TTransactionBase<TColu
2222

2323
private:
2424
TEvColumnShard::TEvProposeTransaction::TPtr Ev;
25-
const ui32 TabletTxNo;
2625
std::unique_ptr<TEvColumnShard::TEvProposeTransactionResult> Result;
2726

28-
TStringBuilder TxPrefix() const {
29-
return TStringBuilder() << "TxProposeTransaction[" << ToString(TabletTxNo) << "] ";
30-
}
31-
32-
TString TxSuffix() const {
33-
return TStringBuilder() << " at tablet " << Self->TabletID();
34-
}
35-
36-
void ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo);
27+
void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) override;
28+
void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) override;
3729
TTxController::TProposeResult ProposeTtlDeprecated(const TString& txBody);
3830
};
3931

4032

4133
bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) {
4234
Y_ABORT_UNLESS(Ev);
43-
LOG_S_DEBUG(TxPrefix() << "execute" << TxSuffix());
4435

4536
txc.DB.NoMoreReadsForTx();
4637
NIceDb::TNiceDb db(txc.DB);
4738

4839
Self->IncCounter(COUNTER_PREPARE_REQUEST);
4940

5041
auto& record = Proto(Ev->Get());
51-
auto txKind = record.GetTxKind();
52-
ui64 txId = record.GetTxId();
53-
auto& txBody = record.GetTxBody();
42+
const auto txKind = record.GetTxKind();
43+
const ui64 txId = record.GetTxId();
44+
const auto& txBody = record.GetTxBody();
5445

5546
if (txKind == NKikimrTxColumnShard::TX_KIND_TTL) {
5647
auto proposeResult = ProposeTtlDeprecated(txBody);
@@ -71,39 +62,7 @@ bool TTxProposeTransaction::Execute(TTransactionContext& txc, const TActorContex
7162
Y_ABORT_UNLESS(Self->CurrentSchemeShardId == record.GetSchemeShardId());
7263
}
7364
}
74-
75-
TTxController::TBasicTxInfo fakeTxInfo;
76-
fakeTxInfo.TxId = txId;
77-
fakeTxInfo.TxKind = txKind;
78-
79-
auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txKind, fakeTxInfo);
80-
if (!txOperator || !txOperator->Parse(txBody)) {
81-
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txId
82-
<< (txOperator ? ". Parsing error " : ". Unknown operator for txKind"));
83-
ConstructResult(proposeResult, fakeTxInfo);
84-
return true;
85-
}
86-
87-
auto txInfoPtr = Self->ProgressTxController->GetTxInfo(txId);
88-
if (!!txInfoPtr) {
89-
if (txInfoPtr->Source != Ev->Get()->GetSource() || txInfoPtr->Cookie != Ev->Cookie) {
90-
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txId << " has already been proposed");
91-
ConstructResult(proposeResult, fakeTxInfo);
92-
}
93-
TTxController::TProposeResult proposeResult;
94-
ConstructResult(proposeResult, *txInfoPtr);
95-
} else {
96-
auto proposeResult = txOperator->Propose(*Self, txc, false);
97-
if (!!proposeResult) {
98-
const auto& txInfo = txOperator->TxWithDeadline() ? Self->ProgressTxController->RegisterTxWithDeadline(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc)
99-
: Self->ProgressTxController->RegisterTx(txId, txKind, txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
100-
101-
ConstructResult(proposeResult, txInfo);
102-
} else {
103-
ConstructResult(proposeResult, fakeTxInfo);
104-
}
105-
}
106-
AFL_VERIFY(!!Result);
65+
ProposeTransaction(TTxController::TBasicTxInfo(txKind, txId), txBody, Ev->Get()->GetSource(), Ev->Cookie, txc);
10766
return true;
10867
}
10968

@@ -154,21 +113,21 @@ TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const
154113
return TTxController::TProposeResult();
155114
}
156115

157-
void TTxProposeTransaction::ConstructResult(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) {
116+
void TTxProposeTransaction::OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) {
158117
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
159-
if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED) {
160-
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
161-
Result->Record.SetMinStep(txInfo.MinStep);
162-
Result->Record.SetMaxStep(txInfo.MaxStep);
163-
if (Self->ProcessingParams) {
164-
Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators());
165-
}
166-
} else if (proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::SUCCESS) {
167-
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
168-
} else {
169-
Self->IncCounter(COUNTER_PREPARE_ERROR);
170-
LOG_S_INFO(TxPrefix() << "error txId " << txInfo.TxId << " " << proposeResult.GetStatusMessage() << TxSuffix());
118+
Self->IncCounter(COUNTER_PREPARE_ERROR);
119+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("message", proposeResult.GetStatusMessage())("tablet_id", Self->TabletID())("tx_id", txInfo.TxId);
120+
}
121+
122+
void TTxProposeTransaction::OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) {
123+
AFL_VERIFY(proposeResult.GetStatus() == NKikimrTxColumnShard::EResultStatus::PREPARED)("tx_id", txInfo.TxId)("details", proposeResult.DebugString());
124+
Result = std::make_unique<TEvColumnShard::TEvProposeTransactionResult>(Self->TabletID(), txInfo.TxKind, txInfo.TxId, proposeResult.GetStatus(), proposeResult.GetStatusMessage());
125+
Result->Record.SetMinStep(txInfo.MinStep);
126+
Result->Record.SetMaxStep(txInfo.MaxStep);
127+
if (Self->ProcessingParams) {
128+
Result->Record.MutableDomainCoordinators()->CopyFrom(Self->ProcessingParams->GetCoordinators());
171129
}
130+
Self->IncCounter(COUNTER_PREPARE_SUCCESS);
172131
}
173132

174133
void TTxProposeTransaction::Complete(const TActorContext& ctx) {
@@ -180,12 +139,6 @@ void TTxProposeTransaction::Complete(const TActorContext& ctx) {
180139

181140

182141
void TColumnShard::Handle(TEvColumnShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx) {
183-
auto& record = Proto(ev->Get());
184-
auto txKind = record.GetTxKind();
185-
ui64 txId = record.GetTxId();
186-
LOG_S_DEBUG("ProposeTransaction " << NKikimrTxColumnShard::ETransactionKind_Name(txKind)
187-
<< " txId " << txId << " at tablet " << TabletID());
188-
189142
Execute(new TTxProposeTransaction(this, ev), ctx);
190143
}
191144

ydb/core/tx/columnshard/columnshard_impl.h

+5
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,11 @@ class TColumnShard
570570
return TablesManager.MutablePrimaryIndexAsVerified<T>();
571571
}
572572

573+
TTxController& GetProgressTxController() const {
574+
AFL_VERIFY(ProgressTxController);
575+
return *ProgressTxController;
576+
}
577+
573578
bool HasIndex() const {
574579
return !!TablesManager.GetPrimaryIndex();
575580
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#include "propose_transaction_base.h"
2+
3+
#include <ydb/core/tx/columnshard/columnshard_impl.h>
4+
5+
6+
namespace NKikimr::NColumnShard {
7+
8+
void TProposeTransactionBase::ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc) {
9+
auto txOperator = TTxController::ITransactionOperatior::TFactory::MakeHolder(txInfo.TxKind, TTxController::TTxInfo(txInfo.TxKind, txInfo.TxId));
10+
if (!txOperator || !txOperator->Parse(txBody)) {
11+
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Error processing commit TxId# " << txInfo.TxId
12+
<< (txOperator ? ". Parsing error " : ". Unknown operator for txKind"));
13+
OnProposeError(proposeResult, txInfo);
14+
return;
15+
}
16+
17+
auto txInfoPtr = Self->GetProgressTxController().GetTxInfo(txInfo.TxId);
18+
if (!!txInfoPtr) {
19+
if (txInfoPtr->Source != source || txInfoPtr->Cookie != cookie) {
20+
TTxController::TProposeResult proposeResult(NKikimrTxColumnShard::EResultStatus::ERROR, TStringBuilder() << "Another commit TxId# " << txInfo.TxId << " has already been proposed");
21+
OnProposeError(proposeResult, txInfo);
22+
}
23+
TTxController::TProposeResult proposeResult;
24+
OnProposeResult(proposeResult, *txInfoPtr);
25+
} else {
26+
auto proposeResult = txOperator->Propose(*Self, txc, false);
27+
if (!!proposeResult) {
28+
const auto fullTxInfo = txOperator->TxWithDeadline() ? Self->GetProgressTxController().RegisterTxWithDeadline(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc)
29+
: Self->GetProgressTxController().RegisterTx(txInfo.TxId, txInfo.TxKind, txBody, source, cookie, txc);
30+
31+
OnProposeResult(proposeResult, fullTxInfo);
32+
} else {
33+
OnProposeError(proposeResult, txInfo);
34+
}
35+
}
36+
}
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#pragma once
2+
#include "tx_controller.h"
3+
4+
namespace NKikimr::NColumnShard {
5+
6+
class TColumnShard;
7+
8+
class TProposeTransactionBase : public NTabletFlatExecutor::TTransactionBase<TColumnShard> {
9+
public:
10+
TProposeTransactionBase(TColumnShard* self)
11+
: TBase(self)
12+
{}
13+
14+
protected:
15+
void ProposeTransaction(const TTxController::TBasicTxInfo& txInfo, const TString& txBody, const TActorId source, const ui64 cookie, TTransactionContext& txc);
16+
17+
virtual void OnProposeResult(TTxController::TProposeResult& proposeResult, const TTxController::TTxInfo& txInfo) = 0;
18+
virtual void OnProposeError(TTxController::TProposeResult& proposeResult, const TTxController::TBasicTxInfo& txInfo) = 0;
19+
};
20+
21+
22+
}

0 commit comments

Comments
 (0)