@@ -154,6 +154,9 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
154
154
155
155
}
156
156
157
+ srcReader->Close (TDuration::Zero ());
158
+ dstReader->Close (TDuration::Zero ());
159
+
157
160
// write to source topic
158
161
TVector<ui32> messagesPerPartition (partitionsCount, 0 );
159
162
for (ui32 partition = 0 ; partition < partitionsCount; ++partition) {
@@ -163,7 +166,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
163
166
{" some_extra_field2" , " another_value" + ToString (partition)},
164
167
{" file" , " /home/user/log" + ToString (partition)}
165
168
};
166
- auto writer = CreateSimpleWriter (*driver, srcTopic, sourceId, partition + 1 , std::nullopt, std::nullopt, sessionMeta);
169
+ auto writer = CreateSimpleWriter (*driver, srcTopic, sourceId, partition + 1 , std::nullopt, std::nullopt, sessionMeta);
167
170
168
171
ui64 seqNo = writer->GetInitSeqNo ();
169
172
@@ -211,10 +214,10 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
211
214
auto dstReader = createReader (dstTopic, partition);
212
215
213
216
for (ui32 i = 0 ; i < messagesPerPartition[partition]; ++i) {
214
- auto dstEvent = GetNextMessageSkipAssignment (dstReader);
217
+ auto dstEvent = GetNextMessageSkipAssignment (dstReader, TDuration::Seconds ( 1 ) );
215
218
UNIT_ASSERT (dstEvent);
216
219
Cerr << " Destination read message: " << dstEvent->DebugString () << " \n " ;
217
- auto srcEvent = GetNextMessageSkipAssignment (srcReader);
220
+ auto srcEvent = GetNextMessageSkipAssignment (srcReader, TDuration::Seconds ( 1 ) );
218
221
UNIT_ASSERT (srcEvent);
219
222
Cerr << " Source read message: " << srcEvent->DebugString () << " \n " ;
220
223
@@ -263,7 +266,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
263
266
server.AnnoyingClient ->CreateTopic (topicFullName, 1 );
264
267
265
268
auto driver = server.AnnoyingClient ->GetDriver ();
266
- auto writer = CreateSimpleWriter (*driver, topic, " src-id-test" );
269
+ auto writer = CreateSimpleWriter (*driver, topic, " src-id-test" );
267
270
for (auto i = 0u ; i < 5 ; i++) {
268
271
auto res = writer->Write (TString (10 , ' a' ));
269
272
UNIT_ASSERT (res);
@@ -299,7 +302,7 @@ Y_UNIT_TEST_SUITE(TPersQueueMirrorer) {
299
302
break ;
300
303
}
301
304
}
302
-
305
+
303
306
for (auto i = 0u ; i < 5 ; i++) {
304
307
auto res = writer->Write (TString (10 , ' b' ));
305
308
UNIT_ASSERT (res);
0 commit comments