Skip to content

Commit 09c88b0

Browse files
committed
yt/cpp/mapreduce: move Get, TryGet, Exists, MultisetAttributes to THttpRawClient
commit_hash:bd2228f98fa92de408ca850f9bc1608fdf99e7f5
1 parent 615edba commit 09c88b0

32 files changed

+376
-185
lines changed

yt/cpp/mapreduce/client/client.cpp

+47-13
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,22 @@ bool TClientBase::Exists(
126126
const TYPath& path,
127127
const TExistsOptions& options)
128128
{
129-
return NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
129+
return RequestWithRetry<bool>(
130+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
131+
[this, &path, &options] (TMutationId& mutationId) {
132+
return RawClient_->Exists(mutationId, TransactionId_, path, options);
133+
});
130134
}
131135

132136
TNode TClientBase::Get(
133137
const TYPath& path,
134138
const TGetOptions& options)
135139
{
136-
return NRawClient::Get(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, options);
140+
return RequestWithRetry<TNode>(
141+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
142+
[this, &path, &options] (TMutationId& mutationId) {
143+
return RawClient_->Get(mutationId, TransactionId_, path, options);
144+
});
137145
}
138146

139147
void TClientBase::Set(
@@ -149,12 +157,17 @@ void TClientBase::Set(
149157
}
150158

151159
void TClientBase::MultisetAttributes(
152-
const TYPath& path, const TNode::TMapType& value, const TMultisetAttributesOptions& options)
160+
const TYPath& path,
161+
const TNode::TMapType& value,
162+
const TMultisetAttributesOptions& options)
153163
{
154-
NRawClient::MultisetAttributes(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, path, value, options);
164+
RequestWithRetry<void>(
165+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
166+
[this, &path, &value, &options] (TMutationId& mutationId) {
167+
RawClient_->MultisetAttributes(mutationId, TransactionId_, path, value, options);
168+
});
155169
}
156170

