Skip to content

Buffered producer thread safe #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 36 additions & 5 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,18 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib)
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/lib)

# Build output checks
option(CPPKAFKA_CMAKE_VERBOSE "Generate verbose output." OFF)
option(CPPKAFKA_BUILD_SHARED "Build cppkafka as a shared library." ON)
option(CPPKAFKA_DISABLE_TESTS "Disable build of cppkafka tests." OFF)
option(CPPKAFKA_DISABLE_EXAMPLES "Disable build of cppkafka examples." OFF)
option(CPPKAFKA_BOOST_STATIC_LIBS "Link with Boost static libraries." ON)
option(CPPKAFKA_BOOST_USE_MULTITHREADED "Use Boost multithreaded libraries." ON)

# Disable output from find_package macro
if (NOT CPPKAFKA_CMAKE_VERBOSE)
set(FIND_PACKAGE_QUIET QUIET)
endif()

if(CPPKAFKA_BUILD_SHARED)
message(STATUS "Build will generate a shared library. "
"Use CPPKAFKA_BUILD_SHARED=0 to perform a static build")
Expand All @@ -37,16 +47,36 @@ else()
endif()

# Look for Boost (just need boost.optional headers here)
find_package(Boost REQUIRED)
find_package(RdKafka REQUIRED)
find_package(Boost REQUIRED ${FIND_PACKAGE_QUIET})
find_package(RdKafka REQUIRED ${FIND_PACKAGE_QUIET})

if (Boost_FOUND)
find_package(Boost COMPONENTS program_options ${FIND_PACKAGE_QUIET})
set(Boost_USE_STATIC_LIBS ${CPPKAFKA_BOOST_STATIC_LIBS})
set(Boost_USE_MULTITHREADED ${CPPKAFKA_BOOST_USE_MULTITHREADED})
include_directories(${Boost_INCLUDE_DIRS})
link_directories(${Boost_LIBRARY_DIRS})
if (CPPKAFKA_CMAKE_VERBOSE)
message(STATUS "Boost include dir: ${Boost_INCLUDE_DIRS}")
message(STATUS "Boost library dir: ${Boost_LIBRARY_DIRS}")
message(STATUS "Boost use static libs: ${Boost_USE_STATIC_LIBS}")
message(STATUS "Boost is multi-threaded: ${CPPKAFKA_BOOST_USE_MULTITHREADED}")
message(STATUS "Boost libraries: ${Boost_LIBRARIES}")
endif()
endif()

add_subdirectory(src)
add_subdirectory(include)

add_subdirectory(examples)
# Examples target
if (NOT CPPKAFKA_DISABLE_EXAMPLES AND Boost_PROGRAM_OPTIONS_FOUND)
add_subdirectory(examples)
else()
message(STATUS "Disabling examples")
endif()

