diff --git a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift index 651e7be2..b51445cb 100644 --- a/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift +++ b/Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift @@ -87,6 +87,10 @@ import WASILibc /// } /// ``` /// +/// ## Scheduling invariants +/// +/// * Jobs enqueued on a worker are guaranteed to run within the same macrotask in which they were scheduled. +/// /// ## Known limitations /// /// Currently, the Cooperative Global Executor of Swift runtime has a bug around @@ -135,22 +139,26 @@ public final class WebWorkerTaskExecutor: TaskExecutor { /// +---------+ +------------+ /// +----->| Idle |--[terminate]-->| Terminated | /// | +---+-----+ +------------+ - /// | | - /// | [enqueue] - /// | | - /// [no more job] | - /// | v - /// | +---------+ - /// +------| Running | - /// +---------+ + /// | | \ + /// | | \------------------+ + /// | | | + /// | [enqueue] [enqueue] (on other thread) + /// | | | + /// [no more job] | | + /// | v v + /// | +---------+ +---------+ + /// +------| Running |<--[wake]--| Ready | + /// +---------+ +---------+ /// enum State: UInt32, AtomicRepresentable { /// The worker is idle and waiting for a new job. case idle = 0 + /// A wake message is sent to the worker, but it has not been received it yet + case ready = 1 /// The worker is processing a job. - case running = 1 + case running = 2 /// The worker is terminated. - case terminated = 2 + case terminated = 3 } let state: Atomic = Atomic(.idle) /// TODO: Rewrite it to use real queue :-) @@ -197,32 +205,46 @@ public final class WebWorkerTaskExecutor: TaskExecutor { func enqueue(_ job: UnownedJob) { statsIncrement(\.enqueuedJobs) var locked: Bool + let onTargetThread = Self.currentThread === self + // If it's on the thread and it's idle, we can directly schedule a `Worker/run` microtask. + let desiredState: State = onTargetThread ? .running : .ready repeat { let result: Void? = jobQueue.withLockIfAvailable { queue in queue.append(job) + trace("Worker.enqueue idle -> running") // Wake up the worker to process a job. - switch state.exchange(.running, ordering: .sequentiallyConsistent) { - case .idle: - if Self.currentThread === self { + trace("Worker.enqueue idle -> \(desiredState)") + switch state.compareExchange( + expected: .idle, + desired: desiredState, + ordering: .sequentiallyConsistent + ) { + case (true, _): + if onTargetThread { // Enqueueing a new job to the current worker thread, but it's idle now. // This is usually the case when a continuation is resumed by JS events // like `setTimeout` or `addEventListener`. // We can run the job and subsequently spawned jobs immediately. - // JSPromise.resolve(JSValue.undefined).then { _ in - _ = JSObject.global.queueMicrotask!( - JSOneshotClosure { _ in - self.run() - return JSValue.undefined - } - ) + scheduleRunWithinMacroTask() } else { let tid = self.tid.load(ordering: .sequentiallyConsistent) swjs_wake_up_worker_thread(tid) } - case .running: + case (false, .idle): + preconditionFailure("unreachable: idle -> \(desiredState) should return exchanged=true") + case (false, .ready): + // A wake message is sent to the worker, but it has not been received it yet + if onTargetThread { + // This means the job is enqueued outside of `Worker/run` (typically triggered + // JS microtasks not awaited by Swift), then schedule a `Worker/run` within + // the same macrotask. + state.store(.running, ordering: .sequentiallyConsistent) + scheduleRunWithinMacroTask() + } + case (false, .running): // The worker is already running, no need to wake up. break - case .terminated: + case (false, .terminated): // Will not wake up the worker because it's already terminated. break } @@ -231,7 +253,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor { } while !locked } - func scheduleNextRun() { + func scheduleRunWithinMacroTask() { _ = JSObject.global.queueMicrotask!( JSOneshotClosure { _ in self.run() @@ -265,12 +287,27 @@ public final class WebWorkerTaskExecutor: TaskExecutor { trace("Worker.start tid=\(tid)") } + /// On receiving a wake-up message from other thread + func wakeUpFromOtherThread() { + let (exchanged, _) = state.compareExchange( + expected: .ready, + desired: .running, + ordering: .sequentiallyConsistent + ) + guard exchanged else { + // `Worker/run` was scheduled on the thread before JS event loop starts + // a macrotask handling wake-up message. + return + } + run() + } + /// Process jobs in the queue. /// /// Return when the worker has no more jobs to run or terminated. /// This method must be called from the worker thread after the worker /// is started by `start(executor:)`. - func run() { + private func run() { trace("Worker.run") guard let executor = parentTaskExecutor else { preconditionFailure("The worker must be started with a parent executor.") @@ -290,7 +327,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor { queue.removeFirst() return job } - // No more jobs to run now. Wait for a new job to be enqueued. + // No more jobs to run now. let (exchanged, original) = state.compareExchange( expected: .running, desired: .idle, @@ -301,7 +338,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor { case (true, _): trace("Worker.run exited \(original) -> idle") return nil // Regular case - case (false, .idle): + case (false, .idle), (false, .ready): preconditionFailure("unreachable: Worker/run running in multiple threads!?") case (false, .running): preconditionFailure("unreachable: running -> idle should return exchanged=true") @@ -657,12 +694,12 @@ func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) { @_expose(wasm, "swjs_wake_worker_thread") #endif func _swjs_wake_worker_thread() { - WebWorkerTaskExecutor.Worker.currentThread!.run() + WebWorkerTaskExecutor.Worker.currentThread!.wakeUpFromOtherThread() } private func trace(_ message: String) { #if JAVASCRIPTKIT_TRACE - JSObject.global.process.stdout.write("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n") + _ = JSObject.global.console.warn("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n") #endif } diff --git a/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift index b9c42c02..1d1e82a6 100644 --- a/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift +++ b/Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift @@ -1,4 +1,5 @@ #if compiler(>=6.1) && _runtime(_multithreaded) +import Synchronization import XCTest import _CJavaScriptKit // For swjs_get_worker_thread_id @testable import JavaScriptKit @@ -22,6 +23,7 @@ func pthread_mutex_lock(_ mutex: UnsafeMutablePointer) -> Int32 } #endif +@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *) final class WebWorkerTaskExecutorTests: XCTestCase { func testTaskRunOnMainThread() async throws { let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) @@ -97,6 +99,182 @@ final class WebWorkerTaskExecutorTests: XCTestCase { executor.terminate() } + func testScheduleJobWithinMacroTask1() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + defer { executor.terminate() } + + final class Context: @unchecked Sendable { + let hasEndedFirstWorkerWakeLoop = Atomic(false) + let hasEnqueuedFromMain = Atomic(false) + let hasReachedNextMacroTask = Atomic(false) + let hasJobBEnded = Atomic(false) + let hasJobCEnded = Atomic(false) + } + + // Scenario 1. + // | Main | Worker | + // | +---------------------+--------------------------+ + // | | | Start JS macrotask | + // | | | Start 1st wake-loop | + // | | | Enq JS microtask A | + // | | | End 1st wake-loop | + // | | | Start a JS microtask A | + // time | Enq job B to Worker | [PAUSE] | + // | | | Enq Swift job C | + // | | | End JS microtask A | + // | | | Start 2nd wake-loop | + // | | | Run Swift job B | + // | | | Run Swift job C | + // | | | End 2nd wake-loop | + // v | | End JS macrotask | + // +---------------------+--------------------------+ + + let context = Context() + Task { + while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) { + try! await Task.sleep(nanoseconds: 1_000) + } + // Enqueue job B to Worker + Task(executorPreference: executor) { + XCTAssertFalse(isMainThread()) + XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)) + context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent) + } + XCTAssertTrue(isMainThread()) + // Resume worker thread to let it enqueue job C + context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent) + } + + // Start worker + await Task(executorPreference: executor) { + // Schedule a new macrotask to detect if the current macrotask has completed + JSObject.global.setTimeout.function!( + JSOneshotClosure { _ in + context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent) + return .undefined + }, + 0 + ) + + // Enqueue a microtask, not managed by WebWorkerTaskExecutor + JSObject.global.queueMicrotask.function!( + JSOneshotClosure { _ in + // Resume the main thread and let it enqueue job B + context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent) + // Wait until the enqueue has completed + while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {} + // Should be still in the same macrotask + XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)) + // Enqueue job C + Task(executorPreference: executor) { + // Should be still in the same macrotask + XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)) + // Notify that job C has completed + context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent) + } + return .undefined + }, + 0 + ) + // Wait until job B, C and the next macrotask have completed + while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent) + || !context.hasJobCEnded.load(ordering: .sequentiallyConsistent) + || !context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent) + { + try! await Task.sleep(nanoseconds: 1_000) + } + }.value + } + + func testScheduleJobWithinMacroTask2() async throws { + let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1) + defer { executor.terminate() } + + final class Context: @unchecked Sendable { + let hasEndedFirstWorkerWakeLoop = Atomic(false) + let hasEnqueuedFromMain = Atomic(false) + let hasReachedNextMacroTask = Atomic(false) + let hasJobBEnded = Atomic(false) + let hasJobCEnded = Atomic(false) + } + + // Scenario 2. + // (The order of enqueue of job B and C are reversed from Scenario 1) + // + // | Main | Worker | + // | +---------------------+--------------------------+ + // | | | Start JS macrotask | + // | | | Start 1st wake-loop | + // | | | Enq JS microtask A | + // | | | End 1st wake-loop | + // | | | Start a JS microtask A | + // | | | Enq Swift job C | + // time | Enq job B to Worker | [PAUSE] | + // | | | End JS microtask A | + // | | | Start 2nd wake-loop | + // | | | Run Swift job B | + // | | | Run Swift job C | + // | | | End 2nd wake-loop | + // v | | End JS macrotask | + // +---------------------+--------------------------+ + + let context = Context() + Task { + while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) { + try! await Task.sleep(nanoseconds: 1_000) + } + // Enqueue job B to Worker + Task(executorPreference: executor) { + XCTAssertFalse(isMainThread()) + XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)) + context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent) + } + XCTAssertTrue(isMainThread()) + // Resume worker thread to let it enqueue job C + context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent) + } + + // Start worker + await Task(executorPreference: executor) { + // Schedule a new macrotask to detect if the current macrotask has completed + JSObject.global.setTimeout.function!( + JSOneshotClosure { _ in + context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent) + return .undefined + }, + 0 + ) + + // Enqueue a microtask, not managed by WebWorkerTaskExecutor + JSObject.global.queueMicrotask.function!( + JSOneshotClosure { _ in + // Enqueue job C + Task(executorPreference: executor) { + // Should be still in the same macrotask + XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)) + // Notify that job C has completed + context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent) + } + // Resume the main thread and let it enqueue job B + context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent) + // Wait until the enqueue has completed + while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {} + // Should be still in the same macrotask + XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)) + return .undefined + }, + 0 + ) + // Wait until job B, C and the next macrotask have completed + while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent) + || !context.hasJobCEnded.load(ordering: .sequentiallyConsistent) + || !context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent) + { + try! await Task.sleep(nanoseconds: 1_000) + } + }.value + } + func testTaskGroupRunOnSameThread() async throws { let executor = try await WebWorkerTaskExecutor(numberOfThreads: 3)