Skip to content

Commit 350eccc

Browse files
authored
YQ-3322 Row dispatcher (#5544)
1 parent b69d555 commit 350eccc

File tree

99 files changed

+7519
-246
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+7519
-246
lines changed

ydb/core/fq/libs/actors/clusters_from_connections.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ void FillPqClusterConfig(NYql::TPqClusterConfig& clusterConfig,
5151
clusterConfig.SetUseSsl(ds.secure());
5252
clusterConfig.SetAddBearerToToken(useBearerForYdb);
5353
clusterConfig.SetClusterType(TPqClusterConfig::CT_DATA_STREAMS);
54+
clusterConfig.SetSharedReading(ds.shared_reading());
5455
FillClusterAuth(clusterConfig, ds.auth(), authToken, accountIdSignatures);
5556
}
5657

ydb/core/fq/libs/actors/logging/log.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@
4747
#define LOG_STREAMS_STORAGE_SERVICE_AS_WARN(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, WARN, STREAMS_STORAGE_SERVICE, logRecordStream)
4848
#define LOG_STREAMS_STORAGE_SERVICE_AS_ERROR(actorSystem, logRecordStream) LOG_STREAMS_IMPL_AS(actorSystem, ERROR, STREAMS_STORAGE_SERVICE, logRecordStream)
4949

50+
// Component: ROW_DISPATCHER.
51+
#define LOG_ROW_DISPATCHER_TRACE(logRecordStream) LOG_STREAMS_IMPL(TRACE, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
52+
#define LOG_ROW_DISPATCHER_DEBUG(logRecordStream) LOG_STREAMS_IMPL(DEBUG, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
53+
#define LOG_ROW_DISPATCHER_INFO(logRecordStream) LOG_STREAMS_IMPL(INFO, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
54+
#define LOG_ROW_DISPATCHER_WARN(logRecordStream) LOG_STREAMS_IMPL(WARN, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
55+
#define LOG_ROW_DISPATCHER_ERROR(logRecordStream) LOG_STREAMS_IMPL(ERROR, FQ_ROW_DISPATCHER, LogPrefix << logRecordStream)
56+
5057
// Component: STREAMS_SCHEDULER_SERVICE.
5158
#define LOG_STREAMS_SCHEDULER_SERVICE_EMERG(logRecordStream) LOG_STREAMS_IMPL(EMERG, STREAMS_SCHEDULER_SERVICE, logRecordStream)
5259
#define LOG_STREAMS_SCHEDULER_SERVICE_ALERT(logRecordStream) LOG_STREAMS_IMPL(ALERT, STREAMS_SCHEDULER_SERVICE, logRecordStream)

ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#include <ydb/public/api/protos/draft/fq.pb.h>
1212

1313
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor.h>
14-
#include <ydb/library/yql/dq/actors/compute/retry_queue.h>
14+
#include <ydb/library/yql/dq/actors/common/retry_queue.h>
1515
#include <ydb/library/yql/providers/dq/actors/events.h>
1616
#include <ydb/library/yql/providers/dq/actors/task_controller_impl.h>
1717

ydb/core/fq/libs/config/protos/fq_config.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import "ydb/core/fq/libs/config/protos/quotas_manager.proto";
2222
import "ydb/core/fq/libs/config/protos/rate_limiter.proto";
2323
import "ydb/core/fq/libs/config/protos/read_actors_factory.proto";
2424
import "ydb/core/fq/libs/config/protos/resource_manager.proto";
25+
import "ydb/core/fq/libs/config/protos/row_dispatcher.proto";
2526
import "ydb/core/fq/libs/config/protos/test_connection.proto";
2627
import "ydb/core/fq/libs/config/protos/token_accessor.proto";
2728
import "ydb/library/folder_service/proto/config.proto";
@@ -53,4 +54,5 @@ message TConfig {
5354
TRateLimiterConfig RateLimiter = 22;
5455
bool EnableTaskCounters = 23;
5556
TComputeConfig Compute = 24;
57+
TRowDispatcherConfig RowDispatcher = 25;
5658
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
syntax = "proto3";
2+
option cc_enable_arenas = true;
3+
4+
package NFq.NConfig;
5+
option java_package = "ru.yandex.kikimr.proto";
6+
7+
import "ydb/core/fq/libs/config/protos/storage.proto";
8+
9+
////////////////////////////////////////////////////////////
10+
11+
message TRowDispatcherCoordinatorConfig {
12+
TYdbStorageConfig Database = 1;
13+
string CoordinationNodePath = 2;
14+
}
15+
message TRowDispatcherConfig {
16+
bool Enabled = 1;
17+
uint64 TimeoutBeforeStartSessionSec = 2;
18+
uint64 SendStatusPeriodSec = 3;
19+
uint64 MaxSessionUsedMemory = 4;
20+
bool WithoutConsumer = 5;
21+
TRowDispatcherCoordinatorConfig Coordinator = 6;
22+
23+
}

ydb/core/fq/libs/config/protos/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ SRCS(
2222
rate_limiter.proto
2323
read_actors_factory.proto
2424
resource_manager.proto
25+
row_dispatcher.proto
2526
storage.proto
2627
test_connection.proto
2728
token_accessor.proto

ydb/core/fq/libs/events/event_subspace.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ struct TYqEventSubspace {
3232
ControlPlaneConfig,
3333
YdbCompute,
3434
TableOverFq,
35-
35+
RowDispatcher,
3636
SubspacesEnd,
3737
};
3838

ydb/core/fq/libs/events/ya.make

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@ PEERDIR(
88
ydb/library/actors/core
99
ydb/core/fq/libs/graph_params/proto
1010
ydb/core/fq/libs/protos
11+
ydb/core/fq/libs/row_dispatcher/protos
1112
ydb/library/yql/core/facade
1213
ydb/library/yql/providers/common/db_id_async_resolver
1314
ydb/library/yql/providers/dq/provider
15+
ydb/library/yql/providers/pq/proto
1416
ydb/library/yql/public/issue
1517
ydb/public/api/protos
1618
ydb/public/sdk/cpp/client/ydb_table

ydb/core/fq/libs/init/init.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <ydb/core/fq/libs/rate_limiter/events/control_plane_events.h>
1818
#include <ydb/core/fq/libs/rate_limiter/events/data_plane.h>
1919
#include <ydb/core/fq/libs/rate_limiter/quoter_service/quoter_service.h>
20+
#include <ydb/core/fq/libs/row_dispatcher/row_dispatcher_service.h>
2021
#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
2122
#include <ydb/core/fq/libs/test_connection/test_connection.h>
2223

@@ -187,6 +188,18 @@ void Init(
187188
credentialsFactory = NYql::CreateSecuredServiceAccountCredentialsOverTokenAccessorFactory(tokenAccessorConfig.GetEndpoint(), tokenAccessorConfig.GetUseSsl(), caContent, tokenAccessorConfig.GetConnectionPoolSize());
188189
}
189190

191+
if (protoConfig.GetRowDispatcher().GetEnabled()) {
192+
auto rowDispatcher = NFq::NewRowDispatcherService(
193+
protoConfig.GetRowDispatcher(),
194+
protoConfig.GetCommon(),
195+
NKikimr::CreateYdbCredentialsProviderFactory,
196+
yqSharedResources,
197+
credentialsFactory,
198+
tenant,
199+
yqCounters->GetSubgroup("subsystem", "row_dispatcher"));
200+
actorRegistrator(NFq::RowDispatcherServiceActorId(), rowDispatcher.release());
201+
}
202+
190203
auto s3ActorsFactory = NYql::NDq::CreateS3ActorsFactory();
191204

192205
if (protoConfig.GetPrivateApi().GetEnabled()) {

ydb/core/fq/libs/init/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ PEERDIR(
2222
ydb/core/fq/libs/quota_manager
2323
ydb/core/fq/libs/rate_limiter/control_plane_service
2424
ydb/core/fq/libs/rate_limiter/quoter_service
25+
ydb/core/fq/libs/row_dispatcher
2526
ydb/core/fq/libs/shared_resources
2627
ydb/core/fq/libs/test_connection
2728
ydb/core/protos
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
#include <ydb/core/fq/libs/row_dispatcher/actors_factory.h>
2+
3+
#include <ydb/core/fq/libs/row_dispatcher/topic_session.h>
4+
5+
namespace NFq::NRowDispatcher {
6+
7+
8+
struct TActorFactory : public IActorFactory {
9+
TActorFactory() {}
10+
11+
NActors::TActorId RegisterTopicSession(
12+
const TString& topicPath,
13+
const NConfig::TRowDispatcherConfig& config,
14+
NActors::TActorId rowDispatcherActorId,
15+
ui32 partitionId,
16+
NYdb::TDriver driver,
17+
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
18+
const ::NMonitoring::TDynamicCounterPtr& counters) const override {
19+
20+
auto actorPtr = NFq::NewTopicSession(
21+
topicPath,
22+
config,
23+
rowDispatcherActorId,
24+
partitionId,
25+
std::move(driver),
26+
credentialsProviderFactory,
27+
counters
28+
);
29+
return NActors::TlsActivationContext->ExecutorThread.RegisterActor(actorPtr.release(), NActors::TMailboxType::HTSwap, Max<ui32>());
30+
}
31+
};
32+
33+
IActorFactory::TPtr CreateActorFactory() {
34+
return MakeIntrusive<TActorFactory>();
35+
}
36+
37+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#pragma once
2+
3+
#include <ydb/core/fq/libs/config/protos/row_dispatcher.pb.h>
4+
#include <util/generic/ptr.h>
5+
#include <ydb/library/actors/core/actor.h>
6+
#include <ydb/public/sdk/cpp/client/ydb_driver/driver.h>
7+
8+
namespace NFq::NRowDispatcher {
9+
10+
struct IActorFactory : public TThrRefBase {
11+
using TPtr = TIntrusivePtr<IActorFactory>;
12+
13+
virtual NActors::TActorId RegisterTopicSession(
14+
const TString& topicPath,
15+
const NConfig::TRowDispatcherConfig& config,
16+
NActors::TActorId rowDispatcherActorId,
17+
ui32 partitionId,
18+
NYdb::TDriver driver,
19+
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
20+
const ::NMonitoring::TDynamicCounterPtr& counters) const = 0;
21+
};
22+
23+
IActorFactory::TPtr CreateActorFactory();
24+
25+
}

0 commit comments

Comments
 (0)