Skip to content

Completion of the transaction #4407

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ ydb/public/sdk/cpp/client/ydb_topic/ut BasicUsage.WriteRead
ydb/public/sdk/cpp/client/ydb_topic/ut TSettingsValidation.TestDifferentDedupParams
ydb/public/sdk/cpp/client/ydb_topic/ut [0/10]*
ydb/public/sdk/cpp/client/ydb_topic/ut [6/10]*
ydb/public/sdk/cpp/client/ydb_topic/ut TxUsage::WriteToTopic_Demo_*
ydb/services/datastreams/ut DataStreams.TestGetRecordsStreamWithSingleShard
ydb/services/datastreams/ut DataStreams.TestPutRecordsWithRead
ydb/services/datastreams/ut DataStreams.TestReservedConsumersMetering
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/kqp/executer_actor/kqp_data_executer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2304,7 +2304,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
!topicTxs.empty());

if (!locksMap.empty() || VolatileTx ||
Request.TopicOperations.HasReadOperations())
Request.TopicOperations.HasReadOperations() || Request.TopicOperations.HasWriteOperations())
{
YQL_ENSURE(Request.LocksOp == ELocksOp::Commit || Request.LocksOp == ELocksOp::Rollback || VolatileTx);

Expand Down Expand Up @@ -2352,6 +2352,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}

if (auto tabletIds = Request.TopicOperations.GetReceivingTabletIds()) {
sendingShardsSet.insert(tabletIds.begin(), tabletIds.end());
receivingShardsSet.insert(tabletIds.begin(), tabletIds.end());
}

Expand Down Expand Up @@ -2398,6 +2399,7 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
}
}


// Encode sending/receiving shards in tx bodies
if (needCommit) {
NProtoBuf::RepeatedField<ui64> sendingShards(sendingShardsSet.begin(), sendingShardsSet.end());
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/kqp/topics/kqp_topics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,7 @@ TSet<ui64> TTopicOperations::GetReceivingTabletIds() const
{
TSet<ui64> ids;
for (auto& [_, operations] : Operations_) {
if (operations.HasWriteOperations()) {
ids.insert(operations.GetTabletId());
}
ids.insert(operations.GetTabletId());
}
return ids;
}
Expand Down
32 changes: 30 additions & 2 deletions ydb/core/persqueue/events/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,19 @@ struct TEvPQ {
EvPartitionScaleStatusChanged,
EvPartitionScaleRequestDone,
EvBalanceConsumer,
EvDeletePartition,
EvDeletePartitionDone,
EvTransactionCompleted,
EvEnd
};

struct TEvHandleWriteResponse : TEventLocal<TEvHandleWriteResponse, EvHandleWriteResponse> {
TEvHandleWriteResponse()
{}
explicit TEvHandleWriteResponse(ui64 cookie) :
Cookie(cookie)
{
}

ui64 Cookie = 0;
};

struct TEvWrite : public TEventLocal<TEvWrite, EvWrite> {
Expand Down Expand Up @@ -1132,6 +1139,27 @@ struct TEvPQ {

TString ConsumerName;
};

struct TEvDeletePartition : TEventLocal<TEvDeletePartition, EvDeletePartition> {
};

struct TEvDeletePartitionDone : TEventLocal<TEvDeletePartitionDone, EvDeletePartitionDone> {
explicit TEvDeletePartitionDone(const NPQ::TPartitionId& partitionId) :
PartitionId(partitionId)
{
}

NPQ::TPartitionId PartitionId;
};

struct TEvTransactionCompleted : TEventLocal<TEvTransactionCompleted, EvTransactionCompleted> {
explicit TEvTransactionCompleted(TMaybe<ui64> writeId) :
WriteId(writeId)
{
}

TMaybe<ui64> WriteId;
};
};

} //NKikimr
121 changes: 112 additions & 9 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,6 @@ void TPartition::AddMetaKey(TEvKeyValue::TEvRequest* request) {
write->SetKey(ikey.Data(), ikey.Size());
write->SetValue(out.c_str(), out.size());
write->SetStorageChannel(NKikimrClient::TKeyValueRequest::INLINE);

}

bool TPartition::CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx) {
Expand Down Expand Up @@ -492,7 +491,8 @@ void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContex
}
}

void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx) {
void TPartition::DestroyActor(const TActorContext& ctx)
{
// Reply to all outstanding requests in order to destroy corresponding actors

TStringBuilder ss;
Expand Down Expand Up @@ -523,12 +523,19 @@ void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
UsersInfoStorage->Clear(ctx);
}

Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
Send(WriteQuotaTrackerActor, new TEvents::TEvPoisonPill());
if (!IsSupportive()) {
Send(ReadQuotaTrackerActor, new TEvents::TEvPoisonPill());
Send(WriteQuotaTrackerActor, new TEvents::TEvPoisonPill());
}

Die(ctx);
}

void TPartition::Handle(TEvents::TEvPoisonPill::TPtr&, const TActorContext& ctx)
{
DestroyActor(ctx);
}

