Skip to content

Commit b15d3d7

Browse files
reading from a topic and writing to a topic in a transaction (#5841)
1 parent 0bcb6b0 commit b15d3d7

File tree

7 files changed

+387
-48
lines changed

7 files changed

+387
-48
lines changed

ydb/core/persqueue/partition.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include "offload_actor.h"
44
#include "partition_util.h"
55
#include "partition.h"
6+
#include "partition_log.h"
67

78
#include <ydb/core/base/appdata.h>
89
#include <ydb/core/base/blobstorage.h>
@@ -81,7 +82,7 @@ TString TPartition::LogPrefix() const {
8182
} else {
8283
state = "Unknown";
8384
}
84-
return TStringBuilder() << "" << SelfId() << " " << state << " Partition: " << Partition << " ";
85+
return TStringBuilder() << "[Partition:" << Partition << ", State:" << state << "] ";
8586
}
8687

8788
bool TPartition::IsActive() const {
@@ -1091,11 +1092,13 @@ TPartition::EProcessResult TPartition::ApplyWriteInfoResponse(TTransaction& tx)
10911092
}
10921093

10931094
void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorContext& ctx) {
1095+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoResponse");
10941096
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
10951097
}
10961098

10971099

10981100
void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorContext& ctx) {
1101+
PQ_LOG_D("Handle TEvPQ::TEvGetWriteInfoError");
10991102
WriteInfoResponseHandler(ev->Sender, ev->Release(), ctx);
11001103
}
11011104

ydb/core/persqueue/pq_impl.cpp

+52-3
Original file line numberDiff line numberDiff line change
@@ -2675,6 +2675,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
26752675
//
26762676
TTxWriteInfo& writeInfo = TxWrites[writeId];
26772677
TPartitionId partitionId(originalPartitionId, writeId, NextSupportivePartitionId++);
2678+
PQ_LOG_D("partition " << partitionId << " for WriteId " << writeId);
26782679

26792680
writeInfo.Partitions.emplace(originalPartitionId, partitionId);
26802681
TxWritesChanged = true;
@@ -3109,7 +3110,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co
31093110

31103111
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
31113112
{
3112-
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "Tablet " << TabletID() << " Handle TEvPersQueue::TEvProposeTransaction");
3113+
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << ev->Get()->Record.DebugString());
31133114

31143115
NKikimrPQ::TEvProposeTransaction& event = ev->Get()->Record;
31153116
switch (event.GetTxBodyCase()) {
@@ -3122,6 +3123,8 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
31223123
case NKikimrPQ::TEvProposeTransaction::TXBODY_NOT_SET:
31233124
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
31243125
event.GetTxId(),
3126+
NKikimrPQ::TError::ERROR,
3127+
"missing TxBody",
31253128
ctx);
31263129
break;
31273130
}
@@ -3134,6 +3137,7 @@ bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& ope
31343137
TPartitionId partitionId(operation.GetPartitionId(),
31353138
writeId,
31363139
operation.GetSupportivePartition());
3140+
PQ_LOG_D("partitionId=" << partitionId);
31373141
return Partitions.contains(partitionId);
31383142
}
31393143

@@ -3144,6 +3148,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
31443148
}
31453149

31463150
ui64 writeId = txBody.GetWriteId();
3151+
PQ_LOG_D("writeId=" << writeId);
31473152

31483153
for (auto& operation : txBody.GetOperations()) {
31493154
auto isWrite = [](const NKikimrPQ::TPartitionOperation& o) {
@@ -3169,8 +3174,11 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
31693174
const NKikimrPQ::TDataTransaction& txBody = event.GetData();
31703175

31713176
if (TabletState != NKikimrPQ::ENormal) {
3177+
PQ_LOG_D("invalid PQ tablet state (" << NKikimrPQ::ETabletState_Name(TabletState) << ")");
31723178
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
31733179
event.GetTxId(),
3180+
NKikimrPQ::TError::ERROR,
3181+
"invalid PQ tablet state",
31743182
ctx);
31753183
return;
31763184
}
@@ -3180,36 +3188,49 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
31803188
//
31813189

31823190
if (txBody.OperationsSize() <= 0) {
3191+
PQ_LOG_D("empty list of operations");
31833192
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
31843193
event.GetTxId(),
3194+
NKikimrPQ::TError::BAD_REQUEST,
3195+
"empty list of operations",
31853196
ctx);
31863197
return;
31873198
}
31883199

31893200
if (!CheckTxWriteOperations(txBody)) {
3201+
PQ_LOG_D("invalid WriteId " << txBody.GetWriteId());
31903202
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
31913203
event.GetTxId(),
3204+
NKikimrPQ::TError::BAD_REQUEST,
3205+
"invalid WriteId",
31923206
ctx);
31933207
return;
31943208
}
31953209

