Skip to content

Commit 88b3f6d

Browse files
authored
[WASMFS] Proxied Backend Structure (#15609)
Relevant Issue: #15041 Introduce new proxied backend structure for the new file system. Proxying will be refactored soon using on this PR: #15681
1 parent 67da98e commit 88b3f6d

File tree

7 files changed

+349
-72
lines changed

7 files changed

+349
-72
lines changed

system/include/emscripten/wasmfs.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ backend_t wasmfs_get_backend_by_fd(int fd);
2525
// Creates a JSFile Backend in the new file system.
2626
backend_t wasmfs_create_js_file_backend();
2727

28-
// Creates a file in a specific backend and returns an fd to an open file.
28+
// Creates a Proxied Backend in the new file system.
29+
backend_t wasmfs_create_proxied_backend(backend_t backend);
30+
31+
// Creates a new file in the new file system under a specific backend.
2932
uint32_t wasmfs_create_file(char* pathname, mode_t mode, backend_t backend);
3033

3134
// Creates a new directory in the new file system under a specific backend.
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2021 The Emscripten Authors. All rights reserved.
2+
// Emscripten is available under two separate licenses, the MIT license and the
3+
// University of Illinois/NCSA Open Source License. Both these licenses can be
4+
// found in the LICENSE file.
5+
6+
// This file defines the Proxied File and Proxied Backend of the new file
7+
// system. Current Status: Work in Progress. See
8+
// https://github.com/emscripten-core/emscripten/issues/15041.
9+
10+
#include "backend.h"
11+
#include "file.h"
12+
#include "thread_utils.h"
13+
#include "wasmfs.h"
14+
15+
namespace wasmfs {
16+
17+
// This class represents a file that forwards all file operations to a thread.
18+
class ProxiedFile : public DataFile {
19+
20+
emscripten::SyncToAsync& proxy;
21+
std::shared_ptr<DataFile> baseFile;
22+
23+
// Read and write operations are forwarded via the proxying mechanism.
24+
__wasi_errno_t write(const uint8_t* buf, size_t len, off_t offset) override {
25+
__wasi_errno_t result;
26+
proxy.invoke([&](emscripten::SyncToAsync::Callback resume) {
27+
result = baseFile->locked().write(buf, len, offset);
28+
(*resume)();
29+
});
30+
return result;
31+
}
32+
33+
__wasi_errno_t read(uint8_t* buf, size_t len, off_t offset) override {
34+
__wasi_errno_t result;
35+
proxy.invoke([&](emscripten::SyncToAsync::Callback resume) {
36+
result = baseFile->locked().read(buf, len, offset);
37+
(*resume)();
38+
});
39+
return result;
40+
}
41+
42+
// Querying the size of the Proxied File returns the size of the underlying
43+
// file given by the proxying mechanism.
44+
size_t getSize() override {
45+
size_t result;
46+
proxy.invoke([&](emscripten::SyncToAsync::Callback resume) {
47+
result = baseFile->locked().getSize();
48+
(*resume)();
49+
});
50+
return result;
51+
}
52+
53+
public:
54+
// A file with the chosen destination backend is created on a thread via
55+
// the ProxiedFile's proxy.
56+
ProxiedFile(mode_t mode,
57+
backend_t proxyBackend,
58+
backend_t underlyingBackend,
59+
emscripten::SyncToAsync& proxy)
60+
: DataFile(mode, proxyBackend), proxy(proxy) {
61+
proxy.invoke([&](emscripten::SyncToAsync::Callback resume) {
62+
baseFile = underlyingBackend->createFile(mode);
63+
(*resume)();
64+
});
65+
}
66+
67+
// The destructor must use the proxy to forward notification that the Proxied
68+
// File resource has been destroyed. Proxying is necessary because the
69+
// underlying thread may need to free resources on the proxied thread.
70+
// Ex. A JSFile will need to proxy so that it can free its underlying JS array
71+
// on that thread.
72+
~ProxiedFile() {
73+
proxy.invoke([&](emscripten::SyncToAsync::Callback resume) {
74+
baseFile = nullptr;
75+
(*resume)();
76+
});
77+
}
78+
};
79+
class ProxiedBackend : public Backend {
80+
backend_t backend;
81+
// ProxiedBackend uses the proxy member to create files on a thread.
82+
emscripten::SyncToAsync proxy;
83+
84+
public:
85+
ProxiedBackend(backend_t backend) : backend(backend) {}
86+
87+
std::shared_ptr<DataFile> createFile(mode_t mode) override {
88+
// This creates a file on a thread specified by the proxy member.
89+
return std::make_shared<ProxiedFile>(mode, this, backend, proxy);
90+
}
91+
92+
std::shared_ptr<Directory> createDirectory(mode_t mode) override {
93+
return std::make_shared<Directory>(mode, this);
94+
}
95+
};
96+
97+
// Create a proxied backend by supplying another backend.
98+
extern "C" backend_t wasmfs_create_proxied_backend(backend_t backend) {
99+
return wasmFS.addBackend(std::make_unique<ProxiedBackend>(backend));
100+
}
101+
102+
} // namespace wasmfs

system/lib/wasmfs/thread_utils.h

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
/*
2+
* Copyright 2021 The Emscripten Authors. All rights reserved.
3+
* Emscripten is available under two separate licenses, the MIT license and the
4+
* University of Illinois/NCSA Open Source License. Both these licenses can be
5+
* found in the LICENSE file.
6+
*/
7+
8+
#pragma once
9+
10+
#include <assert.h>
11+
#include <emscripten.h>
12+
#include <emscripten/threading.h>
13+
#include <pthread.h>
14+
15+
#include <functional>
16+
#include <thread>
17+
#include <utility>
18+
19+
// TODO: This will be updated with:
20+
// https://github.com/emscripten-core/emscripten/pull/15681
21+
22+
namespace emscripten {
23+
24+
// Helper class for generic sync-to-async conversion. Creating an instance of
25+
// this class will spin up a pthread. You can then call invoke() to run code
26+
// on that pthread. The work done on the pthread receives a callback method
27+
// which lets you indicate when it finished working. The call to invoke() is
28+
// synchronous, while the work done on the other thread can be asynchronous,
29+
// which allows bridging async JS APIs to sync C++ code.
30+
//
31+
// This can be useful if you are in a location where blocking is possible (like
32+
// a thread, or when using PROXY_TO_PTHREAD), but you have code that is hard to
33+
// refactor to be async, but that requires some async operation (like waiting
34+
// for a JS event).
35+
class SyncToAsync {
36+
37+
// Public API
38+
//==============================================================================
39+
public:
40+
// Pass around the callback as a pointer to a std::function. Using a pointer
41+
// means that it can be sent easily to JS, as a void* parameter to a C API,
42+
// etc., and also means we do not need to worry about the lifetime of the
43+
// std::function in user code.
44+
using Callback = std::function<void()>*;
45+
46+
//
47+
// Run some work on thread. This is a synchronous (blocking) call. The thread
48+
// where the work actually runs can do async work for us - all it needs to do
49+
// is call the given callback function when it is done.
50+
//
51+
// Note that you need to call the callback even if you are not async, as the
52+
// code here does not know if you are async or not. For example,
53+
//
54+
// instance.invoke([](emscripten::SyncToAsync::Callback resume) {
55+
// std::cout << "Hello from sync C++ on the pthread\n";
56+
// (*resume)();
57+
// });
58+
//
59+
// In the async case, you would call resume() at some later time.
60+
//
61+
// It is safe to call this method from multiple threads, as it locks itself.
62+
// That is, you can create an instance of this and call it from multiple
63+
// threads freely.
64+
//
65+
void invoke(std::function<void(Callback)> newWork);
66+
67+
//==============================================================================
68+
// End Public API
69+
70+
private:
71+
// The dedicated worker thread.
72+
std::thread thread;
73+
74+
// Condition variable used for bidirectional communication between the worker
75+
// thread and invoking threads.
76+
std::condition_variable condition;
77+
std::mutex mutex;
78+
79+
// The current state of the worker thread. New work can only be submitted when
80+
// in the `Waiting` state.
81+
enum State {
82+
Waiting,
83+
WorkAvailable,
84+
ShouldExit,
85+
} state = Waiting;
86+
87+
// Increment the count every time work is finished. This will allow invokers
88+
// to detect that their particular work has been completed even if some other
89+
// invoker wins the race and submits new work before the original invoker can
90+
// check for completion.
91+
std::atomic<uint32_t> workCount{0};
92+
93+
// The work that the dedicated worker thread should perform and the callback
94+
// that needs to be called when the work is finished.
95+
std::function<void(Callback)> work;
96+
std::function<void()> resume;
97+
98+
static void* threadMain(void* arg) {
99+
// Schedule ourselves to start processing incoming work requests.
100+
emscripten_async_call(threadIter, arg, 0);
101+
return 0;
102+
}
103+
104+
// The main worker thread routine that waits for work, wakes up when work is
105+
// available, executes the work, then schedules itself again.
106+
static void threadIter(void* arg) {
107+
auto* parent = (SyncToAsync*)arg;
108+
109+
std::function<void(Callback)> currentWork;
110+
{
111+
// Wait until we get something to do.
112+
std::unique_lock<std::mutex> lock(parent->mutex);
113+
parent->condition.wait(lock, [&]() {
114+
return parent->state == WorkAvailable || parent->state == ShouldExit;
115+
});
116+
117+
if (parent->state == ShouldExit) {
118+
pthread_exit(0);
119+
}
120+
121+
assert(parent->state == WorkAvailable);
122+
currentWork = parent->work;
123+
124+
// Now that we have a local copy of the work, it is ok for new invokers to
125+
// queue up more work for us to do, so go back to `Waiting` and release
126+
// the lock.
127+
parent->state = Waiting;
128+
}
129+
130+
// Allocate a resume function that will wake the invoker and schedule us to
131+
// wait for more work.
132+
parent->resume = [parent, arg]() {
133+
// We are called, so the work was finished. Notify the invoker so it will
134+
// wake up and continue once we resume waiting. There might be other
135+
// invokers waiting to give us work, so `notify_all` to make sure our
136+
// invoker wakes up. Don't worry about overflow because it's a reasonable
137+
// assumption that no invoker will continue losing wake up races for a
138+
// full cycle.
139+
parent->workCount++;
140+
parent->condition.notify_all();
141+
142+
// Look for more work. Doing this asynchronously ensures that we continue
143+
// after the current call stack unwinds (avoiding constantly adding to the
144+
// stack, and also running any remaining code the caller had, like
145+
// destructors). TODO: add an option to do a synchronous call here in some
146+
// cases, which would avoid the time delay caused by a browser setTimeout.
147+
emscripten_async_call(threadIter, arg, 0);
148+
};
149+
150+
// Run the work function the user gave us. Give it a pointer to the resume
151+
// function, which it will be responsible for calling when it's done.
152+
currentWork(&parent->resume);
153+
}
154+
155+
public:
156+
// Spawn the worker thread. It starts in the `Waiting` state, so it is ready
157+
// to accept work requests from invokers even before it starts up.
158+
SyncToAsync() : thread(threadMain, this) {}
159+
160+
~SyncToAsync() {
161+
std::unique_lock<std::mutex> lock(mutex);
162+
163+
// We are destructing the SyncToAsync object, so we should not be racing
164+
// with other threads trying to perform more `invoke`s. There should
165+
// therefore not be any work available.
166+
assert(state == Waiting);
167+
168+
// Wake the worker and tell it to quit. Be ready to join it when it does.
169+
// There shouldn't be other invokers waiting to send work since we are
170+
// destructing the SyncToAsync, so just use `notify_one`.
171+
state = ShouldExit;
172+
condition.notify_one();
173+
174+
// Unlock to allow the worker to wake up and exit.
175+
lock.unlock();
176+
thread.join();
177+
}
178+
};
179+
180+
void SyncToAsync::invoke(std::function<void(Callback)> newWork) {
181+
// The worker might not be waiting for work if some other invoker has already
182+
// sent work. Wait for the worker to be done with that work and ready for new
183+
// work.
184+
std::unique_lock<std::mutex> lock(mutex);
185+
condition.wait(lock, [&]() { return state == Waiting; });
186+
187+
// Now the worker is definitely waiting for our work. Send it over.
188+
assert(state == Waiting);
189+
uint32_t workID = workCount;
190+
work = newWork;
191+
state = WorkAvailable;
192+
193+
// Wake the worker and wait for it to finish the work. There might be other
194+
// invokers waiting to send work as well, so `notify_all` to ensure the worker
195+
// wakes up. Wait for `workCount` to increase rather than for the state to
196+
// return to `Waiting` to make sure we wake up even if some other invoker wins
197+
// the race and submits more work before we acquire the lock.
198+
condition.notify_all();
199+
condition.wait(lock, [&]() { return workCount != workID; });
200+
}
201+
202+
} // namespace emscripten

tests/test_other.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def also_with_wasmfs(f):
120120
def metafunc(self, wasmfs):
121121
if wasmfs:
122122
self.set_setting('WASMFS')
123-
self.emcc_args = self.emcc_args.copy() + ['-DWASMFS']
123+
self.emcc_args.append('-DWASMFS')
124124
f(self)
125125
else:
126126
f(self)
@@ -11256,6 +11256,14 @@ def test_wasmfs_jsfile(self):
1125611256
self.set_setting('WASMFS')
1125711257
self.do_run_in_out_file_test('wasmfs/wasmfs_jsfile.c')
1125811258

11259+
@node_pthreads
11260+
def test_wasmfs_jsfile_proxying_backend(self):
11261+
self.emcc_args.append('-DPROXYING')
11262+
self.set_setting('USE_PTHREADS')
11263+
self.set_setting('PROXY_TO_PTHREAD')
11264+
self.set_setting('EXIT_RUNTIME')
11265+
self.test_wasmfs_jsfile()
11266+
1125911267
@disabled('Running with initial >2GB heaps is not currently supported on the CI version of Node')
1126011268
def test_hello_world_above_2gb(self):
1126111269
self.run_process([EMCC, test_file('hello_world.c'), '-sGLOBAL_BASE=2147483648', '-sINITIAL_MEMORY=3GB'])

0 commit comments

Comments
 (0)