@@ -113,24 +113,6 @@ def __init__(
113
113
prefilled_cache = receipts_stream_prefill ,
114
114
)
115
115
116
- self .db_pool .updates .register_background_index_update (
117
- "receipts_linearized_unique_index" ,
118
- index_name = "receipts_linearized_unique_index" ,
119
- table = "receipts_linearized" ,
120
- columns = ["room_id" , "receipt_type" , "user_id" ],
121
- where_clause = "thread_id IS NULL" ,
122
- unique = True ,
123
- )
124
-
125
- self .db_pool .updates .register_background_index_update (
126
- "receipts_graph_unique_index" ,
127
- index_name = "receipts_graph_unique_index" ,
128
- table = "receipts_graph" ,
129
- columns = ["room_id" , "receipt_type" , "user_id" ],
130
- where_clause = "thread_id IS NULL" ,
131
- unique = True ,
132
- )
133
-
134
116
def get_max_receipt_stream_id (self ) -> int :
135
117
"""Get the current max stream ID for receipts stream"""
136
118
return self ._receipts_id_gen .get_current_token ()
@@ -702,9 +684,6 @@ def _insert_linearized_receipt_txn(
702
684
"data" : json_encoder .encode (data ),
703
685
},
704
686
where_clause = where_clause ,
705
- # receipts_linearized has a unique constraint on
706
- # (user_id, room_id, receipt_type), so no need to lock
707
- lock = False ,
708
687
)
709
688
710
689
return rx_ts
@@ -862,14 +841,13 @@ def _insert_graph_receipt_txn(
862
841
"data" : json_encoder .encode (data ),
863
842
},
864
843
where_clause = where_clause ,
865
- # receipts_graph has a unique constraint on
866
- # (user_id, room_id, receipt_type), so no need to lock
867
- lock = False ,
868
844
)
869
845
870
846
871
847
class ReceiptsBackgroundUpdateStore (SQLBaseStore ):
872
848
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
849
+ RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME = "receipts_linearized_unique_index"
850
+ RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME = "receipts_graph_unique_index"
873
851
874
852
def __init__ (
875
853
self ,
@@ -883,6 +861,14 @@ def __init__(
883
861
self .POPULATE_RECEIPT_EVENT_STREAM_ORDERING ,
884
862
self ._populate_receipt_event_stream_ordering ,
885
863
)
864
+ self .db_pool .updates .register_background_update_handler (
865
+ self .RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME ,
866
+ self ._background_receipts_linearized_unique_index ,
867
+ )
868
+ self .db_pool .updates .register_background_update_handler (
869
+ self .RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME ,
870
+ self ._background_receipts_graph_unique_index ,
871
+ )
886
872
887
873
async def _populate_receipt_event_stream_ordering (
888
874
self , progress : JsonDict , batch_size : int
@@ -938,6 +924,143 @@ def _populate_receipt_event_stream_ordering_txn(
938
924
939
925
return batch_size
940
926
927
+ async def _create_receipts_index (self , index_name : str , table : str ) -> None :
928
+ """Adds a unique index on `(room_id, receipt_type, user_id)` to the given
929
+ receipts table, for non-thread receipts."""
930
+
931
+ def _create_index (conn : LoggingDatabaseConnection ) -> None :
932
+ conn .rollback ()
933
+
934
+ # we have to set autocommit, because postgres refuses to
935
+ # CREATE INDEX CONCURRENTLY without it.
936
+ if isinstance (self .database_engine , PostgresEngine ):
937
+ conn .set_session (autocommit = True )
938
+
939
+ try :
940
+ c = conn .cursor ()
941
+
942
+ # Now that the duplicates are gone, we can create the index.
943
+ concurrently = (
944
+ "CONCURRENTLY"
945
+ if isinstance (self .database_engine , PostgresEngine )
946
+ else ""
947
+ )
948
+ sql = f"""
949
+ CREATE UNIQUE INDEX { concurrently } { index_name }
950
+ ON { table } (room_id, receipt_type, user_id)
951
+ WHERE thread_id IS NULL
952
+ """
953
+ c .execute (sql )
954
+ finally :
955
+ if isinstance (self .database_engine , PostgresEngine ):
956
+ conn .set_session (autocommit = False )
957
+
958
+ await self .db_pool .runWithConnection (_create_index )
959
+
960
+ async def _background_receipts_linearized_unique_index (
961
+ self , progress : dict , batch_size : int
962
+ ) -> int :
963
+ """Removes duplicate receipts and adds a unique index on
964
+ `(room_id, receipt_type, user_id)` to `receipts_linearized`, for non-thread
965
+ receipts."""
966
+
967
+ def _remote_duplicate_receipts_txn (txn : LoggingTransaction ) -> None :
968
+ # Identify any duplicate receipts arising from
969
+ # https://github.com/matrix-org/synapse/issues/14406.
970
+ # We expect the following query to use the per-thread receipt index and take
971
+ # less than a minute.
972
+ sql = """
973
+ SELECT MAX(stream_id), room_id, receipt_type, user_id
974
+ FROM receipts_linearized
975
+ WHERE thread_id IS NULL
976
+ GROUP BY room_id, receipt_type, user_id
977
+ HAVING COUNT(*) > 1
978
+ """
979
+ txn .execute (sql )
980
+ duplicate_keys = cast (List [Tuple [int , str , str , str ]], list (txn ))
981
+
982
+ # Then remove duplicate receipts, keeping the one with the highest
983
+ # `stream_id`. There should only be a single receipt with any given
984
+ # `stream_id`.
985
+ for max_stream_id , room_id , receipt_type , user_id in duplicate_keys :
986
+ sql = """
987
+ DELETE FROM receipts_linearized
988
+ WHERE
989
+ room_id = ? AND
990
+ receipt_type = ? AND
991
+ user_id = ? AND
992
+ thread_id IS NULL AND
993
+ stream_id < ?
994
+ """
995
+ txn .execute (sql , (room_id , receipt_type , user_id , max_stream_id ))
996
+
997
+ await self .db_pool .runInteraction (
998
+ self .RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME ,
999
+ _remote_duplicate_receipts_txn ,
1000
+ )
1001
+
1002
+ await self ._create_receipts_index (
1003
+ "receipts_linearized_unique_index" ,
1004
+ "receipts_linearized" ,
1005
+ )
1006
+
1007
+ await self .db_pool .updates ._end_background_update (
1008
+ self .RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME
1009
+ )
1010
+
1011
+ return 1
1012
+
1013
+ async def _background_receipts_graph_unique_index (
1014
+ self , progress : dict , batch_size : int
1015
+ ) -> int :
1016
+ """Removes duplicate receipts and adds a unique index on
1017
+ `(room_id, receipt_type, user_id)` to `receipts_graph`, for non-thread
1018
+ receipts."""
1019
+
1020
+ def _remote_duplicate_receipts_txn (txn : LoggingTransaction ) -> None :
1021
+ # Identify any duplicate receipts arising from
1022
+ # https://github.com/matrix-org/synapse/issues/14406.
1023
+ # We expect the following query to use the per-thread receipt index and take
1024
+ # less than a minute.
1025
+ sql = """
1026
+ SELECT room_id, receipt_type, user_id FROM receipts_graph
1027
+ WHERE thread_id IS NULL
1028
+ GROUP BY room_id, receipt_type, user_id
1029
+ HAVING COUNT(*) > 1
1030
+ """
1031
+ txn .execute (sql )
1032
+ duplicate_keys = cast (List [Tuple [str , str , str ]], list (txn ))
1033
+
1034
+ # Then remove all duplicate receipts.
1035
+ # We could be clever and try to keep the latest receipt out of every set of
1036
+ # duplicates, but it's far simpler to remove them all.
1037
+ for room_id , receipt_type , user_id in duplicate_keys :
1038
+ sql = """
1039
+ DELETE FROM receipts_graph
1040
+ WHERE
1041
+ room_id = ? AND
1042
+ receipt_type = ? AND
1043
+ user_id = ? AND
1044
+ thread_id IS NULL
1045
+ """
1046
+ txn .execute (sql , (room_id , receipt_type , user_id ))
1047
+
1048
+ await self .db_pool .runInteraction (
1049
+ self .RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME ,
1050
+ _remote_duplicate_receipts_txn ,
1051
+ )
1052
+
1053
+ await self ._create_receipts_index (
1054
+ "receipts_graph_unique_index" ,
1055
+ "receipts_graph" ,
1056
+ )
1057
+
1058
+ await self .db_pool .updates ._end_background_update (
1059
+ self .RECEIPTS_GRAPH_UNIQUE_INDEX_UPDATE_NAME
1060
+ )
1061
+
1062
+ return 1
1063
+
941
1064
942
1065
class ReceiptsStore (ReceiptsWorkerStore , ReceiptsBackgroundUpdateStore ):
943
1066
pass
0 commit comments