Skip to content

Commit 0db162e

Browse files
authored
Merge 87dfbc1 into 8b8059d
2 parents 8b8059d + 87dfbc1 commit 0db162e

File tree

1 file changed

+80
-25
lines changed

1 file changed

+80
-25
lines changed

ydb/library/yql/providers/yt/gateway/qplayer/yql_yt_qplayer_gateway.cpp

+80-25
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,26 @@ class TGateway : public IYtGateway {
4848
{}
4949

5050
void OpenSession(TOpenSessionOptions&& options) final {
51+
with_lock(SessionGenerationsLock_) {
52+
SessionGenerations_[options.SessionId()] = 0;
53+
}
54+
5155
return Inner_->OpenSession(std::move(options));
5256
}
5357

5458
NThreading::TFuture<void> CloseSession(TCloseSessionOptions&& options) final {
59+
with_lock(SessionGenerationsLock_) {
60+
SessionGenerations_.erase(options.SessionId());
61+
}
62+
5563
return Inner_->CloseSession(std::move(options));
5664
}
5765

5866
NThreading::TFuture<void> CleanupSession(TCleanupSessionOptions&& options) final {
67+
with_lock(SessionGenerationsLock_) {
68+
++SessionGenerations_[options.SessionId()];
69+
}
70+
5971
return Inner_->CleanupSession(std::move(options));
6072
}
6173

@@ -166,8 +178,9 @@ class TGateway : public IYtGateway {
166178
});
167179
}
168180

