Skip to content

Ensure a job enqueued on a worker must be run within the same macro task #348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 65 additions & 28 deletions Sources/JavaScriptEventLoop/WebWorkerTaskExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ import WASILibc
/// }
/// ```
///
/// ## Scheduling invariants
///
/// * Jobs enqueued on a worker are guaranteed to run within the same macrotask in which they were scheduled.
///
/// ## Known limitations
///
/// Currently, the Cooperative Global Executor of Swift runtime has a bug around
Expand Down Expand Up @@ -135,22 +139,26 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
/// +---------+ +------------+
/// +----->| Idle |--[terminate]-->| Terminated |
/// | +---+-----+ +------------+
/// | |
/// | [enqueue]
/// | |
/// [no more job] |
/// | v
/// | +---------+
/// +------| Running |
/// +---------+
/// | | \
/// | | \------------------+
/// | | |
/// | [enqueue] [enqueue] (on other thread)
/// | | |
/// [no more job] | |
/// | v v
/// | +---------+ +---------+
/// +------| Running |<--[wake]--| Ready |
/// +---------+ +---------+
///
enum State: UInt32, AtomicRepresentable {
/// The worker is idle and waiting for a new job.
case idle = 0
/// A wake message is sent to the worker, but it has not been received it yet
case ready = 1
/// The worker is processing a job.
case running = 1
case running = 2
/// The worker is terminated.
case terminated = 2
case terminated = 3
}
let state: Atomic<State> = Atomic(.idle)
/// TODO: Rewrite it to use real queue :-)
Expand Down Expand Up @@ -197,32 +205,46 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
func enqueue(_ job: UnownedJob) {
statsIncrement(\.enqueuedJobs)
var locked: Bool
let onTargetThread = Self.currentThread === self
// If it's on the thread and it's idle, we can directly schedule a `Worker/run` microtask.
let desiredState: State = onTargetThread ? .running : .ready
repeat {
let result: Void? = jobQueue.withLockIfAvailable { queue in
queue.append(job)
trace("Worker.enqueue idle -> running")
// Wake up the worker to process a job.
switch state.exchange(.running, ordering: .sequentiallyConsistent) {
case .idle:
if Self.currentThread === self {
trace("Worker.enqueue idle -> \(desiredState)")
switch state.compareExchange(
expected: .idle,
desired: desiredState,
ordering: .sequentiallyConsistent
) {
case (true, _):
if onTargetThread {
// Enqueueing a new job to the current worker thread, but it's idle now.
// This is usually the case when a continuation is resumed by JS events
// like `setTimeout` or `addEventListener`.
// We can run the job and subsequently spawned jobs immediately.
// JSPromise.resolve(JSValue.undefined).then { _ in
_ = JSObject.global.queueMicrotask!(
JSOneshotClosure { _ in
self.run()
return JSValue.undefined
}
)
scheduleRunWithinMacroTask()
} else {
let tid = self.tid.load(ordering: .sequentiallyConsistent)
swjs_wake_up_worker_thread(tid)
}
case .running:
case (false, .idle):
preconditionFailure("unreachable: idle -> \(desiredState) should return exchanged=true")
case (false, .ready):
// A wake message is sent to the worker, but it has not been received it yet
if onTargetThread {
// This means the job is enqueued outside of `Worker/run` (typically triggered
// JS microtasks not awaited by Swift), then schedule a `Worker/run` within
// the same macrotask.
state.store(.running, ordering: .sequentiallyConsistent)
scheduleRunWithinMacroTask()
}
case (false, .running):
// The worker is already running, no need to wake up.
break
case .terminated:
case (false, .terminated):
// Will not wake up the worker because it's already terminated.
break
}
Expand All @@ -231,7 +253,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
} while !locked
}

func scheduleNextRun() {
func scheduleRunWithinMacroTask() {
_ = JSObject.global.queueMicrotask!(
JSOneshotClosure { _ in
self.run()
Expand Down Expand Up @@ -265,12 +287,27 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
trace("Worker.start tid=\(tid)")
}

/// On receiving a wake-up message from other thread
func wakeUpFromOtherThread() {
let (exchanged, _) = state.compareExchange(
expected: .ready,
desired: .running,
ordering: .sequentiallyConsistent
)
guard exchanged else {
// `Worker/run` was scheduled on the thread before JS event loop starts
// a macrotask handling wake-up message.
return
}
run()
}

/// Process jobs in the queue.
///
/// Return when the worker has no more jobs to run or terminated.
/// This method must be called from the worker thread after the worker
/// is started by `start(executor:)`.
func run() {
private func run() {
trace("Worker.run")
guard let executor = parentTaskExecutor else {
preconditionFailure("The worker must be started with a parent executor.")
Expand All @@ -290,7 +327,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
queue.removeFirst()
return job
}
// No more jobs to run now. Wait for a new job to be enqueued.
// No more jobs to run now.
let (exchanged, original) = state.compareExchange(
expected: .running,
desired: .idle,
Expand All @@ -301,7 +338,7 @@ public final class WebWorkerTaskExecutor: TaskExecutor {
case (true, _):
trace("Worker.run exited \(original) -> idle")
return nil // Regular case
case (false, .idle):
case (false, .idle), (false, .ready):
preconditionFailure("unreachable: Worker/run running in multiple threads!?")
case (false, .running):
preconditionFailure("unreachable: running -> idle should return exchanged=true")
Expand Down Expand Up @@ -657,12 +694,12 @@ func _swjs_enqueue_main_job_from_worker(_ job: UnownedJob) {
@_expose(wasm, "swjs_wake_worker_thread")
#endif
func _swjs_wake_worker_thread() {
WebWorkerTaskExecutor.Worker.currentThread!.run()
WebWorkerTaskExecutor.Worker.currentThread!.wakeUpFromOtherThread()
}

private func trace(_ message: String) {
#if JAVASCRIPTKIT_TRACE
JSObject.global.process.stdout.write("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n")
_ = JSObject.global.console.warn("[trace tid=\(swjs_get_worker_thread_id())] \(message)\n")
#endif
}

Expand Down
178 changes: 178 additions & 0 deletions Tests/JavaScriptEventLoopTests/WebWorkerTaskExecutorTests.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#if compiler(>=6.1) && _runtime(_multithreaded)
import Synchronization
import XCTest
import _CJavaScriptKit // For swjs_get_worker_thread_id
@testable import JavaScriptKit
Expand All @@ -22,6 +23,7 @@ func pthread_mutex_lock(_ mutex: UnsafeMutablePointer<pthread_mutex_t>) -> Int32
}
#endif

@available(macOS 15.0, iOS 18.0, watchOS 11.0, tvOS 18.0, visionOS 2.0, *)
final class WebWorkerTaskExecutorTests: XCTestCase {
func testTaskRunOnMainThread() async throws {
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
Expand Down Expand Up @@ -97,6 +99,182 @@ final class WebWorkerTaskExecutorTests: XCTestCase {
executor.terminate()
}

func testScheduleJobWithinMacroTask1() async throws {
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
defer { executor.terminate() }

final class Context: @unchecked Sendable {
let hasEndedFirstWorkerWakeLoop = Atomic<Bool>(false)
let hasEnqueuedFromMain = Atomic<Bool>(false)
let hasReachedNextMacroTask = Atomic<Bool>(false)
let hasJobBEnded = Atomic<Bool>(false)
let hasJobCEnded = Atomic<Bool>(false)
}

// Scenario 1.
// | Main | Worker |
// | +---------------------+--------------------------+
// | | | Start JS macrotask |
// | | | Start 1st wake-loop |
// | | | Enq JS microtask A |
// | | | End 1st wake-loop |
// | | | Start a JS microtask A |
// time | Enq job B to Worker | [PAUSE] |
// | | | Enq Swift job C |
// | | | End JS microtask A |
// | | | Start 2nd wake-loop |
// | | | Run Swift job B |
// | | | Run Swift job C |
// | | | End 2nd wake-loop |
// v | | End JS macrotask |
// +---------------------+--------------------------+

let context = Context()
Task {
while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) {
try! await Task.sleep(nanoseconds: 1_000)
}
// Enqueue job B to Worker
Task(executorPreference: executor) {
XCTAssertFalse(isMainThread())
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent)
}
XCTAssertTrue(isMainThread())
// Resume worker thread to let it enqueue job C
context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent)
}

// Start worker
await Task(executorPreference: executor) {
// Schedule a new macrotask to detect if the current macrotask has completed
JSObject.global.setTimeout.function!(
JSOneshotClosure { _ in
context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent)
return .undefined
},
0
)

// Enqueue a microtask, not managed by WebWorkerTaskExecutor
JSObject.global.queueMicrotask.function!(
JSOneshotClosure { _ in
// Resume the main thread and let it enqueue job B
context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent)
// Wait until the enqueue has completed
while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {}
// Should be still in the same macrotask
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
// Enqueue job C
Task(executorPreference: executor) {
// Should be still in the same macrotask
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
// Notify that job C has completed
context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent)
}
return .undefined
},
0
)
// Wait until job B, C and the next macrotask have completed
while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent)
|| !context.hasJobCEnded.load(ordering: .sequentiallyConsistent)
|| !context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)
{
try! await Task.sleep(nanoseconds: 1_000)
}
}.value
}

func testScheduleJobWithinMacroTask2() async throws {
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 1)
defer { executor.terminate() }

final class Context: @unchecked Sendable {
let hasEndedFirstWorkerWakeLoop = Atomic<Bool>(false)
let hasEnqueuedFromMain = Atomic<Bool>(false)
let hasReachedNextMacroTask = Atomic<Bool>(false)
let hasJobBEnded = Atomic<Bool>(false)
let hasJobCEnded = Atomic<Bool>(false)
}

// Scenario 2.
// (The order of enqueue of job B and C are reversed from Scenario 1)
//
// | Main | Worker |
// | +---------------------+--------------------------+
// | | | Start JS macrotask |
// | | | Start 1st wake-loop |
// | | | Enq JS microtask A |
// | | | End 1st wake-loop |
// | | | Start a JS microtask A |
// | | | Enq Swift job C |
// time | Enq job B to Worker | [PAUSE] |
// | | | End JS microtask A |
// | | | Start 2nd wake-loop |
// | | | Run Swift job B |
// | | | Run Swift job C |
// | | | End 2nd wake-loop |
// v | | End JS macrotask |
// +---------------------+--------------------------+

let context = Context()
Task {
while !context.hasEndedFirstWorkerWakeLoop.load(ordering: .sequentiallyConsistent) {
try! await Task.sleep(nanoseconds: 1_000)
}
// Enqueue job B to Worker
Task(executorPreference: executor) {
XCTAssertFalse(isMainThread())
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
context.hasJobBEnded.store(true, ordering: .sequentiallyConsistent)
}
XCTAssertTrue(isMainThread())
// Resume worker thread to let it enqueue job C
context.hasEnqueuedFromMain.store(true, ordering: .sequentiallyConsistent)
}

// Start worker
await Task(executorPreference: executor) {
// Schedule a new macrotask to detect if the current macrotask has completed
JSObject.global.setTimeout.function!(
JSOneshotClosure { _ in
context.hasReachedNextMacroTask.store(true, ordering: .sequentiallyConsistent)
return .undefined
},
0
)

// Enqueue a microtask, not managed by WebWorkerTaskExecutor
JSObject.global.queueMicrotask.function!(
JSOneshotClosure { _ in
// Enqueue job C
Task(executorPreference: executor) {
// Should be still in the same macrotask
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
// Notify that job C has completed
context.hasJobCEnded.store(true, ordering: .sequentiallyConsistent)
}
// Resume the main thread and let it enqueue job B
context.hasEndedFirstWorkerWakeLoop.store(true, ordering: .sequentiallyConsistent)
// Wait until the enqueue has completed
while !context.hasEnqueuedFromMain.load(ordering: .sequentiallyConsistent) {}
// Should be still in the same macrotask
XCTAssertFalse(context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent))
return .undefined
},
0
)
// Wait until job B, C and the next macrotask have completed
while !context.hasJobBEnded.load(ordering: .sequentiallyConsistent)
|| !context.hasJobCEnded.load(ordering: .sequentiallyConsistent)
|| !context.hasReachedNextMacroTask.load(ordering: .sequentiallyConsistent)
{
try! await Task.sleep(nanoseconds: 1_000)
}
}.value
}

func testTaskGroupRunOnSameThread() async throws {
let executor = try await WebWorkerTaskExecutor(numberOfThreads: 3)

Expand Down