Skip to content

[ML] Threading optimisations #433

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions include/core/CConcurrentQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,11 @@ class CConcurrentQueue final : private CNonCopyable {

private:
void waitWhileEmpty(std::unique_lock<std::mutex>& lock) {
while (m_Queue.empty()) {
m_ConsumerCondition.wait(lock);
}
m_ConsumerCondition.wait(lock, [this] { return m_Queue.size() > 0; });
}
void waitWhileFull(std::unique_lock<std::mutex>& lock) {
while (m_Queue.size() >= QUEUE_CAPACITY) {
m_ProducerCondition.wait(lock);
}
m_ProducerCondition.wait(
lock, [this] { return m_Queue.size() < QUEUE_CAPACITY; });
}

void notifyIfNoLongerFull(std::unique_lock<std::mutex>& lock, std::size_t oldSize) {
Expand Down
2 changes: 1 addition & 1 deletion include/core/CDataFrame.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ class CORE_EXPORT CDataFrame final {
TWriteSliceToStoreFunc m_WriteSliceToStore;
TFloatVec m_RowsOfSliceBeingWritten;
TInt32Vec m_DocHashesOfSliceBeingWritten;
future<TRowSlicePtr> m_SliceWrittenAsyncToStore;
std::future<TRowSlicePtr> m_SliceWrittenAsyncToStore;
TRowSlicePtrVec m_SlicesWrittenToStore;
};
using TRowSliceWriterPtr = std::unique_ptr<CDataFrameRowSliceWriter>;
Expand Down
10 changes: 3 additions & 7 deletions include/core/CStaticThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
#include <core/Concurrency.h>
#include <core/ImportExport.h>

#include <boost/any.hpp>
#include <boost/optional.hpp>

#include <atomic>
Expand All @@ -26,11 +25,11 @@ namespace core {
//!
//! IMPLEMENTATION:\n
//! This purposely has very limited interface and is intended to mainly support
//! CThreadPoolExecutor. This provides the mechanism by which we expose the thread
//! CThreadPoolExecutor which provides the mechanism by which we expose the thread
//! pool to the rest of the code via calls core::async.
class CORE_EXPORT CStaticThreadPool {
public:
using TTask = std::packaged_task<boost::any()>;
using TTask = std::function<void()>;

public:
explicit CStaticThreadPool(std::size_t size);
Expand All @@ -49,10 +48,7 @@ class CORE_EXPORT CStaticThreadPool {
//! and is suitable for our use case where we don't need to guaranty that this
//! always returns immediately and instead want to exert back pressure on the
//! thread scheduling tasks if the pool can't keep up.
void schedule(std::packaged_task<boost::any()>&& task);

//! Executes the specified function in the thread pool.
void schedule(std::function<void()>&& f);
void schedule(TTask&& task);

//! Check if the thread pool has been marked as busy.
bool busy() const;
Expand Down
72 changes: 30 additions & 42 deletions include/core/Concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
#include <core/CLoopProgress.h>
#include <core/ImportExport.h>

#include <boost/any.hpp>

#include <algorithm>
#include <functional>
#include <future>
Expand Down Expand Up @@ -72,7 +70,7 @@ auto bindRetrievableState(FUNCTION&& function, STATE&& state) {
class CExecutor {
public:
virtual ~CExecutor() = default;
virtual void schedule(std::packaged_task<boost::any()>&& f) = 0;
virtual void schedule(std::function<void()>&& f) = 0;
virtual bool busy() const = 0;
virtual void busy(bool value) = 0;
};
Expand Down Expand Up @@ -105,35 +103,21 @@ CORE_EXPORT
std::size_t defaultAsyncThreadPoolSize();

namespace concurrency_detail {
template<typename F>
boost::any resultToAny(F& f, const std::false_type&) {
return boost::any{f()};
template<typename F, typename P>
void invokeAndWriteResultToPromise(F& f, P& promise, const std::false_type&) {
try {
promise->set_value(f());
} catch (...) { promise->set_exception(std::current_exception()); }
}
template<typename F>
boost::any resultToAny(F& f, const std::true_type&) {
f();
return boost::any{};
template<typename F, typename P>
void invokeAndWriteResultToPromise(F& f, P& promise, const std::true_type&) {
try {
f();
promise->set_value();
} catch (...) { promise->set_exception(std::current_exception()); }
}

template<typename R>
class CTypedFutureAnyWrapper {
public:
CTypedFutureAnyWrapper() = default;
CTypedFutureAnyWrapper(std::future<boost::any>&& future)
: m_Future{std::forward<std::future<boost::any>>(future)} {}

bool valid() const { return m_Future.valid(); }
void wait() const { m_Future.wait(); }
R get() { return boost::any_cast<R>(m_Future.get()); }

private:
std::future<boost::any> m_Future;
};
}

template<typename R>
using future = concurrency_detail::CTypedFutureAnyWrapper<R>;

//! An version of std::async which uses a specified executor.
//!
//! \note f must be copy constructible.
Expand All @@ -146,18 +130,22 @@ using future = concurrency_detail::CTypedFutureAnyWrapper<R>;
//! them. Prefer using high level primitives, such as parallel_for_each, which are
//! safer.
template<typename FUNCTION, typename... ARGS>
future<std::result_of_t<std::decay_t<FUNCTION>(std::decay_t<ARGS>...)>>
std::future<std::result_of_t<std::decay_t<FUNCTION>(std::decay_t<ARGS>...)>>
async(CExecutor& executor, FUNCTION&& f, ARGS&&... args) {
using R = std::result_of_t<std::decay_t<FUNCTION>(std::decay_t<ARGS>...)>;

// Note g stores copies of the arguments in the pack, which are moved into place
// if possible, so this is safe to invoke later in the context of a packaged task.
auto g = std::bind<R>(std::forward<FUNCTION>(f), std::forward<ARGS>(args)...);

std::packaged_task<boost::any()> task([g_ = std::move(g)]() mutable {
return concurrency_detail::resultToAny(g_, std::is_same<R, void>{});
});
auto result = task.get_future();
auto promise = std::make_shared<std::promise<R>>();
auto result = promise->get_future();

std::function<void()> task(
[ g_ = std::move(g), promise_ = std::move(promise) ]() mutable {
concurrency_detail::invokeAndWriteResultToPromise(
g_, promise_, std::is_same<R, void>{});
});

// Schedule the task to compute the result.
executor.schedule(std::move(task));
Expand All @@ -167,41 +155,41 @@ async(CExecutor& executor, FUNCTION&& f, ARGS&&... args) {

//! Wait for all \p futures to be available.
template<typename T>
void wait_for_all(const std::vector<future<T>>& futures) {
void wait_for_all(const std::vector<std::future<T>>& futures) {
std::for_each(futures.begin(), futures.end(),
[](const future<T>& future) { future.wait(); });
[](const std::future<T>& future) { future.wait(); });
}

//! Wait for a valid future to be available otherwise return immediately.
template<typename T>
void wait_for_valid(const future<T>& future) {
void wait_for_valid(const std::future<T>& future) {
if (future.valid()) {
future.wait();
}
}

//! Wait for all valid \p futures to be available.
template<typename T>
void wait_for_all_valid(const std::vector<future<T>>& futures) {
void wait_for_all_valid(const std::vector<std::future<T>>& futures) {
std::for_each(futures.begin(), futures.end(), wait_for_valid);
}

//! \brief Waits for a future to complete when the object is destroyed.
template<typename T>
class CWaitIfValidWhenExitingScope {
public:
CWaitIfValidWhenExitingScope(future<T>& future) : m_Future{future} {}
CWaitIfValidWhenExitingScope(std::future<T>& future) : m_Future{future} {}
~CWaitIfValidWhenExitingScope() { wait_for_valid(m_Future); }
CWaitIfValidWhenExitingScope(const CWaitIfValidWhenExitingScope&) = delete;
CWaitIfValidWhenExitingScope& operator=(const CWaitIfValidWhenExitingScope&) = delete;

private:
future<T>& m_Future;
std::future<T>& m_Future;
};

//! Get the conjunction of all \p futures.
CORE_EXPORT
bool get_conjunction_of_all(std::vector<future<bool>>& futures);
bool get_conjunction_of_all(std::vector<std::future<bool>>& futures);

namespace concurrency_detail {
class CORE_EXPORT CDefaultAsyncExecutorBusyForScope {
Expand Down Expand Up @@ -287,7 +275,7 @@ parallel_for_each(std::size_t partitions,
// ensure the best possible locality of reference for reads which occur
// at a similar time in the different threads.

std::vector<future<bool>> tasks;
std::vector<std::future<bool>> tasks;

for (std::size_t offset = 0; offset < partitions; ++offset, ++start) {
// Note there is one copy of g for each thread so capture by reference
Expand Down Expand Up @@ -371,7 +359,7 @@ parallel_for_each(std::size_t partitions,

// See above for the rationale for this access pattern.

std::vector<future<bool>> tasks;
std::vector<std::future<bool>> tasks;

for (std::size_t offset = 0; offset < partitions; ++offset, ++start) {
// Note there is one copy of g for each thread so capture by reference
Expand Down
2 changes: 1 addition & 1 deletion lib/core/CDataFrame.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ CDataFrame::TRowFuncVecBoolPr CDataFrame::sequentialApplyToAllRows(std::size_t b
// function each slice is then concurrently read by the callback
// on a worker thread.

future<void> backgroundApply;
std::future<void> backgroundApply;

// We need to wait and this isn't guaranteed by the future destructor.
CWaitIfValidWhenExitingScope<void> waitFor(backgroundApply);
Expand Down
38 changes: 20 additions & 18 deletions lib/core/CStaticThreadPool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,18 @@ void CStaticThreadPool::schedule(TTask&& task_) {
if (i == end) {
m_TaskQueues[i % size].push(std::move(task));
}
m_Cursor.store(i + 1);
}

void CStaticThreadPool::schedule(std::function<void()>&& f) {
this->schedule(TTask([g = std::move(f)] {
g();
return boost::any{};
}));
// For many small tasks the best strategy for minimising contention between the
// producers and consumers is to 1) not yield, 2) set the cursor to add tasks on
// the queue for the thread on which a task is popped. This gives about twice the
// throughput of this strategy. However, if there are a small number of large
// tasks they must be added to different queues to ensure the work is parallelised.
// For a general purpose thread pool we must avoid that pathology. If we need a
// better pool for very fine grained threading we'd be better investing in a
// somewhat lock free bounded queue: for a fixed memory buffer it is possible to
// safely add and remove elements at different ends of a queue of length greater
// than one.
m_Cursor.store(i + 1);
}

bool CStaticThreadPool::busy() const {
Expand All @@ -75,10 +79,7 @@ void CStaticThreadPool::shutdown() {
// Signal to each thread that it is finished. We bind each task to a thread so
// so each thread executes exactly one shutdown task.
for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) {
TTask done{[&] {
m_Done = true;
return boost::any{};
}};
TTask done{[&] { m_Done = true; }};
m_TaskQueues[id].push(CWrappedTask{std::move(done), id});
}

Expand All @@ -99,6 +100,7 @@ void CStaticThreadPool::worker(std::size_t id) {
};

TOptionalTask task;
std::size_t size{m_TaskQueues.size()};

while (m_Done == false) {
// We maintain "worker count" queues and each worker has an affinity to a
Expand All @@ -108,7 +110,6 @@ void CStaticThreadPool::worker(std::size_t id) {
// if everything is working well we have essentially no contention between
// workers on queue reads.

std::size_t size{m_TaskQueues.size()};
for (std::size_t i = 0; i < size; ++i) {
task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed);
if (task != boost::none) {
Expand All @@ -126,7 +127,7 @@ void CStaticThreadPool::worker(std::size_t id) {
// and the producer(s) will be waiting to add a task as each one is consumed.
// By switching to work on a new queue here we minimise contention between the
// producers and consumers. Testing on bare metal (OSX) the overhead per task
// dropped from around 2.2 microseconds to 1.5 microseconds by yielding here.
// dropped by around 120% by yielding here.
std::this_thread::yield();
}
}
Expand Down Expand Up @@ -156,11 +157,12 @@ bool CStaticThreadPool::CWrappedTask::executableOnThread(std::size_t id) const {
}

void CStaticThreadPool::CWrappedTask::operator()() {
try {
m_Task();
} catch (const std::future_error& e) {
LOG_ERROR(<< "Failed executing packaged task: '" << e.code() << "' "
<< "with error '" << e.what() << "'");
if (m_Task != nullptr) {
try {
m_Task();
} catch (const std::exception& e) {
LOG_ERROR(<< "Failed executing task with error '" << e.what() << "'");
}
}
}
}
Expand Down
18 changes: 9 additions & 9 deletions lib/core/Concurrency.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ namespace {
//! \brief Executes a function immediately (on the calling thread).
class CImmediateExecutor final : public CExecutor {
public:
virtual void schedule(std::packaged_task<boost::any()>&& f) { f(); }
virtual bool busy() const { return false; }
virtual void busy(bool) {}
void schedule(std::function<void()>&& f) override { f(); }
bool busy() const override { return false; }
void busy(bool) override {}
};

//! \brief Executes a function in a thread pool.
class CThreadPoolExecutor final : public CExecutor {
public:
explicit CThreadPoolExecutor(std::size_t size) : m_ThreadPool{size} {}

virtual void schedule(std::packaged_task<boost::any()>&& f) {
m_ThreadPool.schedule(std::forward<std::packaged_task<boost::any()>>(f));
void schedule(std::function<void()>&& f) override {
m_ThreadPool.schedule(std::forward<std::function<void()>>(f));
}
virtual bool busy() const { return m_ThreadPool.busy(); }
virtual void busy(bool value) { return m_ThreadPool.busy(value); }
bool busy() const override { return m_ThreadPool.busy(); }
void busy(bool value) override { return m_ThreadPool.busy(value); }

private:
CStaticThreadPool m_ThreadPool;
Expand Down Expand Up @@ -115,14 +115,14 @@ CExecutor& defaultAsyncExecutor() {
return singletonExecutor.get();
}

bool get_conjunction_of_all(std::vector<future<bool>>& futures) {
bool get_conjunction_of_all(std::vector<std::future<bool>>& futures) {

// This waits until results are present. If we get an exception we still want
// to wait until all results are ready in case continuing destroys state access
// which a worker thread reads. We just rethrow the _last_ exception we received.
std::exception_ptr e;
bool result{std::accumulate(futures.begin(), futures.end(), true,
[&e](bool conjunction, future<bool>& future) {
[&e](bool conjunction, std::future<bool>& future) {
try {
// Don't shortcircuit
bool value = future.get();
Expand Down
Loading