# Add a target to generate API documentation using Doxygen
find_package(Doxygen QUIET)
find_package(Doxygen ${FIND_PACKAGE_QUIET})
if(DOXYGEN_FOUND)
configure_file(
${CMAKE_CURRENT_SOURCE_DIR}/docs/Doxyfile.in
Expand All @@ -65,12 +95,13 @@ if(NOT CPPKAFKA_DISABLE_TESTS)
set(CATCH_ROOT ${CMAKE_SOURCE_DIR}/third_party/Catch2)
if(EXISTS ${CATCH_ROOT}/CMakeLists.txt)
set(CATCH_INCLUDE ${CATCH_ROOT}/single_include)

enable_testing()
add_subdirectory(tests)
else()
message(STATUS "Disabling tests because submodule Catch2 isn't checked out")
endif()
else()
message(STATUS "Disabling tests")
endif()

if(NOT TARGET uninstall)
Expand Down
29 changes: 14 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,25 +55,32 @@ In order to compile _cppkafka_ you need:
* _CMake_
* A compiler with good C++11 support (e.g. gcc >= 4.8). This was tested successfully on
_g++ 4.8.3_.
* The boost library. _cppkafka_ only requires boost.optional, which is a header only library,
so this doesn't add any additional runtime dependencies.
* The boost library.

Now, in order to build, just run:

```Shell
mkdir build
cd build
cmake ..
cmake <OPTIONS> ..
make
```

## CMake options

If you have installed _librdkafka_ on a non standard directory, you can use the
`RDKAFKA_ROOT_DIR` cmake parameter when configuring the project:

The following cmake options can be specified:
* `RDKAFKA_ROOT_DIR` : Specify a different librdkafka install directory.
* `BOOST_ROOT` : Specify a different Boost install directory.
* `CPPKAFKA_CMAKE_VERBOSE` : Generate verbose output. Default is `OFF`.
* `CPPKAFKA_BUILD_SHARED` : Build cppkafka as a shared library. Default is `ON`.
* `CPPKAFKA_DISABLE_TESTS` : Disable build of cppkafka tests. Default is `OFF`.
* `CPPKAFKA_DISABLE_EXAMPLES` : Disable build of cppkafka examples. Default is `OFF`.
* `CPPKAFKA_BOOST_STATIC_LIBS` : Link with Boost static libraries. Default is `ON`.
* `CPPKAFKA_BOOST_USE_MULTITHREADED` : Use Boost multi-threaded libraries. Default is `ON`.

Example:
```Shell
cmake .. -DRDKAFKA_ROOT_DIR=/some/other/dir
cmake -DRDKAFKA_ROOT_DIR=/some/other/dir -DCPPKAFKA_BUILD_SHARED=OFF ...
```

Note that the `RDKAFKA_ROOT_DIR` must contain the following structure:
Expand All @@ -86,13 +93,6 @@ ${RDKAFKA_ROOT_DIR}/
+ lib/librdkafka.a
```

By default, a shared library will be built. If you want to perform a static build,
use the `CPPKAFKA_BUILD_SHARED` parameter:

```Shell
cmake .. -DCPPKAFKA_BUILD_SHARED=0
```

# Using

If you want to use _cppkafka_, you'll need to link your application with:
Expand All @@ -108,4 +108,3 @@ _Doxygen_ to be installed. The documentation will be written in html format at

Make sure to check the [wiki](https://github.com/mfontanini/cppkafka/wiki) which includes
some documentation about the project and some of its features.

37 changes: 15 additions & 22 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
find_package(Boost COMPONENTS program_options)

if (Boost_PROGRAM_OPTIONS_FOUND)
link_libraries(${Boost_LIBRARIES} cppkafka ${RDKAFKA_LIBRARY})

include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include)
include_directories(SYSTEM ${Boost_INCLUDE_DIRS} ${RDKAFKA_INCLUDE_DIR})

add_custom_target(examples)
macro(create_example example_name)
add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp")
add_dependencies(examples ${example_name})
endmacro()

create_example(kafka_producer)
create_example(kafka_consumer)
create_example(kafka_consumer_dispatcher)
create_example(metadata)
create_example(consumers_information)
else()
message(STATUS "Disabling examples since boost.program_options was not found")
endif()
link_libraries(cppkafka ${RDKAFKA_LIBRARY} ${Boost_LIBRARIES} pthread)
include_directories(${CMAKE_CURRENT_SOURCE_DIR}/../include)
include_directories(SYSTEM ${RDKAFKA_INCLUDE_DIR})

add_custom_target(examples)
macro(create_example example_name)
add_executable(${example_name} EXCLUDE_FROM_ALL "${example_name}.cpp")
add_dependencies(examples ${example_name})
endmacro()

create_example(kafka_producer)
create_example(kafka_consumer)
create_example(kafka_consumer_dispatcher)
create_example(metadata)
create_example(consumers_information)
18 changes: 18 additions & 0 deletions include/cppkafka/message_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "buffer.h"
#include "topic.h"
#include "macros.h"
#include "message.h"

namespace cppkafka {

Expand All @@ -49,6 +50,11 @@ class BasicMessageBuilder {
* \param topic The topic into which this message would be produced
*/
BasicMessageBuilder(std::string topic);

/**
* Construct a BasicMessageBuilder from a Message object
*/
BasicMessageBuilder(const Message& message);

/**
* \brief Construct a message builder from another one that uses a different buffer type
Expand Down Expand Up @@ -177,6 +183,18 @@ BasicMessageBuilder<T, C>::BasicMessageBuilder(std::string topic)
: topic_(std::move(topic)) {
}

template <typename T, typename C>
BasicMessageBuilder<T, C>::BasicMessageBuilder(const Message& message)
: topic_(message.get_topic()),
key_(Buffer(message.get_key().get_data(), message.get_key().get_size())),
payload_(Buffer(message.get_payload().get_data(), message.get_payload().get_size())),
timestamp_(message.get_timestamp() ? message.get_timestamp().get().get_timestamp() :
std::chrono::milliseconds(0)),
user_data_(message.get_user_data())
{

}

template <typename T, typename C>
template <typename U, typename V>
BasicMessageBuilder<T, C>::BasicMessageBuilder(const BasicMessageBuilder<U, V>& rhs)
Expand Down
1 change: 1 addition & 0 deletions include/cppkafka/producer.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class CPPKAFKA_API Producer : public KafkaHandleBase {
* The policy to use for the payload. The default policy is COPY_PAYLOAD
*/
enum class PayloadPolicy {
PASSTHROUGH_PAYLOAD = 0, ///< Rdkafka will not copy nor free the payload.
COPY_PAYLOAD = RD_KAFKA_MSG_F_COPY, ///< Means RD_KAFKA_MSG_F_COPY
FREE_PAYLOAD = RD_KAFKA_MSG_F_FREE ///< Means RD_KAFKA_MSG_F_FREE
};
Expand Down
Loading