Skip to content

Commit 8b2f8d9

Browse files
ttl correction for further usage (ydb-platform#2612)
1 parent 6f0a4d9 commit 8b2f8d9

File tree

14 files changed

+296
-191
lines changed

14 files changed

+296
-191
lines changed

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const
102102
const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
103103
for (ui64 pathId : ttlBody.GetPathIds()) {
104104
NOlap::TTiering tiering;
105-
tiering.Add(NOlap::TTierInfo::MakeTtl(now - unixTime, columnName));
105+
AFL_VERIFY(tiering.Add(NOlap::TTierInfo::MakeTtl(now - unixTime, columnName)));
106106
pathTtls.emplace(pathId, std::move(tiering));
107107
}
108108
}
Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1 @@
11
#include "tier_info.h"
2-
3-
namespace NKikimr::NOlap {
4-
5-
std::optional<TInstant> TTierInfo::ScalarToInstant(const std::shared_ptr<arrow::Scalar>& scalar) const {
6-
const ui64 unitsInSeconds = TtlUnitsInSecond ? TtlUnitsInSecond : 1;
7-
switch (scalar->type->id()) {
8-
case arrow::Type::TIMESTAMP:
9-
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::TimestampScalar>(scalar)->value);
10-
case arrow::Type::UINT16: // YQL Date
11-
return TInstant::Days(std::static_pointer_cast<arrow::UInt16Scalar>(scalar)->value);
12-
case arrow::Type::UINT32: // YQL Datetime or Uint32
13-
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::UInt32Scalar>(scalar)->value / (1.0 * unitsInSeconds / 1000000));
14-
case arrow::Type::UINT64:
15-
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::UInt64Scalar>(scalar)->value / (1.0 * unitsInSeconds / 1000000));
16-
default:
17-
return {};
18-
}
19-
}
20-
21-
}
Lines changed: 1 addition & 166 deletions
Original file line numberDiff line numberDiff line change
@@ -1,167 +1,2 @@
11
#pragma once
2-
3-
#include <ydb/core/formats/arrow/arrow_helpers.h>
4-
#include <ydb/core/formats/arrow/common/validation.h>
5-
#include <ydb/core/formats/arrow/serializer/abstract.h>
6-
#include <ydb/core/tx/columnshard/common/scalars.h>
7-
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
8-
#include <util/generic/set.h>
9-
#include <util/generic/hash_set.h>
10-
11-
namespace NKikimr::NOlap {
12-
13-
class TTierInfo {
14-
private:
15-
YDB_READONLY_DEF(TString, Name);
16-
YDB_READONLY_DEF(TString, EvictColumnName);
17-
YDB_READONLY_DEF(TDuration, EvictDuration);
18-
19-
ui32 TtlUnitsInSecond;
20-
YDB_READONLY_DEF(std::optional<NArrow::NSerialization::TSerializerContainer>, Serializer);
21-
public:
22-
static TString GetTtlTierName() {
23-
return "__TTL";
24-
}
25-
26-
TTierInfo(const TString& tierName, TDuration evictDuration, const TString& column, ui32 unitsInSecond = 0)
27-
: Name(tierName)
28-
, EvictColumnName(column)
29-
, EvictDuration(evictDuration)
30-
, TtlUnitsInSecond(unitsInSecond)
31-
{
32-
Y_ABORT_UNLESS(!!Name);
33-
Y_ABORT_UNLESS(!!EvictColumnName);
34-
}
35-
36-
TInstant GetEvictInstant(const TInstant now) const {
37-
return now - EvictDuration;
38-
}
39-
40-
TTierInfo& SetSerializer(const NArrow::NSerialization::TSerializerContainer& value) {
41-
Serializer = value;
42-
return *this;
43-
}
44-
45-
std::shared_ptr<arrow::Field> GetEvictColumn(const std::shared_ptr<arrow::Schema>& schema) const {
46-
return schema->GetFieldByName(EvictColumnName);
47-
}
48-
49-
std::optional<TInstant> ScalarToInstant(const std::shared_ptr<arrow::Scalar>& scalar) const;
50-
51-
static std::shared_ptr<TTierInfo> MakeTtl(const TDuration evictDuration, const TString& ttlColumn, ui32 unitsInSecond = 0) {
52-
return std::make_shared<TTierInfo>(GetTtlTierName(), evictDuration, ttlColumn, unitsInSecond);
53-
}
54-
55-
TString GetDebugString() const {
56-
TStringBuilder sb;
57-
sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";serializer=";
58-
if (Serializer) {
59-
sb << Serializer->DebugString();
60-
} else {
61-
sb << "NOT_SPECIFIED(Default)";
62-
}
63-
sb << ";";
64-
return sb;
65-
}
66-
};
67-
68-
class TTierRef {
69-
public:
70-
TTierRef(const std::shared_ptr<TTierInfo>& tierInfo)
71-
: Info(tierInfo)
72-
{
73-
Y_ABORT_UNLESS(tierInfo);
74-
}
75-
76-
bool operator < (const TTierRef& b) const {
77-
if (Info->GetEvictDuration() > b.Info->GetEvictDuration()) {
78-
return true;
79-
} else if (Info->GetEvictDuration() == b.Info->GetEvictDuration()) {
80-
return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter
81-
}
82-
return false;
83-
}
84-
85-
bool operator == (const TTierRef& b) const {
86-
return Info->GetEvictDuration() == b.Info->GetEvictDuration()
87-
&& Info->GetName() == b.Info->GetName();
88-
}
89-
90-
const TTierInfo& Get() const {
91-
return *Info;
92-
}
93-
94-
std::shared_ptr<TTierInfo> GetPtr() const {
95-
return Info;
96-
}
97-
98-
private:
99-
std::shared_ptr<TTierInfo> Info;
100-
};
101-
102-
class TTiering {
103-
using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>;
104-
TTiersMap TierByName;
105-
TSet<TTierRef> OrderedTiers;
106-
YDB_READONLY_DEF(TString, EvictColumnName);
107-
108-
public:
109-
const TTiersMap& GetTierByName() const {
110-
return TierByName;
111-
}
112-
113-
std::shared_ptr<TTierInfo> GetTierByName(const TString& name) const {
114-
auto tIt = TierByName.find(name);
115-
if (tIt == TierByName.end()) {
116-
return nullptr;
117-
}
118-
return tIt->second;
119-
}
120-
121-
const TSet<TTierRef>& GetOrderedTiers() const {
122-
return OrderedTiers;
123-
}
124-
125-
bool HasTiers() const {
126-
return !OrderedTiers.empty();
127-
}
128-
129-
bool Add(const std::shared_ptr<TTierInfo>& tier) {
130-
if (EvictColumnName && tier->GetEvictColumnName() != EvictColumnName) {
131-
// AFL_VERIFY(false)("column_name", tier->GetEvictColumnName())("evict_column_name", EvictColumnName);
132-
return false;
133-
}
134-
EvictColumnName = tier->GetEvictColumnName();
135-
TierByName.emplace(tier->GetName(), tier);
136-
if (tier->GetName() != TTierInfo::GetTtlTierName()) {
137-
OrderedTiers.emplace(tier);
138-
}
139-
return true;
140-
}
141-
142-
TString GetHottestTierName() const {
143-
if (OrderedTiers.size()) {
144-
return OrderedTiers.rbegin()->Get().GetName(); // hottest one
145-
}
146-
return {};
147-
}
148-
149-
std::optional<NArrow::NSerialization::TSerializerContainer> GetSerializer(const TString& name) const {
150-
auto it = TierByName.find(name);
151-
if (it != TierByName.end()) {
152-
Y_ABORT_UNLESS(!name.empty());
153-
return it->second->GetSerializer();
154-
}
155-
return {};
156-
}
157-
158-
TString GetDebugString() const {
159-
TStringBuilder sb;
160-
for (auto&& [_, i] : TierByName) {
161-
sb << i->GetDebugString() << "; ";
162-
}
163-
return sb;
164-
}
165-
};
166-
167-
}
2+
#include "tiering/tier_info.h"
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "common.h"
2+
3+
namespace NKikimr::NOlap::NTiering::NCommon {
4+
5+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <util/generic/string.h>
4+
5+
namespace NKikimr::NOlap::NTiering::NCommon {
6+
7+
static inline const TString DeleteTierName = "$$DELETE";
8+
9+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include "tier_info.h"
2+
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
3+
4+
namespace NKikimr::NOlap {
5+
6+
std::optional<TInstant> TTierInfo::ScalarToInstant(const std::shared_ptr<arrow::Scalar>& scalar) const {
7+
const ui64 unitsInSeconds = TtlUnitsInSecond ? TtlUnitsInSecond : 1;
8+
switch (scalar->type->id()) {
9+
case arrow::Type::TIMESTAMP:
10+
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::TimestampScalar>(scalar)->value);
11+
case arrow::Type::UINT16: // YQL Date
12+
return TInstant::Days(std::static_pointer_cast<arrow::UInt16Scalar>(scalar)->value);
13+
case arrow::Type::UINT32: // YQL Datetime or Uint32
14+
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::UInt32Scalar>(scalar)->value / (1.0 * unitsInSeconds / 1000000));
15+
case arrow::Type::UINT64:
16+
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::UInt64Scalar>(scalar)->value / (1.0 * unitsInSeconds / 1000000));
17+
default:
18+
return {};
19+
}
20+
}
21+
22+
TTiering::TTieringContext TTiering::GetTierToMove(const std::shared_ptr<arrow::Scalar>& max, const TInstant now) const {
23+
AFL_VERIFY(OrderedTiers.size());
24+
std::optional<TString> nextTierName;
25+
std::optional<TDuration> nextTierDuration;
26+
for (auto& tierRef : GetOrderedTiers()) {
27+
auto& tierInfo = tierRef.Get();
28+
auto mpiOpt = tierInfo.ScalarToInstant(max);
29+
Y_ABORT_UNLESS(mpiOpt);
30+
const TInstant maxTieringPortionInstant = *mpiOpt;
31+
const TDuration dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(now);
32+
if (!dWaitLocal) {
33+
return TTieringContext(tierInfo.GetName(), tierInfo.GetEvictInstant(now) - maxTieringPortionInstant, nextTierName, nextTierDuration);
34+
} else {
35+
nextTierName = tierInfo.GetName();
36+
nextTierDuration = dWaitLocal;
37+
}
38+
}
39+
return TTieringContext(IStoragesManager::DefaultStorageId, TDuration::Zero(), nextTierName, nextTierDuration);
40+
}
41+
42+
}

0 commit comments

Comments
 (0)