@@ -459,6 +459,32 @@ def f(txn: LoggingTransaction) -> List[str]:
459
459
460
460
return await self .db_pool .runInteraction ("get_push_action_users_in_range" , f )
461
461
462
+ def _get_receipts_by_room_txn (
463
+ self , txn : LoggingTransaction , user_id : str
464
+ ) -> List [Tuple [str , int ]]:
465
+ receipt_types_clause , args = make_in_list_sql_clause (
466
+ self .database_engine ,
467
+ "receipt_type" ,
468
+ (
469
+ ReceiptTypes .READ ,
470
+ ReceiptTypes .READ_PRIVATE ,
471
+ ReceiptTypes .UNSTABLE_READ_PRIVATE ,
472
+ ),
473
+ )
474
+
475
+ sql = f"""
476
+ SELECT room_id, MAX(stream_ordering)
477
+ FROM receipts_linearized
478
+ INNER JOIN events USING (room_id, event_id)
479
+ WHERE { receipt_types_clause }
480
+ AND user_id = ?
481
+ GROUP BY room_id
482
+ """
483
+
484
+ args .extend ((user_id ,))
485
+ txn .execute (sql , args )
486
+ return cast (List [Tuple [str , int ]], txn .fetchall ())
487
+
462
488
async def get_unread_push_actions_for_user_in_range_for_http (
463
489
self ,
464
490
user_id : str ,
@@ -482,106 +508,45 @@ async def get_unread_push_actions_for_user_in_range_for_http(
482
508
The list will have between 0~limit entries.
483
509
"""
484
510
485
- # find rooms that have a read receipt in them and return the next
486
- # push actions
487
- def get_after_receipt (
488
- txn : LoggingTransaction ,
489
- ) -> List [Tuple [str , str , int , str , bool ]]:
490
- # find rooms that have a read receipt in them and return the next
491
- # push actions
492
-
493
- receipt_types_clause , args = make_in_list_sql_clause (
494
- self .database_engine ,
495
- "receipt_type" ,
496
- (
497
- ReceiptTypes .READ ,
498
- ReceiptTypes .READ_PRIVATE ,
499
- ReceiptTypes .UNSTABLE_READ_PRIVATE ,
500
- ),
501
- )
502
-
503
- sql = f"""
504
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
505
- ep.highlight
506
- FROM (
507
- SELECT room_id,
508
- MAX(stream_ordering) as stream_ordering
509
- FROM events
510
- INNER JOIN receipts_linearized USING (room_id, event_id)
511
- WHERE { receipt_types_clause } AND user_id = ?
512
- GROUP BY room_id
513
- ) AS rl,
514
- event_push_actions AS ep
515
- WHERE
516
- ep.room_id = rl.room_id
517
- AND ep.stream_ordering > rl.stream_ordering
518
- AND ep.user_id = ?
519
- AND ep.stream_ordering > ?
520
- AND ep.stream_ordering <= ?
521
- AND ep.notif = 1
522
- ORDER BY ep.stream_ordering ASC LIMIT ?
523
- """
524
- args .extend (
525
- (user_id , user_id , min_stream_ordering , max_stream_ordering , limit )
526
- )
527
- txn .execute (sql , args )
528
- return cast (List [Tuple [str , str , int , str , bool ]], txn .fetchall ())
529
-
530
- after_read_receipt = await self .db_pool .runInteraction (
531
- "get_unread_push_actions_for_user_in_range_http_arr" , get_after_receipt
511
+ receipts_by_room = dict (
512
+ await self .db_pool .runInteraction (
513
+ "get_unread_push_actions_for_user_in_range_http_receipts" ,
514
+ self ._get_receipts_by_room_txn ,
515
+ user_id = user_id ,
516
+ ),
532
517
)
533
518
534
- # There are rooms with push actions in them but you don't have a read receipt in
535
- # them e.g. rooms you've been invited to, so get push actions for rooms which do
536
- # not have read receipts in them too.
537
- def get_no_receipt (
519
+ def get_push_actions_txn (
538
520
txn : LoggingTransaction ,
539
521
) -> List [Tuple [str , str , int , str , bool ]]:
540
- receipt_types_clause , args = make_in_list_sql_clause (
541
- self .database_engine ,
542
- "receipt_type" ,
543
- (
544
- ReceiptTypes .READ ,
545
- ReceiptTypes .READ_PRIVATE ,
546
- ReceiptTypes .UNSTABLE_READ_PRIVATE ,
547
- ),
548
- )
549
-
550
- sql = f"""
551
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
552
- ep.highlight
522
+ sql = """
523
+ SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions, ep.highlight
553
524
FROM event_push_actions AS ep
554
- INNER JOIN events AS e USING (room_id, event_id)
555
525
WHERE
556
- ep.room_id NOT IN (
557
- SELECT room_id FROM receipts_linearized
558
- WHERE { receipt_types_clause } AND user_id = ?
559
- GROUP BY room_id
560
- )
561
- AND ep.user_id = ?
526
+ ep.user_id = ?
562
527
AND ep.stream_ordering > ?
563
528
AND ep.stream_ordering <= ?
564
529
AND ep.notif = 1
565
530
ORDER BY ep.stream_ordering ASC LIMIT ?
566
531
"""
567
- args .extend (
568
- (user_id , user_id , min_stream_ordering , max_stream_ordering , limit )
569
- )
570
- txn .execute (sql , args )
532
+ txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
571
533
return cast (List [Tuple [str , str , int , str , bool ]], txn .fetchall ())
572
534
573
- no_read_receipt = await self .db_pool .runInteraction (
574
- "get_unread_push_actions_for_user_in_range_http_nrr " , get_no_receipt
535
+ push_actions = await self .db_pool .runInteraction (
536
+ "get_unread_push_actions_for_user_in_range_http " , get_push_actions_txn
575
537
)
576
538
577
539
notifs = [
578
540
HttpPushAction (
579
- event_id = row [ 0 ] ,
580
- room_id = row [ 1 ] ,
581
- stream_ordering = row [ 2 ] ,
582
- actions = _deserialize_action (row [ 3 ], row [ 4 ] ),
541
+ event_id = event_id ,
542
+ room_id = room_id ,
543
+ stream_ordering = stream_ordering ,
544
+ actions = _deserialize_action (actions , highlight ),
583
545
)
584
- for row in after_read_receipt + no_read_receipt
546
+ for event_id , room_id , stream_ordering , actions , highlight in push_actions
547
+ # Only include push actions with a stream ordering after any receipt, or without any
548
+ # receipt present (invited to but never read rooms).
549
+ if stream_ordering > receipts_by_room .get (room_id , 0 )
585
550
]
586
551
587
552
# Now sort it so it's ordered correctly, since currently it will
@@ -617,106 +582,49 @@ async def get_unread_push_actions_for_user_in_range_for_email(
617
582
The list will have between 0~limit entries.
618
583
"""
619
584
620
- # find rooms that have a read receipt in them and return the most recent
621
- # push actions
622
- def get_after_receipt (
623
- txn : LoggingTransaction ,
624
- ) -> List [Tuple [str , str , int , str , bool , int ]]:
625
- receipt_types_clause , args = make_in_list_sql_clause (
626
- self .database_engine ,
627
- "receipt_type" ,
628
- (
629
- ReceiptTypes .READ ,
630
- ReceiptTypes .READ_PRIVATE ,
631
- ReceiptTypes .UNSTABLE_READ_PRIVATE ,
632
- ),
633
- )
634
-
635
- sql = f"""
636
- SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
637
- ep.highlight, e.received_ts
638
- FROM (
639
- SELECT room_id,
640
- MAX(stream_ordering) as stream_ordering
641
- FROM events
642
- INNER JOIN receipts_linearized USING (room_id, event_id)
643
- WHERE { receipt_types_clause } AND user_id = ?
644
- GROUP BY room_id
645
- ) AS rl,
646
- event_push_actions AS ep
647
- INNER JOIN events AS e USING (room_id, event_id)
648
- WHERE
649
- ep.room_id = rl.room_id
650
- AND ep.stream_ordering > rl.stream_ordering
651
- AND ep.user_id = ?
652
- AND ep.stream_ordering > ?
653
- AND ep.stream_ordering <= ?
654
- AND ep.notif = 1
655
- ORDER BY ep.stream_ordering DESC LIMIT ?
656
- """
657
- args .extend (
658
- (user_id , user_id , min_stream_ordering , max_stream_ordering , limit )
659
- )
660
- txn .execute (sql , args )
661
- return cast (List [Tuple [str , str , int , str , bool , int ]], txn .fetchall ())
662
-
663
- after_read_receipt = await self .db_pool .runInteraction (
664
- "get_unread_push_actions_for_user_in_range_email_arr" , get_after_receipt
585
+ receipts_by_room = dict (
586
+ await self .db_pool .runInteraction (
587
+ "get_unread_push_actions_for_user_in_range_email_receipts" ,
588
+ self ._get_receipts_by_room_txn ,
589
+ user_id = user_id ,
590
+ ),
665
591
)
666
592
667
- # There are rooms with push actions in them but you don't have a read receipt in
668
- # them e.g. rooms you've been invited to, so get push actions for rooms which do
669
- # not have read receipts in them too.
670
- def get_no_receipt (
593
+ def get_push_actions_txn (
671
594
txn : LoggingTransaction ,
672
595
) -> List [Tuple [str , str , int , str , bool , int ]]:
673
- receipt_types_clause , args = make_in_list_sql_clause (
674
- self .database_engine ,
675
- "receipt_type" ,
676
- (
677
- ReceiptTypes .READ ,
678
- ReceiptTypes .READ_PRIVATE ,
679
- ReceiptTypes .UNSTABLE_READ_PRIVATE ,
680
- ),
681
- )
682
-
683
- sql = f"""
596
+ sql = """
684
597
SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,
685
598
ep.highlight, e.received_ts
686
599
FROM event_push_actions AS ep
687
600
INNER JOIN events AS e USING (room_id, event_id)
688
601
WHERE
689
- ep.room_id NOT IN (
690
- SELECT room_id FROM receipts_linearized
691
- WHERE { receipt_types_clause } AND user_id = ?
692
- GROUP BY room_id
693
- )
694
- AND ep.user_id = ?
602
+ ep.user_id = ?
695
603
AND ep.stream_ordering > ?
696
604
AND ep.stream_ordering <= ?
697
605
AND ep.notif = 1
698
606
ORDER BY ep.stream_ordering DESC LIMIT ?
699
607
"""
700
- args .extend (
701
- (user_id , user_id , min_stream_ordering , max_stream_ordering , limit )
702
- )
703
- txn .execute (sql , args )
608
+ txn .execute (sql , (user_id , min_stream_ordering , max_stream_ordering , limit ))
704
609
return cast (List [Tuple [str , str , int , str , bool , int ]], txn .fetchall ())
705
610
706
- no_read_receipt = await self .db_pool .runInteraction (
707
- "get_unread_push_actions_for_user_in_range_email_nrr " , get_no_receipt
611
+ push_actions = await self .db_pool .runInteraction (
612
+ "get_unread_push_actions_for_user_in_range_email " , get_push_actions_txn
708
613
)
709
614
710
615
# Make a list of dicts from the two sets of results.
711
616
notifs = [
712
617
EmailPushAction (
713
- event_id = row [ 0 ] ,
714
- room_id = row [ 1 ] ,
715
- stream_ordering = row [ 2 ] ,
716
- actions = _deserialize_action (row [ 3 ], row [ 4 ] ),
717
- received_ts = row [ 5 ] ,
618
+ event_id = event_id ,
619
+ room_id = room_id ,
620
+ stream_ordering = stream_ordering ,
621
+ actions = _deserialize_action (actions , highlight ),
622
+ received_ts = received_ts ,
718
623
)
719
- for row in after_read_receipt + no_read_receipt
624
+ for event_id , room_id , stream_ordering , actions , highlight , received_ts in push_actions
625
+ # Only include push actions with a stream ordering after any receipt, or without any
626
+ # receipt present (invited to but never read rooms).
627
+ if stream_ordering > receipts_by_room .get (room_id , 0 )
720
628
]
721
629
722
630
# Now sort it so it's ordered correctly, since currently it will
0 commit comments