Skip to content

Commit cc69d3c

Browse files
ttl correction for further usage (ydb-platform#2612)
1 parent 96d4003 commit cc69d3c

File tree

14 files changed

+296
-190
lines changed

14 files changed

+296
-190
lines changed

ydb/core/tx/columnshard/columnshard__propose_transaction.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ TTxController::TProposeResult TTxProposeTransaction::ProposeTtlDeprecated(const
143143
const TInstant now = TlsActivationContext ? AppData()->TimeProvider->Now() : TInstant::Now();
144144
for (ui64 pathId : ttlBody.GetPathIds()) {
145145
NOlap::TTiering tiering;
146-
tiering.Ttl = NOlap::TTierInfo::MakeTtl(now - unixTime, columnName);
146+
AFL_VERIFY(tiering.Add(NOlap::TTierInfo::MakeTtl(now - unixTime, columnName)));
147147
pathTtls.emplace(pathId, std::move(tiering));
148148
}
149149
}
Original file line numberDiff line numberDiff line change
@@ -1,21 +1 @@
11
#include "tier_info.h"
2-
3-
namespace NKikimr::NOlap {
4-
5-
std::optional<TInstant> TTierInfo::ScalarToInstant(const std::shared_ptr<arrow::Scalar>& scalar) const {
6-
const ui64 unitsInSeconds = TtlUnitsInSecond ? TtlUnitsInSecond : 1;
7-
switch (scalar->type->id()) {
8-
case arrow::Type::TIMESTAMP:
9-
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::TimestampScalar>(scalar)->value);
10-
case arrow::Type::UINT16: // YQL Date
11-
return TInstant::Days(std::static_pointer_cast<arrow::UInt16Scalar>(scalar)->value);
12-
case arrow::Type::UINT32: // YQL Datetime or Uint32
13-
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::UInt32Scalar>(scalar)->value / (1.0 * unitsInSeconds / 1000000));
14-
case arrow::Type::UINT64:
15-
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::UInt64Scalar>(scalar)->value / (1.0 * unitsInSeconds / 1000000));
16-
default:
17-
return {};
18-
}
19-
}
20-
21-
}
Original file line numberDiff line numberDiff line change
@@ -1,167 +1,2 @@
11
#pragma once
2-
3-
#include <ydb/core/formats/arrow/arrow_helpers.h>
4-
#include <ydb/core/formats/arrow/common/validation.h>
5-
#include <ydb/core/formats/arrow/serializer/abstract.h>
6-
#include <ydb/core/tx/columnshard/common/scalars.h>
7-
#include <contrib/libs/apache/arrow/cpp/src/arrow/util/compression.h>
8-
#include <util/generic/set.h>
9-
#include <util/generic/hash_set.h>
10-
11-
namespace NKikimr::NOlap {
12-
13-
class TTierInfo {
14-
private:
15-
YDB_READONLY_DEF(TString, Name);
16-
YDB_READONLY_DEF(TString, EvictColumnName);
17-
YDB_READONLY_DEF(TDuration, EvictDuration);
18-
19-
ui32 TtlUnitsInSecond;
20-
YDB_READONLY_DEF(std::optional<NArrow::NSerialization::TSerializerContainer>, Serializer);
21-
public:
22-
TTierInfo(const TString& tierName, TDuration evictDuration, const TString& column, ui32 unitsInSecond = 0)
23-
: Name(tierName)
24-
, EvictColumnName(column)
25-
, EvictDuration(evictDuration)
26-
, TtlUnitsInSecond(unitsInSecond)
27-
{
28-
Y_ABORT_UNLESS(!!Name);
29-
Y_ABORT_UNLESS(!!EvictColumnName);
30-
}
31-
32-
TInstant GetEvictInstant(const TInstant now) const {
33-
return now - EvictDuration;
34-
}
35-
36-
TTierInfo& SetSerializer(const NArrow::NSerialization::TSerializerContainer& value) {
37-
Serializer = value;
38-
return *this;
39-
}
40-
41-
std::shared_ptr<arrow::Field> GetEvictColumn(const std::shared_ptr<arrow::Schema>& schema) const {
42-
return schema->GetFieldByName(EvictColumnName);
43-
}
44-
45-
std::optional<TInstant> ScalarToInstant(const std::shared_ptr<arrow::Scalar>& scalar) const;
46-
47-
static std::shared_ptr<TTierInfo> MakeTtl(const TDuration evictDuration, const TString& ttlColumn, ui32 unitsInSecond = 0) {
48-
return std::make_shared<TTierInfo>("TTL", evictDuration, ttlColumn, unitsInSecond);
49-
}
50-
51-
TString GetDebugString() const {
52-
TStringBuilder sb;
53-
sb << "name=" << Name << ";duration=" << EvictDuration << ";column=" << EvictColumnName << ";serializer=";
54-
if (Serializer) {
55-
sb << Serializer->DebugString();
56-
} else {
57-
sb << "NOT_SPECIFIED(Default)";
58-
}
59-
sb << ";";
60-
return sb;
61-
}
62-
};
63-
64-
class TTierRef {
65-
public:
66-
TTierRef(const std::shared_ptr<TTierInfo>& tierInfo)
67-
: Info(tierInfo)
68-
{
69-
Y_ABORT_UNLESS(tierInfo);
70-
}
71-
72-
bool operator < (const TTierRef& b) const {
73-
if (Info->GetEvictDuration() > b.Info->GetEvictDuration()) {
74-
return true;
75-
} else if (Info->GetEvictDuration() == b.Info->GetEvictDuration()) {
76-
return Info->GetName() > b.Info->GetName(); // add stability: smaller name is hotter
77-
}
78-
return false;
79-
}
80-
81-
bool operator == (const TTierRef& b) const {
82-
return Info->GetEvictDuration() == b.Info->GetEvictDuration()
83-
&& Info->GetName() == b.Info->GetName();
84-
}
85-
86-
const TTierInfo& Get() const {
87-
return *Info;
88-
}
89-
90-
std::shared_ptr<TTierInfo> GetPtr() const {
91-
return Info;
92-
}
93-
94-
private:
95-
std::shared_ptr<TTierInfo> Info;
96-
};
97-
98-
class TTiering {
99-
using TTiersMap = THashMap<TString, std::shared_ptr<TTierInfo>>;
100-
TTiersMap TierByName;
101-
TSet<TTierRef> OrderedTiers;
102-
public:
103-
104-
std::shared_ptr<TTierInfo> Ttl;
105-
106-
const TTiersMap& GetTierByName() const {
107-
return TierByName;
108-
}
109-
110-
const TSet<TTierRef>& GetOrderedTiers() const {
111-
return OrderedTiers;
112-
}
113-
114-
bool HasTiers() const {
115-
return !OrderedTiers.empty();
116-
}
117-
118-
void Add(const std::shared_ptr<TTierInfo>& tier) {
119-
if (HasTiers()) {
120-
// TODO: support different ttl columns
121-
Y_ABORT_UNLESS(tier->GetEvictColumnName() == OrderedTiers.begin()->Get().GetEvictColumnName());
122-
}
123-
124-
TierByName.emplace(tier->GetName(), tier);
125-
OrderedTiers.emplace(tier);
126-
}
127-
128-
TString GetHottestTierName() const {
129-
if (OrderedTiers.size()) {
130-
return OrderedTiers.rbegin()->Get().GetName(); // hottest one
131-
}
132-
return {};
133-
}
134-
135-
std::optional<NArrow::NSerialization::TSerializerContainer> GetSerializer(const TString& name) const {
136-
auto it = TierByName.find(name);
137-
if (it != TierByName.end()) {
138-
Y_ABORT_UNLESS(!name.empty());
139-
return it->second->GetSerializer();
140-
}
141-
return {};
142-
}
143-
144-
THashSet<TString> GetTtlColumns() const {
145-
THashSet<TString> out;
146-
if (Ttl) {
147-
out.insert(Ttl->GetEvictColumnName());
148-
}
149-
for (auto& [tierName, tier] : TierByName) {
150-
out.insert(tier->GetEvictColumnName());
151-
}
152-
return out;
153-
}
154-
155-
TString GetDebugString() const {
156-
TStringBuilder sb;
157-
if (Ttl) {
158-
sb << Ttl->GetDebugString() << "; ";
159-
}
160-
for (auto&& i : OrderedTiers) {
161-
sb << i.Get().GetDebugString() << "; ";
162-
}
163-
return sb;
164-
}
165-
};
166-
167-
}
2+
#include "tiering/tier_info.h"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#include "common.h"
2+
3+
namespace NKikimr::NOlap::NTiering::NCommon {
4+
5+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
#pragma once
2+
3+
#include <util/generic/string.h>
4+
5+
namespace NKikimr::NOlap::NTiering::NCommon {
6+
7+
static inline const TString DeleteTierName = "$$DELETE";
8+
9+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include "tier_info.h"
2+
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
3+
4+
namespace NKikimr::NOlap {
5+
6+
std::optional<TInstant> TTierInfo::ScalarToInstant(const std::shared_ptr<arrow::Scalar>& scalar) const {
7+
const ui64 unitsInSeconds = TtlUnitsInSecond ? TtlUnitsInSecond : 1;
8+
switch (scalar->type->id()) {
9+
case arrow::Type::TIMESTAMP:
10+
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::TimestampScalar>(scalar)->value);
11+
case arrow::Type::UINT16: // YQL Date
12+
return TInstant::Days(std::static_pointer_cast<arrow::UInt16Scalar>(scalar)->value);
13+
case arrow::Type::UINT32: // YQL Datetime or Uint32
14+
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::UInt32Scalar>(scalar)->value / (1.0 * unitsInSeconds / 1000000));
15+
case arrow::Type::UINT64:
16+
return TInstant::MicroSeconds(std::static_pointer_cast<arrow::UInt64Scalar>(scalar)->value / (1.0 * unitsInSeconds / 1000000));
17+
default:
18+
return {};
19+
}
20+
}
21+
22+
TTiering::TTieringContext TTiering::GetTierToMove(const std::shared_ptr<arrow::Scalar>& max, const TInstant now) const {
23+
AFL_VERIFY(OrderedTiers.size());
24+
std::optional<TString> nextTierName;
25+
std::optional<TDuration> nextTierDuration;
26+
for (auto& tierRef : GetOrderedTiers()) {
27+
auto& tierInfo = tierRef.Get();
28+
auto mpiOpt = tierInfo.ScalarToInstant(max);
29+
Y_ABORT_UNLESS(mpiOpt);
30+
const TInstant maxTieringPortionInstant = *mpiOpt;
31+
const TDuration dWaitLocal = maxTieringPortionInstant - tierInfo.GetEvictInstant(now);
32+
if (!dWaitLocal) {
33+
return TTieringContext(tierInfo.GetName(), tierInfo.GetEvictInstant(now) - maxTieringPortionInstant, nextTierName, nextTierDuration);
34+
} else {
35+
nextTierName = tierInfo.GetName();
36+
nextTierDuration = dWaitLocal;
37+
}
38+
}
39+
return TTieringContext(IStoragesManager::DefaultStorageId, TDuration::Zero(), nextTierName, nextTierDuration);
40+
}
41+
42+
}

0 commit comments

Comments
 (0)