@@ -130,7 +130,7 @@ async def _persist_events_and_state_updates(
130
130
* ,
131
131
current_state_for_room : Dict [str , StateMap [str ]],
132
132
state_delta_for_room : Dict [str , DeltaState ],
133
- new_forward_extremeties : Dict [str , List [str ]],
133
+ new_forward_extremities : Dict [str , Set [str ]],
134
134
use_negative_stream_ordering : bool = False ,
135
135
inhibit_local_membership_updates : bool = False ,
136
136
) -> None :
@@ -143,7 +143,7 @@ async def _persist_events_and_state_updates(
143
143
the room based on forward extremities
144
144
state_delta_for_room: Map from room_id to the delta to apply to
145
145
room state
146
- new_forward_extremities: Map from room_id to list of event IDs
146
+ new_forward_extremities: Map from room_id to set of event IDs
147
147
that are the new forward extremities of the room.
148
148
use_negative_stream_ordering: Whether to start stream_ordering on
149
149
the negative side and decrement. This should be set as True
@@ -193,7 +193,7 @@ async def _persist_events_and_state_updates(
193
193
events_and_contexts = events_and_contexts ,
194
194
inhibit_local_membership_updates = inhibit_local_membership_updates ,
195
195
state_delta_for_room = state_delta_for_room ,
196
- new_forward_extremeties = new_forward_extremeties ,
196
+ new_forward_extremities = new_forward_extremities ,
197
197
)
198
198
persist_event_counter .inc (len (events_and_contexts ))
199
199
@@ -220,7 +220,7 @@ async def _persist_events_and_state_updates(
220
220
for room_id , new_state in current_state_for_room .items ():
221
221
self .store .get_current_state_ids .prefill ((room_id ,), new_state )
222
222
223
- for room_id , latest_event_ids in new_forward_extremeties .items ():
223
+ for room_id , latest_event_ids in new_forward_extremities .items ():
224
224
self .store .get_latest_event_ids_in_room .prefill (
225
225
(room_id ,), list (latest_event_ids )
226
226
)
@@ -334,8 +334,8 @@ def _persist_events_txn(
334
334
events_and_contexts : List [Tuple [EventBase , EventContext ]],
335
335
inhibit_local_membership_updates : bool = False ,
336
336
state_delta_for_room : Optional [Dict [str , DeltaState ]] = None ,
337
- new_forward_extremeties : Optional [Dict [str , List [str ]]] = None ,
338
- ):
337
+ new_forward_extremities : Optional [Dict [str , Set [str ]]] = None ,
338
+ ) -> None :
339
339
"""Insert some number of room events into the necessary database tables.
340
340
341
341
Rejected events are only inserted into the events table, the events_json table,
@@ -353,13 +353,13 @@ def _persist_events_txn(
353
353
from the database. This is useful when retrying due to
354
354
IntegrityError.
355
355
state_delta_for_room: The current-state delta for each room.
356
- new_forward_extremetie : The new forward extremities for each room.
356
+ new_forward_extremities : The new forward extremities for each room.
357
357
For each room, a list of the event ids which are the forward
358
358
extremities.
359
359
360
360
"""
361
361
state_delta_for_room = state_delta_for_room or {}
362
- new_forward_extremeties = new_forward_extremeties or {}
362
+ new_forward_extremities = new_forward_extremities or {}
363
363
364
364
all_events_and_contexts = events_and_contexts
365
365
@@ -372,7 +372,7 @@ def _persist_events_txn(
372
372
373
373
self ._update_forward_extremities_txn (
374
374
txn ,
375
- new_forward_extremities = new_forward_extremeties ,
375
+ new_forward_extremities = new_forward_extremities ,
376
376
max_stream_order = max_stream_order ,
377
377
)
378
378
@@ -1158,7 +1158,10 @@ def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
1158
1158
)
1159
1159
1160
1160
def _update_forward_extremities_txn (
1161
- self , txn , new_forward_extremities , max_stream_order
1161
+ self ,
1162
+ txn : LoggingTransaction ,
1163
+ new_forward_extremities : Dict [str , Set [str ]],
1164
+ max_stream_order : int ,
1162
1165
):
1163
1166
for room_id in new_forward_extremities .keys ():
1164
1167
self .db_pool .simple_delete_txn (
0 commit comments