@@ -142,35 +142,43 @@ namespace NKikimr::NPersQueueTests {
142
142
TTestServer server (false );
143
143
server.ServerSettings .PQConfig .SetTopicsAreFirstClassCitizen (true );
144
144
server.StartServer ();
145
+
145
146
server.EnableLogs ({NKikimrServices::PQ_READ_PROXY, NKikimrServices::TX_PROXY_SCHEME_CACHE});
147
+
148
+ Cerr << " >>>>> Prepare scheme" << Endl;
146
149
PrepareForGrpcNoDC (*server.AnnoyingClient );
147
- NYdb::TDriverConfig driverCfg;
148
150
149
- driverCfg.SetEndpoint (TStringBuilder () << " localhost:" << server.GrpcPort ).SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG)).SetDatabase (" /Root" );
151
+ Cerr << " >>>>> Create PersQueue client" << Endl;
152
+ NYdb::TDriverConfig driverCfg;
153
+ driverCfg.SetEndpoint (TStringBuilder () << " localhost:" << server.GrpcPort )
154
+ .SetLog (CreateLogBackend (" cerr" , ELogPriority::TLOG_DEBUG))
155
+ .SetDatabase (" /Root" );
150
156
151
157
auto ydbDriver = MakeHolder<NYdb::TDriver>(driverCfg);
152
158
auto persqueueClient = MakeHolder<NYdb::NPersQueue::TPersQueueClient>(*ydbDriver);
153
159
154
- TString topic = " account2/topic2" ;
155
- server.EnableLogs ({ NKikimrServices::PQ_READ_PROXY});
156
-
157
- NYdb::NPersQueue::TWriteSessionSettings writeSessionSettings;
158
- writeSessionSettings.ClusterDiscoveryMode (NYdb::NPersQueue::EClusterDiscoveryMode::Off)
159
- .Path (topic)
160
- .MessageGroupId (topic)
161
- .Codec (NYdb::NPersQueue::ECodec::RAW);
160
+ // Topic was created in PrepareForGrpcNoDC
161
+ const TString topic = " account2/topic2" ;
162
+ const TString consumerName = " userx" ;
162
163
163
164
{
165
+ Cerr << " >>>>> Create consumer '" << consumerName << " '" << Endl;
164
166
auto res = persqueueClient->AddReadRule (" /Root/" + topic,
165
- NYdb::NPersQueue::TAddReadRuleSettings ().ReadRule (NYdb::NPersQueue::TReadRuleSettings ().ConsumerName (" userx" )));
167
+ NYdb::NPersQueue::TAddReadRuleSettings ()
168
+ .ReadRule (NYdb::NPersQueue::TReadRuleSettings ()
169
+ .ConsumerName (consumerName)));
166
170
res.Wait ();
167
171
UNIT_ASSERT (res.GetValue ().IsSuccess ());
168
172
}
169
173
170
- TVector<TInstant> ts;
171
- TVector<ui32> firstOffset;
172
-
174
+ Cerr << " >>>>> Create writeSession" << Endl;
175
+ auto writeSessionSettings = NYdb::NPersQueue::TWriteSessionSettings ()
176
+ .ClusterDiscoveryMode (NYdb::NPersQueue::EClusterDiscoveryMode::Off)
177
+ .Path (topic)
178
+ .MessageGroupId (topic)
179
+ .Codec (NYdb::NPersQueue::ECodec::RAW);
173
180
auto writeSession = persqueueClient->CreateWriteSession (writeSessionSettings);
181
+
174
182
TMaybe<TContinuationToken> continuationToken = Nothing ();
175
183
ui32 messagesAcked = 0 ;
176
184
auto processEvent = [&](TWriteSessionEvent::TEvent& event) {
@@ -194,11 +202,13 @@ namespace NKikimr::NPersQueueTests {
194
202
}, event);
195
203
};
196
204
205
+ Cerr << " >>>>> Receiving continuationToken" << Endl;
197
206
for (auto & event: writeSession->GetEvents (true )) {
198
207
processEvent (event);
199
208
}
200
209
UNIT_ASSERT (continuationToken.Defined ());
201
210
211
+ Cerr << " >>>>> Write messages" << Endl;
202
212
for (ui32 i = 0 ; i < maxMessagesCount; ++i) {
203
213
TString message = generateMessage (i);
204
214
Cerr << " WRITTEN message " << i << " \n " ;
@@ -212,71 +222,86 @@ namespace NKikimr::NPersQueueTests {
212
222
}
213
223
}
214
224
215
- // TODO check skip inside big blob
216
- ui32 tsIt = 0 ;
217
- while (true ) {
225
+ // Ts and firstOffset and expectingQuantities will be set in first iteration of reading by received messages.
226
+ // Each will contains shifts from the message: before, equals and after.
227
+ // It allow check reading from different shift. First iteration read from zero.
228
+ TVector<TInstant> ts { TInstant::Zero () };
229
+ TVector<ui32> firstOffset { 0 };
230
+ TVector<size_t > expectingQuantities { maxMessagesCount };
231
+
232
+ // Start test scenario
233
+
234
+ Cerr << " >>>>> Start reading" << Endl << Flush;
235
+ for (size_t i = 0 ; i < ts.size (); ++i) {
236
+ TInstant curTs = ts[i];
237
+ size_t expectingQuantity = expectingQuantities[i];
238
+
239
+ Cerr << " >>>>> Iteration: " << i << " Start reading from " << curTs << " . ExpectingQuantity" << expectingQuantity << Endl << Flush;
240
+
241
+ // Accumulate received messages
242
+ // Key is unique message body
243
+ // Value is quantity of received messages with it body
244
+ TMap<TString, size_t > map;
245
+
218
246
std::shared_ptr<NYdb::NPersQueue::IReadSession> reader;
219
- TInstant curTs = tsIt == 0 ? TInstant::Zero () : (ts[tsIt]);
220
247
auto settings = NYdb::NPersQueue::TReadSessionSettings ()
221
248
.AppendTopics (topic)
222
- .ConsumerName (" userx " )
249
+ .ConsumerName (consumerName )
223
250
.StartingMessageTimestamp (curTs)
224
251
.ReadOnlyOriginal (true );
225
252
226
- TMap<TString, ui32> map;
227
- ui32 messagesReceived = 0 ;
228
253
settings.EventHandlers_ .SimpleDataHandlers ([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& event) mutable {
229
254
for (const auto & msg : event.GetMessages ()) {
230
- Cerr << " TS: " << curTs << " Got message: " << msg.DebugString (false ) << Endl;
231
- Cout.Flush ();
232
- auto count = ++map[msg.GetData ()];
233
- UNIT_ASSERT (count == 1 );
234
- if (tsIt == 0 ) {
235
- if (ts.empty ()) {
236
- ts.push_back (TInstant::Zero ());
237
- firstOffset.push_back (0 );
238
- }
255
+ Cerr << " Iteration: " << curTs << " Got message: " << msg.DebugString (false ) << Endl << Flush;
239
256
257
+ auto count = ++map[msg.GetData ()];
258
+ UNIT_ASSERT_C (count == 1 , " Each message must be received once" );
259
+ if (i == 0 ) {
260
+ // First iteration. Filling ts and firstOffset vectors from received messages
240
261
ts.push_back (msg.GetWriteTime () - TDuration::MilliSeconds (1 ));
241
262
ts.push_back (msg.GetWriteTime ());
242
263
ts.push_back (msg.GetWriteTime () + TDuration::MilliSeconds (1 ));
264
+
243
265
firstOffset.push_back (msg.GetOffset ());
244
266
firstOffset.push_back (msg.GetOffset ());
245
267
firstOffset.push_back (msg.GetOffset () + 1 );
246
268
247
- Cerr << " GOT MESSAGE TIMESTAMP " << ts.back () << " \n " ;
248
- } else {
249
- Cerr << " WAITING FIRST MESSAGE " << firstOffset[tsIt] << " got " << msg.GetOffset () << " \n " ;
269
+ size_t prevQuantity = expectingQuantities.back ();
270
+ expectingQuantities.push_back (prevQuantity);
271
+ expectingQuantities.push_back (prevQuantity);
272
+ expectingQuantities.push_back (prevQuantity - 1 );
250
273
251
- UNIT_ASSERT (messagesReceived > 0 || msg.GetOffset () == firstOffset[tsIt]);
274
+ Cerr << " Iteration: " << i << " GOT MESSAGE TIMESTAMP " << ts.back () << Endl << Flush;
275
+ } else if (map.size () == 1 ) {
276
+ auto expectedOffset = firstOffset[i];
277
+ UNIT_ASSERT_EQUAL_C (msg.GetOffset (), expectedOffset,
278
+ " Iteration: " << i << " Expected first message offset " << expectedOffset << " but got " << msg.GetOffset ());
252
279
}
253
- messagesReceived = msg.GetOffset () + 1 ;
254
280
}
255
281
}, false );
256
282
reader = CreateReader (*ydbDriver, settings);
257
283
258
- Cout << " Created reader \n " ;
284
+ Cerr << " >>>>> Iteration: " << i << " Reader was created " << Endl << Flush ;
259
285
260
- Cout.Flush ();
261
- while (messagesReceived < maxMessagesCount) Sleep (TDuration::MilliSeconds (10 ));
262
- Cerr << " Closing session. Got " << messagesReceived << " messages" << Endl;
263
- reader->Close (TDuration::Seconds (0 ));
264
- Cerr << " Session closed" << Endl;
286
+ Cerr << " >>>>> Iteration: " << i << " Wait receiving all messages" << Endl << Flush;
287
+ Sleep (TDuration::MilliSeconds (10 ));
288
+ while (map.size () < expectingQuantity) Sleep (TDuration::MilliSeconds (10 ));
265
289
266
- if (tsIt == 0 ) {
267
- for (ui32 i = 0 ; i < ts.size (); ++i) {
268
- Cout << " TS " << ts[i] << " OFFSET " << firstOffset[i] << " \n " ;
290
+ Cerr << " >>>>> Iteration: " << i << " Closing session. Got " << map.size () << " messages" << Endl << Flush;
291
+ reader->Close (TDuration::Seconds (0 ));
292
+ Cerr << " >>>>> Iteration: " << i << " Session closed" << Endl << Flush;
293
+
294
+ if (i == 0 ) {
295
+ for (ui32 j = 1 ; j < ts.size (); ++j) {
296
+ Cerr << " >>>>> Planed iteration: " << j
297
+ << " . Start reading from time: " << ts[j]
298
+ << " . Expected first message offset: " << firstOffset[j]
299
+ << " . Expected message quantity: " << expectingQuantities[j] << Endl;
269
300
}
270
301
}
271
-
272
-
273
- tsIt++;
274
- if (tsIt == ts.size ()) break ;
275
- if (firstOffset[tsIt] >= messagesReceived) break ;
276
302
}
277
303
}
278
304
279
-
280
305
Y_UNIT_TEST (TestReadAtTimestamp) {
281
306
auto generate1 = [](ui32 messageId) {
282
307
Y_UNUSED (messageId);
@@ -288,12 +313,11 @@ namespace NKikimr::NPersQueueTests {
288
313
289
314
auto generate2 = [](ui32 messageId) {
290
315
Y_UNUSED (messageId);
291
- TString message = " Hello___" + CreateGuidAsString () + TString (1_MB , ' a ' );
316
+ TString message = " Hello___" + CreateGuidAsString () + TString (10_MB , ' b ' );
292
317
return message;
293
318
};
294
319
295
320
TestReadAtTimestampImpl (3 , generate2);
296
-
297
321
}
298
322
299
323
Y_UNIT_TEST (TestWriteStat1stClass) {
0 commit comments