169-
static TString MakeGetTableInfoKey(const TTableReq& req, ui32 epoch) {
181+
static TString MakeGetTableInfoKey(const TTableReq& req, ui32 epoch, ui64 generation) {
170182
auto tableNode = NYT::TNode()
183+
("Generation", generation)
171184
("Cluster", req.Cluster())
172185
("Table", req.Table());
173186

@@ -199,12 +212,17 @@ class TGateway : public IYtGateway {
199212
}
200213

201214
NThreading::TFuture<TTableInfoResult> GetTableInfo(TGetTableInfoOptions&& options) final {
215+
ui64 generation;
216+
with_lock(SessionGenerationsLock_) {
217+
generation = SessionGenerations_[options.SessionId()];
218+
}
219+
202220
if (QContext_.CanRead()) {
203221
TTableInfoResult res;
204222
res.SetSuccess();
205223
for (const auto& req : options.Tables()) {
206224
TTableInfoResult::TTableData data;
207-
auto key = MakeGetTableInfoKey(req, options.Epoch());
225+
auto key = MakeGetTableInfoKey(req, options.Epoch(), generation);
208226
auto item = QContext_.GetReader()->Get({YtGateway_GetTableInfo, key}).GetValueSync();
209227
if (!item) {
210228
throw yexception() << "Missing replay data";
@@ -246,7 +264,7 @@ class TGateway : public IYtGateway {
246264

247265
auto optionsDup = options;
248266
return Inner_->GetTableInfo(std::move(options))
249-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TTableInfoResult>& future) {
267+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TTableInfoResult>& future) {
250268
if (!qContext.CanWrite() || future.HasException()) {
251269
return;
252270
}
@@ -260,7 +278,7 @@ class TGateway : public IYtGateway {
260278
for (size_t i = 0; i < res.Data.size(); ++i) {
261279
const auto& req = optionsDup.Tables()[i];
262280
const auto& data = res.Data[i];
263-
auto key = MakeGetTableInfoKey(req, optionsDup.Epoch());
281+
auto key = MakeGetTableInfoKey(req, optionsDup.Epoch(), generation);
264282

265283
auto attrsNode = NYT::TNode::CreateMap();
266284
if (data.Meta) {
@@ -303,8 +321,9 @@ class TGateway : public IYtGateway {
303321
});
304322
}
305323

306-
static TString MakeGetTableRangeKey(const TTableRangeOptions& options) {
324+
static TString MakeGetTableRangeKey(const TTableRangeOptions& options, ui64 generation) {
307325
auto keyNode = NYT::TNode()
326+
("Generation", generation)
308327
("Cluster", options.Cluster())
309328
("Prefix", options.Prefix())
310329
("Suffix", options.Suffix());
@@ -317,9 +336,14 @@ class TGateway : public IYtGateway {
317336
}
318337

319338
NThreading::TFuture<TTableRangeResult> GetTableRange(TTableRangeOptions&& options) final {
339+
ui64 generation;
340+
with_lock(SessionGenerationsLock_) {
341+
generation = SessionGenerations_[options.SessionId()];
342+
}
343+
320344
TString key;
321345
if (QContext_) {
322-
key = MakeGetTableRangeKey(options);
346+
key = MakeGetTableRangeKey(options, generation);
323347
}
324348

325349
if (QContext_.CanRead()) {
@@ -407,21 +431,22 @@ class TGateway : public IYtGateway {
407431
});
408432
}
409433

410-
static TString MakeGetFolderKey(const TFolderOptions& options) {
434+
static TString MakeGetFolderKey(const TFolderOptions& options, ui64 generation) {
411435
auto attrNode = NYT::TNode::CreateList();
412436
for (const auto& attr : options.Attributes()) {
413437
attrNode.Add(NYT::TNode(attr));
414438
}
415439

416440
auto keyNode = NYT::TNode()
441+
("Generation", generation)
417442
("Cluster", options.Cluster())
418443
("Prefix", options.Prefix())
419444
("Attributes", attrNode);
420445

421446
return MakeHash(NYT::NodeToCanonicalYsonString(keyNode, NYT::NYson::EYsonFormat::Binary));
422447
}
423448

424-
static TString MakeResolveLinksKey(const TResolveOptions& options) {
449+
static TString MakeResolveLinksKey(const TResolveOptions& options, ui64 generation) {
425450
auto itemsNode = NYT::TNode::CreateList();
426451
for (const auto& item : options.Items()) {
427452
auto attrNode = NYT::TNode::CreateList();
@@ -438,13 +463,14 @@ class TGateway : public IYtGateway {
438463
}
439464

440465
auto keyNode = NYT::TNode()
466+
("Generation", generation)
441467
("Cluster", options.Cluster())
442468
("Items", itemsNode);
443469

444470
return MakeHash(NYT::NodeToCanonicalYsonString(keyNode, NYT::NYson::EYsonFormat::Binary));
445471
}
446472

447-
static TString MakeGetFoldersKey(const TBatchFolderOptions& options) {
473+
static TString MakeGetFoldersKey(const TBatchFolderOptions& options, ui64 generation) {
448474
auto itemsNode = NYT::TNode();
449475
TMap<TString, size_t> order;
450476
for (size_t i = 0; i < options.Folders().size(); ++i) {
@@ -464,6 +490,7 @@ class TGateway : public IYtGateway {
464490
}
465491

466492
auto keyNode = NYT::TNode()
493+
("Generation", generation)
467494
("Cluster", options.Cluster())
468495
("Items", itemsNode);
469496

@@ -490,8 +517,13 @@ class TGateway : public IYtGateway {
490517
}
491518

492519
NThreading::TFuture<TFolderResult> GetFolder(TFolderOptions&& options) final {
520+
ui64 generation;
521+
with_lock(SessionGenerationsLock_) {
522+
generation = SessionGenerations_[options.SessionId()];
523+
}
524+
493525
if (QContext_.CanRead()) {
494-
const auto& key = MakeGetFolderKey(options);
526+
const auto& key = MakeGetFolderKey(options, generation);
495527
auto item = QContext_.GetReader()->Get({YtGateway_GetFolder, key}).GetValueSync();
496528
if (!item) {
497529
throw yexception() << "Missing replay data";
@@ -520,7 +552,7 @@ class TGateway : public IYtGateway {
520552

521553
auto optionsDup = options;
522554
return Inner_->GetFolder(std::move(options))
523-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TFolderResult>& future) {
555+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TFolderResult>& future) {
524556
if (!qContext.CanWrite() || future.HasException()) {
525557
return;
526558
}
@@ -530,7 +562,7 @@ class TGateway : public IYtGateway {
530562
return;
531563
}
532564

533-
const auto& key = MakeGetFolderKey(optionsDup);
565+
const auto& key = MakeGetFolderKey(optionsDup, generation);
534566
auto valueNode = NYT::TNode();
535567

536568
if (std::holds_alternative<TFileLinkPtr>(res.ItemsOrFileLink)) {
@@ -550,10 +582,15 @@ class TGateway : public IYtGateway {
550582
}
551583

552584
NThreading::TFuture<TBatchFolderResult> ResolveLinks(TResolveOptions&& options) final {
585+
ui64 generation;
586+
with_lock(SessionGenerationsLock_) {
587+
generation = SessionGenerations_[options.SessionId()];
588+
}
589+
553590
if (QContext_.CanRead()) {
554591
TBatchFolderResult res;
555592
res.SetSuccess();
556-
const auto& key = MakeResolveLinksKey(options);
593+
const auto& key = MakeResolveLinksKey(options, generation);
557594
auto item = QContext_.GetReader()->Get({YtGateway_ResolveLinks, key}).GetValueSync();
558595
if (!item) {
559596
throw yexception() << "Missing replay data";
@@ -571,7 +608,7 @@ class TGateway : public IYtGateway {
571608

572609
auto optionsDup = options;
573610
return Inner_->ResolveLinks(std::move(options))
574-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TBatchFolderResult>& future) {
611+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TBatchFolderResult>& future) {
575612
if (!qContext.CanWrite() || future.HasException()) {
576613
return;
577614
}
@@ -581,7 +618,7 @@ class TGateway : public IYtGateway {
581618
return;
582619
}
583620

584-
const auto& key = MakeResolveLinksKey(optionsDup);
621+
const auto& key = MakeResolveLinksKey(optionsDup, generation);
585622
NYT::TNode valueNode = NYT::TNode::CreateList();
586623
for (const auto& item : res.Items) {
587624
valueNode.Add(SerializeFolderItem(item));
@@ -593,10 +630,15 @@ class TGateway : public IYtGateway {
593630
}
594631

595632
NThreading::TFuture<TBatchFolderResult> GetFolders(TBatchFolderOptions&& options) final {
633+
ui64 generation;
634+
with_lock(SessionGenerationsLock_) {
635+
generation = SessionGenerations_[options.SessionId()];
636+
}
637+
596638
if (QContext_.CanRead()) {
597639
TBatchFolderResult res;
598640
res.SetSuccess();
599-
const auto& key = MakeGetFoldersKey(options);
641+
const auto& key = MakeGetFoldersKey(options, generation);
600642
auto item = QContext_.GetReader()->Get({YtGateway_GetFolders, key}).GetValueSync();
601643
if (!item) {
602644
throw yexception() << "Missing replay data";
@@ -614,7 +656,7 @@ class TGateway : public IYtGateway {
614656

615657
auto optionsDup = options;
616658
return Inner_->GetFolders(std::move(options))
617-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TBatchFolderResult>& future) {
659+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TBatchFolderResult>& future) {
618660
if (!qContext.CanWrite() || future.HasException()) {
619661
return;
620662
}
@@ -624,7 +666,7 @@ class TGateway : public IYtGateway {
624666
return;
625667
}
626668

627-
const auto& key = MakeGetFoldersKey(optionsDup);
669+
const auto& key = MakeGetFoldersKey(optionsDup, generation);
628670
NYT::TNode valueNode = NYT::TNode::CreateList();
629671
for (const auto& item : res.Items) {
630672
valueNode.Add(SerializeFolderItem(item));
@@ -699,8 +741,9 @@ class TGateway : public IYtGateway {
699741
return Inner_->DropTrackables(std::move(options));
700742
}
701743

702-
static TString MakePathStatKey(const TString& cluster, bool extended, const TPathStatReq& req) {
744+
static TString MakePathStatKey(const TString& cluster, bool extended, const TPathStatReq& req, ui64 generation) {
703745
auto node = NYT::TNode()
746+
("Generation", generation)
704747
("Cluster", cluster)
705748
("Extended", extended);
706749

@@ -766,13 +809,18 @@ class TGateway : public IYtGateway {
766809
}
767810

768811
NThreading::TFuture<TPathStatResult> PathStat(TPathStatOptions&& options) final {
812+
ui64 generation;
813+
with_lock(SessionGenerationsLock_) {
814+
generation = SessionGenerations_[options.SessionId()];
815+
}
816+
769817
if (QContext_.CanRead()) {
770818
TPathStatResult res;
771819
res.DataSize.resize(options.Paths().size(), 0);
772820
res.Extended.resize(options.Paths().size());
773821

774822
for (ui32 index = 0; index < options.Paths().size(); ++index) {
775-
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]);
823+
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index], generation);
776824
auto item = QContext_.GetReader()->Get({YtGateway_PathStat, key}).GetValueSync();
777825
if (!item) {
778826
throw yexception() << "Missing replay data";
@@ -789,7 +837,7 @@ class TGateway : public IYtGateway {
789837

790838
auto optionsDup = options;
791839
return Inner_->PathStat(std::move(options))
792-
.Subscribe([optionsDup, qContext = QContext_](const NThreading::TFuture<TPathStatResult>& future) {
840+
.Subscribe([optionsDup, qContext = QContext_, generation](const NThreading::TFuture<TPathStatResult>& future) {
793841
if (!qContext.CanWrite() || future.HasException()) {
794842
return;
795843
}
@@ -800,21 +848,26 @@ class TGateway : public IYtGateway {
800848
}
801849

802850
for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) {
803-
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]);
851+
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index], generation);
804852
auto value = SerializePathStat(res, index);
805853
qContext.GetWriter()->Put({YtGateway_PathStat, key}, value).GetValueSync();
806854
}
807855
});
808856
}
809857

810858
TPathStatResult TryPathStat(TPathStatOptions&& options) final {
859+
ui64 generation;
860+
with_lock(SessionGenerationsLock_) {
861+
generation = SessionGenerations_[options.SessionId()];
862+
}
863+
811864
if (QContext_.CanRead()) {
812865
TPathStatResult res;
813866
res.DataSize.resize(options.Paths().size(), 0);
814867
res.Extended.resize(options.Paths().size());
815868

816869
for (ui32 index = 0; index < options.Paths().size(); ++index) {
817-
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index]);
870+
const auto& key = MakePathStatKey(options.Cluster(), options.Extended(), options.Paths()[index], generation);
818871
bool allow = false;
819872
if (PathStatKeys_.contains(key)) {
820873
allow = true;
@@ -851,15 +904,15 @@ class TGateway : public IYtGateway {
851904

852905
if (!res.Success()) {
853906
for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) {
854-
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]);
907+
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index], generation);
855908
QContext_.GetWriter()->Put({YtGateway_PathStatMissing, key}, "1").GetValueSync();
856909
}
857910

858911
return res;
859912
}
860913

861914
for (ui32 index = 0; index < optionsDup.Paths().size(); ++index) {
862-
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index]);
915+
const auto& key = MakePathStatKey(optionsDup.Cluster(), optionsDup.Extended(), optionsDup.Paths()[index], generation);
863916
auto value = SerializePathStat(res, index);
864917
QContext_.GetWriter()->Put({YtGateway_PathStat, key}, value).GetValueSync();
865918
}
@@ -937,6 +990,8 @@ class TGateway : public IYtGateway {
937990
const TIntrusivePtr<IRandomProvider> RandomProvider_;
938991
const TFileStoragePtr FileStorage_;
939992
THashSet<TString> PathStatKeys_;
993+
THashMap<TString, ui64> SessionGenerations_;
994+
TMutex SessionGenerationsLock_;
940995
};
941996

942997
}

0 commit comments

Comments
 (0)