Skip to content

Commit f02f39a

Browse files
GrigoriyPAblinkov
authored andcommitted
YQ-4133 fix race on RM initialization (#14931)
1 parent 03e85c6 commit f02f39a

File tree

2 files changed

+21
-8
lines changed

2 files changed

+21
-8
lines changed

ydb/core/kqp/rm_service/kqp_rm_service.cpp

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,14 @@ using namespace NResourceBroker;
3636
#define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
3737
#define LOG_N(stream) LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
3838

39-
#define LOG_AS_C(stream) LOG_CRIT_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
40-
#define LOG_AS_D(stream) LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
41-
#define LOG_AS_I(stream) LOG_INFO_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
42-
#define LOG_AS_E(stream) LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
43-
#define LOG_AS_W(stream) LOG_WARN_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
44-
#define LOG_AS_N(stream) LOG_NOTICE_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream)
39+
#define LOG_AS_SAFE(log) {if (ActorSystem) { log; }}
40+
41+
#define LOG_AS_C(stream) LOG_AS_SAFE(LOG_CRIT_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
42+
#define LOG_AS_D(stream) LOG_AS_SAFE(LOG_DEBUG_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
43+
#define LOG_AS_I(stream) LOG_AS_SAFE(LOG_INFO_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
44+
#define LOG_AS_E(stream) LOG_AS_SAFE(LOG_ERROR_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
45+
#define LOG_AS_W(stream) LOG_AS_SAFE(LOG_WARN_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
46+
#define LOG_AS_N(stream) LOG_AS_SAFE(LOG_NOTICE_S(*ActorSystem, NKikimrServices::KQP_RESOURCE_MANAGER, stream))
4547

4648
namespace {
4749

@@ -165,6 +167,7 @@ class TKqpResourceManager : public IKqpResourceManager {
165167
, TotalMemoryResource(MakeIntrusive<TMemoryResource>(config.GetQueryMemoryLimit(), (double)100, config.GetSpillingPercent()))
166168
, ResourceSnapshotState(std::make_shared<TResourceSnapshotState>())
167169
{
170+
PublishAfterBootstrap.clear();
168171
SetConfigValues(config);
169172
}
170173

@@ -179,6 +182,11 @@ class TKqpResourceManager : public IKqpResourceManager {
179182
config.GetKqpPatternCachePatternAccessTimesBeforeTryToCompile());
180183

181184
CreateResourceInfoExchanger(config.GetInfoExchangerSettings());
185+
186+
if (PublishAfterBootstrap.test()) {
187+
FireResourcesPublishing();
188+
PublishAfterBootstrap.clear();
189+
}
182190
}
183191

184192
const TIntrusivePtr<TKqpCounters>& GetCounters() const override {
@@ -489,7 +497,11 @@ class TKqpResourceManager : public IKqpResourceManager {
489497
void FireResourcesPublishing() {
490498
bool prev = PublishScheduled.test_and_set();
491499
if (!prev) {
492-
ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources);
500+
if (Y_LIKELY(ActorSystem)) {
501+
ActorSystem->Send(SelfId, new TEvPrivate::TEvSchedulePublishResources);
502+
} else {
503+
PublishAfterBootstrap.test_and_set();
504+
}
493505
}
494506
}
495507

@@ -535,6 +547,7 @@ class TKqpResourceManager : public IKqpResourceManager {
535547
// current state
536548
std::atomic<ui64> LastResourceBrokerTaskId = 0;
537549

550+
std::atomic_flag PublishAfterBootstrap;
538551
std::atomic_flag PublishScheduled;
539552
// pattern cache for different actors
540553
std::shared_ptr<NMiniKQL::TComputationPatternLRUCache> PatternCache;

ydb/tests/tools/kqprun/configuration/app_config.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
ActorSystemConfig {
22
Executor {
33
Type: BASIC
4-
Threads: 1
4+
Threads: 2
55
SpinThreshold: 10
66
Name: "System"
77
}

0 commit comments

Comments
 (0)