diff --git a/include/core/CConcurrentQueue.h b/include/core/CConcurrentQueue.h index 8e48f6f8b2..6d8006d0b3 100644 --- a/include/core/CConcurrentQueue.h +++ b/include/core/CConcurrentQueue.h @@ -132,14 +132,11 @@ class CConcurrentQueue final : private CNonCopyable { private: void waitWhileEmpty(std::unique_lock& lock) { - while (m_Queue.empty()) { - m_ConsumerCondition.wait(lock); - } + m_ConsumerCondition.wait(lock, [this] { return m_Queue.size() > 0; }); } void waitWhileFull(std::unique_lock& lock) { - while (m_Queue.size() >= QUEUE_CAPACITY) { - m_ProducerCondition.wait(lock); - } + m_ProducerCondition.wait( + lock, [this] { return m_Queue.size() < QUEUE_CAPACITY; }); } void notifyIfNoLongerFull(std::unique_lock& lock, std::size_t oldSize) { diff --git a/include/core/CDataFrame.h b/include/core/CDataFrame.h index 3349520e66..f6a632ffaa 100644 --- a/include/core/CDataFrame.h +++ b/include/core/CDataFrame.h @@ -403,7 +403,7 @@ class CORE_EXPORT CDataFrame final { TWriteSliceToStoreFunc m_WriteSliceToStore; TFloatVec m_RowsOfSliceBeingWritten; TInt32Vec m_DocHashesOfSliceBeingWritten; - future m_SliceWrittenAsyncToStore; + std::future m_SliceWrittenAsyncToStore; TRowSlicePtrVec m_SlicesWrittenToStore; }; using TRowSliceWriterPtr = std::unique_ptr; diff --git a/include/core/CStaticThreadPool.h b/include/core/CStaticThreadPool.h index 266e1c09d8..1f2bc6ba17 100644 --- a/include/core/CStaticThreadPool.h +++ b/include/core/CStaticThreadPool.h @@ -10,7 +10,6 @@ #include #include -#include #include #include @@ -26,11 +25,11 @@ namespace core { //! //! IMPLEMENTATION:\n //! This purposely has very limited interface and is intended to mainly support -//! CThreadPoolExecutor. This provides the mechanism by which we expose the thread +//! CThreadPoolExecutor which provides the mechanism by which we expose the thread //! pool to the rest of the code via calls core::async. class CORE_EXPORT CStaticThreadPool { public: - using TTask = std::packaged_task; + using TTask = std::function; public: explicit CStaticThreadPool(std::size_t size); @@ -49,10 +48,7 @@ class CORE_EXPORT CStaticThreadPool { //! and is suitable for our use case where we don't need to guaranty that this //! always returns immediately and instead want to exert back pressure on the //! thread scheduling tasks if the pool can't keep up. - void schedule(std::packaged_task&& task); - - //! Executes the specified function in the thread pool. - void schedule(std::function&& f); + void schedule(TTask&& task); //! Check if the thread pool has been marked as busy. bool busy() const; diff --git a/include/core/Concurrency.h b/include/core/Concurrency.h index 5b2e74c673..d46e8db637 100644 --- a/include/core/Concurrency.h +++ b/include/core/Concurrency.h @@ -11,8 +11,6 @@ #include #include -#include - #include #include #include @@ -72,7 +70,7 @@ auto bindRetrievableState(FUNCTION&& function, STATE&& state) { class CExecutor { public: virtual ~CExecutor() = default; - virtual void schedule(std::packaged_task&& f) = 0; + virtual void schedule(std::function&& f) = 0; virtual bool busy() const = 0; virtual void busy(bool value) = 0; }; @@ -105,35 +103,21 @@ CORE_EXPORT std::size_t defaultAsyncThreadPoolSize(); namespace concurrency_detail { -template -boost::any resultToAny(F& f, const std::false_type&) { - return boost::any{f()}; +template +void invokeAndWriteResultToPromise(F& f, P& promise, const std::false_type&) { + try { + promise->set_value(f()); + } catch (...) { promise->set_exception(std::current_exception()); } } -template -boost::any resultToAny(F& f, const std::true_type&) { - f(); - return boost::any{}; +template +void invokeAndWriteResultToPromise(F& f, P& promise, const std::true_type&) { + try { + f(); + promise->set_value(); + } catch (...) { promise->set_exception(std::current_exception()); } } - -template -class CTypedFutureAnyWrapper { -public: - CTypedFutureAnyWrapper() = default; - CTypedFutureAnyWrapper(std::future&& future) - : m_Future{std::forward>(future)} {} - - bool valid() const { return m_Future.valid(); } - void wait() const { m_Future.wait(); } - R get() { return boost::any_cast(m_Future.get()); } - -private: - std::future m_Future; -}; } -template -using future = concurrency_detail::CTypedFutureAnyWrapper; - //! An version of std::async which uses a specified executor. //! //! \note f must be copy constructible. @@ -146,7 +130,7 @@ using future = concurrency_detail::CTypedFutureAnyWrapper; //! them. Prefer using high level primitives, such as parallel_for_each, which are //! safer. template -future(std::decay_t...)>> +std::future(std::decay_t...)>> async(CExecutor& executor, FUNCTION&& f, ARGS&&... args) { using R = std::result_of_t(std::decay_t...)>; @@ -154,10 +138,14 @@ async(CExecutor& executor, FUNCTION&& f, ARGS&&... args) { // if possible, so this is safe to invoke later in the context of a packaged task. auto g = std::bind(std::forward(f), std::forward(args)...); - std::packaged_task task([g_ = std::move(g)]() mutable { - return concurrency_detail::resultToAny(g_, std::is_same{}); - }); - auto result = task.get_future(); + auto promise = std::make_shared>(); + auto result = promise->get_future(); + + std::function task( + [ g_ = std::move(g), promise_ = std::move(promise) ]() mutable { + concurrency_detail::invokeAndWriteResultToPromise( + g_, promise_, std::is_same{}); + }); // Schedule the task to compute the result. executor.schedule(std::move(task)); @@ -167,14 +155,14 @@ async(CExecutor& executor, FUNCTION&& f, ARGS&&... args) { //! Wait for all \p futures to be available. template -void wait_for_all(const std::vector>& futures) { +void wait_for_all(const std::vector>& futures) { std::for_each(futures.begin(), futures.end(), - [](const future& future) { future.wait(); }); + [](const std::future& future) { future.wait(); }); } //! Wait for a valid future to be available otherwise return immediately. template -void wait_for_valid(const future& future) { +void wait_for_valid(const std::future& future) { if (future.valid()) { future.wait(); } @@ -182,7 +170,7 @@ void wait_for_valid(const future& future) { //! Wait for all valid \p futures to be available. template -void wait_for_all_valid(const std::vector>& futures) { +void wait_for_all_valid(const std::vector>& futures) { std::for_each(futures.begin(), futures.end(), wait_for_valid); } @@ -190,18 +178,18 @@ void wait_for_all_valid(const std::vector>& futures) { template class CWaitIfValidWhenExitingScope { public: - CWaitIfValidWhenExitingScope(future& future) : m_Future{future} {} + CWaitIfValidWhenExitingScope(std::future& future) : m_Future{future} {} ~CWaitIfValidWhenExitingScope() { wait_for_valid(m_Future); } CWaitIfValidWhenExitingScope(const CWaitIfValidWhenExitingScope&) = delete; CWaitIfValidWhenExitingScope& operator=(const CWaitIfValidWhenExitingScope&) = delete; private: - future& m_Future; + std::future& m_Future; }; //! Get the conjunction of all \p futures. CORE_EXPORT -bool get_conjunction_of_all(std::vector>& futures); +bool get_conjunction_of_all(std::vector>& futures); namespace concurrency_detail { class CORE_EXPORT CDefaultAsyncExecutorBusyForScope { @@ -287,7 +275,7 @@ parallel_for_each(std::size_t partitions, // ensure the best possible locality of reference for reads which occur // at a similar time in the different threads. - std::vector> tasks; + std::vector> tasks; for (std::size_t offset = 0; offset < partitions; ++offset, ++start) { // Note there is one copy of g for each thread so capture by reference @@ -371,7 +359,7 @@ parallel_for_each(std::size_t partitions, // See above for the rationale for this access pattern. - std::vector> tasks; + std::vector> tasks; for (std::size_t offset = 0; offset < partitions; ++offset, ++start) { // Note there is one copy of g for each thread so capture by reference diff --git a/lib/core/CDataFrame.cc b/lib/core/CDataFrame.cc index cffb57c16e..c759c05b53 100644 --- a/lib/core/CDataFrame.cc +++ b/lib/core/CDataFrame.cc @@ -291,7 +291,7 @@ CDataFrame::TRowFuncVecBoolPr CDataFrame::sequentialApplyToAllRows(std::size_t b // function each slice is then concurrently read by the callback // on a worker thread. - future backgroundApply; + std::future backgroundApply; // We need to wait and this isn't guaranteed by the future destructor. CWaitIfValidWhenExitingScope waitFor(backgroundApply); diff --git a/lib/core/CStaticThreadPool.cc b/lib/core/CStaticThreadPool.cc index 16afc65d32..b4bb267e7d 100644 --- a/lib/core/CStaticThreadPool.cc +++ b/lib/core/CStaticThreadPool.cc @@ -49,14 +49,18 @@ void CStaticThreadPool::schedule(TTask&& task_) { if (i == end) { m_TaskQueues[i % size].push(std::move(task)); } - m_Cursor.store(i + 1); -} -void CStaticThreadPool::schedule(std::function&& f) { - this->schedule(TTask([g = std::move(f)] { - g(); - return boost::any{}; - })); + // For many small tasks the best strategy for minimising contention between the + // producers and consumers is to 1) not yield, 2) set the cursor to add tasks on + // the queue for the thread on which a task is popped. This gives about twice the + // throughput of this strategy. However, if there are a small number of large + // tasks they must be added to different queues to ensure the work is parallelised. + // For a general purpose thread pool we must avoid that pathology. If we need a + // better pool for very fine grained threading we'd be better investing in a + // somewhat lock free bounded queue: for a fixed memory buffer it is possible to + // safely add and remove elements at different ends of a queue of length greater + // than one. + m_Cursor.store(i + 1); } bool CStaticThreadPool::busy() const { @@ -75,10 +79,7 @@ void CStaticThreadPool::shutdown() { // Signal to each thread that it is finished. We bind each task to a thread so // so each thread executes exactly one shutdown task. for (std::size_t id = 0; id < m_TaskQueues.size(); ++id) { - TTask done{[&] { - m_Done = true; - return boost::any{}; - }}; + TTask done{[&] { m_Done = true; }}; m_TaskQueues[id].push(CWrappedTask{std::move(done), id}); } @@ -99,6 +100,7 @@ void CStaticThreadPool::worker(std::size_t id) { }; TOptionalTask task; + std::size_t size{m_TaskQueues.size()}; while (m_Done == false) { // We maintain "worker count" queues and each worker has an affinity to a @@ -108,7 +110,6 @@ void CStaticThreadPool::worker(std::size_t id) { // if everything is working well we have essentially no contention between // workers on queue reads. - std::size_t size{m_TaskQueues.size()}; for (std::size_t i = 0; i < size; ++i) { task = m_TaskQueues[(id + i) % size].tryPop(ifAllowed); if (task != boost::none) { @@ -126,7 +127,7 @@ void CStaticThreadPool::worker(std::size_t id) { // and the producer(s) will be waiting to add a task as each one is consumed. // By switching to work on a new queue here we minimise contention between the // producers and consumers. Testing on bare metal (OSX) the overhead per task - // dropped from around 2.2 microseconds to 1.5 microseconds by yielding here. + // dropped by around 120% by yielding here. std::this_thread::yield(); } } @@ -156,11 +157,12 @@ bool CStaticThreadPool::CWrappedTask::executableOnThread(std::size_t id) const { } void CStaticThreadPool::CWrappedTask::operator()() { - try { - m_Task(); - } catch (const std::future_error& e) { - LOG_ERROR(<< "Failed executing packaged task: '" << e.code() << "' " - << "with error '" << e.what() << "'"); + if (m_Task != nullptr) { + try { + m_Task(); + } catch (const std::exception& e) { + LOG_ERROR(<< "Failed executing task with error '" << e.what() << "'"); + } } } } diff --git a/lib/core/Concurrency.cc b/lib/core/Concurrency.cc index 640408d51b..7519e5721f 100644 --- a/lib/core/Concurrency.cc +++ b/lib/core/Concurrency.cc @@ -19,9 +19,9 @@ namespace { //! \brief Executes a function immediately (on the calling thread). class CImmediateExecutor final : public CExecutor { public: - virtual void schedule(std::packaged_task&& f) { f(); } - virtual bool busy() const { return false; } - virtual void busy(bool) {} + void schedule(std::function&& f) override { f(); } + bool busy() const override { return false; } + void busy(bool) override {} }; //! \brief Executes a function in a thread pool. @@ -29,11 +29,11 @@ class CThreadPoolExecutor final : public CExecutor { public: explicit CThreadPoolExecutor(std::size_t size) : m_ThreadPool{size} {} - virtual void schedule(std::packaged_task&& f) { - m_ThreadPool.schedule(std::forward>(f)); + void schedule(std::function&& f) override { + m_ThreadPool.schedule(std::forward>(f)); } - virtual bool busy() const { return m_ThreadPool.busy(); } - virtual void busy(bool value) { return m_ThreadPool.busy(value); } + bool busy() const override { return m_ThreadPool.busy(); } + void busy(bool value) override { return m_ThreadPool.busy(value); } private: CStaticThreadPool m_ThreadPool; @@ -115,14 +115,14 @@ CExecutor& defaultAsyncExecutor() { return singletonExecutor.get(); } -bool get_conjunction_of_all(std::vector>& futures) { +bool get_conjunction_of_all(std::vector>& futures) { // This waits until results are present. If we get an exception we still want // to wait until all results are ready in case continuing destroys state access // which a worker thread reads. We just rethrow the _last_ exception we received. std::exception_ptr e; bool result{std::accumulate(futures.begin(), futures.end(), true, - [&e](bool conjunction, future& future) { + [&e](bool conjunction, std::future& future) { try { // Don't shortcircuit bool value = future.get(); diff --git a/lib/core/unittest/CStaticThreadPoolTest.cc b/lib/core/unittest/CStaticThreadPoolTest.cc index 72020e5f3b..b12bcf6d0f 100644 --- a/lib/core/unittest/CStaticThreadPoolTest.cc +++ b/lib/core/unittest/CStaticThreadPoolTest.cc @@ -9,6 +9,8 @@ #include #include +#include + #include #include #include @@ -149,21 +151,24 @@ void CStaticThreadPoolTest::testManyTasksThroughput() { void CStaticThreadPoolTest::testSchedulingOverhead() { - // Test the overhead per task is less than 1.6 microseconds. + // Test the overhead per task is less than 0.7 microseconds. core::CStaticThreadPool pool{4}; core::CStopWatch watch{true}; - for (std::size_t i = 0; i < 1000000; ++i) { + for (std::size_t i = 0; i < 2000000; ++i) { if (i % 100000 == 0) { LOG_DEBUG(<< i); } - pool.schedule([]() {}); + pool.schedule([ j = std::size_t{0}, count = i % 1000 ]() mutable { + for (j = 0; j < count; ++j) { + } + }); } double overhead{static_cast(watch.stop()) / 1000.0}; LOG_DEBUG(<< "Total time = " << overhead); - //CPPUNIT_ASSERT(overhead < 1.6); + //CPPUNIT_ASSERT(overhead < 1.4); } void CStaticThreadPoolTest::testWithExceptions() { @@ -175,8 +180,10 @@ void CStaticThreadPoolTest::testWithExceptions() { { core::CStaticThreadPool pool{2}; for (std::size_t i = 0; i < 5; ++i) { - core::CStaticThreadPool::TTask bad; - pool.schedule(std::move(bad)); + core::CStaticThreadPool::TTask null; + pool.schedule(std::move(null)); + auto throws = []() { throw std::runtime_error{"bad"}; }; + pool.schedule(throws); } // This would dealock due to lack of consumers if we'd killed our workers.