Skip to content

Commit 1e15d50

Browse files
faucctKamil Khamitov
authored and
Kamil Khamitov
committed
Allow sending/reading values larger than 16MB to RPC proxy via wire protocol to/from methods dealing with static tables
Historically, it data written through RPC proxies had pass dynamic table row validation standards, like max 16MB value size. This is inconvenient for static tables. SPYT needs to write large values – ytsaurus/ytsaurus-spyt#43. Disable wire input Max*ValueLength validation on RPC proxies. The methods affected by this change are WriteTable and ReadShuffleData in rpc proxy server and TTableReader in rpc proxy client. * Changelog entry Type: feature Component: proxy Allow sending/reading values larger than 16MB to RPC proxy via wire protocol to/from methods dealing with static tables --- Pull Request resolved: ytsaurus/ytsaurus#1019 commit_hash:264beb456994ba459ef06d8f5c25bf6d52abc08b
1 parent d51e1dc commit 1e15d50

File tree

5 files changed

+28
-7
lines changed

5 files changed

+28
-7
lines changed

yt/yt/client/api/rpc_proxy/row_batch_reader.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ TRowBatchReader::TRowBatchReader(
1818
IAsyncZeroCopyInputStreamPtr underlying,
1919
bool isStreamWithStatistics)
2020
: Underlying_(std::move(underlying))
21-
, Decoder_(CreateWireRowStreamDecoder(NameTable_))
21+
, Decoder_(CreateWireRowStreamDecoder(NameTable_, CreateUnlimitedWireProtocolOptions()))
2222
, IsStreamWithStatistics_(isStreamWithStatistics)
2323
{
2424
YT_VERIFY(Underlying_);

yt/yt/client/api/rpc_proxy/wire_row_stream.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,11 @@ class TWireRowStreamDecoder
7474
: public IRowStreamDecoder
7575
{
7676
public:
77-
explicit TWireRowStreamDecoder(TNameTablePtr nameTable)
77+
explicit TWireRowStreamDecoder(
78+
TNameTablePtr nameTable,
79+
TWireProtocolOptions wireProtocolOptions = {})
7880
: NameTable_(std::move(nameTable))
81+
, WireProtocolOptions_(std::move(wireProtocolOptions))
7982
{
8083
Descriptor_.set_wire_format_version(NApi::NRpcProxy::CurrentWireFormatVersion);
8184
Descriptor_.set_rowset_kind(NApi::NRpcProxy::NProto::RK_UNVERSIONED);
@@ -86,7 +89,7 @@ class TWireRowStreamDecoder
8689
const NProto::TRowsetDescriptor& descriptorDelta) override
8790
{
8891
struct TWireRowStreamDecoderTag { };
89-
auto reader = CreateWireProtocolReader(payloadRef, New<TRowBuffer>(TWireRowStreamDecoderTag()));
92+
auto reader = CreateWireProtocolReader(payloadRef, New<TRowBuffer>(TWireRowStreamDecoderTag()), WireProtocolOptions_);
9093
auto rows = reader->ReadUnversionedRowset(true);
9194

9295
auto oldNameTableSize = Descriptor_.name_table_entries_size();
@@ -125,18 +128,20 @@ class TWireRowStreamDecoder
125128

126129
private:
127130
const TNameTablePtr NameTable_;
131+
const TWireProtocolOptions WireProtocolOptions_;
128132

129133
NApi::NRpcProxy::NProto::TRowsetDescriptor Descriptor_;
130134
TNameTableToSchemaIdMapping IdMapping_;
131135
bool HasNontrivialIdMapping_ = false;
132136
};
133137

134-
IRowStreamDecoderPtr CreateWireRowStreamDecoder(TNameTablePtr nameTable)
138+
IRowStreamDecoderPtr CreateWireRowStreamDecoder(
139+
TNameTablePtr nameTable,
140+
TWireProtocolOptions wireProtocolOptions)
135141
{
136-
return New<TWireRowStreamDecoder>(std::move(nameTable));
142+
return New<TWireRowStreamDecoder>(std::move(nameTable), std::move(wireProtocolOptions));
137143
}
138144

139145
////////////////////////////////////////////////////////////////////////////////
140146

141147
} // namespace NYT::NApi::NRpcProxy
142-

yt/yt/client/api/rpc_proxy/wire_row_stream.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@
33
#include "public.h"
44

55
#include <yt/yt/client/table_client/public.h>
6+
#include <yt/yt/client/table_client/wire_protocol.h>
67

78
namespace NYT::NApi::NRpcProxy {
89

910
////////////////////////////////////////////////////////////////////////////////
1011

1112
IRowStreamEncoderPtr CreateWireRowStreamEncoder(NTableClient::TNameTablePtr nameTable);
12-
IRowStreamDecoderPtr CreateWireRowStreamDecoder(NTableClient::TNameTablePtr nameTable);
13+
IRowStreamDecoderPtr CreateWireRowStreamDecoder(
14+
NTableClient::TNameTablePtr nameTable,
15+
NTableClient::TWireProtocolOptions wireProtocolOptions = {});
1316

1417
////////////////////////////////////////////////////////////////////////////////
1518

yt/yt/client/table_client/wire_protocol.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1055,6 +1055,17 @@ auto IWireProtocolReader::GetSchemaData(const TTableSchema& schema) -> TSchemaDa
10551055

10561056
////////////////////////////////////////////////////////////////////////////////
10571057

1058+
TWireProtocolOptions CreateUnlimitedWireProtocolOptions()
1059+
{
1060+
return {
1061+
.MaxStringValueLength = std::numeric_limits<i64>::max(),
1062+
.MaxAnyValueLength = std::numeric_limits<i64>::max(),
1063+
.MaxCompositeValueLength = std::numeric_limits<i64>::max(),
1064+
};
1065+
}
1066+
1067+
////////////////////////////////////////////////////////////////////////////////
1068+
10581069
std::unique_ptr<IWireProtocolReader> CreateWireProtocolReader(TSharedRef data, TRowBufferPtr rowBuffer, TWireProtocolOptions options)
10591070
{
10601071
return std::make_unique<TWireProtocolReader>(std::move(data), std::move(rowBuffer), std::move(options));

yt/yt/client/table_client/wire_protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,8 @@ struct TWireProtocolOptions
288288
i64 MaxVersionedRowDataWeight = NTableClient::MaxServerVersionedRowDataWeight;
289289
};
290290

291+
TWireProtocolOptions CreateUnlimitedWireProtocolOptions();
292+
291293
////////////////////////////////////////////////////////////////////////////////
292294

293295
//! Creates wire protocol reader.

0 commit comments

Comments
 (0)