Skip to content

Commit 6bf263a

Browse files
Repair portions and async proto parser (#11636)
1 parent c65e4fe commit 6bf263a

File tree

7 files changed

+396
-96
lines changed

7 files changed

+396
-96
lines changed

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,12 +1256,72 @@ void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvFinishedFromSource::T
12561256
}
12571257
};
12581258

1259+
class TPortionConstructorV2 {
1260+
private:
1261+
NOlap::TPortionInfo::TConstPtr PortionInfo;
1262+
std::optional<NOlap::TColumnChunkLoadContextV2> Records;
1263+
std::optional<std::vector<NOlap::TIndexChunkLoadContext>> Indexes;
1264+
1265+
public:
1266+
TPortionConstructorV2(const NOlap::TPortionInfo::TConstPtr& portionInfo)
1267+
: PortionInfo(portionInfo) {
1268+
}
1269+
1270+
void SetRecords(NOlap::TColumnChunkLoadContextV2&& records) {
1271+
AFL_VERIFY(!Records);
1272+
Records = std::move(records);
1273+
}
1274+
1275+
void SetIndexes(std::vector<NOlap::TIndexChunkLoadContext>&& indexes) {
1276+
AFL_VERIFY(!Indexes);
1277+
Indexes = std::move(indexes);
1278+
}
1279+
1280+
NOlap::TPortionDataAccessor BuildAccessor() {
1281+
AFL_VERIFY(PortionInfo && Records && Indexes);
1282+
std::vector<NOlap::TColumnChunkLoadContextV1> records = Records->BuildRecordsV1();
1283+
return NOlap::TPortionAccessorConstructor::BuildForLoading(std::move(PortionInfo), std::move(records), std::move(*Indexes));
1284+
}
1285+
};
1286+
1287+
class TAccessorsParsingTask: public NConveyor::ITask {
1288+
private:
1289+
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
1290+
std::vector<TPortionConstructorV2> Portions;
1291+
1292+
virtual TConclusionStatus DoExecute(const std::shared_ptr<ITask>& /*taskPtr*/) override {
1293+
std::vector<NOlap::TPortionDataAccessor> accessors;
1294+
accessors.reserve(Portions.size());
1295+
for (auto&& i : Portions) {
1296+
accessors.emplace_back(i.BuildAccessor());
1297+
}
1298+
FetchCallback->OnAccessorsFetched(std::move(accessors));
1299+
return TConclusionStatus::Success();
1300+
}
1301+
virtual void DoOnCannotExecute(const TString& reason) override {
1302+
AFL_VERIFY(false)("cannot parse metadata", reason);
1303+
}
1304+
1305+
public:
1306+
virtual TString GetTaskClassIdentifier() const override {
1307+
return "ASKED_METADATA_PARSER";
1308+
}
1309+
1310+
TAccessorsParsingTask(
1311+
const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& callback, std::vector<TPortionConstructorV2>&& portions)
1312+
: FetchCallback(callback)
1313+
, Portions(std::move(portions))
1314+
{
1315+
1316+
}
1317+
};
1318+
12591319
class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12601320
private:
12611321
using TBase = TTransactionBase<TColumnShard>;
12621322
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
12631323
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
1264-
std::vector<NOlap::TPortionDataAccessor> FetchedAccessors;
1324+
std::vector<TPortionConstructorV2> FetchedAccessors;
12651325

12661326
public:
12671327
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
@@ -1275,6 +1335,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
12751335

12761336
bool Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) override {
12771337
NIceDb::TNiceDb db(txc.DB);
1338+
12781339
TBlobGroupSelector selector(Self->Info());
12791340
bool reask = false;
12801341
for (auto&& i : PortionsByPath) {
@@ -1302,21 +1363,22 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
13021363
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first);
13031364
while (i.second.size()) {
13041365
auto p = i.second.back();
1305-
std::vector<NOlap::TColumnChunkLoadContextV1> records;
1306-
std::vector<NOlap::TIndexChunkLoadContext> indexes;
1366+
TPortionConstructorV2 constructor(p);
13071367
{
13081368
auto rowset = db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
13091369
if (!rowset.IsReady()) {
13101370
return false;
13111371
}
13121372
while (!rowset.EndOfSet()) {
1313-
NOlap::TColumnChunkLoadContextV1::BuildFromDBV2(rowset, records);
1373+
NOlap::TColumnChunkLoadContextV2 info(rowset);
1374+
constructor.SetRecords(std::move(info));
13141375
if (!rowset.Next()) {
13151376
return false;
13161377
}
13171378
}
13181379
}
13191380
{
1381+
std::vector<NOlap::TIndexChunkLoadContext> indexes;
13201382
auto rowset = db.Table<NColumnShard::Schema::IndexIndexes>().Prefix(p->GetPathId(), p->GetPortionId()).Select();
13211383
if (!rowset.IsReady()) {
13221384
return false;
@@ -1327,16 +1389,17 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
13271389
return false;
13281390
}
13291391
}
1392+
constructor.SetIndexes(std::move(indexes));
13301393
}
1331-
FetchedAccessors.emplace_back(NOlap::TPortionAccessorConstructor::BuildForLoading(p, std::move(records), std::move(indexes)));
1394+
FetchedAccessors.emplace_back(std::move(constructor));
13321395
i.second.pop_back();
13331396
}
13341397
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())(
13351398
"path_id", i.first);
13361399
}
13371400

