Skip to content

Commit 030e3a8

Browse files
authored
Fix sink write to secondary index (#7897)
1 parent c68f701 commit 030e3a8

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

ydb/core/kqp/runtime/kqp_write_actor.cpp

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
228228
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
229229
IgnoreFunc(TEvTxUserProxy::TEvAllocateTxIdResult);
230230
hFunc(TEvPrivate::TEvShardRequestTimeout, Handle);
231+
hFunc(TEvPrivate::TEvResolveRequestPlanned, Handle);
231232
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
232233
IgnoreFunc(TEvTxProxySchemeCache::TEvInvalidateTableResult);
233234
}
@@ -247,21 +248,26 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
247248
}
248249

249250
void PlanResolveTable() {
251+
CA_LOG_D("Plan resolve with delay " << CalculateNextAttemptDelay(ResolveAttempts));
250252
TlsActivationContext->Schedule(
251253
CalculateNextAttemptDelay(ResolveAttempts),
252254
new IEventHandle(SelfId(), SelfId(), new TEvPrivate::TEvResolveRequestPlanned{}, 0, 0));
253255
}
254256

257+
void Handle(TEvPrivate::TEvResolveRequestPlanned::TPtr&) {
258+
ResolveTable();
259+
}
260+
255261
void ResolveTable() {
256262
SchemeEntry.reset();
257263
SchemeRequest.reset();
258264

259265
if (ResolveAttempts++ >= BackoffSettings()->MaxResolveAttempts) {
260-
const auto error = TStringBuilder()
261-
<< "Too many table resolve attempts for Sink=" << this->SelfId() << ".";
262-
CA_LOG_E(error);
266+
CA_LOG_E(TStringBuilder()
267+
<< "Too many table resolve attempts for table " << TableId << ".");
263268
RuntimeError(
264-
error,
269+
TStringBuilder()
270+
<< "Too many table resolve attempts for table `" << Settings.GetTable().GetPath() << "`.",
265271
NYql::NDqProto::StatusIds::SCHEME_ERROR);
266272
return;
267273
}
@@ -273,21 +279,24 @@ class TKqpDirectWriteActor : public TActorBootstrapped<TKqpDirectWriteActor>, pu
273279
entry.RequestType = NSchemeCache::TSchemeCacheNavigate::TEntry::ERequestType::ByTableId;
274280
entry.Operation = NSchemeCache::TSchemeCacheNavigate::OpTable;
275281
entry.SyncVersion = false;
282+
entry.ShowPrivatePath = true;
276283
request->ResultSet.emplace_back(entry);
277284

278285
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(TableId, {}));
279286
Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvNavigateKeySet(request));
280287
}
281288

282289
void Handle(TEvTxProxySchemeCache::TEvNavigateKeySetResult::TPtr& ev) {
290+
auto& resultSet = ev->Get()->Request->ResultSet;
291+
YQL_ENSURE(resultSet.size() == 1);
292+
283293
if (ev->Get()->Request->ErrorCount > 0) {
284294
CA_LOG_E(TStringBuilder() << "Failed to get table: "
285-
<< TableId << "'");
295+
<< TableId << "'. Entry: " << resultSet[0].ToString());
286296
PlanResolveTable();
287297
return;
288298
}
289-
auto& resultSet = ev->Get()->Request->ResultSet;
290-
YQL_ENSURE(resultSet.size() == 1);
299+
291300
SchemeEntry = resultSet[0];
292301

293302
CA_LOG_D("Resolved TableId=" << TableId << " ("

ydb/core/kqp/ut/service/kqp_qs_queries_ut.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3183,7 +3183,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
31833183
CompareYson(output, R"([[1u;"test1";[10];["1"]];[2u;"test2";#;["2"]];[3u;"test3";[12];#];[4u;"test4";#;#];[100u;"test100";[1000];["100"]]])");
31843184
}
31853185

3186-
Y_UNIT_TEST(TableSink_OltpReplace) {
3186+
Y_UNIT_TEST_TWIN(TableSink_OltpReplace, HasSecondaryIndex) {
31873187
NKikimrConfig::TAppConfig appConfig;
31883188
appConfig.MutableTableServiceConfig()->SetEnableOltpSink(true);
31893189
auto settings = TKikimrSettings()
@@ -3195,14 +3195,15 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
31953195

31963196
auto session = kikimr.GetTableClient().CreateSession().GetValueSync().GetSession();
31973197

3198-
const TString query = R"(
3198+
const TString query = Sprintf(R"(
31993199
CREATE TABLE `/Root/DataShard` (
32003200
Col1 Uint64 NOT NULL,
32013201
Col2 Int32,
32023202
Col3 String,
3203+
%s
32033204
PRIMARY KEY (Col1)
32043205
);
3205-
)";
3206+
)", (HasSecondaryIndex ? "INDEX idx_2 GLOBAL ON (Col2)," : ""));
32063207

32073208
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
32083209
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

0 commit comments

Comments
 (0)