@@ -712,7 +712,6 @@ Y_UNIT_TEST_SUITE(TPersQueueTest) {
712
712
UNIT_ASSERT_C (resp.server_message_case () == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse , resp);
713
713
}
714
714
715
-
716
715
Y_UNIT_TEST (UpdatePartitionLocation) {
717
716
TPersQueueV1TestServer server;
718
717
SET_LOCALS;
@@ -6631,7 +6630,6 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
6631
6630
6632
6631
Y_UNIT_TEST (PartitionsMapping) {
6633
6632
NPersQueue::TTestServer server;
6634
-
6635
6633
TString topic = " topic1" ;
6636
6634
TString topicFullName = " rt3.dc1--" + topic;
6637
6635
@@ -6997,7 +6995,83 @@ TPersQueueV1TestServer server{{.CheckACL=true, .NodeCount=1}};
6997
6995
driver->Stop ();
6998
6996
}
6999
6997
7000
- Y_UNIT_TEST (ReadWithoutConsumer) {
6998
+ Y_UNIT_TEST (ReadWithoutConsumerFederation) {
6999
+ const ui32 partititonsCount = 5 ;
7000
+ const auto topic = " rt3.dc1--topic2" ;
7001
+
7002
+ TPersQueueV1TestServer server;
7003
+ server.Server ->AnnoyingClient ->CreateTopic (topic, partititonsCount);
7004
+
7005
+ NACLib::TDiffACL acl;
7006
+ acl.AddAccess (NACLib::EAccessType::Allow, NACLib::GenericFull, " user@" BUILTIN_ACL_DOMAIN);
7007
+ server.Server ->AnnoyingClient ->ModifyACL (" /Root/PQ" , topic, acl.SerializeAsString ());
7008
+
7009
+ auto writeSettings = NYdb::NPersQueue::TWriteSessionSettings ()
7010
+ .Path (topic)
7011
+ .MessageGroupId (" src_id" );
7012
+
7013
+ auto writer = server.PersQueueClient ->CreateSimpleBlockingWriteSession (writeSettings);
7014
+
7015
+ auto res = writer->Write (" some_data" );
7016
+ UNIT_ASSERT (res);
7017
+ writer->Close ();
7018
+
7019
+ std::shared_ptr<grpc::Channel> Channel_;
7020
+ std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> StubP_;
7021
+
7022
+ Channel_ = grpc::CreateChannel (" localhost:" + ToString (server.Server ->GrpcPort ), grpc::InsecureChannelCredentials ());
7023
+ StubP_ = Ydb::Topic::V1::TopicService::NewStub (Channel_);
7024
+
7025
+ grpc::ClientContext rcontext;
7026
+ rcontext.AddMetadata (" x-ydb-auth-ticket" , " user@" BUILTIN_ACL_DOMAIN);
7027
+ auto readStream = StubP_->StreamRead (&rcontext);
7028
+ UNIT_ASSERT (readStream);
7029
+
7030
+ {
7031
+ Ydb::Topic::StreamReadMessage::FromClient req;
7032
+ Ydb::Topic::StreamReadMessage::FromServer resp;
7033
+ auto topicReadSettings = req.mutable_init_request ()->add_topics_read_settings ();
7034
+ topicReadSettings->set_path (topic);
7035
+ for (ui32 i = 0 ; i < partititonsCount; i++) {
7036
+ topicReadSettings->add_partition_ids (i);
7037
+ }
7038
+
7039
+ req.mutable_init_request ()->set_consumer (" " );
7040
+
7041
+ if (!readStream->Write (req)) {
7042
+ ythrow yexception () << " write fail" ;
7043
+ }
7044
+
7045
+ UNIT_ASSERT (readStream->Read (&resp));
7046
+ UNIT_ASSERT (resp.server_message_case () == Ydb::Topic::StreamReadMessage::FromServer::kInitResponse );
7047
+ }
7048
+ ui32 partitionsSigned = 0 ;
7049
+
7050
+ while (partitionsSigned != partititonsCount) {
7051
+
7052
+ Ydb::Topic::StreamReadMessage::FromServer resp;
7053
+ UNIT_ASSERT (readStream->Read (&resp));
7054
+ UNIT_ASSERT_C (resp.server_message_case () == Ydb::Topic::StreamReadMessage::FromServer::kStartPartitionSessionRequest , resp);
7055
+ auto assignId = resp.start_partition_session_request ().partition_session ().partition_session_id ();
7056
+
7057
+ Ydb::Topic::StreamReadMessage::FromClient req;
7058
+ req.mutable_start_partition_session_response ()->set_partition_session_id (assignId);
7059
+ req.mutable_start_partition_session_response ()->set_read_offset (0 );
7060
+ auto res = readStream->Write (req);
7061
+ UNIT_ASSERT (res);
7062
+ partitionsSigned++;
7063
+ }
7064
+
7065
+ Ydb::Topic::StreamReadMessage::FromClient req;
7066
+ req.mutable_read_request ()->set_bytes_size (1 );
7067
+ readStream->Write (req);
7068
+
7069
+ Ydb::Topic::StreamReadMessage::FromServer resp;
7070
+ UNIT_ASSERT (readStream->Read (&resp));
7071
+ UNIT_ASSERT_C (resp.server_message_case () == Ydb::Topic::StreamReadMessage::FromServer::kReadResponse , resp);
7072
+ }
7073
+
7074
+ Y_UNIT_TEST (ReadWithoutConsumerFirstClassCitizen) {
7001
7075
auto readToEndThenCommit = [] (NPersQueue::TTestServer& server, ui32 partitions, ui32 maxOffset, TString consumer, ui32 readByBytes) {
7002
7076
std::shared_ptr<grpc::Channel> Channel_;
7003
7077
std::unique_ptr<Ydb::Topic::V1::TopicService::Stub> StubP_;
0 commit comments