31963210
TMaybe<TPartitionId> partitionId = FindPartitionId(txBody);
31973211
if (!partitionId.Defined()) {
3212+
PQ_LOG_D("unknown partition for WriteId " << txBody.GetWriteId());
31983213
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
31993214
event.GetTxId(),
3215+
NKikimrPQ::TError::INTERNAL,
3216+
"unknown supportive partition",
32003217
ctx);
32013218
return;
32023219
}
32033220

32043221
if (!Partitions.contains(*partitionId)) {
3222+
PQ_LOG_D("unknown partition " << *partitionId);
32053223
SendProposeTransactionAbort(ActorIdFromProto(event.GetSourceActor()),
32063224
event.GetTxId(),
3225+
NKikimrPQ::TError::INTERNAL,
3226+
"unknown supportive partition",
32073227
ctx);
32083228
return;
32093229
}
32103230

32113231

32123232
if (txBody.GetImmediate()) {
3233+
PQ_LOG_D("immediate transaction");
32133234
TPartitionId originalPartitionId(txBody.GetOperations(0).GetPartitionId());
32143235
const TPartitionInfo& partition = Partitions.at(originalPartitionId);
32153236

@@ -3221,6 +3242,7 @@ void TPersQueue::HandleDataTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransact
32213242

32223243
ctx.Send(partition.Actor, ev.Release());
32233244
} else {
3245+
PQ_LOG_D("distributed transaction");
32243246
EvProposeTransactionQueue.emplace_back(ev.Release());
32253247

32263248
TryWriteTxs(ctx);
@@ -3697,6 +3719,7 @@ void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info)
36973719

36983720
void TPersQueue::ScheduleProposeTransactionResult(const TDistributedTransaction& tx)
36993721
{
3722+
PQ_LOG_D("schedule TEvProposeTransactionResult(PREPARED)");
37003723
auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
37013724

37023725
event->Record.SetOrigin(TabletID());
@@ -3766,16 +3789,27 @@ void TPersQueue::SendEvReadSetAckToSenders(const TActorContext& ctx,
37663789

37673790
TMaybe<TPartitionId> TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransaction& txBody) const
37683791
{
3792+
auto hasWriteOperation = [](const auto& txBody) {
3793+
for (const auto& o : txBody.GetOperations()) {
3794+
if (!o.HasBegin()) {
3795+
return true;
3796+
}
3797+
}
3798+
return false;
3799+
};
3800+
37693801
ui32 partitionId = txBody.GetOperations(0).GetPartitionId();
37703802

3771-
if (txBody.HasWriteId()) {
3803+
if (txBody.HasWriteId() && hasWriteOperation(txBody)) {
37723804
ui64 writeId = txBody.GetWriteId();
37733805
if (!TxWrites.contains(writeId)) {
3806+
PQ_LOG_D("unknown WriteId " << writeId);
37743807
return Nothing();
37753808
}
37763809

37773810
const TTxWriteInfo& writeInfo = TxWrites.at(writeId);
37783811
if (!writeInfo.Partitions.contains(partitionId)) {
3812+
PQ_LOG_D("unknown partition " << partitionId << " for WriteId " << writeId);
37793813
return Nothing();
37803814
}
37813815

@@ -3882,6 +3916,9 @@ void TPersQueue::SendEvProposeTransactionResult(const TActorContext& ctx,
38823916
result->Record.SetTxId(tx.TxId);
38833917
result->Record.SetStep(tx.Step);
38843918

3919+
PQ_LOG_D("send TEvPersQueue::TEvProposeTransactionResult(" <<
3920+
NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(result->Record.GetStatus()) <<
3921+
")");
38853922
ctx.Send(tx.SourceActor, std::move(result));
38863923
}
38873924

@@ -4204,6 +4241,8 @@ bool TPersQueue::AllTransactionsHaveBeenProcessed() const
42044241

42054242
void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
42064243
ui64 txId,
4244+
NKikimrPQ::TError::EKind kind,
4245+
const TString& reason,
42074246
const TActorContext& ctx)
42084247
{
42094248
auto event = std::make_unique<TEvPersQueue::TEvProposeTransactionResult>();
@@ -4212,6 +4251,15 @@ void TPersQueue::SendProposeTransactionAbort(const TActorId& target,
42124251
event->Record.SetStatus(NKikimrPQ::TEvProposeTransactionResult::ABORTED);
42134252
event->Record.SetTxId(txId);
42144253

4254+
if (kind != NKikimrPQ::TError::OK) {
4255+
auto* error = event->Record.MutableErrors()->Add();
4256+
error->SetKind(kind);
4257+
error->SetReason(reason);
4258+
}
4259+
4260+
PQ_LOG_D("send TEvPersQueue::TEvProposeTransactionResult(" <<
4261+
NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event->Record.GetStatus()) <<
4262+
")");
42154263
ctx.Send(target, std::move(event));
42164264
}
42174265

@@ -4509,6 +4557,7 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
45094557
DeleteTx(*tx);
45104558
}
45114559
}
4560+
PQ_LOG_D("delete WriteId " << writeId);
45124561
TxWrites.erase(writeId);
45134562
}
45144563
TxWritesChanged = true;
@@ -4562,7 +4611,7 @@ void TPersQueue::BeginDeletePartitions(TTxWriteInfo& writeInfo)
45624611
}
45634612

