Skip to content

Commit d22f086

Browse files
authored
Use lock-free Bucket in CostTracker with limited underflow (#8637)
1 parent 0bfba86 commit d22f086

File tree

8 files changed

+235
-12
lines changed

8 files changed

+235
-12
lines changed

ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,11 @@ TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::E
4747
: GroupType(groupType)
4848
, CostCounters(counters->GetSubgroup("subsystem", "advancedCost"))
4949
, MonGroup(std::make_shared<NMonGroup::TCostTrackerGroup>(CostCounters))
50-
, Bucket(&DiskTimeAvailable, &BucketCapacity, nullptr, nullptr, nullptr, nullptr, true)
50+
, Bucket(BucketUpperLimit, BucketLowerLimit, DiskTimeAvailable)
5151
, BurstThresholdNs(costMetricsParameters.BurstThresholdNs)
5252
, DiskTimeAvailableScale(costMetricsParameters.DiskTimeAvailableScale)
5353
{
54-
AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs);
54+
BucketUpperLimit.store(BurstThresholdNs * GetDiskTimeAvailableScale());
5555
BurstDetector.Initialize(CostCounters, "BurstDetector");
5656
switch (GroupType.GetErasure()) {
5757
case TBlobStorageGroupType::ErasureMirror3dc:

ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "vdisk_performance_params.h"
99

1010
#include <library/cpp/bucket_quoter/bucket_quoter.h>
11+
#include <ydb/library/lockfree_bucket/lockfree_bucket.h>
1112
#include <util/system/compiler.h>
1213
#include <ydb/core/base/blobstorage.h>
1314
#include <ydb/core/blobstorage/base/blobstorage_events.h>
@@ -315,12 +316,14 @@ class TBsCostTracker {
315316
TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters;
316317
std::shared_ptr<NMonGroup::TCostTrackerGroup> MonGroup;
317318

318-
TAtomic BucketCapacity = 1'000'000'000; // 10^9 nsec
319-
TAtomic DiskTimeAvailable = 1'000'000'000;
320-
TBucketQuoter<i64, TSpinLock, TAppDataTimerMs<TInstantTimerMs>> Bucket;
319+
const double BucketRelativeMinimum = 2;
320+
std::atomic<i64> BucketUpperLimit = 1'000'000'000; // 10^9 nsec
321+
std::atomic<i64> BucketLowerLimit = 1'000'000'000 * -BucketRelativeMinimum;
322+
std::atomic<ui64> DiskTimeAvailable = 1'000'000'000;
323+
324+
TLockFreeBucket<TAppDataTimerMs<TInstantTimerMs>> Bucket;
321325
TLight BurstDetector;
322326
std::atomic<ui64> SeqnoBurstDetector = 0;
323-
static constexpr ui32 ConcurrentHugeRequestsAllowed = 3;
324327

325328
TMemorizableControlWrapper BurstThresholdNs;
326329
TMemorizableControlWrapper DiskTimeAvailableScale;
@@ -349,15 +352,16 @@ class TBsCostTracker {
349352
}
350353

351354
void CountRequest(ui64 cost) {
352-
AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs.Update(TAppData::TimeProvider->Now()));
353-
Bucket.UseAndFill(cost);
354-
BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
355+
i64 bucketCapacity = GetDiskTimeAvailableScale() * BurstThresholdNs.Update(TAppData::TimeProvider->Now());
356+
BucketUpperLimit.store(bucketCapacity);
357+
BucketLowerLimit.store(bucketCapacity * -BucketRelativeMinimum);
358+
Bucket.FillAndTake(cost);
359+
BurstDetector.Set(Bucket.IsEmpty(), SeqnoBurstDetector.fetch_add(1));
355360
}
356361

357362
void SetTimeAvailable(ui64 diskTimeAvailableNSec) {
358363
ui64 diskTimeAvailable = diskTimeAvailableNSec * GetDiskTimeAvailableScale();
359-
360-
AtomicSet(DiskTimeAvailable, diskTimeAvailable);
364+
DiskTimeAvailable.store(diskTimeAvailable);
361365
MonGroup->DiskTimeAvailableCtr() = diskTimeAvailable;
362366
}
363367

@@ -408,7 +412,7 @@ class TBsCostTracker {
408412
}
409413

410414
void CountPDiskResponse() {
411-
BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
415+
BurstDetector.Set(Bucket.IsEmpty(), SeqnoBurstDetector.fetch_add(1));
412416
}
413417

414418
private:
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
#include "lockfree_bucket.h"
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#pragma once
2+
3+
#include <atomic>
4+
#include <limits>
5+
6+
#include <util/datetime/base.h>
7+
8+
template<class TTimer>
9+
class TLockFreeBucket {
10+
public:
11+
TLockFreeBucket(std::atomic<i64>& maxTokens, std::atomic<i64>& minTokens, std::atomic<ui64>& inflowPerSecond)
12+
: MaxTokens(maxTokens)
13+
, MinTokens(minTokens)
14+
, InflowPerSecond(inflowPerSecond)
15+
, Tokens(maxTokens.load())
16+
{
17+
Y_DEBUG_ABORT_UNLESS(maxTokens > 0);
18+
Y_DEBUG_ABORT_UNLESS(minTokens < 0);
19+
}
20+
21+
bool IsEmpty() {
22+
FillBucket();
23+
return Tokens.load() <= 0;
24+
}
25+
26+
void FillAndTake(i64 tokens) {
27+
FillBucket();
28+
TakeTokens(tokens);
29+
}
30+
31+
private:
32+
void FillBucket() {
33+
TTime prev;
34+
TTime now;
35+
for (prev = LastUpdate.load(), now = TTimer::Now(); !LastUpdate.compare_exchange_strong(prev, now); ) {}
36+
37+
ui64 rawInflow = InflowPerSecond.load() * TTimer::Duration(prev, now);
38+
if (rawInflow >= TTimer::Resolution) {
39+
Tokens.fetch_add(rawInflow / TTimer::Resolution);
40+
for (i64 tokens = Tokens.load(), maxTokens = MaxTokens.load(); tokens > maxTokens; ) {
41+
if (Tokens.compare_exchange_strong(tokens, maxTokens)) {
42+
break;
43+
}
44+
}
45+
}
46+
}
47+
48+
void TakeTokens(i64 tokens) {
49+
Tokens.fetch_sub(tokens);
50+
for (i64 tokens = Tokens.load(), minTokens = MinTokens.load(); tokens < minTokens; ) {
51+
if (Tokens.compare_exchange_strong(tokens, minTokens)) {
52+
break;
53+
}
54+
}
55+
}
56+
57+
private:
58+
using TTime = typename TTimer::TTime;
59+
60+
std::atomic<i64>& MaxTokens;
61+
std::atomic<i64>& MinTokens;
62+
std::atomic<ui64>& InflowPerSecond;
63+
64+
std::atomic<i64> Tokens;
65+
std::atomic<TTime> LastUpdate;
66+
};
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
#include <library/cpp/testing/unittest/registar.h>
2+
#include <ydb/library/lockfree_bucket/lockfree_bucket.h>
3+
#include <util/system/guard.h>
4+
#include <util/system/spinlock.h>
5+
#include <util/system/types.h>
6+
7+
#include <thread>
8+
9+
struct TTestTimerMs {
10+
using TTime = TInstant;
11+
static constexpr ui64 Resolution = 1000ull; // milliseconds
12+
13+
static TTime Now() {
14+
return TInstant::Zero() + TDuration::MilliSeconds(Time.load());
15+
}
16+
17+
static ui64 Duration(TTime from, TTime to) {
18+
return (to - from).MilliSeconds();
19+
}
20+
21+
static std::atomic<ui64> Time;
22+
23+
static void Reset() {
24+
Time.store(0);
25+
}
26+
27+
static void Advance(TDuration delta) {
28+
Time.fetch_add(delta.MilliSeconds());
29+
}
30+
};
31+
32+
std::atomic<ui64> TTestTimerMs::Time = {};
33+
34+
Y_UNIT_TEST_SUITE(TLockFreeBucket) {
35+
struct TTestContext {
36+
TTestContext() {
37+
MaxTokens.store(1'000'000);
38+
MinTokens.store(-1'000'000);
39+
Inflow.store(1'000'000);
40+
TTestTimerMs::Reset();
41+
}
42+
43+
template<class TCallback>
44+
void Initialize(TCallback callback, ui32 threadCount) {
45+
for (ui32 i = 0; i < threadCount; ++i) {
46+
Threads.emplace_back(callback);
47+
}
48+
}
49+
50+
~TTestContext() {
51+
JoinAll();
52+
}
53+
54+
void JoinAll() {
55+
for (std::thread& t : Threads) {
56+
t.join();
57+
}
58+
Threads.clear();
59+
}
60+
61+
std::atomic<i64> MaxTokens;
62+
std::atomic<i64> MinTokens;
63+
std::atomic<ui64> Inflow;
64+
std::vector<std::thread> Threads;
65+
};
66+
67+
void TestLowerLimit(ui32 threadCount) {
68+
TTestContext ctx;
69+
TLockFreeBucket<TTestTimerMs> bucket(ctx.MaxTokens, ctx.MinTokens, ctx.Inflow);
70+
71+
auto worker = [&]() {
72+
for (ui32 i = 0; i < 100; ++i) {
73+
TTestTimerMs::Advance(TDuration::MilliSeconds(10));
74+
bucket.FillAndTake(123'456);
75+
}
76+
};
77+
78+
ctx.Initialize(worker, threadCount);
79+
ctx.JoinAll();
80+
81+
TTestTimerMs::Advance(TDuration::Seconds(1));
82+
TTestTimerMs::Advance(TDuration::MilliSeconds(1));
83+
84+
UNIT_ASSERT(!bucket.IsEmpty());
85+
}
86+
87+
void TestUpperLimit(ui32 tokensTaken, bool isEmpty, ui32 threadCount) {
88+
TTestContext ctx;
89+
TLockFreeBucket<TTestTimerMs> bucket(ctx.MaxTokens, ctx.MinTokens, ctx.Inflow);
90+
TTestTimerMs::Advance(TDuration::Seconds(100500));
91+
92+
auto worker = [&]() {
93+
for (ui32 i = 0; i < 100; ++i) {
94+
TTestTimerMs::Advance(TDuration::MilliSeconds(10));
95+
bucket.FillAndTake(tokensTaken);
96+
}
97+
};
98+
99+
ctx.Initialize(worker, threadCount);
100+
ctx.JoinAll();
101+
102+
UNIT_ASSERT_VALUES_EQUAL(bucket.IsEmpty(), isEmpty);
103+
}
104+
105+
Y_UNIT_TEST(LowerLimitSingleThreaded) {
106+
TestLowerLimit(1);
107+
}
108+
109+
Y_UNIT_TEST(LowerLimitMultiThreaded) {
110+
TestLowerLimit(20);
111+
}
112+
113+
Y_UNIT_TEST(UpperLimitSingleThreaded) {
114+
TestUpperLimit(123'456, true, 1);
115+
}
116+
117+
Y_UNIT_TEST(UpperLimitMultiThreaded) {
118+
TestUpperLimit(123'456, true, 20);
119+
}
120+
121+
Y_UNIT_TEST(LowIntakeSingleThreaded) {
122+
TestUpperLimit(1, false, 1);
123+
}
124+
125+
Y_UNIT_TEST(LowIntakeMultiThreaded) {
126+
TestUpperLimit(1, false, 20);
127+
}
128+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
UNITTEST()
2+
3+
FORK_SUBTESTS()
4+
SRCS(
5+
main.cpp
6+
)
7+
PEERDIR(
8+
ydb/library/lockfree_bucket
9+
)
10+
11+
END()

ydb/library/lockfree_bucket/ya.make

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
LIBRARY()
2+
3+
4+
SRCS(
5+
lockfree_bucket.cpp
6+
)
7+
8+
END()
9+
10+
RECURSE_FOR_TESTS(
11+
ut
12+
)

ydb/library/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ RECURSE(
1414
grpc
1515
http_proxy
1616
keys
17+
lockfree_bucket
1718
logger
1819
login
1920
mkql_proto

0 commit comments

Comments
 (0)