Skip to content

Commit c20aff2

Browse files
authored
OltpSink: reattach shard (#14863)
1 parent f6c9ccb commit c20aff2

File tree

7 files changed

+217
-49
lines changed

7 files changed

+217
-49
lines changed

ydb/core/kqp/common/kqp_tx_manager.cpp

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,29 @@ class TKqpTransactionManager : public IKqpTransactionManager {
202202
return locks;
203203
}
204204

205+
void Reattached(ui64 shardId) override {
206+
auto& shardInfo = ShardsInfo.at(shardId);
207+
shardInfo.Reattaching = false;
208+
}
209+
210+
void SetRestarting(ui64 shardId) override {
211+
auto& shardInfo = ShardsInfo.at(shardId);
212+
shardInfo.Restarting = true;
213+
}
214+
215+
bool ShouldReattach(ui64 shardId, TInstant now) override {
216+
auto& shardInfo = ShardsInfo.at(shardId);
217+
if (!std::exchange(shardInfo.Restarting, false) && !shardInfo.Reattaching) {
218+
return false;
219+
}
220+
return ::NKikimr::NKqp::ShouldReattach(now, shardInfo.ReattachState.ReattachInfo);;
221+
}
222+
223+
TReattachState& GetReattachState(ui64 shardId) override {
224+
auto& shardInfo = ShardsInfo.at(shardId);
225+
return shardInfo.ReattachState;
226+
}
227+
205228
bool IsTxPrepared() const override {
206229
for (const auto& [_, shardInfo] : ShardsInfo) {
207230
if (shardInfo.State != EShardState::PREPARED) {
@@ -454,6 +477,10 @@ class TKqpTransactionManager : public IKqpTransactionManager {
454477

455478
bool IsOlap = false;
456479
THashSet<TStringBuf> Pathes;
480+
481+
bool Restarting = false;
482+
bool Reattaching = false;
483+
TReattachState ReattachState;
457484
};
458485

459486
void MakeLocksIssue(const TShardInfo& shardInfo) {

ydb/core/kqp/common/kqp_tx_manager.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22

33
#include <ydb/core/kqp/common/kqp_yql.h>
4+
#include <ydb/core/kqp/common/simple/reattach.h>
45
#include <ydb/core/kqp/gateway/kqp_gateway.h>
56
#include <ydb/core/kqp/provider/yql_kikimr_provider.h>
67
#include <ydb/core/util/ulid.h>
@@ -43,11 +44,21 @@ class IKqpTransactionManager {
4344
virtual bool AddLock(ui64 shardId, const NKikimrDataEvents::TLock& lock) = 0;
4445

4546
virtual void BreakLock(ui64 shardId) = 0;
47+
virtual TVector<NKikimrDataEvents::TLock> GetLocks() const = 0;
48+
virtual TVector<NKikimrDataEvents::TLock> GetLocks(ui64 shardId) const = 0;
4649

4750
virtual TTableInfo GetShardTableInfo(ui64 shardId) const = 0;
4851

49-
virtual TVector<NKikimrDataEvents::TLock> GetLocks() const = 0;
50-
virtual TVector<NKikimrDataEvents::TLock> GetLocks(ui64 shardId) const = 0;
52+
virtual bool ShouldReattach(ui64 shardId, TInstant now) = 0;
53+
virtual void Reattached(ui64 shardId) = 0;
54+
virtual void SetRestarting(ui64 shardId) = 0;
55+
56+
struct TReattachState {
57+
TReattachInfo ReattachInfo;
58+
ui64 Cookie = 0;
59+
};
60+
61+
virtual TReattachState& GetReattachState(ui64 shardId) = 0;
5162

5263
virtual EShardState GetState(ui64 shardId) const = 0;
5364
virtual void SetError(ui64 shardId) = 0;
@@ -57,11 +68,7 @@ class IKqpTransactionManager {
5768

5869
virtual void SetTopicOperations(NTopic::TTopicOperations&& topicOperations) = 0;
5970
virtual const NTopic::TTopicOperations& GetTopicOperations() const = 0;
60-
61-
virtual void SetAllowVolatile(bool allowVolatile) = 0;
62-
6371
virtual void BuildTopicTxs(NTopic::TTopicOperationTransactions& txs) = 0;
64-
6572
virtual bool HasTopics() const = 0;
6673

6774
virtual bool IsTxPrepared() const = 0;
@@ -74,6 +81,7 @@ class IKqpTransactionManager {
7481
virtual bool IsEmpty() const = 0;
7582
virtual bool HasLocks() const = 0;
7683

84+
virtual void SetAllowVolatile(bool allowVolatile) = 0;
7785
virtual bool IsVolatile() const = 0;
7886

7987
virtual bool HasSnapshot() const = 0;
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#include "reattach.h"
2+
3+
#include <ydb/core/base/appdata_fwd.h>
4+
#include <library/cpp/random_provider/random_provider.h>
5+
6+
namespace NKikimr::NKqp {
7+
8+
namespace {
9+
static constexpr TDuration MinReattachDelay = TDuration::MilliSeconds(10);
10+
static constexpr TDuration MaxReattachDelay = TDuration::MilliSeconds(100);
11+
static constexpr TDuration MaxReattachDuration = TDuration::Seconds(4);
12+
}
13+
14+
bool ShouldReattach(TInstant now, TReattachInfo& reattachInfo) {
15+
if (!reattachInfo.Reattaching) {
16+
reattachInfo.Deadline = now + MaxReattachDuration;
17+
reattachInfo.Delay = TDuration::Zero();
18+
reattachInfo.Reattaching = true;
19+
return true;
20+
}
21+
22+
TDuration left = reattachInfo.Deadline - now;
23+
if (!left) {
24+
reattachInfo.Reattaching = false;
25+
return false;
26+
}
27+
28+
reattachInfo.Delay *= 2.0;
29+
if (reattachInfo.Delay < MinReattachDelay) {
30+
reattachInfo.Delay = MinReattachDelay;
31+
} else if (reattachInfo.Delay > MaxReattachDelay) {
32+
reattachInfo.Delay = MaxReattachDelay;
33+
}
34+
35+
// Add ±10% jitter
36+
reattachInfo.Delay *= 0.9 + 0.2 * TAppData::RandomProvider->GenRandReal4();
37+
if (reattachInfo.Delay > left) {
38+
reattachInfo.Delay = left;
39+
}
40+
41+
return true;
42+
}
43+
44+
}

ydb/core/kqp/common/simple/reattach.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
#pragma once
2+
3+
#include <util/datetime/base.h>
4+
5+
namespace NKikimr::NKqp {
6+
7+
struct TReattachInfo {
8+
TDuration Delay;
9+
TInstant Deadline;
10+
bool Reattaching = false;
11+
};
12+
13+
bool ShouldReattach(TInstant now, TReattachInfo& reattachInfo);
14+
15+
}

ydb/core/kqp/common/simple/ya.make

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@ LIBRARY()
22

33
SRCS(
44
helpers.cpp
5+
kqp_event_ids.cpp
56
query_id.cpp
6-
settings.cpp
7+
reattach.cpp
78
services.cpp
8-
kqp_event_ids.cpp
9+
settings.cpp
910
temp_tables.cpp
1011
)
1112

ydb/core/kqp/executer_actor/kqp_data_executer.cpp

Lines changed: 11 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@
1111
#include <ydb/core/client/minikql_compile/db_key_resolver.h>
1212
#include <ydb/core/kqp/common/buffer/events.h>
1313
#include <ydb/core/kqp/common/kqp_data_integrity_trails.h>
14-
#include <ydb/core/kqp/common/kqp_yql.h>
1514
#include <ydb/core/kqp/common/kqp_tx_manager.h>
15+
#include <ydb/core/kqp/common/kqp_yql.h>
16+
#include <ydb/core/kqp/common/simple/reattach.h>
1617
#include <ydb/library/wilson_ids/wilson.h>
1718
#include <ydb/core/kqp/compute_actor/kqp_compute_actor.h>
1819
#include <ydb/core/kqp/common/kqp_tx.h>
@@ -41,55 +42,24 @@ using namespace NLongTxService;
4142

4243
namespace {
4344

44-
static constexpr TDuration MinReattachDelay = TDuration::MilliSeconds(10);
45-
static constexpr TDuration MaxReattachDelay = TDuration::MilliSeconds(100);
46-
static constexpr TDuration MaxReattachDuration = TDuration::Seconds(4);
4745
static constexpr ui32 ReplySizeLimit = 48 * 1024 * 1024; // 48 MB
4846

4947
class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Data> {
5048
using TBase = TKqpExecuterBase<TKqpDataExecuter, EExecType::Data>;
5149
using TKqpSnapshot = IKqpGateway::TKqpSnapshot;
5250

5351
struct TReattachState {
54-
TDuration Delay;
55-
TInstant Deadline;
52+
TReattachInfo ReattachInfo;
5653
ui64 Cookie = 0;
57-
bool Reattaching = false;
5854

5955
bool ShouldReattach(TInstant now) {
6056
++Cookie; // invalidate any previous cookie
6157

62-
if (!Reattaching) {
63-
Deadline = now + MaxReattachDuration;
64-
Delay = TDuration::Zero();
65-
Reattaching = true;
66-
return true;
67-
}
68-
69-
TDuration left = Deadline - now;
70-
if (!left) {
71-
Reattaching = false;
72-
return false;
73-
}
74-
75-
Delay *= 2.0;
76-
if (Delay < MinReattachDelay) {
77-
Delay = MinReattachDelay;
78-
} else if (Delay > MaxReattachDelay) {
79-
Delay = MaxReattachDelay;
80-
}
81-
82-
// Add ±10% jitter
83-
Delay *= 0.9 + 0.2 * TAppData::RandomProvider->GenRandReal4();
84-
if (Delay > left) {
85-
Delay = left;
86-
}
87-
88-
return true;
58+
return ::NKikimr::NKqp::ShouldReattach(now, ReattachInfo);
8959
}
9060

9161
void Reattached() {
92-
Reattaching = false;
62+
ReattachInfo.Reattaching = false;
9363
}
9464
};
9565

@@ -729,13 +699,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
729699
case TShardState::EState::Prepared: {
730700
// Disconnected while waiting for other shards to prepare
731701

732-
if ((wasRestarting || shardState->ReattachState.Reattaching) &&
702+
if ((wasRestarting || shardState->ReattachState.ReattachInfo.Reattaching) &&
733703
shardState->ReattachState.ShouldReattach(TlsActivationContext->Now()))
734704
{
735705
LOG_N("Shard " << msg->TabletId << " delivery problem (already prepared, reattaching in "
736-
<< shardState->ReattachState.Delay << ")");
706+
<< shardState->ReattachState.ReattachInfo.Delay << ")");
737707

738-
Schedule(shardState->ReattachState.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId));
708+
Schedule(shardState->ReattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId));
739709
++shardState->RestartCount;
740710
return;
741711
}
@@ -1542,13 +1512,13 @@ class TKqpDataExecuter : public TKqpExecuterBase<TKqpDataExecuter, EExecType::Da
15421512
// Proceed with query processing
15431513
[[fallthrough]];
15441514
case TShardState::EState::Executing: {
1545-
if ((wasRestarting || shardState->ReattachState.Reattaching) &&
1515+
if ((wasRestarting || shardState->ReattachState.ReattachInfo.Reattaching) &&
15461516
shardState->ReattachState.ShouldReattach(TlsActivationContext->Now()))
15471517
{
15481518
LOG_N("Shard " << msg->TabletId << " lost pipe while waiting for reply (reattaching in "
1549-
<< shardState->ReattachState.Delay << ")");
1519+
<< shardState->ReattachState.ReattachInfo.Delay << ")");
15501520

1551-
Schedule(shardState->ReattachState.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId));
1521+
Schedule(shardState->ReattachState.ReattachInfo.Delay, new TEvPrivate::TEvReattachToShard(msg->TabletId));
15521522
++shardState->RestartCount;
15531523
return;
15541524
}

0 commit comments

Comments
 (0)