bool CheckDiskStatus(const TStorageStatusFlags status) {
return !status.Check(NKikimrBlobStorage::StatusDiskSpaceYellowStop);
}
Expand Down Expand Up @@ -1020,6 +1027,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoResponse::TPtr& ev, const TActorCo
--ImmediateTxCount;

ProcessImmediateTx(tx, predicate, ctx);
ScheduleTransactionCompleted(tx);
TxInProgress = false;
ContinueProcessTxsAndUserActs(ctx);

Expand Down Expand Up @@ -1056,6 +1064,7 @@ void TPartition::Handle(TEvPQ::TEvGetWriteInfoError::TPtr& ev, const TActorConte
NKikimrPQ::TEvProposeTransactionResult::ABORTED,
NKikimrPQ::TError::BAD_REQUEST,
ev->Get()->Message);
ScheduleTransactionCompleted(t->Record);

UserActionAndTransactionEvents.pop_front();
--ImmediateTxCount;
Expand Down Expand Up @@ -1574,13 +1583,11 @@ void TPartition::Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext&

const auto writeDuration = ctx.Now() - WriteStartTime;
const auto minWriteLatency = TDuration::MilliSeconds(AppData(ctx)->PQConfig.GetMinWriteLatencyMs());

if (writeDuration > minWriteLatency) {
KVWriteInProgress = false;
OnProcessTxsAndUserActsWriteComplete(ctx);
HandleWriteResponse(ctx);
ProcessTxsAndUserActs(ctx);
OnHandleWriteResponse(ctx);
} else {
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse());
ctx.Schedule(minWriteLatency - writeDuration, new TEvPQ::TEvHandleWriteResponse(response.GetCookie()));
}
}

Expand Down Expand Up @@ -1658,6 +1665,20 @@ void TPartition::ContinueProcessTxsAndUserActs(const TActorContext& ctx)

THolder<TEvKeyValue::TEvRequest> request(new TEvKeyValue::TEvRequest);

if (DeletePartitionState == DELETION_INITED) {
ScheduleNegativeReplies();
ScheduleDeletePartitionDone();

AddCmdDeleteRangeForAllKeys(*request);

ctx.Send(Tablet, request.Release());

DeletePartitionState = DELETION_IN_PROCESS;
KVWriteInProgress = true;

return;
}

HaveWriteMsg = false;

if (UserActionAndTransactionEvents.empty()) {
Expand Down Expand Up @@ -2629,6 +2650,12 @@ void TPartition::SchedulePartitionConfigChanged()
MakeHolder<TEvPQ::TEvPartitionConfigChanged>(Partition).Release());
}

void TPartition::ScheduleDeletePartitionDone()
{
Replies.emplace_back(Tablet,
MakeHolder<TEvPQ::TEvDeletePartitionDone>(Partition).Release());
}

void TPartition::AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated)
{
Expand Down Expand Up @@ -3013,6 +3040,82 @@ void TPartition::Handle(TEvPQ::TEvCheckPartitionStatusRequest::TPtr& ev, const T
Send(ev->Sender, response.Release());
}

void TPartition::HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext&)
{
PendingEvents.emplace_back(ev->ReleaseBase().Release());
}

void TPartition::Handle(TEvPQ::TEvDeletePartition::TPtr&, const TActorContext& ctx)
{
Y_ABORT_UNLESS(IsSupportive());
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);

DeletePartitionState = DELETION_INITED;

ProcessTxsAndUserActs(ctx);
}

void TPartition::ScheduleNegativeReplies()
{
for (auto& event : UserActionAndTransactionEvents) {
auto visitor = [this](auto& event) {
using T = std::decay_t<decltype(event)>;
if constexpr (TIsSimpleSharedPtr<T>::value) {
return this->ScheduleNegativeReply(*event);
} else {
return this->ScheduleNegativeReply(event);
}
};

std::visit(visitor, event);
}

UserActionAndTransactionEvents.clear();
}

void TPartition::AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request)
{
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeInfo, Partition);
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeData, Partition);
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTmpData, Partition);
NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeMeta, Partition);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove TypeTxMeta

NPQ::AddCmdDeleteRange(request, TKeyPrefix::TypeTxMeta, Partition);
}

void TPartition::ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo&)
{
Y_ABORT("The supportive partition does not accept read operations");
}

void TPartition::ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction&)
{
Y_ABORT("The supportive partition does not accept immediate transactions");
}

void TPartition::ScheduleNegativeReply(const TTransaction&)
{
Y_ABORT("The supportive partition does not accept distribute transactions");
}

void TPartition::ScheduleNegativeReply(const TMessage& msg)
{
ScheduleReplyError(msg.GetCookie(), NPersQueue::NErrorCode::ERROR, "The transaction is completed");
}

void TPartition::ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransaction& tx)
{
Y_ABORT_UNLESS(tx.GetTxBodyCase() == NKikimrPQ::TEvProposeTransaction::kData);
Y_ABORT_UNLESS(tx.HasData());

TMaybe<ui64> writeId;
if (tx.GetData().HasWriteId()) {
writeId = tx.GetData().GetWriteId();
}

Replies.emplace_back(Tablet,
MakeHolder<TEvPQ::TEvTransactionCompleted>(writeId).Release());
}

