@@ -12,6 +12,11 @@ use anyhow::Context;
12
12
use common:: {
13
13
components:: ComponentPath ,
14
14
execution_context:: ExecutionId ,
15
+ knobs:: {
16
+ FUNCTION_LIMIT_WARNING_RATIO ,
17
+ TRANSACTION_MAX_READ_SIZE_BYTES ,
18
+ TRANSACTION_MAX_READ_SIZE_ROWS ,
19
+ } ,
15
20
types:: {
16
21
ModuleEnvironment ,
17
22
StorageUuid ,
@@ -21,6 +26,7 @@ use common::{
21
26
} ;
22
27
use events:: usage:: {
23
28
FunctionCallUsageFields ,
29
+ InsightReadLimitCall ,
24
30
UsageEvent ,
25
31
UsageEventLogger ,
26
32
} ;
@@ -253,6 +259,7 @@ impl UsageCounter {
253
259
stats,
254
260
execution_id,
255
261
request_id,
262
+ success,
256
263
& mut usage_metrics,
257
264
) ;
258
265
self . usage_logger . record ( usage_metrics) ;
@@ -276,6 +283,7 @@ impl UsageCounter {
276
283
stats,
277
284
execution_id,
278
285
request_id,
286
+ true ,
279
287
& mut usage_metrics,
280
288
) ;
281
289
self . usage_logger . record ( usage_metrics) ;
@@ -287,6 +295,7 @@ impl UsageCounter {
287
295
stats : FunctionUsageStats ,
288
296
execution_id : ExecutionId ,
289
297
request_id : RequestId ,
298
+ success : bool ,
290
299
usage_metrics : & mut Vec < UsageEvent > ,
291
300
) {
292
301
// Merge the storage stats.
@@ -332,7 +341,7 @@ impl UsageCounter {
332
341
egress_rows : 0 ,
333
342
} ) ;
334
343
}
335
- for ( ( component_path, table_name) , egress_size) in stats. database_egress_size {
344
+ for ( ( component_path, table_name) , egress_size) in stats. database_egress_size . clone ( ) {
336
345
let rows = stats
337
346
. database_egress_rows
338
347
. get ( & ( component_path. clone ( ) , table_name. clone ( ) ) )
@@ -348,6 +357,53 @@ impl UsageCounter {
348
357
egress_rows : * rows,
349
358
} ) ;
350
359
}
360
+
361
+ // Check read limits and add InsightReadLimit event if thresholds are exceeded
362
+ let total_rows: u64 = stats. database_egress_rows . values ( ) . sum ( ) ;
363
+ let total_bytes: u64 = stats. database_egress_size . values ( ) . sum ( ) ;
364
+
365
+ let row_threshold =
366
+ ( * TRANSACTION_MAX_READ_SIZE_ROWS as f64 * * FUNCTION_LIMIT_WARNING_RATIO ) as u64 ;
367
+ let byte_threshold =
368
+ ( * TRANSACTION_MAX_READ_SIZE_BYTES as f64 * * FUNCTION_LIMIT_WARNING_RATIO ) as u64 ;
369
+
370
+ let did_exceed_document_threshold = total_rows >= row_threshold;
371
+ let did_exceed_byte_threshold = total_bytes >= byte_threshold;
372
+
373
+ if did_exceed_document_threshold || did_exceed_byte_threshold {
374
+ let mut calls = Vec :: new ( ) ;
375
+ let component_path: ComponentPath = stats
376
+ . database_egress_rows
377
+ . clone ( )
378
+ . into_iter ( )
379
+ . next ( )
380
+ . map ( |( ( cp, _) , _) | cp)
381
+ . expect ( "Expected at least one database egress row since thresholds were exceeded" ) ;
382
+
383
+ for ( ( cp, table_name) , egress_rows) in stats. database_egress_rows . into_iter ( ) {
384
+ let egress = stats
385
+ . database_egress_size
386
+ . get ( & ( cp, table_name. clone ( ) ) )
387
+ . copied ( )
388
+ . unwrap_or ( 0 ) ;
389
+
390
+ calls. push ( InsightReadLimitCall {
391
+ table_name,
392
+ bytes_read : egress,
393
+ documents_read : egress_rows,
394
+ } ) ;
395
+ }
396
+
397
+ usage_metrics. push ( UsageEvent :: InsightReadLimit {
398
+ id : execution_id. to_string ( ) ,
399
+ request_id : request_id. to_string ( ) ,
400
+ udf_id : udf_id. clone ( ) ,
401
+ component_path : component_path. serialize ( ) ,
402
+ calls,
403
+ success,
404
+ } ) ;
405
+ }
406
+
351
407
for ( ( component_path, table_name) , ingress_size) in stats. vector_ingress_size {
352
408
usage_metrics. push ( UsageEvent :: VectorBandwidth {
353
409
id : execution_id. to_string ( ) ,
0 commit comments