13381401
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished");
1339-
FetchCallback->OnAccessorsFetched(std::move(FetchedAccessors));
1402+
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)));
13401403
return true;
13411404
}
13421405
void Complete(const TActorContext& /*ctx*/) override {

ydb/core/tx/columnshard/columnshard_schema.h

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -957,6 +957,10 @@ class TColumnChunkLoadContext {
957957
YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());
958958

959959
public:
960+
TPortionAddress GetPortionAddress() const {
961+
return TPortionAddress(PathId, PortionId);
962+
}
963+
960964
const TChunkAddress& GetAddress() const {
961965
return Address;
962966
}
@@ -1012,20 +1016,6 @@ class TColumnChunkLoadContextV1 {
10121016
return TPortionAddress(PathId, PortionId);
10131017
}
10141018

1015-
template <class TSource>
1016-
static void BuildFromDBV2(const TSource& rowset, std::vector<TColumnChunkLoadContextV1>& records) {
1017-
const ui64 pathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
1018-
const ui64 portionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
1019-
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
1020-
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
1021-
AFL_VERIFY(metaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
1022-
for (auto&& i : metaProto.GetChunks()) {
1023-
TColumnChunkLoadContextV1 result(pathId, portionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
1024-
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
1025-
records.emplace_back(std::move(result));
1026-
}
1027-
}
1028-
10291019
NKikimrTxColumnShard::TColumnChunkInfo SerializeToDBProto() const {
10301020
NKikimrTxColumnShard::TColumnChunkInfo proto;
10311021
proto.SetSSColumnId(Address.GetColumnId());
@@ -1068,6 +1058,33 @@ class TColumnChunkLoadContextV1 {
10681058
}
10691059
};
10701060

1061+
class TColumnChunkLoadContextV2 {
1062+
private:
1063+
YDB_READONLY(ui64, PathId, 0);
1064+
YDB_READONLY(ui64, PortionId, 0);
1065+
YDB_READONLY_DEF(TString, MetadataProto);
1066+
1067+
public:
1068+
template <class TSource>
1069+
TColumnChunkLoadContextV2(const TSource& rowset) {
1070+
PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PathId>();
1071+
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::PortionId>();
1072+
MetadataProto = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
1073+
}
1074+
1075+
std::vector<TColumnChunkLoadContextV1> BuildRecordsV1() const {
1076+
std::vector<TColumnChunkLoadContextV1> records;
1077+
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
1078+
AFL_VERIFY(metaProto.ParseFromArray(MetadataProto.data(), MetadataProto.size()))("event", "cannot parse metadata as protobuf");
1079+
for (auto&& i : metaProto.GetChunks()) {
1080+
TColumnChunkLoadContextV1 result(PathId, PortionId, TChunkAddress(i.GetSSColumnId(), i.GetChunkIdx()),
1081+
TBlobRangeLink16::BuildFromProto(i.GetBlobRangeLink()).DetachResult(), i.GetChunkMetadata());
1082+
records.emplace_back(std::move(result));
1083+
}
1084+
return records;
1085+
}
1086+
};
1087+
10711088
class TIndexChunkLoadContext {
10721089
private:
10731090
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);

ydb/core/tx/columnshard/engines/db_wrapper.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,8 @@ void TDbWrapper::WritePortion(const NOlap::TPortionInfo& portion) {
9999

100100
void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) {
101101
NIceDb::TNiceDb db(Database);
102-
using IndexPortions = NColumnShard::Schema::IndexPortions;
103-
db.Table<IndexPortions>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
102+
db.Table<NColumnShard::Schema::IndexPortions>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
103+
db.Table<NColumnShard::Schema::IndexColumnsV2>().Key(portion.GetPathId(), portion.GetPortionId()).Delete();
104104
}
105105

106106
void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {

ydb/core/tx/columnshard/normalizer/abstract/abstract.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ enum class ENormalizerSequentialId: ui32 {
5656
TablesCleaner,
5757
DeprecatedPortionsMetadata,
5858
CleanGranuleId,
59-
EmptyPortionsCleaner,
59+
DeprecatedEmptyPortionsCleaner,
6060
CleanInsertionDedup,
6161
GCCountersNormalizer,
6262
RestorePortionFromChunks,

0 commit comments

Comments
 (0)