const NKikimrPQ::TPQTabletConfig::TPartition* TPartition::GetPartitionConfig(const NKikimrPQ::TPQTabletConfig& config)
{
return NPQ::GetPartitionConfig(config, Partition.OriginalPartitionId);
Expand Down
28 changes: 28 additions & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ class TPartition : public TActorBootstrapped<TPartition> {
static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
static const ui32 SCALE_REQUEST_REPEAT_MIN_SECONDS = 60;

enum EDeletePartitionState {
DELETION_NOT_INITED = 0,
DELETION_INITED = 1,
DELETION_IN_PROCESS = 2,
};

private:
struct THasDataReq;
struct THasDataDeadline;
Expand Down Expand Up @@ -374,6 +380,9 @@ class TPartition : public TActorBootstrapped<TPartition> {

void CommitWriteOperations(const TActorContext& ctx);

void HandleOnInit(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);
void Handle(TEvPQ::TEvDeletePartition::TPtr& ev, const TActorContext& ctx);

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
Expand Down Expand Up @@ -457,6 +466,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
HFuncTraced(TEvPQ::TEvGetWriteInfoResponse, Handle);
HFuncTraced(TEvPQ::TEvGetWriteInfoError, Handle);
HFuncTraced(TEvPQ::TEvDeletePartition, HandleOnInit);
default:
if (!Initializer.Handle(ev)) {
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
Expand Down Expand Up @@ -519,6 +529,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
HFuncTraced(TEvPQ::TEvProcessChangeOwnerRequests, Handle);
HFuncTraced(TEvPQ::TEvDeletePartition, Handle);
default:
ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
break;
Expand Down Expand Up @@ -795,6 +806,23 @@ class TPartition : public TActorBootstrapped<TPartition> {
bool ClosedInternalPartition = false;

bool IsSupportive() const;

EDeletePartitionState DeletePartitionState = DELETION_NOT_INITED;

void ScheduleDeletePartitionDone();
void ScheduleNegativeReplies();
void AddCmdDeleteRangeForAllKeys(TEvKeyValue::TEvRequest& request);

void ScheduleNegativeReply(const TEvPQ::TEvSetClientInfo& event);
void ScheduleNegativeReply(const TEvPersQueue::TEvProposeTransaction& event);
void ScheduleNegativeReply(const TTransaction& tx);
void ScheduleNegativeReply(const TMessage& msg);

void OnHandleWriteResponse(const TActorContext& ctx);

void ScheduleTransactionCompleted(const NKikimrPQ::TEvProposeTransaction& tx);

void DestroyActor(const TActorContext& ctx);
};

} // namespace NKikimr::NPQ
2 changes: 2 additions & 0 deletions ydb/core/persqueue/partition_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1017,7 +1017,9 @@ void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, co
auto range = del->MutableRange();

range->SetFrom(from.Data(), from.Size());
range->SetIncludeFrom(true);
range->SetTo(to.Data(), to.Size());
range->SetIncludeTo(false);
}

static void RequestRange(const TActorContext& ctx, const TActorId& dst, const TPartitionId& partition,
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/persqueue/partition_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,5 +132,8 @@ void AddCheckDiskRequest(TEvKeyValue::TEvRequest *request, ui32 numChannels);
NKikimrClient::TKeyValueRequest::EStorageChannel GetChannel(ui32 i);
bool IsQuotingEnabled(const NKikimrPQ::TPQConfig& pqConfig,
bool isLocalDC);
void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request,
TKeyPrefix::EType c,
const TPartitionId& partitionId);

} // namespace NKikimr::NPQ
16 changes: 12 additions & 4 deletions ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ static const ui32 MAX_INLINE_SIZE = 1000;

static constexpr NPersQueue::NErrorCode::EErrorCode InactivePartitionErrorCode = NPersQueue::NErrorCode::WRITE_ERROR_PARTITION_INACTIVE;

void AddCmdDeleteRange(TEvKeyValue::TEvRequest& request, TKeyPrefix::EType c, const TPartitionId& partitionId);

void TPartition::ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& cookie, ui64 seqNo) {
LOG_DEBUG_S(ctx, NKikimrServices::PERSQUEUE, "TPartition::ReplyOwnerOk. Partition: " << Partition);

Expand Down Expand Up @@ -439,12 +437,22 @@ void TPartition::SyncMemoryStateWithKVState(const TActorContext& ctx) {
UpdateUserInfoEndOffset(ctx.Now());
}

void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx) {
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
void TPartition::OnHandleWriteResponse(const TActorContext& ctx)
{
KVWriteInProgress = false;
OnProcessTxsAndUserActsWriteComplete(ctx);
HandleWriteResponse(ctx);
ProcessTxsAndUserActs(ctx);

if (DeletePartitionState == DELETION_IN_PROCESS) {
DestroyActor(ctx);
}
}

void TPartition::Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx)
{
PQ_LOG_T("TPartition::HandleOnWrite TEvHandleWriteResponse.");
OnHandleWriteResponse(ctx);
}

void TPartition::UpdateAfterWriteCounters(bool writeComplete) {
Expand Down
Loading
Loading