Skip to content

Commit fab0ff2

Browse files
authored
Merge 7d95c06 into 8b8059d
2 parents 8b8059d + 7d95c06 commit fab0ff2

File tree

1 file changed

+41
-25
lines changed

1 file changed

+41
-25
lines changed

ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp

+41-25
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class TGateway : public IYtGateway {
4848
{}
4949

5050
void OpenSession(TOpenSessionOptions&& options) final {
51+
SessionGenerations_[options.SessionId()] = 0;
5152
return Inner_->OpenSession(std::move(options));
5253
}
5354

@@ -56,6 +57,7 @@ class TGateway : public IYtGateway {
5657
}
5758

5859
NThreading::TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final {
60+
++SessionGenerations_[options.SessionId()];
5961
return Inner_->CleanupSession(std::move(options));
6062
}
6163

@@ -166,8 +168,9 @@ class TGateway : public IYtGateway {
166168
});
167169
}
168170

169-
static TString MakeGetTableInfoKey(const TTableReq& req, ui32 epoch) {
171+
static TString MakeGetTableInfoKey(const TTableReq& req, ui32 epoch, ui64 generation) {
170172
auto tableNode = NYT::TNode()
173+
("Generation", generation)
171174
("Cluster", req.Cluster())
172175
("Table", req.Table());
173176

@@ -199,12 +202,13 @@ class TGateway : public IYtGateway {
199202
}
200203

201204
NThreading::TFuture<TTableInfoResult> GetTableInfo(TGetTableInfoOptions&& options) final {
205+
ui64 generation = SessionGenerations_[options.SessionId()];
202206
if (QContext_.CanRead()) {
203207
TTableInfoResult res;
204208
res.SetSuccess();
205209
for (const auto& req : options.Tables()) {
206210
TTableInfoResult::TTableData data;
207-
auto key = MakeGetTableInfoKey(req, options.Epoch());
211+
auto key = MakeGetTableInfoKey(req, options.Epoch(), generation);
208212
auto item = QContext_.GetReader()->Get({YtGateway_GetTableInfo, key}).GetValueSync();
209213
if (!item) {
210214
throw yexception() << "Missing replay data";
@@ -246,7 +250,7 @@ class TGateway : public IYtGateway {
246250

247251
auto optionsDup = options;
248252
return Inner_->GetTableInfo(std::move(options))
249-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TTableInfoResult>& future) {
253+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TTableInfoResult>& future) {
250254
if (!qContext.CanWrite() || future.HasException()) {
251255
return;
252256
}
@@ -260,7 +264,7 @@ class TGateway : public IYtGateway {
260264
for (size_t i = 0; i < res.Data.size(); ++i) {
261265
const auto& req = optionsDup.Tables()[i];
262266
const auto& data = res.Data[i];
263-
auto key = MakeGetTableInfoKey(req, optionsDup.Epoch());
267+
auto key = MakeGetTableInfoKey(req, optionsDup.Epoch(), generation);
264268

265269
auto attrsNode = NYT::TNode::CreateMap();
266270
if (data.Meta) {
@@ -303,8 +307,9 @@ class TGateway : public IYtGateway {
303307
});
304308
}
305309

306-
static TString MakeGetTableRangeKey(const TTableRangeOptions& options) {
310+
static TString MakeGetTableRangeKey(const TTableRangeOptions& options, ui64 generation) {
307311
auto keyNode = NYT::TNode()
312+
("Generation", generation)
308313
("Cluster", options.Cluster())
309314
("Prefix", options.Prefix())
310315
("Suffix", options.Suffix());
@@ -317,9 +322,10 @@ class TGateway : public IYtGateway {
317322
}
318323

319324
NThreading::TFuture<TTableRangeResult> GetTableRange(TTableRangeOptions&& options) final {
325+
ui64 generation = SessionGenerations_[options.SessionId()];
320326
TString key;
321327
if (QContext_) {
322-
key = MakeGetTableRangeKey(options);
328+
key = MakeGetTableRangeKey(options, generation);
323329
}
324330

325331
if (QContext_.CanRead()) {
@@ -407,21 +413,22 @@ class TGateway : public IYtGateway {
407413
});
408414
}
409415

410-
static TString MakeGetFolderKey(const TFolderOptions& options) {
416+
static TString MakeGetFolderKey(const TFolderOptions& options, ui64 generation) {
411417
auto attrNode = NYT::TNode::CreateList();
412418
for (const auto& attr : options.Attributes()) {
413419
attrNode.Add(NYT::TNode(attr));
414420
}
415421

416422
auto keyNode = NYT::TNode()
423+
("Generation", generation)
417424
("Cluster", options.Cluster())
418425
("Prefix", options.Prefix())
419426
("Attributes", attrNode);
420427

421428
return MakeHash(NYT::NodeToCanonicalYsonString(keyNode, NYT::NYson::EYsonFormat::Binary));
422429
}
423430

424-
static TString MakeResolveLinksKey(const TResolveOptions& options) {
431+
static TString MakeResolveLinksKey(const TResolveOptions& options, ui64 generation) {
425432
auto itemsNode = NYT::TNode::CreateList();
426433
for (const auto& item : options.Items()) {
427434
auto attrNode = NYT::TNode::CreateList();
@@ -438,13 +445,14 @@ class TGateway : public IYtGateway {
438445
}
439446

440447
auto keyNode = NYT::TNode()
448+
("Generation", generation)
441449
("Cluster", options.Cluster())
442450
("Items", itemsNode);
443451

444452
return MakeHash(NYT::NodeToCanonicalYsonString(keyNode, NYT::NYson::EYsonFormat::Binary));
445453
}
446454

447-
static TString MakeGetFoldersKey(const TBatchFolderOptions& options) {
455+
static TString MakeGetFoldersKey(const TBatchFolderOptions& options, ui64 generation) {
448456
auto itemsNode = NYT::TNode();
449457
TMap<TString, size_t> order;
450458
for (size_t i = 0; i < options.Folders().size(); ++i) {
@@ -464,6 +472,7 @@ class TGateway : public IYtGateway {
464472
}
465473

466474
auto keyNode = NYT::TNode()
475+
("Generation", generation)
467476
("Cluster", options.Cluster())
468477
("Items", itemsNode);
469478

@@ -490,8 +499,9 @@ class TGateway : public IYtGateway {
490499
}
491500

492501
NThreading::TFuture<TFolderResult> GetFolder(TFolderOptions&& options) final {
502+
ui64 generation = SessionGenerations_[options.SessionId()];
493503
if (QContext_.CanRead()) {
494-
const auto& key = MakeGetFolderKey(options);
504+
const auto& key = MakeGetFolderKey(options, generation);
495505
auto item = QContext_.GetReader()->Get({YtGateway_GetFolder, key}).GetValueSync();
496506
if (!item) {
497507
throw yexception() << "Missing replay data";
@@ -520,7 +530,7 @@ class TGateway : public IYtGateway {
520530

521531
auto optionsDup = options;
522532
return Inner_->GetFolder(std::move(options))
523-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TFolderResult>& future) {
533+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TFolderResult>& future) {
524534
if (!qContext.CanWrite() || future.HasException()) {
525535
return;
526536
}
@@ -530,7 +540,7 @@ class TGateway : public IYtGateway {
530540
return;
531541
}
532542

533-
const auto& key = MakeGetFolderKey(optionsDup);
543+
const auto& key = MakeGetFolderKey(optionsDup, generation);
534544
auto valueNode = NYT::TNode();
535545

536546
if (std::holds_alternative<TFileLinkPtr>(res.ItemsOrFileLink)) {
@@ -550,10 +560,11 @@ class TGateway : public IYtGateway {
550560
}
551561

552562
NThreading::TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) final {
563+
ui64 generation = SessionGenerations_[options.SessionId()];
553564
if (QContext_.CanRead()) {
554565
TBatchFolderResult res;
555566
res.SetSuccess();
556-
const auto& key = MakeResolveLinksKey(options);
567+
const auto& key = MakeResolveLinksKey(options, generation);
557568
auto item = QContext_.GetReader()->Get({YtGateway_ResolveLinks, key}).GetValueSync();
558569
if (!item) {
559570
throw yexception() << "Missing replay data";
@@ -571,7 +582,7 @@ class TGateway : public IYtGateway {
571582

572583
auto optionsDup = options;
573584
return Inner_->ResolveLinks(std::move(options))
574-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TBatchFolderResult>& future) {
585+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TBatchFolderResult>& future) {
575586
if (!qContext.CanWrite() || future.HasException()) {
576587
return;
577588
}
@@ -581,7 +592,7 @@ class TGateway : public IYtGateway {
581592
return;
582593
}
583594

584-
const auto& key = MakeResolveLinksKey(optionsDup);
595+
const auto& key = MakeResolveLinksKey(optionsDup, generation);
585596
NYT::TNode valueNode = NYT::TNode::CreateList();
586597
for (const auto& item : res.Items) {
587598
valueNode.Add(SerializeFolderItem(item));
@@ -593,10 +604,11 @@ class TGateway : public IYtGateway {
593604
}
594605

595606
NThreading::TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) final {
607+
ui64 generation = SessionGenerations_[options.SessionId()];
596608
if (QContext_.CanRead()) {
597609
TBatchFolderResult res;
598610
res.SetSuccess();
599-
const auto& key = MakeGetFoldersKey(options);
611+
const auto& key = MakeGetFoldersKey(options, generation);
600612
auto item = QContext_.GetReader()->Get({YtGateway_GetFolders, key}).GetValueSync();
601613
if (!item) {
602614
throw yexception() << "Missing replay data";
@@ -614,7 +626,7 @@ class TGateway : public IYtGateway {
614626

615627
auto optionsDup = options;
616628
return Inner_->GetFolders(std::move(options))
617-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TBatchFolderResult>& future) {
629+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TBatchFolderResult>& future) {
618630
if (!qContext.CanWrite() || future.HasException()) {
619631
return;
620632
}
@@ -624,7 +636,7 @@ class TGateway : public IYtGateway {
624636
return;
625637
}
626638

627-
const auto& key = MakeGetFoldersKey(optionsDup);
639+
const auto& key = MakeGetFoldersKey(optionsDup, generation);
628640
NYT::TNode valueNode = NYT::TNode::CreateList();
629641
for (const auto& item : res.Items) {
630642
valueNode.Add(SerializeFolderItem(item));
@@ -699,8 +711,9 @@ class TGateway : public IYtGateway {
699711
return Inner_->DropTrackables(std::move(options));
700712
}
701713

702-
static TString MakePathStatKey(const TString& cluster, bool extended, const TPathStatReq& req) {
714+
static TString MakePathStatKey(const TString& cluster, bool extended, const TPathStatReq& req, ui64 generation) {
703715
auto node = NYT::TNode()
716+
("Generation", generation)
704717
("Cluster", cluster)
705718
("Extended", extended);
706719

@@ -766,13 +779,14 @@ class TGateway : public IYtGateway {
766779
}
767780

768781
NThreading::TFuture<TPathStatResult> PathStat(TPathStatOptions&& options) final {
782+
ui64 generation = SessionGenerations_[options.SessionId()];
769783
if (QContext_.CanRead()) {
770784
TPathStatResult res;
771785
res.DataSize.resize(options.Paths().size(), 0);
772786
res.Extended.resize(options.Paths().size());
773787

774788
for (ui32 index = 0; index < options.Paths().size(); ++index) {
775-
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]);
789+
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index], generation);
776790
auto item = QContext_.GetReader()->Get({YtGateway_PathStat, key}).GetValueSync();
777791
if (!item) {
778792
throw yexception() << "Missing replay data";
@@ -789,7 +803,7 @@ class TGateway : public IYtGateway {
789803

790804
auto optionsDup = options;
791805
return Inner_->PathStat(std::move(options))
792-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TPathStatResult>& future) {
806+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TPathStatResult>& future) {
793807
if (!qContext.CanWrite() || future.HasException()) {
794808
return;
795809
}
@@ -800,21 +814,22 @@ class TGateway : public IYtGateway {
800814
}
801815

802816
for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) {
803-
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]);
817+
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index], generation);
804818
auto value = SerializePathStat(res, index);
805819
qContext.GetWriter()->Put({YtGateway_PathStat, key}, value).GetValueSync();
806820
}
807821
});
808822
}
809823

810824
TPathStatResult TryPathStat(TPathStatOptions&& options) final {
825+
ui64 generation = SessionGenerations_[options.SessionId()];
811826
if (QContext_.CanRead()) {
812827
TPathStatResult res;
813828
res.DataSize.resize(options.Paths().size(), 0);
814829
res.Extended.resize(options.Paths().size());
815830

816831
for (ui32 index = 0; index < options.Paths().size(); ++index) {
817-
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]);
832+
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index], generation);
818833
bool allow = false;
819834
if (PathStatKeys_.contains(key)) {
820835
allow = true;
@@ -851,15 +866,15 @@ class TGateway : public IYtGateway {
851866

852867
if (!res.Success()) {
853868
for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) {
854-
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]);
869+
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index], generation);
855870
QContext_.GetWriter()->Put({YtGateway_PathStatMissing, key}, "1").GetValueSync();
856871
}
857872

858873
return res;
859874
}
860875

861876
for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) {
862-
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]);
877+
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index], generation);
863878
auto value = SerializePathStat(res, index);
864879
QContext_.GetWriter()->Put({YtGateway_PathStat, key}, value).GetValueSync();
865880
}
@@ -937,6 +952,7 @@ class TGateway : public IYtGateway {
937952
const TIntrusivePtr<IRandomProvider> RandomProvider_;
938953
const TFileStoragePtr FileStorage_;
939954
THashSet<TString> PathStatKeys_;
955+
THashMap<TString, ui64> SessionGenerations_;
940956
};
941957

942958
}

0 commit comments

Comments
 (0)