-
Notifications
You must be signed in to change notification settings - Fork 699
YQ-3322 Row dispatcher #5544
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
YQ-3322 Row dispatcher #5544
Changes from all commits
e8ff96c
f0d2a8b
367cf6c
ba23e97
970d57c
fb1188f
19b8a65
29381f5
c625f16
babc089
c4c4eb3
d411ae0
0f000c6
8b260f3
e8e3c5a
6a77924
1257ca7
347918c
38dd0dc
1129176
0806d18
49284d6
baa2abc
8cf0614
225a134
0ddcb81
f5a0fd7
dd7e9d0
44d7687
d6940c8
4c02a3d
4046a60
cc03d7a
b68328f
ea61f3f
e0b91d2
0251534
4d3983d
7961d42
ce87ae4
398eec4
e9627f7
ece33f2
f503836
84cf07a
23a11f1
3f4357e
f8b00ef
ecd9a4b
d425526
618be0e
ba8b4bf
0c01fcc
3b42cf4
34e966a
eea9e62
6532c0e
381c230
ded20f0
bc13257
a5119f3
628f7a6
0178b35
5f76adb
e81ab69
f32632e
b0ee2e8
d62d67d
f3e94b6
0a9c93c
b651727
d4fe9a5
26694ff
5285c87
0fcf377
66814e8
81a8da1
20cd878
fe4e233
5b0edee
610595e
277907c
31eafe0
14616d6
72fef54
5d82e50
90e3ace
e53a079
7f28f02
a256870
785e0a5
bd1e8c0
853022c
dac9cd6
afcc80c
1df7952
99b7b55
8ac6851
68fa7c4
80ab259
70639d9
a68fd63
7a1b661
3805510
aaaab26
5a1ae58
24c4733
01def22
1e21e87
3f6312d
7db5b1d
b31d821
be830e2
a2e2287
5519708
7ea2bc2
8839fa1
3a12fb4
6c948e8
2f54d22
6912f39
cf24a47
fb4ec01
ef310ad
bd7ab79
d3f1321
128d95e
b438167
5d38c1d
024cdb7
58fbd32
46f152c
48780cb
073f3ef
340f601
166a5ae
487865d
0e07ef3
7885f2b
39f94cd
8ecac82
71ca4cc
828a282
b52ca73
d8dd2f9
60c00d0
9b077db
b7db4b1
a6bd525
99ee101
f8d2c9e
b370392
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
syntax = "proto3"; | ||
option cc_enable_arenas = true; | ||
|
||
package NFq.NConfig; | ||
option java_package = "ru.yandex.kikimr.proto"; | ||
|
||
import "ydb/core/fq/libs/config/protos/storage.proto"; | ||
|
||
//////////////////////////////////////////////////////////// | ||
|
||
message TRowDispatcherCoordinatorConfig { | ||
TYdbStorageConfig Database = 1; | ||
string CoordinationNodePath = 2; | ||
} | ||
message TRowDispatcherConfig { | ||
bool Enabled = 1; | ||
uint64 TimeoutBeforeStartSessionSec = 2; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. У нас есть еще вариант через строчку задавать таймауты. Можно писать "5m" и т.д. https://a.yandex-team.ru/arcadia/contrib/ydb/core/fq/libs/config/protos/control_plane_storage.proto?rev=r14879806#L36 |
||
uint64 SendStatusPeriodSec = 3; | ||
uint64 MaxSessionUsedMemory = 4; | ||
bool WithoutConsumer = 5; | ||
TRowDispatcherCoordinatorConfig Coordinator = 6; | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
#include <ydb/core/fq/libs/row_dispatcher/actors_factory.h> | ||
|
||
#include <ydb/core/fq/libs/row_dispatcher/topic_session.h> | ||
|
||
namespace NFq::NRowDispatcher { | ||
|
||
|
||
struct TActorFactory : public IActorFactory { | ||
TActorFactory() {} | ||
|
||
NActors::TActorId RegisterTopicSession( | ||
const TString& topicPath, | ||
const NConfig::TRowDispatcherConfig& config, | ||
NActors::TActorId rowDispatcherActorId, | ||
ui32 partitionId, | ||
NYdb::TDriver driver, | ||
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, | ||
const ::NMonitoring::TDynamicCounterPtr& counters) const override { | ||
|
||
auto actorPtr = NFq::NewTopicSession( | ||
topicPath, | ||
config, | ||
rowDispatcherActorId, | ||
partitionId, | ||
std::move(driver), | ||
credentialsProviderFactory, | ||
counters | ||
); | ||
return NActors::TlsActivationContext->ExecutorThread.RegisterActor(actorPtr.release(), NActors::TMailboxType::HTSwap, Max<ui32>()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Штука конечно опасная. Если когда нибудь вызовут RegisterTopicSession не из AS, то будет краш There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. не лучше ли явным образом сюда передать as? в конструктор или в метод There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. В итоге у меня не получилось переделать (через |
||
} | ||
}; | ||
|
||
IActorFactory::TPtr CreateActorFactory() { | ||
return MakeIntrusive<TActorFactory>(); | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
#pragma once | ||
|
||
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h> | ||
#include <util/generic/ptr.h> | ||
#include <ydb/library/actors/core/actor.h> | ||
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h> | ||
|
||
namespace NFq::NRowDispatcher { | ||
|
||
struct IActorFactory : public TThrRefBase { | ||
using TPtr = TIntrusivePtr<IActorFactory>; | ||
|
||
virtual NActors::TActorId RegisterTopicSession( | ||
const TString& topicPath, | ||
const NConfig::TRowDispatcherConfig& config, | ||
NActors::TActorId rowDispatcherActorId, | ||
ui32 partitionId, | ||
NYdb::TDriver driver, | ||
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory, | ||
const ::NMonitoring::TDynamicCounterPtr& counters) const = 0; | ||
}; | ||
|
||
IActorFactory::TPtr CreateActorFactory(); | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
У нас уже есть похожий конфиг для kesus: https://a.yandex-team.ru/arcadia/contrib/ydb/core/fq/libs/config/protos/rate_limiter.proto?rev=r14879806#L11-21
Может по аналогии назвать, что думаешь? Database/CoordinationNodePath