Skip to content

Commit 6e215d5

Browse files
authored
Emulate PQ events from file (#9349)
1 parent b53600a commit 6e215d5

20 files changed

+552
-22
lines changed

ydb/core/fq/libs/init/init.cpp

+11-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
#include <ydb/library/yql/providers/s3/proto/retry_config.pb.h>
4545
#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
4646
#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
47+
#include <ydb/library/yql/providers/pq/gateway/native/yql_pq_gateway.h>
4748
#include <ydb/library/yql/providers/solomon/async_io/dq_solomon_write_actor.h>
4849
#include <ydb/library/yql/providers/common/http_gateway/yql_http_default_retry_policy.h>
4950

@@ -207,7 +208,16 @@ void Init(
207208
NYql::NDq::TS3ReadActorFactoryConfig readActorFactoryCfg = NYql::NDq::CreateReadActorFactoryConfig(protoConfig.GetGateways().GetS3());
208209

209210
RegisterDqInputTransformLookupActorFactory(*asyncIoFactory);
210-
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
211+
212+
NYql::TPqGatewayServices pqServices(
213+
yqSharedResources->UserSpaceYdbDriver,
214+
pqCmConnections,
215+
credentialsFactory,
216+
std::make_shared<NYql::TPqGatewayConfig>(protoConfig.GetGateways().GetPq()),
217+
appData->FunctionRegistry
218+
);
219+
RegisterDqPqReadActorFactory(*asyncIoFactory, yqSharedResources->UserSpaceYdbDriver, credentialsFactory, NYql::CreatePqNativeGateway(std::move(pqServices)),
220+
yqCounters->GetSubgroup("subsystem", "DqSourceTracker"));
211221

212222
s3ActorsFactory->RegisterS3ReadActorFactory(*asyncIoFactory, credentialsFactory, httpGateway, s3HttpRetryPolicy, readActorFactoryCfg,
213223
yqCounters->GetSubgroup("subsystem", "S3ReadActor"), protoConfig.GetGateways().GetS3().GetAllowLocalFiles());

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.cpp

+14-8
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,16 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
128128
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory,
129129
const NActors::TActorId& computeActorId,
130130
const ::NMonitoring::TDynamicCounterPtr& counters,
131-
i64 bufferSize)
131+
i64 bufferSize,
132+
const IPqGateway::TPtr& pqGateway)
132133
: TActor<TDqPqReadActor>(&TDqPqReadActor::StateFunc)
133134
, TDqPqReadActorBase(inputIndex, taskId, this->SelfId(), txId, std::move(sourceParams), std::move(readParams), computeActorId)
134135
, Metrics(txId, taskId, counters)
135136
, BufferSize(bufferSize)
136137
, HolderFactory(holderFactory)
137138
, Driver(std::move(driver))
138139
, CredentialsProviderFactory(std::move(credentialsProviderFactory))
140+
, PqGateway(pqGateway)
139141
{
140142
MetadataFields.reserve(SourceParams.MetadataFieldsSize());
141143
TPqMetaExtractor fieldsExtractor;
@@ -185,9 +187,9 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
185187
}
186188
}
187189

188-
NYdb::NTopic::TTopicClient& GetTopicClient() {
190+
ITopicClient& GetTopicClient() {
189191
if (!TopicClient) {
190-
TopicClient = std::make_unique<NYdb::NTopic::TTopicClient>(Driver, GetTopicClientSettings());
192+
TopicClient = PqGateway->GetTopicClient(Driver, GetTopicClientSettings());
191193
}
192194
return *TopicClient;
193195
}
@@ -229,7 +231,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
229231
ReadSession->Close(TDuration::Zero());
230232
ReadSession.reset();
231233
}
232-
TopicClient.reset();
234+
TopicClient.Reset();
233235
TActor<TDqPqReadActor>::PassAway();
234236
}
235237

@@ -568,7 +570,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
568570
const THolderFactory& HolderFactory;
569571
NYdb::TDriver Driver;
570572
std::shared_ptr<NYdb::ICredentialsProviderFactory> CredentialsProviderFactory;
571-
std::unique_ptr<NYdb::NTopic::TTopicClient> TopicClient;
573+
ITopicClient::TPtr TopicClient;
572574
std::shared_ptr<NYdb::NTopic::IReadSession> ReadSession;
573575
NThreading::TFuture<void> EventFuture;
574576
std::queue<std::pair<ui64, NYdb::NTopic::TDeferredCommit>> DeferredCommits;
@@ -578,6 +580,7 @@ class TDqPqReadActor : public NActors::TActor<TDqPqReadActor>, public NYql::NDq:
578580
std::queue<TReadyBatch> ReadyBuffer;
579581
TMaybe<TDqSourceWatermarkTracker<TPartitionKey>> WatermarkTracker;
580582
TMaybe<TInstant> NextIdlenesCheckAt;
583+
IPqGateway::TPtr PqGateway;
581584
};
582585

