Skip to content

LongTxService subscription id #6250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2594,7 +2594,9 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
auto ev = std::make_unique<TEvPersQueue::TEvProposeTransaction>();

if (writeId.Defined()) {
transaction.SetWriteId(*writeId);
auto* w = transaction.MutableWriteId();
w->SetNodeId(SelfId().NodeId());
w->SetKeyId(*writeId);
}
transaction.SetImmediate(ImmediateTx);

Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/session_actor/kqp_session_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1719,7 +1719,10 @@ class TKqpSessionActor : public TActorBootstrapped<TKqpSessionActor> {

if (replyTopicOperations) {
if (HasTopicWriteId()) {
response->MutableTopicOperations()->SetWriteId(GetTopicWriteId());
auto* w = response->MutableTopicOperations();
auto* writeId = w->MutableWriteId();
writeId->SetNodeId(SelfId().NodeId());
writeId->SetKeyId(GetTopicWriteId());
}
}

Expand Down
5 changes: 3 additions & 2 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/core/persqueue/key.h>
#include <ydb/core/persqueue/sourceid_info.h>
#include <ydb/core/persqueue/metering_sink.h>
#include <ydb/core/persqueue/write_id.h>
#include <ydb/core/tablet/tablet_counters.h>
#include <ydb/library/persqueue/topic_parser/topic_parser.h>

Expand Down Expand Up @@ -1158,12 +1159,12 @@ struct TEvPQ {
};

struct TEvTransactionCompleted : TEventLocal<TEvTransactionCompleted, EvTransactionCompleted> {
explicit TEvTransactionCompleted(TMaybe<ui64> writeId) :
explicit TEvTransactionCompleted(const TMaybe<NPQ::TWriteId>& writeId) :
WriteId(writeId)
{
}

TMaybe<ui64> WriteId;
TMaybe<NPQ::TWriteId> WriteId;
};
};

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3366,9 +3366,9 @@ void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransac
Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(tx.HasData());

TMaybe<ui64> writeId;
TMaybe<TWriteId> writeId;
if (tx.GetData().HasWriteId()) {
writeId = tx.GetData().GetWriteId();
writeId = GetWriteId(tx.GetData());
}

Replies.emplace_back(Tablet,
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/persqueue/partition_id.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#pragma once

#include <ydb/core/persqueue/write_id.h>

#include <util/generic/maybe.h>
#include <util/stream/output.h>
#include <util/system/types.h>
Expand All @@ -20,7 +22,7 @@ class TPartitionId {
{
}

TPartitionId(ui32 originalPartitionId, TMaybe<ui64> writeId, ui32 internalPartitionId) :
TPartitionId(ui32 originalPartitionId, const TMaybe<TWriteId>& writeId, ui32 internalPartitionId) :
OriginalPartitionId(originalPartitionId),
WriteId(writeId),
InternalPartitionId(internalPartitionId)
Expand Down Expand Up @@ -55,7 +57,7 @@ class TPartitionId {
}

ui32 OriginalPartitionId = 0;
TMaybe<ui64> WriteId;
TMaybe<TWriteId> WriteId;
ui32 InternalPartitionId = 0;
};

Expand Down
38 changes: 19 additions & 19 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ void TPersQueue::InitTxWrites(const NKikimrPQ::TTabletTxInfo& info,

for (size_t i = 0; i != info.TxWritesSize(); ++i) {
auto& txWrite = info.GetTxWrites(i);
ui64 writeId = txWrite.GetWriteId();
const TWriteId writeId = GetWriteId(txWrite);
ui32 partitionId = txWrite.GetOriginalPartitionId();
TPartitionId shadowPartitionId(partitionId, writeId, txWrite.GetInternalPartitionId());

Expand Down Expand Up @@ -2535,7 +2535,7 @@ const TPartitionInfo& TPersQueue::GetPartitionInfo(const NKikimrClient::TPersQue
{
Y_ABORT_UNLESS(req.HasWriteId());

ui64 writeId = req.GetWriteId();
const TWriteId writeId = GetWriteId(req);
ui32 originalPartitionId = req.GetPartition();

Y_ABORT_UNLESS(TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId));
Expand Down Expand Up @@ -2620,7 +2620,7 @@ void TPersQueue::HandleEventForSupportivePartition(const ui64 responseCookie,
return;
}

ui64 writeId = req.GetWriteId();
const TWriteId writeId = GetWriteId(req);
ui32 originalPartitionId = req.GetPartition();

if (TxWrites.contains(writeId) && TxWrites.at(writeId).Partitions.contains(originalPartitionId)) {
Expand Down Expand Up @@ -3132,7 +3132,7 @@ void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TAc
}

bool TPersQueue::CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
ui64 writeId) const
const TWriteId& writeId) const
{
TPartitionId partitionId(operation.GetPartitionId(),
writeId,
Expand All @@ -3147,7 +3147,7 @@ bool TPersQueue::CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBod
return true;
}

ui64 writeId = txBody.GetWriteId();
const TWriteId writeId = GetWriteId(txBody);
PQ_LOG_D("writeId=" << writeId);

for (auto& operation : txBody.GetOperations()) {
Expand Down Expand Up @@ -3417,18 +3417,18 @@ bool TPersQueue::CanProcessTxWrites() const
return !NewSupportivePartitions.empty();
}

void TPersQueue::SubscribeWriteId(ui64 writeId,
void TPersQueue::SubscribeWriteId(const TWriteId& writeId,
const TActorContext& ctx)
{
ctx.Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()),
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId, ctx.SelfID.NodeId()));
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
new NLongTxService::TEvLongTxService::TEvSubscribeLock(writeId.KeyId, writeId.NodeId));
}