45644613
TString TPersQueue::LogPrefix() const {
4565-
return TStringBuilder() << SelfId() << " ";
4614+
return TStringBuilder() << "[PQ: " << TabletID() << "] ";
45664615
}
45674616

45684617
ui64 TPersQueue::GetGeneration() {

ydb/core/persqueue/pq_impl.h

+2
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
344344

345345
void SendProposeTransactionAbort(const TActorId& target,
346346
ui64 txId,
347+
NKikimrPQ::TError::EKind kind,
348+
const TString& reason,
347349
const TActorContext& ctx);
348350

349351
void Handle(TEvPQ::TEvProposePartitionConfigResult::TPtr& ev, const TActorContext& ctx);

ydb/core/persqueue/transaction.cpp

+5-1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ void TDistributedTransaction::InitPartitions(const google::protobuf::RepeatedPtr
6060
Partitions.clear();
6161

6262
for (auto& o : operations) {
63+
if (!o.HasBegin()) {
64+
HasWriteOperations = true;
65+
}
66+
6367
Operations.push_back(o);
6468
Partitions.insert(o.GetPartitionId());
6569
}
@@ -133,7 +137,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac
133137

134138
InitPartitions(txBody.GetOperations());
135139

136-
if (txBody.HasWriteId()) {
140+
if (txBody.HasWriteId() && HasWriteOperations) {
137141
WriteId = txBody.GetWriteId();
138142
} else {
139143
WriteId = Nothing();

ydb/core/persqueue/transaction.h

+2
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,8 @@ struct TDistributedTransaction {
109109
void BindMsgToPipe(ui64 tabletId, const IEventBase& event);
110110
void UnbindMsgsFromPipe(ui64 tabletId);
111111
const TVector<TSerializedMessage>& GetBindedMsgs(ui64 tabletId);
112+
113+
bool HasWriteOperations = false;
112114
};
113115

114116
}

ydb/core/persqueue/ut/pqtablet_ut.cpp

+16-8
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ namespace NHelpers {
2929

3030
struct TTxOperation {
3131
ui32 Partition;
32-
TString Consumer;
33-
ui64 Begin = 0;
34-
ui64 End = 0;
32+
TMaybe<TString> Consumer;
33+
TMaybe<ui64> Begin;
34+
TMaybe<ui64> End;
3535
TString Path;
36+
TMaybe<ui32> SupportivePartition;
3637
};
3738

3839
struct TConfigParams {
@@ -307,10 +308,15 @@ void TPQTabletFixture::SendProposeTransactionRequest(const TProposeTransactionPa
307308
for (auto& txOp : params.TxOps) {
308309
auto* operation = body->MutableOperations()->Add();
309310
operation->SetPartitionId(txOp.Partition);
310-
operation->SetBegin(txOp.Begin);
311-
operation->SetEnd(txOp.End);
312-
operation->SetConsumer(txOp.Consumer);
311+
if (txOp.Begin.Defined()) {
312+
operation->SetBegin(*txOp.Begin);
313+
operation->SetEnd(*txOp.End);
314+
operation->SetConsumer(*txOp.Consumer);
315+
}
313316
operation->SetPath(txOp.Path);
317+
if (txOp.SupportivePartition.Defined()) {
318+
operation->SetSupportivePartition(*txOp.SupportivePartition);
319+
}
314320

315321
partitions.insert(txOp.Partition);
316322
}
@@ -342,7 +348,9 @@ void TPQTabletFixture::WaitProposeTransactionResponse(const TProposeTransactionR
342348

343349
if (matcher.Status) {
344350
UNIT_ASSERT(event->Record.HasStatus());
345-
UNIT_ASSERT_EQUAL(*matcher.Status, event->Record.GetStatus());
351+
UNIT_ASSERT_EQUAL_C(*matcher.Status, event->Record.GetStatus(),
352+
"expected: " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(*matcher.Status) <<
353+
", received " << NKikimrPQ::TEvProposeTransactionResult_EStatus_Name(event->Record.GetStatus()));
346354
}
347355
}
348356

@@ -1292,7 +1300,7 @@ Y_UNIT_TEST_F(ProposeTx_Command_After_Propose, TPQTabletFixture)
12921300
.Status=NMsgBusProxy::MSTATUS_OK});
12931301

12941302
SendProposeTransactionRequest({.TxId=txId,
1295-
.TxOps={{.Partition=partitionId, .Path="/topic"}},
1303+
.TxOps={{.Partition=partitionId, .Path="/topic", .SupportivePartition=100'000}},
12961304
.WriteId=writeId});
12971305
WaitProposeTransactionResponse({.TxId=txId,
12981306
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});

0 commit comments

Comments
 (0)