157-
158171
TNode::TListType TClientBase::List(
159172
const TYPath& path,
160173
const TListOptions& options)
@@ -290,6 +303,7 @@ IFileReaderPtr TClientBase::CreateBlobTableReader(
290303
return new TBlobTableReader(
291304
path,
292305
key,
306+
RawClient_,
293307
ClientRetryPolicy_,
294308
GetTransactionPinger(),
295309
Context_,
@@ -303,6 +317,7 @@ IFileReaderPtr TClientBase::CreateFileReader(
303317
{
304318
return new TFileReader(
305319
CanonizeYPath(path),
320+
RawClient_,
306321
ClientRetryPolicy_,
307322
GetTransactionPinger(),
308323
Context_,
@@ -315,11 +330,18 @@ IFileWriterPtr TClientBase::CreateFileWriter(
315330
const TFileWriterOptions& options)
316331
{
317332
auto realPath = CanonizeYPath(path);
318-
if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
333+
334+
auto exists = RequestWithRetry<bool>(
335+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
336+
[this, &realPath] (TMutationId& mutationId) {
337+
return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_);
338+
});
339+
if (!exists) {
319340
NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_FILE,
320341
TCreateOptions().IgnoreExisting(true));
321342
}
322-
return new TFileWriter(realPath, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
343+
344+
return new TFileWriter(realPath, RawClient_, ClientRetryPolicy_, GetTransactionPinger(), Context_, TransactionId_, options);
323345
}
324346

325347
TTableWriterPtr<::google::protobuf::Message> TClientBase::CreateTableWriter(
@@ -343,6 +365,7 @@ TRawTableWriterPtr TClientBase::CreateRawWriter(
343365
const TTableWriterOptions& options)
344366
{
345367
return ::MakeIntrusive<TRetryfulWriter>(
368+
RawClient_,
346369
ClientRetryPolicy_,
347370
GetTransactionPinger(),
348371
Context_,
@@ -673,7 +696,7 @@ void TClientBase::CompleteOperation(const TOperationId& operationId)
673696

674697
void TClientBase::WaitForOperation(const TOperationId& operationId)
675698
{
676-
NYT::NDetail::WaitForOperation(ClientRetryPolicy_, Context_, operationId);
699+
NYT::NDetail::WaitForOperation(ClientRetryPolicy_, RawClient_, Context_, operationId);
677700
}
678701

679702
void TClientBase::AlterTable(
@@ -691,6 +714,7 @@ ::TIntrusivePtr<TClientReader> TClientBase::CreateClientReader(
691714
{
692715
return ::MakeIntrusive<TClientReader>(
693716
CanonizeYPath(path),
717+
RawClient_,
694718
ClientRetryPolicy_,
695719
GetTransactionPinger(),
696720
Context_,
@@ -706,12 +730,20 @@ THolder<TClientWriter> TClientBase::CreateClientWriter(
706730
const TTableWriterOptions& options)
707731
{
708732
auto realPath = CanonizeYPath(path);
709-
if (!NRawClient::Exists(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_)) {
733+
734+
auto exists = RequestWithRetry<bool>(
735+
ClientRetryPolicy_->CreatePolicyForGenericRequest(),
736+
[this, &realPath] (TMutationId& mutationId) {
737+
return RawClient_->Exists(mutationId, TransactionId_, realPath.Path_);
738+
});
739+
if (!exists) {
710740
NRawClient::Create(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, TransactionId_, realPath.Path_, NT_TABLE,
711741
TCreateOptions().IgnoreExisting(true));
712742
}
743+
713744
return MakeHolder<TClientWriter>(
714745
realPath,
746+
RawClient_,
715747
ClientRetryPolicy_,
716748
GetTransactionPinger(),
717749
Context_,
@@ -851,15 +883,16 @@ const IClientRetryPolicyPtr& TClientBase::GetRetryPolicy() const
851883
////////////////////////////////////////////////////////////////////////////////
852884

853885
TTransaction::TTransaction(
854-
IRawClientPtr rawClient,
886+
const IRawClientPtr& rawClient,
855887
TClientPtr parentClient,
856888
const TClientContext& context,
857889
const TTransactionId& parentTransactionId,
858890
const TStartTransactionOptions& options)
859-
: TClientBase(std::move(rawClient), context, parentTransactionId, parentClient->GetRetryPolicy())
891+
: TClientBase(rawClient, context, parentTransactionId, parentClient->GetRetryPolicy())
860892
, TransactionPinger_(parentClient->GetTransactionPinger())
861893
, PingableTx_(
862894
MakeHolder<TPingableTransaction>(
895+
rawClient,
863896
parentClient->GetRetryPolicy(),
864897
context,
865898
parentTransactionId,
@@ -871,15 +904,16 @@ TTransaction::TTransaction(
871904
}
872905

873906
TTransaction::TTransaction(
874-
IRawClientPtr rawClient,
907+
const IRawClientPtr& rawClient,
875908
TClientPtr parentClient,
876909
const TClientContext& context,
877910
const TTransactionId& transactionId,
878911
const TAttachTransactionOptions& options)
879-
: TClientBase(std::move(rawClient), context, transactionId, parentClient->GetRetryPolicy())
912+
: TClientBase(rawClient, context, transactionId, parentClient->GetRetryPolicy())
880913
, TransactionPinger_(parentClient->GetTransactionPinger())
881914
, PingableTx_(
882915
new TPingableTransaction(
916+
rawClient,
883917
parentClient->GetRetryPolicy(),
884918
context,
885919
transactionId,

yt/cpp/mapreduce/client/client.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ class TTransaction
292292
//
293293
// Start a new transaction.
294294
TTransaction(
295-
IRawClientPtr rawClient,
295+
const IRawClientPtr& rawClient,
296296
TClientPtr parentClient,
297297
const TClientContext& context,
298298
const TTransactionId& parentTransactionId,
@@ -301,7 +301,7 @@ class TTransaction
301301
//
302302
// Attach an existing transaction.
303303
TTransaction(
304-
IRawClientPtr rawClient,
304+
const IRawClientPtr& rawClient,
305305
TClientPtr parentClient,
306306
const TClientContext& context,
307307
const TTransactionId& transactionId,

yt/cpp/mapreduce/client/client_reader.cpp

+10-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
#include <yt/cpp/mapreduce/common/retry_lib.h>
99
#include <yt/cpp/mapreduce/common/wait_proxy.h>
1010

11+
#include <yt/cpp/mapreduce/http/helpers.h>
12+
#include <yt/cpp/mapreduce/http/requests.h>
13+
#include <yt/cpp/mapreduce/http/retry_request.h>
14+
1115
#include <yt/cpp/mapreduce/interface/config.h>
1216
#include <yt/cpp/mapreduce/interface/tvm.h>
1317

@@ -16,10 +20,7 @@
1620
#include <yt/cpp/mapreduce/io/helpers.h>
1721
#include <yt/cpp/mapreduce/io/yamr_table_reader.h>
1822

19-
#include <yt/cpp/mapreduce/http/helpers.h>
20-
#include <yt/cpp/mapreduce/http/requests.h>
21-
#include <yt/cpp/mapreduce/http/retry_request.h>
22-
23+
#include <yt/cpp/mapreduce/raw_client/raw_client.h>
2324
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
2425

2526
#include <library/cpp/yson/node/serialize.h>
@@ -38,6 +39,7 @@ using ::ToString;
3839

3940
TClientReader::TClientReader(
4041
const TRichYPath& path,
42+
const IRawClientPtr& rawClient,
4143
IClientRetryPolicyPtr clientRetryPolicy,
4244
ITransactionPingerPtr transactionPinger,
4345
const TClientContext& context,
@@ -46,6 +48,7 @@ TClientReader::TClientReader(
4648
const TTableReaderOptions& options,
4749
bool useFormatFromTableAttributes)
4850
: Path_(path)
51+
, RawClient_(rawClient)
4952
, ClientRetryPolicy_(std::move(clientRetryPolicy))
5053
, Context_(context)
5154
, ParentTransactionId_(transactionId)
@@ -56,12 +59,14 @@ TClientReader::TClientReader(
5659
if (options.CreateTransaction_) {
5760
Y_ABORT_UNLESS(transactionPinger, "Internal error: transactionPinger is null");
5861
ReadTransaction_ = MakeHolder<TPingableTransaction>(
62+
RawClient_,
5963
ClientRetryPolicy_,
6064
Context_,
6165
transactionId,
6266
transactionPinger->GetChildTxPinger(),
6367
TStartTransactionOptions());
6468
Path_.Path(Snapshot(
69+
RawClient_,
6570
ClientRetryPolicy_,
6671
Context_,
6772
ReadTransaction_->GetId(),
@@ -70,7 +75,7 @@ TClientReader::TClientReader(
7075

7176
if (useFormatFromTableAttributes) {
7277
auto transactionId2 = ReadTransaction_ ? ReadTransaction_->GetId() : ParentTransactionId_;
73-
auto newFormat = GetTableFormat(ClientRetryPolicy_, Context_, transactionId2, Path_);
78+
auto newFormat = GetTableFormat(ClientRetryPolicy_, RawClient_, transactionId2, Path_);
7479
if (newFormat) {
7580
Format_->Config = *newFormat;
7681
}

yt/cpp/mapreduce/client/client_reader.h

+4
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ class TClientReader
2121
public:
2222
TClientReader(
2323
const TRichYPath& path,
24+
const IRawClientPtr& rawClient,
2425
IClientRetryPolicyPtr clientRetryPolicy,
2526
ITransactionPingerPtr transactionPinger,
2627
const TClientContext& context,
@@ -43,8 +44,11 @@ class TClientReader
4344

4445
private:
4546
TRichYPath Path_;
47+
48+
const IRawClientPtr RawClient_;
4649
const IClientRetryPolicyPtr ClientRetryPolicy_;
4750
const TClientContext Context_;
51+
4852
TTransactionId ParentTransactionId_;
4953
TMaybe<TFormat> Format_;
5054
TTableReaderOptions Options_;

yt/cpp/mapreduce/client/client_writer.cpp

+8-2
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
11
#include "client_writer.h"
22

33
#include "retryful_writer.h"
4-
#include "retryless_writer.h"
54
#include "retryful_writer_v2.h"
5+
#include "retryless_writer.h"
66

7-
#include <yt/cpp/mapreduce/interface/io.h>
87
#include <yt/cpp/mapreduce/common/fwd.h>
98
#include <yt/cpp/mapreduce/common/helpers.h>
109

10+
#include <yt/cpp/mapreduce/interface/io.h>
11+
12+
#include <yt/cpp/mapreduce/raw_client/raw_client.h>
13+
1114
namespace NYT {
1215

1316
////////////////////////////////////////////////////////////////////////////////
1417

1518
TClientWriter::TClientWriter(
1619
const TRichYPath& path,
20+
const IRawClientPtr& rawClient,
1721
IClientRetryPolicyPtr clientRetryPolicy,
1822
ITransactionPingerPtr transactionPinger,
1923
const TClientContext& context,
@@ -38,6 +42,7 @@ TClientWriter::TClientWriter(
3842
auto serializedWriterOptions = FormIORequestParameters(options);
3943

4044
RawWriter_ = MakeIntrusive<NPrivate::TRetryfulWriterV2>(
45+
rawClient,
4146
std::move(clientRetryPolicy),
4247
std::move(transactionPinger),
4348
context,
@@ -50,6 +55,7 @@ TClientWriter::TClientWriter(
5055
options.CreateTransaction_);
5156
} else {
5257
RawWriter_.Reset(new TRetryfulWriter(
58+
rawClient,
5359
std::move(clientRetryPolicy),
5460
std::move(transactionPinger),
5561
context,

yt/cpp/mapreduce/client/client_writer.h

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ class TClientWriter
1717
public:
1818
TClientWriter(
1919
const TRichYPath& path,
20+
const IRawClientPtr& rawClient,
2021
IClientRetryPolicyPtr clientRetryPolicy,
2122
ITransactionPingerPtr transactionPinger,
2223
const TClientContext& context,

yt/cpp/mapreduce/client/file_reader.cpp

+10-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <yt/cpp/mapreduce/http/http_client.h>
2020
#include <yt/cpp/mapreduce/http/retry_request.h>
2121

22+
#include <yt/cpp/mapreduce/raw_client/raw_client.h>
2223
#include <yt/cpp/mapreduce/raw_client/raw_requests.h>
2324

2425
namespace NYT {
@@ -39,13 +40,16 @@ static TMaybe<ui64> GetEndOffset(const TFileReaderOptions& options) {
3940
////////////////////////////////////////////////////////////////////////////////
4041

4142
TStreamReaderBase::TStreamReaderBase(
43+
const IRawClientPtr& rawClient,
4244
IClientRetryPolicyPtr clientRetryPolicy,
4345
ITransactionPingerPtr transactionPinger,
4446
const TClientContext& context,
4547
const TTransactionId& transactionId)
46-
: Context_(context)
48+
: RawClient_(rawClient)
49+
, Context_(context)
4750
, ClientRetryPolicy_(std::move(clientRetryPolicy))
4851
, ReadTransaction_(MakeHolder<TPingableTransaction>(
52+
RawClient_,
4953
ClientRetryPolicy_,
5054
context,
5155
transactionId,
@@ -57,7 +61,7 @@ TStreamReaderBase::~TStreamReaderBase() = default;
5761

5862
TYPath TStreamReaderBase::Snapshot(const TYPath& path)
5963
{
60-
return NYT::Snapshot(ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path);
64+
return NYT::Snapshot(RawClient_, ClientRetryPolicy_, Context_, ReadTransaction_->GetId(), path);
6165
}
6266

6367
TString TStreamReaderBase::GetActiveRequestId() const
@@ -119,12 +123,13 @@ size_t TStreamReaderBase::DoRead(void* buf, size_t len)
119123

120124
TFileReader::TFileReader(
121125
const TRichYPath& path,
126+
const IRawClientPtr& rawClient,
122127
IClientRetryPolicyPtr clientRetryPolicy,
123128
ITransactionPingerPtr transactionPinger,
124129
const TClientContext& context,
125130
const TTransactionId& transactionId,
126131
const TFileReaderOptions& options)
127-
: TStreamReaderBase(std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
132+
: TStreamReaderBase(rawClient, std::move(clientRetryPolicy), std::move(transactionPinger), context, transactionId)
128133
, FileReaderOptions_(options)
129134
, Path_(path)
130135
, StartOffset_(FileReaderOptions_.Offset_.GetOrElse(0))
@@ -183,12 +188,13 @@ NHttpClient::IHttpResponsePtr TFileReader::Request(const TClientContext& context
183188
TBlobTableReader::TBlobTableReader(
184189
const TYPath& path,
185190
const TKey& key,
191+
const IRawClientPtr& rawClient,
186192
IClientRetryPolicyPtr retryPolicy,
187193
ITransactionPingerPtr transactionPinger,
188194
const TClientContext& context,
189195
const TTransactionId& transactionId,
190196
const TBlobTableReaderOptions& options)
191-
: TStreamReaderBase(std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
197+
: TStreamReaderBase(rawClient, std::move(retryPolicy), std::move(transactionPinger), context, transactionId)
192198
, Key_(key)
193199
, Options_(options)
194200
{

0 commit comments

Comments
 (0)