Skip to content

Commit adf0c57

Browse files
author
accelerated
committed
Using single mutex version
1 parent eb774bc commit adf0c57

File tree

3 files changed

+14
-17
lines changed

3 files changed

+14
-17
lines changed

Diff for: CMakeLists.txt

+2-2
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET})
5151
find_package(RdKafka REQUIRED ${FIND_PACKAGE_QUIET})
5252

5353
if (Boost_FOUND)
54-
find_package(Boost COMPONENTS thread program_options ${FIND_PACKAGE_QUIET})
54+
find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET})
5555
set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS})
5656
set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED})
5757
include_directories(${Boost_INCLUDE_DIRS})
@@ -91,7 +91,7 @@ if(DOXYGEN_FOUND)
9191
)
9292
endif(DOXYGEN_FOUND)
9393

94-
if(NOT CPPKAFKA_DISABLE_TESTS AND Boost_THREAD_FOUND)
94+
if(NOT CPPKAFKA_DISABLE_TESTS)
9595
set(CATCH_ROOT ${CMAKE_SOURCE_DIR}/third_party/Catch2)
9696
if(EXISTS ${CATCH_ROOT}/CMakeLists.txt)
9797
set(CATCH_INCLUDE ${CATCH_ROOT}/single_include)

Diff for: include/cppkafka/utils/buffered_producer.h

+11-14
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,7 @@
3737
#include <unordered_set>
3838
#include <unordered_map>
3939
#include <map>
40-
#include <boost/thread/mutex.hpp>
41-
#include <boost/thread/shared_mutex.hpp>
40+
#include <mutex>
4241
#include <atomic>
4342
#include <boost/optional.hpp>
4443
#include "../producer.h"
@@ -190,8 +189,7 @@ class CPPKAFKA_API BufferedProducer {
190189

191190
Producer producer_;
192191
QueueType messages_;
193-
mutable boost::mutex exclusive_access_;
194-
mutable boost::shared_mutex shared_access_;
192+
mutable std::mutex mutex_;
195193
ProduceFailureCallback produce_failure_callback_;
196194
std::atomic_ulong expected_acks_{0};
197195
std::atomic_ullong total_messages_acked_{0};
@@ -221,14 +219,14 @@ void BufferedProducer<BufferType>::produce(const MessageBuilder& builder) {
221219

222220
template <typename BufferType>
223221
void BufferedProducer<BufferType>::flush() {
224-
{
225-
boost::shared_lock<boost::shared_mutex> grant(shared_access_);
226-
size_t num_messages = messages_.size();
227-
while (num_messages--) {
228-
produce_message(messages_.front());
229-
boost::lock_guard<boost::mutex> require(exclusive_access_);
230-
messages_.pop();
222+
size_t num_messages = messages_.size();
223+
while (num_messages--) {
224+
std::lock_guard<std::mutex> lock(mutex_);
225+
if (messages_.empty()) {
226+
break; //perhaps clear() was called
231227
}
228+
produce_message(messages_.front());
229+
messages_.pop();
232230
}
233231
wait_for_acks();
234232
}
@@ -253,16 +251,15 @@ void BufferedProducer<BufferType>::wait_for_acks() {
253251

254252
template <typename BufferType>
255253
void BufferedProducer<BufferType>::clear() {
256-
boost::unique_lock<boost::shared_mutex> restrict(shared_access_);
254+
std::lock_guard<std::mutex> lock(mutex_);
257255
QueueType tmp;
258256
std::swap(tmp, messages_);
259257
}
260258

261259
template <typename BufferType>
262260
template <typename BuilderType>
263261
void BufferedProducer<BufferType>::do_add_message(BuilderType&& builder) {
264-
boost::shared_lock<boost::shared_mutex> grant(shared_access_);
265-
boost::lock_guard<boost::mutex> require(exclusive_access_);
262+
std::lock_guard<std::mutex> lock(mutex_);
266263
messages_.push(std::move(builder));
267264
}
268265

Diff for: tests/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ add_custom_target(tests)
88

99
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
1010
add_library(cppkafka-test EXCLUDE_FROM_ALL test_utils.cpp)
11-
target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread)
11+
target_link_libraries(cppkafka-test cppkafka ${RDKAFKA_LIBRARY} pthread)
1212

1313
add_definitions("-DKAFKA_TEST_INSTANCE=\"${KAFKA_TEST_INSTANCE}\"")
1414

0 commit comments

Comments
 (0)