Skip to content

The DescripeTopic call with the IncludeStats flag freezes (#9392) #9414

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions ydb/core/persqueue/pq_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1511,9 +1511,10 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
ctx);
}
partition.PendingRequests.clear();
} else {
++PartitionsInited;
}

++PartitionsInited;
Y_ABORT_UNLESS(ConfigInited);//partitions are inited only after config

auto allInitialized = AllOriginalPartitionsInited();
Expand Down Expand Up @@ -1914,11 +1915,17 @@ void TPersQueue::ProcessStatusRequests(const TActorContext &ctx) {

void TPersQueue::Handle(TEvPersQueue::TEvStatus::TPtr& ev, const TActorContext& ctx)
{
PQ_LOG_D("Handle TEvPersQueue::TEvStatus");

ReadBalancerActorId = ev->Sender;

if (!ConfigInited || !AllOriginalPartitionsInited()) {
StatusRequests.push_back(ev);
return;
PQ_LOG_D("Postpone the request." <<
" ConfigInited " << static_cast<int>(ConfigInited) <<
", PartitionsInited " << PartitionsInited <<
", OriginalPartitionsCount " << OriginalPartitionsCount);
StatusRequests.push_back(ev);
return;
}

ui32 cnt = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class TFixture : public NUnitTest::TBaseFixture {
const TString& consumer = TEST_CONSUMER,
size_t partitionCount = 1,
std::optional<size_t> maxPartitionCount = std::nullopt);
void DescribeTopic(const TString& path);

void WriteToTopicWithInvalidTxId(bool invalidTxId);

Expand Down Expand Up @@ -322,6 +323,11 @@ void TFixture::CreateTopic(const TString& path,
Setup->CreateTopic(path, consumer, partitionCount, maxPartitionCount);
}

void TFixture::DescribeTopic(const TString& path)
{
Setup->DescribeTopic(path);
}

const TDriver& TFixture::GetDriver() const
{
return *Driver;
Expand Down Expand Up @@ -1106,6 +1112,8 @@ Y_UNIT_TEST_F(WriteToTopic_Demo_6, TFixture)
UNIT_ASSERT_VALUES_EQUAL(messages[0], "message #1");
UNIT_ASSERT_VALUES_EQUAL(messages[1], "message #2");
}

DescribeTopic("topic_A");
}

Y_UNIT_TEST_F(WriteToTopic_Demo_7, TFixture)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ void TTopicSdkTestSetup::CreateTopic(const TString& path, const TString& consume
Server.WaitInit(path);
}

void TTopicSdkTestSetup::DescribeTopic(const TString& path)
{
TTopicClient client(MakeDriver());

TDescribeTopicSettings settings;
settings.IncludeStats(true);
settings.IncludeLocation(true);

auto status = client.DescribeTopic(path, settings).GetValueSync();
UNIT_ASSERT(status.IsSuccess());
}

TString TTopicSdkTestSetup::GetEndpoint() const {
return "localhost:" + ToString(Server.GrpcPort);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ class TTopicSdkTestSetup {
void CreateTopicWithAutoscale(const TString& path = TEST_TOPIC, const TString& consumer = TEST_CONSUMER, size_t partitionCount = 1,
size_t maxPartitionCount = 100);

void DescribeTopic(const TString& path = TEST_TOPIC);

TString GetEndpoint() const;
TString GetTopicPath(const TString& name = TEST_TOPIC) const;
TString GetTopicParent() const;
Expand Down
Loading