void TPersQueue::UnsubscribeWriteId(ui64 writeId,
void TPersQueue::UnsubscribeWriteId(const TWriteId& writeId,
const TActorContext& ctx)
{
ctx.Send(NLongTxService::MakeLongTxServiceID(ctx.SelfID.NodeId()),
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId, ctx.SelfID.NodeId()));
ctx.Send(NLongTxService::MakeLongTxServiceID(writeId.NodeId),
new NLongTxService::TEvLongTxService::TEvUnsubscribeLock(writeId.KeyId, writeId.NodeId));
}

void TPersQueue::CreateSupportivePartitionActors(const TActorContext& ctx)
Expand Down Expand Up @@ -3534,7 +3534,7 @@ void TPersQueue::ProcessProposeTransactionQueue(const TActorContext& ctx)
TabletID());

if (tx.WriteId.Defined()) {
ui64 writeId = *tx.WriteId;
const TWriteId& writeId = *tx.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
writeInfo.TxId = tx.TxId;
Expand Down Expand Up @@ -3708,7 +3708,7 @@ void TPersQueue::SaveTxWrites(NKikimrPQ::TTabletTxInfo& info)
for (auto& [writeId, write] : TxWrites) {
for (auto [partitionId, shadowPartitionId] : write.Partitions) {
auto* txWrite = info.MutableTxWrites()->Add();
txWrite->SetWriteId(writeId);
SetWriteId(*txWrite, writeId);
txWrite->SetOriginalPartitionId(partitionId);
txWrite->SetInternalPartitionId(shadowPartitionId.InternalPartitionId);
}
Expand Down Expand Up @@ -3801,7 +3801,7 @@ TMaybe<TPartitionId> TPersQueue::FindPartitionId(const NKikimrPQ::TDataTransacti
ui32 partitionId = txBody.GetOperations(0).GetPartitionId();

if (txBody.HasWriteId() && hasWriteOperation(txBody)) {
ui64 writeId = txBody.GetWriteId();
const TWriteId writeId = GetWriteId(txBody);
if (!TxWrites.contains(writeId)) {
PQ_LOG_D("unknown WriteId " << writeId);
return Nothing();
Expand Down Expand Up @@ -3839,7 +3839,7 @@ void TPersQueue::SendEvTxCalcPredicateToPartitions(const TActorContext& ctx,
}

if (tx.WriteId.Defined()) {
ui64 writeId = *tx.WriteId;
const TWriteId& writeId = *tx.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
const TTxWriteInfo& writeInfo = TxWrites.at(writeId);

Expand Down Expand Up @@ -4509,7 +4509,7 @@ void TPersQueue::ProcessCheckPartitionStatusRequests(const TPartitionId& partiti
void TPersQueue::Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext&)
{
auto& record = ev->Get()->Record;
ui64 writeId = record.GetLockId();
const TWriteId writeId(record.GetLockNode(), record.GetLockId());

if (!TxWrites.contains(writeId)) {
// the transaction has already been completed
Expand Down Expand Up @@ -4547,7 +4547,7 @@ void TPersQueue::Handle(TEvPQ::TEvDeletePartitionDone::TPtr& ev, const TActorCon
{
auto* event = ev->Get();
Y_ABORT_UNLESS(event->PartitionId.WriteId.Defined());
const ui64 writeId = *event->PartitionId.WriteId;
const TWriteId& writeId = *event->PartitionId.WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
Y_ABORT_UNLESS(writeInfo.Partitions.contains(event->PartitionId.OriginalPartitionId));
Expand Down Expand Up @@ -4581,7 +4581,7 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
return;
}

const ui64 writeId = *event->WriteId;
const TWriteId& writeId = *event->WriteId;
Y_ABORT_UNLESS(TxWrites.contains(writeId));
TTxWriteInfo& writeInfo = TxWrites.at(writeId);
Y_ABORT_UNLESS(writeInfo.Partitions.size() == 1);
Expand All @@ -4592,7 +4592,7 @@ void TPersQueue::Handle(TEvPQ::TEvTransactionCompleted::TPtr& ev, const TActorCo
void TPersQueue::BeginDeleteTx(const TDistributedTransaction& tx)
{
Y_ABORT_UNLESS(tx.WriteId.Defined());
const ui64 writeId = *tx.WriteId;
const TWriteId& writeId = *tx.WriteId;
if (!TxWrites.contains(writeId)) {
// the transaction has already been completed
return;
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/persqueue/pq_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
bool Deleting = false;
};

THashMap<ui64, TTxWriteInfo> TxWrites;
THashMap<TWriteId, TTxWriteInfo> TxWrites;
bool TxWritesChanged = false;
ui32 NextSupportivePartitionId = 100'000;

Expand Down Expand Up @@ -486,8 +486,8 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void CreateSupportivePartitionActors(const TActorContext& ctx);
void CreateSupportivePartitionActor(const TPartitionId& shadowPartitionId, const TActorContext& ctx);
NKikimrPQ::TPQTabletConfig MakeSupportivePartitionConfig() const;
void SubscribeWriteId(ui64 writeId, const TActorContext& ctx);
void UnsubscribeWriteId(ui64 writeId, const TActorContext& ctx);
void SubscribeWriteId(const TWriteId& writeId, const TActorContext& ctx);
void UnsubscribeWriteId(const TWriteId& writeId, const TActorContext& ctx);

bool AllOriginalPartitionsInited() const;

Expand All @@ -499,7 +499,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
void BeginDeletePartitions(TTxWriteInfo& writeInfo);

bool CheckTxWriteOperation(const NKikimrPQ::TPartitionOperation& operation,
ui64 writeId) const;
const TWriteId& writeId) const;
bool CheckTxWriteOperations(const NKikimrPQ::TDataTransaction& txBody) const;
};

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/persqueue/transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ TDistributedTransaction::TDistributedTransaction(const NKikimrPQ::TTransaction&
SourceActor = ActorIdFromProto(tx.GetSourceActor());

if (tx.HasWriteId()) {
WriteId = tx.GetWriteId();
WriteId = GetWriteId(tx);
}
}

Expand Down Expand Up @@ -139,7 +139,7 @@ void TDistributedTransaction::OnProposeTransaction(const NKikimrPQ::TDataTransac
InitPartitions(txBody.GetOperations());

if (txBody.HasWriteId() && HasWriteOperations) {
WriteId = txBody.GetWriteId();
WriteId = GetWriteId(txBody);
} else {
WriteId = Nothing();
}
Expand Down Expand Up @@ -350,7 +350,7 @@ void TDistributedTransaction::AddCmdWriteDataTx(NKikimrPQ::TTransaction& tx)
tx.SetAggrPredicate(ParticipantsDecision == NKikimrTx::TReadSetData::DECISION_COMMIT);
}
if (WriteId.Defined()) {
tx.SetWriteId(*WriteId);
SetWriteId(tx, *WriteId);
}
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct TDistributedTransaction {
THashSet<ui64> Senders; // список отправителей TEvReadSet
THashSet<ui64> Receivers; // список получателей TEvReadSet
TVector<NKikimrPQ::TPartitionOperation> Operations;
TMaybe<ui64> WriteId;
TMaybe<TWriteId> WriteId;

EDecision SelfDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
EDecision ParticipantsDecision = NKikimrTx::TReadSetData::DECISION_UNKNOWN;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/ut/internals_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ Y_UNIT_TEST(StoreKeys) {
TKey keyOld(TKeyPrefix::TypeData, TPartitionId{9}, 8, 7, 6, 5, false);
UNIT_ASSERT_VALUES_EQUAL(keyOld.ToString(), "d0000000009_00000000000000000008_00007_0000000006_00005");

TKey keyNew(TKeyPrefix::TypeData, TPartitionId{5, 1, 9}, 8, 7, 6, 5, false);
TKey keyNew(TKeyPrefix::TypeData, TPartitionId{5, TWriteId{0, 1}, 9}, 8, 7, 6, 5, false);
UNIT_ASSERT_VALUES_EQUAL(keyNew.ToString(), "D0000000009_00000000000000000008_00007_0000000006_00005");

keyNew.SetType(TKeyPrefix::TypeInfo);
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ void CompareVectors(const TVector<ui64>& expected, const TIterable& actual) {
}

void TPartitionFixture::ShadowPartitionCountersTest(bool isFirstClass) {
const TPartitionId partition{0, 1111, 123};
const TPartitionId partition{0, TWriteId{0, 1111}, 123};
const ui64 begin = 0;
const ui64 end = 10;
const TString session = "session";
Expand Down Expand Up @@ -2320,7 +2320,7 @@ Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {
Ctx->Runtime->GetAppData().PQConfig.MutableQuotingConfig()->SetEnableQuoting(false);

CreatePartition({
.Partition=TPartitionId{2, 10, 100'001},
.Partition=TPartitionId{2, TWriteId{0, 10}, 100'001},
//
// partition configuration
//
Expand Down Expand Up @@ -2389,7 +2389,7 @@ Y_UNIT_TEST_F(GetPartitionWriteInfoSuccess, TPartitionFixture) {

Y_UNIT_TEST_F(GetPartitionWriteInfoError, TPartitionFixture) {
CreatePartition({
.Partition=TPartitionId{2, 10, 100'001},
.Partition=TPartitionId{2, TWriteId{0, 10}, 100'001},
.Begin=0, .End=10,
//
// partition configuration
Expand Down Expand Up @@ -2444,7 +2444,7 @@ Y_UNIT_TEST_F(ShadowPartitionCountersFirstClass, TPartitionFixture) {
}

Y_UNIT_TEST_F(ShadowPartitionCountersRestore, TPartitionFixture) {
const TPartitionId partitionId{0, 1111, 123};
const TPartitionId partitionId{0, TWriteId{0, 1111}, 123};
const ui64 begin = 0;
const ui64 end = 10;
const TString session = "session";
Expand Down Expand Up @@ -3126,7 +3126,7 @@ Y_UNIT_TEST_F(TestBatchingWithProposeConfig, TPartitionTxTestHelper) {

Y_UNIT_TEST_F(GetUsedStorage, TPartitionFixture) {
auto* actor = CreatePartition({
.Partition=TPartitionId{2, 10, 100'001},
.Partition=TPartitionId{2, TWriteId{0, 10}, 100'001},
.Begin=0, .End=10,
//
// partition configuration
Expand Down
Loading
Loading