diff --git a/spec/unit/scheduler.spec.ts b/spec/unit/scheduler.spec.ts index 122b71fde1d..5efd9ce1bc1 100644 --- a/spec/unit/scheduler.spec.ts +++ b/spec/unit/scheduler.spec.ts @@ -112,7 +112,7 @@ describe("MatrixScheduler", function () { expect(procCount).toEqual(2); }); - it("should give up if the retryFn on failure returns -1 and try the next event", async function () { + it("should give up if the retryFn on failure returns -1", async function () { // Queue A & B. // Reject A and return -1 on retry. // Expect B to be tried next and the promise for A to be rejected. @@ -139,19 +139,15 @@ describe("MatrixScheduler", function () { return new Promise>(() => {}); }); - const globalA = scheduler.queueEvent(eventA); - scheduler.queueEvent(eventB); + const queuedA = scheduler.queueEvent(eventA); + const queuedB = scheduler.queueEvent(eventB); + await Promise.resolve(); + deferA.reject(new Error("Testerror")); // as queueing doesn't start processing synchronously anymore (see commit bbdb5ac) // wait just long enough before it does - await Promise.resolve(); + await expect(queuedA).rejects.toThrow("Testerror"); + await expect(queuedB).rejects.toThrow("Testerror"); expect(procCount).toEqual(1); - deferA.reject({}); - try { - await globalA; - } catch (err) { - await Promise.resolve(); - expect(procCount).toEqual(2); - } }); it("should treat each queue separately", function (done) { diff --git a/src/models/room.ts b/src/models/room.ts index 8d478789b9d..da0eb08e4cc 100644 --- a/src/models/room.ts +++ b/src/models/room.ts @@ -3380,7 +3380,7 @@ export class Room extends ReadReceipt { const ALLOWED_TRANSITIONS: Record = { [EventStatus.ENCRYPTING]: [EventStatus.SENDING, EventStatus.NOT_SENT, EventStatus.CANCELLED], [EventStatus.SENDING]: [EventStatus.ENCRYPTING, EventStatus.QUEUED, EventStatus.NOT_SENT, EventStatus.SENT], - [EventStatus.QUEUED]: [EventStatus.SENDING, EventStatus.CANCELLED], + [EventStatus.QUEUED]: [EventStatus.SENDING, EventStatus.NOT_SENT, EventStatus.CANCELLED], [EventStatus.SENT]: [], [EventStatus.NOT_SENT]: [EventStatus.SENDING, EventStatus.QUEUED, EventStatus.CANCELLED], [EventStatus.CANCELLED]: [], diff --git a/src/scheduler.ts b/src/scheduler.ts index 16c06665eee..6b6bae1bc3f 100644 --- a/src/scheduler.ts +++ b/src/scheduler.ts @@ -245,12 +245,7 @@ export class MatrixScheduler { // get head of queue const obj = this.peekNextEvent(queueName); if (!obj) { - // queue is empty. Mark as inactive and stop recursing. - const index = this.activeQueues.indexOf(queueName); - if (index >= 0) { - this.activeQueues.splice(index, 1); - } - debuglog("Stopping queue '%s' as it is now empty", queueName); + this.disableQueue(queueName); return; } debuglog("Queue '%s' has %s pending events", queueName, this.queues[queueName].length); @@ -289,10 +284,7 @@ export class MatrixScheduler { // give up (you quitter!) debuglog("Queue '%s' giving up on event %s", queueName, obj.event.getId()); // remove this from the queue - this.removeNextEvent(queueName); - obj.defer.reject(err); - // process next event - this.processQueue(queueName); + this.clearQueue(queueName, err); } else { setTimeout(this.processQueue, waitTimeMs, queueName); } @@ -300,6 +292,24 @@ export class MatrixScheduler { ); }; + private disableQueue(queueName: string): void { + // queue is empty. Mark as inactive and stop recursing. + const index = this.activeQueues.indexOf(queueName); + if (index >= 0) { + this.activeQueues.splice(index, 1); + } + debuglog("Stopping queue '%s' as it is now empty", queueName); + } + + private clearQueue(queueName: string, err: unknown): void { + debuglog("clearing queue '%s'", queueName); + let obj: IQueueEntry | undefined; + while ((obj = this.removeNextEvent(queueName))) { + obj.defer.reject(err); + } + this.disableQueue(queueName); + } + private peekNextEvent(queueName: string): IQueueEntry | undefined { const queue = this.queues[queueName]; if (!Array.isArray(queue)) {