Skip to content

Commit 40252ec

Browse files
committed
[ML] Threading optimisations (elastic#433)
1 parent 482ff86 commit 40252ec

8 files changed

+80
-90
lines changed

include/core/CConcurrentQueue.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,11 @@ class CConcurrentQueue final : private CNonCopyable {
132132

133133
private:
134134
void waitWhileEmpty(std::unique_lock<std::mutex>& lock) {
135-
while (m_Queue.empty()) {
136-
m_ConsumerCondition.wait(lock);
137-
}
135+
m_ConsumerCondition.wait(lock, [this] { return m_Queue.size() > 0; });
138136
}
139137
void waitWhileFull(std::unique_lock<std::mutex>& lock) {
140-
while (m_Queue.size() >= QUEUE_CAPACITY) {
141-
m_ProducerCondition.wait(lock);
142-
}
138+
m_ProducerCondition.wait(
139+
lock, [this] { return m_Queue.size() < QUEUE_CAPACITY; });
143140
}
144141

145142
void notifyIfNoLongerFull(std::unique_lock<std::mutex>& lock, std::size_t oldSize) {

include/core/CDataFrame.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,7 @@ class CORE_EXPORT CDataFrame final {
403403
TWriteSliceToStoreFunc m_WriteSliceToStore;
404404
TFloatVec m_RowsOfSliceBeingWritten;
405405
TInt32Vec m_DocHashesOfSliceBeingWritten;
406-
future<TRowSlicePtr> m_SliceWrittenAsyncToStore;
406+
std::future<TRowSlicePtr> m_SliceWrittenAsyncToStore;
407407
TRowSlicePtrVec m_SlicesWrittenToStore;
408408
};
409409
using TRowSliceWriterPtr = std::unique_ptr<CDataFrameRowSliceWriter>;

include/core/CStaticThreadPool.h

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <core/Concurrency.h>
1111
#include <core/ImportExport.h>
1212

13-
#include <boost/any.hpp>
1413
#include <boost/optional.hpp>
1514

1615
#include <atomic>
@@ -26,11 +25,11 @@ namespace core {
2625
//!
2726
//! IMPLEMENTATION:\n
2827
//! This purposely has very limited interface and is intended to mainly support
29-
//! CThreadPoolExecutor. This provides the mechanism by which we expose the thread
28+
//! CThreadPoolExecutor which provides the mechanism by which we expose the thread
3029
//! pool to the rest of the code via calls core::async.
3130
class CORE_EXPORT CStaticThreadPool {
3231
public:
33-
using TTask = std::packaged_task<boost::any()>;
32+
using TTask = std::function<void()>;
3433

3534
public:
3635
explicit CStaticThreadPool(std::size_t size);
@@ -49,10 +48,7 @@ class CORE_EXPORT CStaticThreadPool {
4948
//! and is suitable for our use case where we don't need to guaranty that this
5049
//! always returns immediately and instead want to exert back pressure on the
5150
//! thread scheduling tasks if the pool can't keep up.
52-
void schedule(std::packaged_task<boost::any()>&& task);
53-
54-
//! Executes the specified function in the thread pool.
55-
void schedule(std::function<void()>&& f);
51+
void schedule(TTask&& task);
5652

5753
//! Check if the thread pool has been marked as busy.
5854
bool busy() const;

include/core/Concurrency.h

Lines changed: 30 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
#include <core/CLoopProgress.h>
1212
#include <core/ImportExport.h>
1313

14-
#include <boost/any.hpp>
15-
1614
#include <algorithm>
1715
#include <functional>
1816
#include <future>
@@ -72,7 +70,7 @@ auto bindRetrievableState(FUNCTION&& function, STATE&& state) {
7270
class CExecutor {
7371
public:
7472
virtual ~CExecutor() = default;
75-
virtual void schedule(std::packaged_task<boost::any()>&& f) = 0;
73+
virtual void schedule(std::function<void()>&& f) = 0;
7674
virtual bool busy() const = 0;
7775
virtual void busy(bool value) = 0;
7876
};
@@ -105,35 +103,21 @@ CORE_EXPORT
105103
std::size_t defaultAsyncThreadPoolSize();
106104

107105
namespace concurrency_detail {
108-
template<typename F>
109-
boost::any resultToAny(F& f, const std::false_type&) {
110-
return boost::any{f()};
106+
template<typename F, typename P>
107+
void invokeAndWriteResultToPromise(F& f, P& promise, const std::false_type&) {
108+
try {
109+
promise->set_value(f());
110+
} catch (...) { promise->set_exception(std::current_exception()); }
111111
}
112-
template<typename F>
113-
boost::any resultToAny(F& f, const std::true_type&) {
114-
f();
115-
return boost::any{};
112+
template<typename F, typename P>
113+
void invokeAndWriteResultToPromise(F& f, P& promise, const std::true_type&) {
114+
try {
115+
f();
116+
promise->set_value();
117+
} catch (...) { promise->set_exception(std::current_exception()); }
116118
}
117-
118-
template<typename R>
119-
class CTypedFutureAnyWrapper {
120-
public:
121-
CTypedFutureAnyWrapper() = default;
122-
CTypedFutureAnyWrapper(std::future<boost::any>&& future)
123-
: m_Future{std::forward<std::future<boost::any>>(future)} {}
124-
125-
bool valid() const { return m_Future.valid(); }
126-
void wait() const { m_Future.wait(); }
127-
R get() { return boost::any_cast<R>(m_Future.get()); }
128-
129-
private:
130-
std::future<boost::any> m_Future;
131-
};
132119
}
133120

134-
template<typename R>
135-
using future = concurrency_detail::CTypedFutureAnyWrapper<R>;
136-
137121
//! An version of std::async which uses a specified executor.
138122
//!
139123
//! \note f must be copy constructible.
@@ -146,18 +130,22 @@ using future = concurrency_detail::CTypedFutureAnyWrapper<R>;
146130
//! them. Prefer using high level primitives, such as parallel_for_each, which are
147131
//! safer.
148132
template<typename FUNCTION, typename... ARGS>
149-
future<std::result_of_t<std::decay_t<FUNCTION>(std::decay_t<ARGS>...)>>
133+
std::future<std::result_of_t<std::decay_t<FUNCTION>(std::decay_t<ARGS>...)>>
150134
async(CExecutor& executor, FUNCTION&& f, ARGS&&... args) {
151135
using R = std::result_of_t<std::decay_t<FUNCTION>(std::decay_t<ARGS>...)>;
152136

153137
// Note g stores copies of the arguments in the pack, which are moved into place
154138
// if possible, so this is safe to invoke later in the context of a packaged task.
155139
auto g = std::bind<R>(std::forward<FUNCTION>(f), std::forward<ARGS>(args)...);
156140

157-
std::packaged_task<boost::any()> task([g_ = std::move(g)]() mutable {
158-
return concurrency_detail::resultToAny(g_, std::is_same<R, void>{});
159-
});
160-
auto result = task.get_future();
141+
auto promise = std::make_shared<std::promise<R>>();
142+
auto result = promise->get_future();
143+
144+
std::function<void()> task(
145+
[ g_ = std::move(g), promise_ = std::move(promise) ]() mutable {
146+
concurrency_detail::invokeAndWriteResultToPromise(
147+
g_, promise_, std::is_same<R, void>{});
148+
});
161149

162150
// Schedule the task to compute the result.
163151
executor.schedule(std::move(task));
@@ -167,41 +155,41 @@ async(CExecutor& executor, FUNCTION&& f, ARGS&&... args) {
167155

168156
//! Wait for all \p futures to be available.
169157
template<typename T>
170-
void wait_for_all(const std::vector<future<T>>& futures) {
158+
void wait_for_all(const std::vector<std::future<T>>& futures) {
171159
std::for_each(futures.begin(), futures.end(),
172-
[](const future<T>& future) { future.wait(); });
160+
[](const std::future<T>& future) { future.wait(); });
173161
}
174162

175163
//! Wait for a valid future to be available otherwise return immediately.
176164
template<typename T>
177-
void wait_for_valid(const future<T>& future) {
165+
void wait_for_valid(const std::future<T>& future) {
178166
if (future.valid()) {
179167
future.wait();
180168
}
181169
}
182170

183171
//! Wait for all valid \p futures to be available.
184172
template<typename T>
185-
void wait_for_all_valid(const std::vector<future<T>>& futures) {
173+
void wait_for_all_valid(const std::vector<std::future<T>>& futures) {
186174
std::for_each(futures.begin(), futures.end(), wait_for_valid);
187175
}
188176

189177
//! \brief Waits for a future to complete when the object is destroyed.
190178
template<typename T>
191179
class CWaitIfValidWhenExitingScope {
192180
public:
193-
CWaitIfValidWhenExitingScope(future<T>& future) : m_Future{future} {}
181+
CWaitIfValidWhenExitingScope(std::future<T>& future) : m_Future{future} {}
194182
~CWaitIfValidWhenExitingScope() { wait_for_valid(m_Future); }
195183
CWaitIfValidWhenExitingScope(const CWaitIfValidWhenExitingScope&) = delete;
196184
CWaitIfValidWhenExitingScope& operator=(const CWaitIfValidWhenExitingScope&) = delete;
197185

198186
private:
199-
future<T>& m_Future;
187+
std::future<T>& m_Future;
200188
};
201189

202190
//! Get the conjunction of all \p futures.
203191
CORE_EXPORT
204-
bool get_conjunction_of_all(std::vector<future<bool>>& futures);
192+
bool get_conjunction_of_all(std::vector<std::future<bool>>& futures);
205193

206194
namespace concurrency_detail {
207195
class CORE_EXPORT CDefaultAsyncExecutorBusyForScope {
@@ -287,7 +275,7 @@ parallel_for_each(std::size_t partitions,
287275
// ensure the best possible locality of reference for reads which occur
288276
// at a similar time in the different threads.
289277

290-
std::vector<future<bool>> tasks;
278+
std::vector<std::future<bool>> tasks;
291279

292280
for (std::size_t offset = 0; offset < partitions; ++offset, ++start) {
293281
// Note there is one copy of g for each thread so capture by reference
@@ -371,7 +359,7 @@ parallel_for_each(std::size_t partitions,
371359

372360
// See above for the rationale for this access pattern.
373361

374-
std::vector<future<bool>> tasks;
362+
std::vector<std::future<bool>> tasks;
375363

376364
for (std::size_t offset = 0; offset < partitions; ++offset, ++start) {
377365
// Note there is one copy of g for each thread so capture by reference

lib/core/CDataFrame.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ CDataFrame::TRowFuncVecBoolPr CDataFrame::sequentialApplyToAllRows(std::size_t b
291291
// function each slice is then concurrently read by the callback
292292
// on a worker thread.
293293

294-
future<void> backgroundApply;
294+
std::future<void> backgroundApply;
295295

296296
// We need to wait and this isn't guaranteed by the future destructor.
297297
CWaitIfValidWhenExitingScope<void> waitFor(backgroundApply);

lib/core/CStaticThreadPool.cc

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,18 @@ void CStaticThreadPool::schedule(TTask&& task_) {
4949
if (i == end) {
5050
m_TaskQueues[i % size].push(std::move(task));
5151
}
52-
m_Cursor.store(i + 1);
53-
}
5452

55-
void CStaticThreadPool::schedule(std::function<void()>&& f) {
56-
this->schedule(TTask([g = std::move(f)] {
57-
g();
58-
return boost::any{};
59-
}));
53+
// For many small tasks the best strategy for minimising contention between the
54+
// producers and consumers is to 1) not yield, 2) set the cursor to add tasks on
55+
// the queue for the thread on which a task is popped. This gives about twice the
56+
// throughput of this strategy. However, if there are a small number of large
57+
// tasks they must be added to different queues to ensure the work is parallelised.
58+
// For a general purpose thread pool we must avoid that pathology. If we need a
59+
// better pool for very fine grained threading we'd be better investing in a
60+
// somewhat lock free bounded queue: for a fixed memory buffer it is possible to
61+
// safely add and remove elements at different ends of a queue of length greater
62+
// than one.
63+
m_Cursor.store(i + 1);
6064
}
6165

6266
bool CStaticThreadPool::busy() const {
@@ -75,10 +79,7 @@ void CStaticThreadPool::shutdown() {
7579
// Signal to each thread that it is finished. We bind each task to a thread so
7680
// so each thread executes exactly one shutdown task.
7781
for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) {
78-
TTask done{[&] {
79-
m_Done = true;
80-
return boost::any{};
81-
}};
82+
TTask done{[&] { m_Done = true; }};
8283
m_TaskQueues[id].push(CWrappedTask{std::move(done), id});
8384
}
8485

@@ -99,6 +100,7 @@ void CStaticThreadPool::worker(std::size_t id) {
99100
};
100101

101102
TOptionalTask task;
103+
std::size_t size{m_TaskQueues.size()};
102104

103105
while (m_Done == false) {
104106
// We maintain "worker count" queues and each worker has an affinity to a
@@ -108,7 +110,6 @@ void CStaticThreadPool::worker(std::size_t id) {
108110
// if everything is working well we have essentially no contention between
109111
// workers on queue reads.
110112

111-
std::size_t size{m_TaskQueues.size()};
112113
for (std::size_t i = 0; i < size; ++i) {
113114
task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed);
114115
if (task != boost::none) {
@@ -126,7 +127,7 @@ void CStaticThreadPool::worker(std::size_t id) {
126127
// and the producer(s) will be waiting to add a task as each one is consumed.
127128
// By switching to work on a new queue here we minimise contention between the
128129
// producers and consumers. Testing on bare metal (OSX) the overhead per task
129-
// dropped from around 2.2 microseconds to 1.5 microseconds by yielding here.
130+
// dropped by around 120% by yielding here.
130131
std::this_thread::yield();
131132
}
132133
}
@@ -156,11 +157,12 @@ bool CStaticThreadPool::CWrappedTask::executableOnThread(std::size_t id) const {
156157
}
157158

158159
void CStaticThreadPool::CWrappedTask::operator()() {
159-
try {
160-
m_Task();
161-
} catch (const std::future_error& e) {
162-
LOG_ERROR(<< "Failed executing packaged task: '" << e.code() << "' "
163-
<< "with error '" << e.what() << "'");
160+
if (m_Task != nullptr) {
161+
try {
162+
m_Task();
163+
} catch (const std::exception& e) {
164+
LOG_ERROR(<< "Failed executing task with error '" << e.what() << "'");
165+
}
164166
}
165167
}
166168
}

lib/core/Concurrency.cc

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,21 @@ namespace {
1919
//! \brief Executes a function immediately (on the calling thread).
2020
class CImmediateExecutor final : public CExecutor {
2121
public:
22-
virtual void schedule(std::packaged_task<boost::any()>&& f) { f(); }
23-
virtual bool busy() const { return false; }
24-
virtual void busy(bool) {}
22+
void schedule(std::function<void()>&& f) override { f(); }
23+
bool busy() const override { return false; }
24+
void busy(bool) override {}
2525
};
2626

2727
//! \brief Executes a function in a thread pool.
2828
class CThreadPoolExecutor final : public CExecutor {
2929
public:
3030
explicit CThreadPoolExecutor(std::size_t size) : m_ThreadPool{size} {}
3131

32-
virtual void schedule(std::packaged_task<boost::any()>&& f) {
33-
m_ThreadPool.schedule(std::forward<std::packaged_task<boost::any()>>(f));
32+
void schedule(std::function<void()>&& f) override {
33+
m_ThreadPool.schedule(std::forward<std::function<void()>>(f));
3434
}
35-
virtual bool busy() const { return m_ThreadPool.busy(); }
36-
virtual void busy(bool value) { return m_ThreadPool.busy(value); }
35+
bool busy() const override { return m_ThreadPool.busy(); }
36+
void busy(bool value) override { return m_ThreadPool.busy(value); }
3737

3838
private:
3939
CStaticThreadPool m_ThreadPool;
@@ -115,14 +115,14 @@ CExecutor& defaultAsyncExecutor() {
115115
return singletonExecutor.get();
116116
}
117117

118-
bool get_conjunction_of_all(std::vector<future<bool>>& futures) {
118+
bool get_conjunction_of_all(std::vector<std::future<bool>>& futures) {
119119

120120
// This waits until results are present. If we get an exception we still want
121121
// to wait until all results are ready in case continuing destroys state access
122122
// which a worker thread reads. We just rethrow the _last_ exception we received.
123123
std::exception_ptr e;
124124
bool result{std::accumulate(futures.begin(), futures.end(), true,
125-
[&e](bool conjunction, future<bool>& future) {
125+
[&e](bool conjunction, std::future<bool>& future) {
126126
try {
127127
// Don't shortcircuit
128128
bool value = future.get();

0 commit comments

Comments
 (0)