@@ -1206,6 +1206,7 @@ Y_UNIT_TEST_SUITE(Cdc) {
1206
1206
{
1207
1207
TTestYdsEnv env (tableDesc, streamDesc);
1208
1208
1209
+ DBGTRACE_LOG (" exec SQL" );
1209
1210
for (const auto & query : queries) {
1210
1211
ExecSQL (env.GetServer (), env.GetEdgeActor (), query);
1211
1212
}
@@ -1246,6 +1247,8 @@ Y_UNIT_TEST_SUITE(Cdc) {
1246
1247
1247
1248
// get records
1248
1249
{
1250
+ WaitForDataRecords (client, shardIt);
1251
+
1249
1252
auto res = client.GetRecords (shardIt).ExtractValueSync ();
1250
1253
UNIT_ASSERT_C (res.IsSuccess (), res.GetIssues ().ToString ());
1251
1254
UNIT_ASSERT_VALUES_EQUAL (res.GetResult ().records ().size (), records.size ());
@@ -1267,6 +1270,19 @@ Y_UNIT_TEST_SUITE(Cdc) {
1267
1270
}
1268
1271
}
1269
1272
1273
+ static void WaitForDataRecords (TDataStreamsClient& client, const TString& shardIt) {
1274
+ int n = 0 ;
1275
+ for (; n < 100 ; ++n) {
1276
+ auto res = client.GetRecords (shardIt).ExtractValueSync ();
1277
+ UNIT_ASSERT_C (res.IsSuccess (), res.GetIssues ().ToString ());
1278
+ if (res.GetResult ().records ().size ()) {
1279
+ break ;
1280
+ }
1281
+ Sleep (TDuration::MilliSeconds (100 ));
1282
+ }
1283
+ UNIT_ASSERT_VALUES_UNEQUAL (n, 100 );
1284
+ }
1285
+
1270
1286
static void Write (const TShardedTableOptions& tableDesc, const TCdcStream& streamDesc) {
1271
1287
TTestYdsEnv env (tableDesc, streamDesc);
1272
1288
0 commit comments