2
2
3
3
#include < ydb/core/engine/mkql_proto.h>
4
4
#include < ydb/core/engine/minikql/flat_local_tx_factory.h>
5
+ #include < ydb/core/tx/data_events/events.h>
6
+ #include < ydb/core/tx/data_events/payload_helper.h>
5
7
#include < ydb/core/tx/schemeshard/schemeshard.h>
6
8
#include < ydb/core/tx/tx_proxy/proxy.h>
7
9
#include < ydb/core/persqueue/events/global.h>
@@ -2256,31 +2258,31 @@ namespace NSchemeShardUT_Private {
2256
2258
NKikimr::NPQ::CmdWrite (&runtime, tabletId, edge, partitionId, " sourceid0" , msgSeqNo, data, false , {}, true , cookie, 0 );
2257
2259
}
2258
2260
2259
- void WriteRow (TTestActorRuntime& runtime, const TString& key, const TString& value, ui64 tabletId) {
2261
+ void UpdateRow (TTestActorRuntime& runtime, const TString& table, const ui32 key, const TString& value, ui64 tabletId) {
2260
2262
NKikimrMiniKQL::TResult result;
2261
2263
TString error;
2262
2264
NKikimrProto::EReplyStatus status = LocalMiniKQL (runtime, tabletId, Sprintf (R"(
2263
2265
(
2264
- (let key '( '('key (Utf8 '%s ) ) ) )
2266
+ (let key '( '('key (Uint32 '%d ) ) ) )
2265
2267
(let row '( '('value (Utf8 '%s) ) ) )
2266
- (return (AsList (UpdateRow '__user__Table key row) ))
2268
+ (return (AsList (UpdateRow '__user__%s key row) ))
2267
2269
)
2268
- )" , key.c_str (), value .c_str ()), result, error);
2270
+ )" , key, value .c_str (), table .c_str ()), result, error);
2269
2271
2270
2272
UNIT_ASSERT_VALUES_EQUAL_C (status, NKikimrProto::EReplyStatus::OK, error);
2271
2273
UNIT_ASSERT_VALUES_EQUAL (error, " " );
2272
2274
}
2273
2275
2274
- void WriteRowPg (TTestActorRuntime& runtime, const TString& key, ui32 value, ui64 tabletId) {
2276
+ void UpdateRowPg (TTestActorRuntime& runtime, const TString& table, const ui32 key, ui32 value, ui64 tabletId) {
2275
2277
NKikimrMiniKQL::TResult result;
2276
2278
TString error;
2277
2279
NKikimrProto::EReplyStatus status = LocalMiniKQL (runtime, tabletId, Sprintf (R"(
2278
2280
(
2279
- (let key '( '('key (Utf8 '%s ) ) ) )
2281
+ (let key '( '('key (Utf8 '%d ) ) ) )
2280
2282
(let row '( '('value (PgConst '%u (PgType 'int4)) ) ) )
2281
- (return (AsList (UpdateRow '__user__Table key row) ))
2283
+ (return (AsList (UpdateRow '__user__%s key row) ))
2282
2284
)
2283
- )" , key.c_str (), value ), result, error);
2285
+ )" , key, value, table .c_str ()), result, error);
2284
2286
2285
2287
UNIT_ASSERT_VALUES_EQUAL_C (status, NKikimrProto::EReplyStatus::OK, error);
2286
2288
UNIT_ASSERT_VALUES_EQUAL (error, " " );
@@ -2291,6 +2293,7 @@ namespace NSchemeShardUT_Private {
2291
2293
auto tableDesc = DescribePath (runtime, tablePath, true , true );
2292
2294
const auto & tablePartitions = tableDesc.GetPathDescription ().GetTablePartitions ();
2293
2295
UNIT_ASSERT (partitionIdx < tablePartitions.size ());
2296
+ const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId ();
2294
2297
2295
2298
auto ev = MakeHolder<TEvDataShard::TEvUploadRowsRequest>();
2296
2299
ev->Record .SetTableId (tableDesc.GetPathId ());
@@ -2314,7 +2317,34 @@ namespace NSchemeShardUT_Private {
2314
2317
}
2315
2318
2316
2319
const auto & sender = runtime.AllocateEdgeActor ();
2317
- ForwardToTablet (runtime, tablePartitions[partitionIdx]. GetDatashardId () , sender, ev.Release ());
2320
+ ForwardToTablet (runtime, datashardTabletId , sender, ev.Release ());
2318
2321
runtime.GrabEdgeEvent <TEvDataShard::TEvUploadRowsResponse>(sender);
2319
2322
}
2323
+
2324
+ void WriteRow (TTestActorRuntime& runtime, const ui64 txId, const TString& tablePath, int partitionIdx, const ui32 key, const TString& value, bool successIsExpected) {
2325
+ auto tableDesc = DescribePath (runtime, tablePath, true , true );
2326
+ const auto & tablePartitions = tableDesc.GetPathDescription ().GetTablePartitions ();
2327
+ UNIT_ASSERT (partitionIdx < tablePartitions.size ());
2328
+ const ui64 tableId = tableDesc.GetPathId ();
2329
+ const ui64 datashardTabletId = tablePartitions[partitionIdx].GetDatashardId ();
2330
+
2331
+ const auto & sender = runtime.AllocateEdgeActor ();
2332
+
2333
+ std::vector<ui32> columnIds{1 , 2 };
2334
+
2335
+ TVector<TCell> cells{TCell ((const char *)&key, sizeof (ui32)), TCell (value.c_str (), value.size ())};
2336
+
2337
+ TSerializedCellMatrix matrix (cells, 1 , 2 );
2338
+
2339
+ auto evWrite = std::make_unique<NKikimr::NEvents::TDataEvents::TEvWrite>(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE);
2340
+ ui64 payloadIndex = NKikimr::NEvWrite::TPayloadHelper<NKikimr::NEvents::TDataEvents::TEvWrite>(*evWrite).AddDataToPayload (std::move (matrix.ReleaseBuffer ()));
2341
+ evWrite->AddOperation (NKikimrDataEvents::TEvWrite::TOperation::OPERATION_UPSERT, tableId, 1 , columnIds, payloadIndex, NKikimrDataEvents::FORMAT_CELLVEC);
2342
+
2343
+ ForwardToTablet (runtime, datashardTabletId, sender, evWrite.release ());
2344
+
2345
+ auto ev = runtime.GrabEdgeEventRethrow <NEvents::TDataEvents::TEvWriteResult>(sender);
2346
+ auto status = ev->Get ()->Record .GetStatus ();
2347
+
2348
+ UNIT_ASSERT_C (successIsExpected == (status == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED), " Status: " << ev->Get ()->Record .GetStatus () << " Issues: " << ev->Get ()->Record .GetIssues ());
2349
+ }
2320
2350
}
0 commit comments