@@ -15,9 +15,10 @@ limitations under the License.
15
15
*/
16
16
17
17
import { logger } from "./logger" ;
18
- import { MatrixError , MatrixClient } from "./matrix" ;
18
+ import { MatrixError , MatrixClient , ClientEvent } from "./matrix" ;
19
19
import { IndexedToDeviceBatch , ToDeviceBatch , ToDeviceBatchWithTxnId , ToDevicePayload } from "./models/ToDeviceMessage" ;
20
20
import { MatrixScheduler } from "./scheduler" ;
21
+ import { SyncState } from "./sync" ;
21
22
22
23
const MAX_BATCH_SIZE = 20 ;
23
24
@@ -37,12 +38,14 @@ export class ToDeviceMessageQueue {
37
38
public start ( ) : void {
38
39
this . running = true ;
39
40
this . sendQueue ( ) ;
41
+ this . client . on ( ClientEvent . Sync , this . onResumedSync ) ;
40
42
}
41
43
42
44
public stop ( ) : void {
43
45
this . running = false ;
44
46
if ( this . retryTimeout !== null ) clearTimeout ( this . retryTimeout ) ;
45
47
this . retryTimeout = null ;
48
+ this . client . removeListener ( ClientEvent . Sync , this . onResumedSync ) ;
46
49
}
47
50
48
51
public async queueBatch ( batch : ToDeviceBatch ) : Promise < void > {
@@ -128,4 +131,15 @@ export class ToDeviceMessageQueue {
128
131
129
132
await this . client . sendToDevice ( batch . eventType , contentMap , batch . txnId ) ;
130
133
}
134
+
135
+ /**
136
+ * Listen to sync state changes and automatically resend any pending events
137
+ * once syncing is resumed
138
+ */
139
+ private onResumedSync = ( state : SyncState | null , oldState : SyncState | null ) : void => {
140
+ if ( state === SyncState . Syncing && oldState !== SyncState . Syncing ) {
141
+ logger . info ( `Resuming queue after resumed sync` ) ;
142
+ this . sendQueue ( ) ;
143
+ }
144
+ } ;
131
145
}
0 commit comments