|
34 | 34 | - topological tokems: "t%d-%d", where the integers map to the topological
|
35 | 35 | and stream ordering columns respectively.
|
36 | 36 | """
|
37 |
| -import abc |
| 37 | + |
38 | 38 | import logging
|
39 | 39 | from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
|
40 | 40 |
|
@@ -336,12 +336,7 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
|
336 | 336 | return " AND ".join(clauses), args
|
337 | 337 |
|
338 | 338 |
|
339 |
| -class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta): |
340 |
| - """This is an abstract base class where subclasses must implement |
341 |
| - `get_room_max_stream_ordering` and `get_room_min_stream_ordering` |
342 |
| - which can be called in the initializer. |
343 |
| - """ |
344 |
| - |
| 339 | +class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): |
345 | 340 | def __init__(
|
346 | 341 | self,
|
347 | 342 | database: DatabasePool,
|
@@ -379,13 +374,22 @@ def __init__(
|
379 | 374 |
|
380 | 375 | self._stream_order_on_start = self.get_room_max_stream_ordering()
|
381 | 376 |
|
382 |
| - @abc.abstractmethod |
383 | 377 | def get_room_max_stream_ordering(self) -> int:
|
384 |
| - raise NotImplementedError() |
| 378 | + """Get the stream_ordering of regular events that we have committed up to |
| 379 | +
|
| 380 | + Returns the maximum stream id such that all stream ids less than or |
| 381 | + equal to it have been successfully persisted. |
| 382 | + """ |
| 383 | + return self._stream_id_gen.get_current_token() |
385 | 384 |
|
386 |
| - @abc.abstractmethod |
387 | 385 | def get_room_min_stream_ordering(self) -> int:
|
388 |
| - raise NotImplementedError() |
| 386 | + """Get the stream_ordering of backfilled events that we have committed up to |
| 387 | +
|
| 388 | + Backfilled events use *negative* stream orderings, so this returns the |
| 389 | + minimum negative stream id such that all stream ids greater than or |
| 390 | + equal to it have been successfully persisted. |
| 391 | + """ |
| 392 | + return self._backfill_id_gen.get_current_token() |
389 | 393 |
|
390 | 394 | def get_room_max_token(self) -> RoomStreamToken:
|
391 | 395 | """Get a `RoomStreamToken` that marks the current maximum persisted
|
@@ -1351,11 +1355,3 @@ async def get_name_from_instance_id(self, instance_id: int) -> str:
|
1351 | 1355 | retcol="instance_name",
|
1352 | 1356 | desc="get_name_from_instance_id",
|
1353 | 1357 | )
|
1354 |
| - |
1355 |
| - |
1356 |
| -class StreamStore(StreamWorkerStore): |
1357 |
| - def get_room_max_stream_ordering(self) -> int: |
1358 |
| - return self._stream_id_gen.get_current_token() |
1359 |
| - |
1360 |
| - def get_room_min_stream_ordering(self) -> int: |
1361 |
| - return self._backfill_id_gen.get_current_token() |
0 commit comments