Skip to content

Commit 4d7998d

Browse files
authored
YQ-3697 Add partition count to dqrun (#9837)
1 parent dbf26c5 commit 4d7998d

File tree

4 files changed

+33
-22
lines changed

4 files changed

+33
-22
lines changed

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ NThreading::TFuture<IPqGateway::TListStreams> TDummyPqGateway::ListStreams(const
5353
TDummyPqGateway& TDummyPqGateway::AddDummyTopic(const TDummyTopic& topic) {
5454
with_lock (Mutex) {
5555
Y_ENSURE(topic.Cluster);
56-
Y_ENSURE(topic.Path);
57-
const auto key = std::make_pair(topic.Cluster, topic.Path);
56+
Y_ENSURE(topic.TopicName);
57+
const auto key = std::make_pair(topic.Cluster, topic.TopicName);
5858
Y_ENSURE(Topics.emplace(key, topic).second, "Already inserted dummy topic {" << topic.Cluster << ", " << topic.Path << "}");
5959
return *this;
6060
}

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_dummy_gateway.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
namespace NYql {
1010

1111
struct TDummyTopic {
12-
TDummyTopic(const TString& cluster, const TString& path, const TMaybe<TString>& filePath = {})
12+
TDummyTopic(const TString& cluster, const TString& topicName, const TMaybe<TString>& path = {}, size_t partitionCount = 1)
1313
: Cluster(cluster)
14+
, TopicName(topicName)
1415
, Path(path)
15-
, FilePath(filePath)
16+
, PartitionsCount(partitionCount)
1617
{
1718
}
1819

@@ -22,9 +23,9 @@ struct TDummyTopic {
2223
}
2324

2425
TString Cluster;
25-
TString Path;
26-
TMaybe<TString> FilePath;
27-
size_t PartitionsCount = 1;
26+
TString TopicName;
27+
TMaybe<TString> Path;
28+
size_t PartitionsCount;
2829
};
2930

3031
// Dummy Pq gateway for tests.

ydb/library/yql/providers/pq/gateway/dummy/yql_pq_file_topic_client.cpp

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <library/cpp/threading/future/async.h>
77

8+
#include <util/folder/path.h>
89
#include <util/system/file.h>
910
#include "yql_pq_blocking_queue.h"
1011

@@ -337,17 +338,24 @@ struct TDummyPartitionSession: public NYdb::NTopic::TPartitionSession {
337338

338339
std::shared_ptr<NYdb::NTopic::IReadSession> TFileTopicClient::CreateReadSession(const NYdb::NTopic::TReadSessionSettings& settings) {
339340
Y_ENSURE(!settings.Topics_.empty());
340-
auto topicPath = settings.Topics_.front().Path_;
341-
341+
const auto& topic = settings.Topics_.front();
342+
auto topicPath = topic.Path_;
343+
Y_ENSURE(topic.PartitionIds_.size() >= 1);
344+
ui64 partitionId = topic.PartitionIds_.front();
342345
auto topicsIt = Topics_.find(make_pair("pq", topicPath));
343346
Y_ENSURE(topicsIt != Topics_.end());
344-
auto filePath = topicsIt->second.FilePath;
347+
auto filePath = topicsIt->second.Path;
345348
Y_ENSURE(filePath);
346349

350+
TFsPath fsPath(*filePath);
351+
if (fsPath.IsDirectory()) {
352+
filePath = TStringBuilder() << *filePath << "/" << ToString(partitionId);
353+
} else if (!fsPath.Exists()) {
354+
filePath = TStringBuilder() << *filePath << "_" << partitionId;
355+
}
356+
347357
// TODO
348358
ui64 sessionId = 0;
349-
ui64 partitionId = 0;
350-
351359
return std::make_shared<TFileTopicReadSession>(
352360
TFile(*filePath, EOpenMode::TEnum::RdOnly),
353361
MakeIntrusive<TDummyPartitionSession>(sessionId, TString{topicPath}, partitionId)
@@ -411,7 +419,7 @@ std::shared_ptr<NYdb::NTopic::IWriteSession> TFileTopicClient::CreateWriteSessio
411419
auto topicPath = TString{settings.Path_};
412420
auto topicsIt = Topics_.find(make_pair("pq", topicPath));
413421
Y_ENSURE(topicsIt != Topics_.end());
414-
auto filePath = topicsIt->second.FilePath;
422+
auto filePath = topicsIt->second.Path;
415423
Y_ENSURE(filePath);
416424

417425
return std::make_shared<TFileTopicWriteSession>(TFile(*filePath, EOpenMode::TEnum::RdWr));

ydb/tests/tools/fqrun/fqrun.cpp

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -163,13 +163,15 @@ class TMain : public TMainBase {
163163
options.AddLongOption("emulate-pq", "Emulate YDS with local file, accepts list of tables to emulate with following format: topic@file (can be used in query from cluster `pq`)")
164164
.RequiredArgument("topic@file")
165165
.Handler1([this](const NLastGetopt::TOptsParser* option) {
166-
TStringBuf topicName;
167-
TStringBuf filePath;
168-
TStringBuf(option->CurVal()).Split('@', topicName, filePath);
169-
if (topicName.empty() || filePath.empty()) {
170-
ythrow yexception() << "Incorrect PQ file mapping, expected form topic@file";
166+
TStringBuf topicName, others;
167+
TStringBuf(option->CurVal()).Split('@', topicName, others);
168+
TStringBuf path, partitionCountStr;
169+
TStringBuf(others).Split(':', path, partitionCountStr);
170+
size_t partitionCount = !partitionCountStr.empty() ? FromString<size_t>(partitionCountStr) : 1;
171+
if (topicName.empty() || path.empty()) {
172+
ythrow yexception() << "Incorrect PQ file mapping, expected form topic@path[:partitions_count]" << Endl;
171173
}
172-
if (!PqFilesMapping.emplace(topicName, filePath).second) {
174+
if (!PqFilesMapping.emplace(topicName, NYql::TDummyTopic("pq", TString(topicName), TString(path), partitionCount)).second) {
173175
ythrow yexception() << "Got duplicated topic name: " << topicName;
174176
}
175177
});
@@ -222,8 +224,8 @@ class TMain : public TMainBase {
222224

223225
if (!PqFilesMapping.empty()) {
224226
auto fileGateway = MakeIntrusive<NYql::TDummyPqGateway>();
225-
for (const auto& [topic, file] : PqFilesMapping) {
226-
fileGateway->AddDummyTopic(NYql::TDummyTopic("pq", TString(topic), TString(file)));
227+
for (const auto& [_, topic] : PqFilesMapping) {
228+
fileGateway->AddDummyTopic(topic);
227229
}
228230
RunnerOptions.FqSettings.PqGateway = std::move(fileGateway);
229231
}
@@ -246,7 +248,7 @@ class TMain : public TMainBase {
246248
private:
247249
TExecutionOptions ExecutionOptions;
248250
TRunnerOptions RunnerOptions;
249-
std::unordered_map<TString, TString> PqFilesMapping;
251+
std::unordered_map<TString, NYql::TDummyTopic> PqFilesMapping;
250252
};
251253

252254
} // anonymous namespace

0 commit comments

Comments
 (0)