583586
std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
@@ -593,6 +596,7 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
593596
const NActors::TActorId& computeActorId,
594597
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
595598
const ::NMonitoring::TDynamicCounterPtr& counters,
599+
IPqGateway::TPtr pqGateway,
596600
i64 bufferSize
597601
)
598602
{
@@ -618,15 +622,16 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
618622
CreateCredentialsProviderFactoryForStructuredToken(credentialsFactory, token, addBearerToToken),
619623
computeActorId,
620624
counters,
621-
bufferSize
625+
bufferSize,
626+
pqGateway
622627
);
623628

624629
return {actor, actor};
625630
}
626631

627-
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters) {
632+
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters) {
628633
factory.RegisterSource<NPq::NProto::TDqPqTopicSource>("PqSource",
629-
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters](
634+
[driver = std::move(driver), credentialsFactory = std::move(credentialsFactory), counters, pqGateway](
630635
NPq::NProto::TDqPqTopicSource&& settings,
631636
IDqAsyncIoFactory::TSourceArguments&& args)
632637
{
@@ -646,6 +651,7 @@ void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driv
646651
args.ComputeActorId,
647652
args.HolderFactory,
648653
counters,
654+
pqGateway,
649655
PQReadDefaultFreeSpace);
650656
}
651657

ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
55

66
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
7+
#include <ydb/library/yql/providers/pq/provider/yql_pq_gateway.h>
78
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
89

910
#include <ydb/library/yql/providers/pq/proto/dq_io.pb.h>
@@ -35,9 +36,10 @@ std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqReadActor(
3536
const NActors::TActorId& computeActorId,
3637
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
3738
const ::NMonitoring::TDynamicCounterPtr& counters,
39+
IPqGateway::TPtr pqGateway,
3840
i64 bufferSize = PQReadDefaultFreeSpace
3941
);
4042

41-
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
43+
void RegisterDqPqReadActorFactory(TDqAsyncIoFactory& factory, NYdb::TDriver driver, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, const IPqGateway::TPtr& pqGateway, const ::NMonitoring::TDynamicCounterPtr& counters = MakeIntrusive<::NMonitoring::TDynamicCounters>());
4244

4345
} // namespace NYql::NDq

ydb/library/yql/providers/pq/gateway/dummy/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ LIBRARY()
22

33
SRCS(
44
yql_pq_dummy_gateway.cpp
5+
yql_pq_file_topic_client.cpp
56
)
67

78
PEERDIR(

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "yql_pq_dummy_gateway.h"
2+
#include "yql_pq_file_topic_client.h"
23

34
#include <util/generic/is_in.h>
45
#include <util/generic/yexception.h>
@@ -59,6 +60,14 @@ TDummyPqGateway& TDummyPqGateway::AddDummyTopic(const TDummyTopic& topic) {
5960
}
6061
}
6162

63+
IPqGateway::TPtr CreatePqFileGateway() {
64+
return MakeIntrusive<TDummyPqGateway>();
65+
}
66+
67+
ITopicClient::TPtr TDummyPqGateway::GetTopicClient(const NYdb::TDriver&, const NYdb::NTopic::TTopicClientSettings&) {
68+
return MakeIntrusive<TFileTopicClient>(Topics);
69+
}
70+
6271
void TDummyPqGateway::UpdateClusterConfigs(
6372
const TString& clusterName,
6473
const TString& endpoint,

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h

+11-3
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@
99
namespace NYql {
1010

1111
struct TDummyTopic {
12-
TDummyTopic(const TString& cluster, const TString& path)
12+
TDummyTopic(const TString& cluster, const TString& path, const TMaybe<TString>& filePath = {})
1313
: Cluster(cluster)
1414
, Path(path)
15+
, FilePath(filePath)
1516
{
1617
}
1718

@@ -22,15 +23,16 @@ struct TDummyTopic {
2223

2324
TString Cluster;
2425
TString Path;
26+
TMaybe<TString> FilePath;
2527
size_t PartitionsCount = 1;
2628
};
2729

2830
// Dummy Pq gateway for tests.
2931
class TDummyPqGateway : public IPqGateway {
3032
public:
3133
TDummyPqGateway& AddDummyTopic(const TDummyTopic& topic);
34+
~TDummyPqGateway() {}
3235

33-
public:
3436
NThreading::TFuture<void> OpenSession(const TString& sessionId, const TString& username) override;
3537
NThreading::TFuture<void> CloseSession(const TString& sessionId) override;
3638

@@ -54,11 +56,17 @@ class TDummyPqGateway : public IPqGateway {
5456
const TString& endpoint,
5557
const TString& database,
5658
bool secure) override;
59+
60+
ITopicClient::TPtr GetTopicClient(const NYdb::TDriver& driver, const NYdb::NTopic::TTopicClientSettings& settings) override;
5761

62+
using TClusterNPath = std::pair<TString, TString>;
5863
private:
5964
mutable TMutex Mutex;
60-
THashMap<std::pair<TString, TString>, TDummyTopic> Topics;
65+
THashMap<TClusterNPath, TDummyTopic> Topics;
66+
6167
THashSet<TString> OpenedSessions;
6268
};
6369

70+
IPqGateway::TPtr CreatePqFileGateway();
71+
6472
} // namespace NYql

0 commit comments

Comments
 (0)