From af9603171ce65c30246e1ddf7c6452c2b5428ac8 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 15 Nov 2022 16:23:31 +0800 Subject: [PATCH 1/3] Migrate from Boost.Python to PyBind11 Fixes https://github.com/apache/pulsar-client-python/issues/24 ### Motivation Remove the Boost.Python dependency so that on Windows there will be no DLL dependencies because PyBind11 is header only. ### Modifications Since PyBind11 can perform type conversions between C++ types (STL, function, etc.) and Python types (list, dict, lambda, etc.), some wrapper classes are replaced with the classes in the Pulsar C++ library. The only API changes are related to the `_pulsar` module, which should not be used directly. The authentication related classes were wrapper classes with constructors before, now they are created by the static `create` methods from Pulsar C++ API. Fix the CMakeLists.txt and the workflows to build Python wheels on Linux, macOS and Windows. Finally add a workflow to build Windows wheels during a release. --- .../workflows/ci-build-release-wheels.yaml | 59 ++++ .github/workflows/ci-pr-validation.yaml | 38 +-- .gitmodules | 6 +- CMakeLists.txt | 46 +-- README.md | 42 ++- build-support/copy-deps-versionfile.sh | 1 - build-support/dep-version.py | 12 +- ...-cpp-client.sh => install-dependencies.sh} | 12 +- dependencies.yaml | 4 +- pkg/build-wheel-inside-docker.sh | 6 + pkg/mac/build-dependencies.sh | 47 +-- pkg/mac/build-mac-wheels.sh | 3 +- pkg/mac/build-pulsar-cpp.sh | 2 +- pkg/manylinux2014/Dockerfile | 9 +- pkg/manylinux_musl/Dockerfile | 10 +- pulsar-client-cpp-version.txt | 1 - pulsar/__init__.py | 16 +- pybind11 | 1 + src/authentication.cc | 146 +++------- src/client.cc | 31 +- src/config.cc | 274 ++++++++---------- src/consumer.cc | 20 +- src/cryptoKeyReader.cc | 32 -- src/enums.cc | 27 +- src/exceptions.cc | 222 ++++++++------ src/exceptions.h | 86 ++++++ src/message.cc | 188 ++++-------- src/producer.cc | 52 +--- src/pulsar.cc | 60 ++-- src/reader.cc | 11 +- src/schema.cc | 14 +- src/utils.h | 21 +- tests/pulsar_test.py | 3 +- vcpkg | 1 - vcpkg-3.10.json | 12 - vcpkg-3.7.json | 18 -- vcpkg-3.8.json | 18 -- vcpkg-3.9.json | 12 - vcpkg.json | 6 - 39 files changed, 697 insertions(+), 872 deletions(-) rename build-support/{install-cpp-client.sh => install-dependencies.sh} (82%) delete mode 100644 pulsar-client-cpp-version.txt create mode 160000 pybind11 delete mode 100644 src/cryptoKeyReader.cc create mode 100644 src/exceptions.h delete mode 160000 vcpkg delete mode 100644 vcpkg-3.10.json delete mode 100644 vcpkg-3.7.json delete mode 100644 vcpkg-3.8.json delete mode 100644 vcpkg-3.9.json delete mode 100644 vcpkg.json diff --git a/.github/workflows/ci-build-release-wheels.yaml b/.github/workflows/ci-build-release-wheels.yaml index 4f890b5..7ddf8ec 100644 --- a/.github/workflows/ci-build-release-wheels.yaml +++ b/.github/workflows/ci-build-release-wheels.yaml @@ -141,3 +141,62 @@ jobs: name: wheel-mac-py${{matrix.py.version}} path: dist/*.whl + windows-wheels: + name: Wheel Windows - Py ${{matrix.py.version}} + runs-on: windows-2019 + env: + PULSAR_CPP_DIR: 'C:\\pulsar-cpp' + timeout-minutes: 300 + + strategy: + fail-fast: false + matrix: + windows: + - name: 'Windows x64' + os: windows-2019 + arch: '-A x64' + triplet: 'x64-windows' + python: + - {version: '3.7'} + - {version: '3.8'} + - {version: '3.9'} + - {version: '3.10'} + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python.version }} + + - name: Download Pulsar C++ client on Windows + shell: bash + run: | + mkdir -p ${{ env.PULSAR_CPP_DIR }} + cd ${{ env.PULSAR_CPP_DIR }} + # TODO: switch to official releases + curl -O -L https://github.com/BewareMyPower/pulsar-client-cpp/releases/download/v3.1.0-rc-20221028/x64-windows-static.zip + unzip -q x64-windows-static.zip + ls -l ${{ env.PULSAR_CPP_DIR }} + + - name: Configure CMake + shell: bash + run: | + pip3 install pyyaml + export PYBIND11_VERSION=$(./build-support/dep-version.py pybind11) + curl -L -O https://github.com/pybind/pybind11/archive/refs/tags/v${PYBIND11_VERSION}.tar.gz + tar zxf v${PYBIND11_VERSION}.tar.gz + rm -rf pybind11 + mv pybind11-${PYBIND11_VERSION} pybind11 + cmake -B build ${{ matrix.windows.arch }} \ + -DCMAKE_PREFIX_PATH=${{ env.PULSAR_CPP_DIR }} \ + -DLINK_STATIC=ON + + - name: Build Python wheel + shell: bash + run: | + cmake --build build --config Release --target install + python -m pip install wheel + python setup.py bdist_wheel + python -m pip install ./dist/*.whl + python -c 'import pulsar; c = pulsar.Client("pulsar://localhost:6650"); c.close()' diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 132ece7..13b93ad 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -55,11 +55,8 @@ jobs: - name: checkout uses: actions/checkout@v3 - - name: Install deps - run: sudo apt-get install -y libboost-python-dev - - name: Install Pulsar C++ client - run: build-support/install-cpp-client.sh + run: build-support/install-dependencies.sh - name: CMake run: cmake . @@ -199,20 +196,11 @@ jobs: steps: - uses: actions/checkout@v3 - with: - submodules: true - uses: actions/setup-python@v4 with: python-version: ${{ matrix.python.version }} - - name: Prepare vcpkg.json - shell: bash - run: | - python --version - cp -f vcpkg-${{ matrix.python.version }}.json vcpkg.json - cat vcpkg.json - - name: Download Pulsar C++ client on Windows shell: bash run: | @@ -223,24 +211,17 @@ jobs: unzip -q ${{ matrix.windows.triplet }}-static.zip ls -l ${{ env.PULSAR_CPP_DIR }} - - name: Cache Vcpkg - uses: actions/cache@v3 - id: cache-vcpkg - with: - path: build/vcpkg_installed - key: ${{ matrix.python.version }}-${{ hashFiles(format('vcpkg-{0}.json', matrix.python.version)) }} - - - name: Install dependencies and configure CMake + - name: Configure CMake shell: bash run: | - COMMIT_ID=$(grep baseline vcpkg.json | sed 's/[",]//g' | awk '{print $2}') - cd vcpkg - echo "git fetch origin $COMMIT_ID" - git fetch origin $COMMIT_ID - cd - + pip3 install pyyaml + export PYBIND11_VERSION=$(./build-support/dep-version.py pybind11) + curl -L -O https://github.com/pybind/pybind11/archive/refs/tags/v${PYBIND11_VERSION}.tar.gz + tar zxf v${PYBIND11_VERSION}.tar.gz + rm -rf pybind11 + mv pybind11-${PYBIND11_VERSION} pybind11 cmake -B build ${{ matrix.windows.arch }} \ -DCMAKE_PREFIX_PATH=${{ env.PULSAR_CPP_DIR }} \ - -DUSE_VCPKG=ON \ -DLINK_STATIC=ON - name: Build Python wheel @@ -250,9 +231,6 @@ jobs: python -m pip install wheel python setup.py bdist_wheel python -m pip install ./dist/*.whl - cp ./build/Release/boost_python*.dll . - echo "The extra DLLs:" - ls -l *.dll python -c 'import pulsar; c = pulsar.Client("pulsar://localhost:6650"); c.close()' check-completion: diff --git a/.gitmodules b/.gitmodules index a0a57f3..a90eb86 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,3 @@ -[submodule "vcpkg"] - path = vcpkg - url = https://github.com/microsoft/vcpkg.git +[submodule "pybind11"] + path = pybind11 + url = https://github.com/pybind/pybind11.git diff --git a/CMakeLists.txt b/CMakeLists.txt index a60e1c2..98c1f62 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,12 +20,8 @@ cmake_minimum_required(VERSION 3.18) set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/cmake_modules") -option(USE_VCPKG "Use Vcpkg to install dependencies" OFF) -if (USE_VCPKG) - set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake" - CACHE STRING "Vcpkg toolchain file") -endif () project (pulsar-client-python) +set(CMAKE_PREFIX_PATH ${PROJECT_SOURCE_DIR}/pybind11/include ${CMAKE_PREFIX_PATH}) option(LINK_STATIC "Link against static libraries" OFF) MESSAGE(STATUS "LINK_STATIC: " ${LINK_STATIC}) @@ -58,43 +54,15 @@ SET(CMAKE_CXX_STANDARD 11) find_package (Python3 REQUIRED COMPONENTS Development.Module) MESSAGE(STATUS "PYTHON: " ${Python3_VERSION} " - " ${Python3_INCLUDE_DIRS}) -find_package(Boost REQUIRED ${Boost_INCLUDE_DIRS}) -message(STATUS "Boost_INCLUDE_DIRS: ${Boost_INCLUDE_DIRS}") - -SET(Boost_USE_STATIC_LIBS ${LINK_STATIC}) - -set(BOOST_PYTHON_NAME_LIST python3 python310 python39 python38 python37) -foreach (BOOST_PYTHON_NAME IN LISTS BOOST_PYTHON_NAME_LIST) - find_package(Boost QUIET COMPONENTS ${BOOST_PYTHON_NAME}) - if (${Boost_FOUND}) - set(BOOST_PYTHON_COMPONENT_FOUND ${BOOST_PYTHON_NAME}) - message(STATUS "Found Boost COMPONENTS " ${BOOST_PYTHON_COMPONENT_FOUND}) - break () - endif () -endforeach () -if (NOT BOOST_PYTHON_COMPONENT_FOUND) - message(FATAL_ERROR "Could not find Boost Python library") -endif () +find_path(PYBIND11_INCLUDE_DIRS NAMES "pybind11/pybind11.h") +message(STATUS "PYBIND11_INCLUDE_DIRS: " ${PYBIND11_INCLUDE_DIRS}) ######################################################################################################################## -INCLUDE_DIRECTORIES(${PULSAR_INCLUDE} "${Boost_INCLUDE_DIRS}" "${Python3_INCLUDE_DIRS}") - -ADD_LIBRARY(_pulsar SHARED src/pulsar.cc - src/producer.cc - src/consumer.cc - src/config.cc - src/enums.cc - src/client.cc - src/message.cc - src/authentication.cc - src/reader.cc - src/schema.cc - src/cryptoKeyReader.cc - src/exceptions.cc - src/utils.cc - ) +INCLUDE_DIRECTORIES(${PULSAR_INCLUDE} ${PYBIND11_INCLUDE_DIRS} ${Python3_INCLUDE_DIRS}) +file(GLOB SOURCES src/*.cc) +ADD_LIBRARY(_pulsar SHARED ${SOURCES}) if (MSVC) set(CMAKE_SHARED_LIBRARY_SUFFIX .pyd) else () @@ -109,10 +77,8 @@ if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") set(CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS "${CMAKE_SHARED_LIBRARY_CREATE_CXX_FLAGS} -Qunused-arguments -undefined dynamic_lookup") endif() -# Try all possible boost-python variable namings set(PYTHON_WRAPPER_LIBS ${PULSAR_LIBRARY} - Boost::${BOOST_PYTHON_COMPONENT_FOUND} ) if (MSVC) set(PYTHON_WRAPPER_LIBS ${PYTHON_WRAPPER_LIBS} Python3::Module) diff --git a/README.md b/README.md index c25c98f..42420f2 100644 --- a/README.md +++ b/README.md @@ -27,49 +27,43 @@ - A C++ compiler that supports C++11 - CMake >= 3.18 - [Pulsar C++ client library](https://github.com/apache/pulsar-client-cpp) -- [Boost.Python](https://github.com/boostorg/python) +- [PyBind11](https://github.com/pybind/pybind11) -## Install the Python wheel - -### Windows (with Vcpkg) +PyBind11 is a header-only library and a submodule, so you can simply download the submodule so that CMake can find this dependency. -First, install the dependencies via [Vcpkg](https://github.com/microsoft/vcpkg). - -```PowerShell -vcpkg install --feature-flags=manifests --triplet x64-windows +```bash +git submodule update --init ``` -> NOTE: For Windows 32-bit library, change `x64-windows` to `x86-windows`, see [here](https://github.com/microsoft/vcpkg/tree/master/triplets) for all available triplets. +You can also download the pybind11 directly like: -Then, build and install the Python wheel. - -```PowerShell -# Assuming the Pulsar C++ client has been installed under the `PULSAR_CPP` directory. -cmake -B build -DUSE_VCPKG=ON -DCMAKE_PREFIX_PATH="$env:PULSAR_CPP" -DLINK_STATIC=ON -cmake --build build --config Release -cmake --install build -py setup.py bdist_wheel -py -m pip install ./dist/pulsar_client-*.whl +```bash +pip3 install pyyaml +export PYBIND11_VERSION=$(./build-support/dep-version.py pybind11) +curl -L -O https://github.com/pybind/pybind11/archive/refs/tags/v${PYBIND11_VERSION}.tar.gz +tar zxf v${PYBIND11_VERSION}.tar.gz +mv pybind11-${PYBIND11_VERSION} pybind11 ``` -Since the Python client links to Boost.Python dynamically, you have to copy the dll (e.g. `boost_python310-vc142-mt-x64-1_80.dll`) into the system path (the `PATH` environment variable). If the `-DLINK_STATIC=ON` option is not specified, you have to copy the `pulsar.dll` into the system path as well. +After that, you only need to install the Pulsar C++ client dependency into the system path. You can [install the pre-built binaries](https://pulsar.apache.org/docs/next/client-libraries-cpp/#installation) or [build from source](https://github.com/apache/pulsar-client-cpp#compilation). -### Linux or macOS +## Install the Python wheel -Assuming the Pulsar C++ client and Boost.Python have been installed under the system path. +Make sure the PyBind11 submodule has been downloaded and the Pulsar C++ client has been installed. Then run the following commands: ```bash cmake -B build -cmake --build build -j8 +cmake --build build cmake --install build -./setup.py bdist_wheel -pip3 install dist/pulsar_client-*.whl --force-reinstall +python3 ./setup.py bdist_wheel +python3 -m pip install dist/pulsar_client-*.whl --force-reinstall ``` > **NOTE** > > 1. Here a separate `build` directory is created to store all CMake temporary files. However, the `setup.py` requires the `_pulsar.so` is under the project directory. > 2. Add the `--force-reinstall` option to overwrite the existing Python wheel in case your system has already installed a wheel before. +> 3. On Windows, the Python command is `py` instead of `python3`. ## Running examples diff --git a/build-support/copy-deps-versionfile.sh b/build-support/copy-deps-versionfile.sh index 3e42105..0a36d10 100755 --- a/build-support/copy-deps-versionfile.sh +++ b/build-support/copy-deps-versionfile.sh @@ -25,6 +25,5 @@ ROOT_DIR=$(git rev-parse --show-toplevel) for dir in manylinux2014 manylinux_musl; do mkdir -p pkg/$dir/.build cp $ROOT_DIR/dependencies.yaml pkg/$dir/.build - cp $ROOT_DIR/pulsar-client-cpp-version.txt pkg/$dir/.build cp $ROOT_DIR/build-support/dep-version.py pkg/$dir/.build done diff --git a/build-support/dep-version.py b/build-support/dep-version.py index 31200cd..8fb0467 100755 --- a/build-support/dep-version.py +++ b/build-support/dep-version.py @@ -20,5 +20,15 @@ import yaml, sys -deps = yaml.safe_load(open("dependencies.yaml")) +if len(sys.argv) < 2: + print(f'''Usage: {sys.argv[0]} dependency-name [dependency-file] + + The dependency file is "dependencies.yaml" by default.''') + sys.exit(1) + +if len(sys.argv) > 2: + dependency_file = sys.argv[2] +else: + dependency_file = 'dependencies.yaml' +deps = yaml.safe_load(open(dependency_file)) print(deps[sys.argv[1]]) diff --git a/build-support/install-cpp-client.sh b/build-support/install-dependencies.sh similarity index 82% rename from build-support/install-cpp-client.sh rename to build-support/install-dependencies.sh index fcc56f9..71596e8 100755 --- a/build-support/install-cpp-client.sh +++ b/build-support/install-dependencies.sh @@ -20,8 +20,10 @@ set -e -x -ROOT_DIR=$(dirname $(dirname $0)) -CPP_CLIENT_VERSION=$(cat $ROOT_DIR/pulsar-client-cpp-version.txt | xargs) +cd `dirname $0` + +CPP_CLIENT_VERSION=$(./dep-version.py pulsar-cpp ../dependencies.yaml) +PYBIND11_VERSION=$(./dep-version.py pybind11 ../dependencies.yaml) if [ $USER != "root" ]; then SUDO="sudo" @@ -62,5 +64,7 @@ else exit 1 fi - - +curl -L -O https://github.com/pybind/pybind11/archive/refs/tags/v${PYBIND11_VERSION}.tar.gz +tar zxf v${PYBIND11_VERSION}.tar.gz +$SUDO cp -rf pybind11-${PYBIND11_VERSION}/include/pybind11 /usr/include/ +rm -rf pybind11-${PYBIND11_VERSION} v${PYBIND11_VERSION}.tar.gz diff --git a/dependencies.yaml b/dependencies.yaml index 3d76f96..53d5667 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -17,8 +17,10 @@ # under the License. # -boost: 1.80.0 cmake: 3.24.2 +pulsar-cpp: 3.0.0 +pybind11: 2.10.1 +boost: 1.80.0 protobuf: 3.20.0 zlib: 1.2.13 zstd: 1.5.2 diff --git a/pkg/build-wheel-inside-docker.sh b/pkg/build-wheel-inside-docker.sh index 09b4304..c062607 100755 --- a/pkg/build-wheel-inside-docker.sh +++ b/pkg/build-wheel-inside-docker.sh @@ -22,6 +22,12 @@ set -e -x cd /pulsar-client-python +PYBIND11_VERSION=$(./build-support/dep-version.py pybind11) +curl -L -O https://github.com/pybind/pybind11/archive/refs/tags/v${PYBIND11_VERSION}.tar.gz +tar zxf v${PYBIND11_VERSION}.tar.gz +rm -rf pybind11 +mv pybind11-${PYBIND11_VERSION} pybind11 + rm -f CMakeCache.txt CMakeFiles cmake . \ diff --git a/pkg/mac/build-dependencies.sh b/pkg/mac/build-dependencies.sh index 07c8987..a817cc1 100755 --- a/pkg/mac/build-dependencies.sh +++ b/pkg/mac/build-dependencies.sh @@ -31,9 +31,10 @@ source pkg/mac/common.sh pip3 install pyyaml dep=$ROOT_DIR/build-support/dep-version.py +PYBIND11_VERSION=$($dep pybind11) +BOOST_VERSION=$($dep boost) ZLIB_VERSION=$($dep zlib) OPENSSL_VERSION=$($dep openssl) -BOOST_VERSION=$($dep boost) PROTOBUF_VERSION=$($dep protobuf) ZSTD_VERSION=$($dep zstd) SNAPPY_VERSION=$($dep snappy) @@ -46,6 +47,15 @@ cd $CACHE_DIR PREFIX=$CACHE_DIR/install +############################################################################### +if [ ! -f pybind11/.done ]; then + curl -L -O https://github.com/pybind/pybind11/archive/refs/tags/v${PYBIND11_VERSION}.tar.gz + tar zxf v${PYBIND11_VERSION}.tar.gz + mkdir -p $PREFIX/include/ + cp -rf pybind11-${PYBIND11_VERSION}/include/pybind11 $PREFIX/include/ + mkdir -p pybind11 + touch pybind11/.done +fi ############################################################################### if [ ! -f zlib-${ZLIB_VERSION}/.done ]; then @@ -128,42 +138,15 @@ fi ############################################################################### BOOST_VERSION_=${BOOST_VERSION//./_} -DIR=boost-src-${BOOST_VERSION} -if [ ! -f $DIR/.done ]; then +if [ ! -f boost/.done ]; then echo "Building Boost for Py $PYTHON_VERSION" curl -O -L https://boostorg.jfrog.io/artifactory/main/release/${BOOST_VERSION}/source/boost_${BOOST_VERSION_}.tar.gz tar xfz boost_${BOOST_VERSION_}.tar.gz - mv boost_${BOOST_VERSION_} $DIR - - PY_INCLUDE_DIR=${PREFIX}/include/python${PYTHON_VERSION} - if [ $PYTHON_VERSION = '3.7' ]; then - PY_INCLUDE_DIR=${PY_INCLUDE_DIR}m - fi - - pushd $DIR - cat < user-config.jam - using python : $PYTHON_VERSION - : python3 - : ${PY_INCLUDE_DIR} - : ${PREFIX}/lib - ; -EOF - ./bootstrap.sh --with-libraries=python --with-python=python3 --with-python-root=$PREFIX \ - --prefix=${PREFIX} - ./b2 -d0 address-model=64 cxxflags="-fPIC -arch arm64 -arch x86_64 -mmacosx-version-min=${MACOSX_DEPLOYMENT_TARGET}" \ - link=static threading=multi \ - --user-config=./user-config.jam \ - variant=release python=${PYTHON_VERSION} \ - -j16 \ - install - touch .done - popd -else - echo "Using cached Boost for Py $PYTHON_VERSION" + cp -rf boost_${BOOST_VERSION_}/boost $PREFIX/include/ + mkdir -p boost + touch .done fi - - ############################################################################### if [ ! -f protobuf-${PROTOBUF_VERSION}/.done ]; then echo "Building Protobuf" diff --git a/pkg/mac/build-mac-wheels.sh b/pkg/mac/build-mac-wheels.sh index b311465..22a3b31 100755 --- a/pkg/mac/build-mac-wheels.sh +++ b/pkg/mac/build-mac-wheels.sh @@ -59,8 +59,7 @@ cmake . \ -DLINK_STATIC=ON \ -DPULSAR_LIBRARY=${CACHE_DIR_CPP_CLIENT}/install/lib/libpulsar.a \ -DPULSAR_INCLUDE=${CACHE_DIR_CPP_CLIENT}/install/include \ - -DPython3_ROOT_DIR=$PREFIX \ - -DBOOST_ROOT=${PREFIX} + -DPython3_ROOT_DIR=$PREFIX make clean make -j16 diff --git a/pkg/mac/build-pulsar-cpp.sh b/pkg/mac/build-pulsar-cpp.sh index cf71545..fada73f 100755 --- a/pkg/mac/build-pulsar-cpp.sh +++ b/pkg/mac/build-pulsar-cpp.sh @@ -25,7 +25,7 @@ cd "${ROOT_DIR}" source pkg/mac/common.sh -PULSAR_CPP_VERSION=$(cat pulsar-client-cpp-version.txt | xargs) +PULSAR_CPP_VERSION=$(./build-support/dep-version.py pulsar-cpp) # Compile and cache dependencies mkdir -p $CACHE_DIR_CPP_CLIENT diff --git a/pkg/manylinux2014/Dockerfile b/pkg/manylinux2014/Dockerfile index 70f5e31..df1ba84 100644 --- a/pkg/manylinux2014/Dockerfile +++ b/pkg/manylinux2014/Dockerfile @@ -35,17 +35,14 @@ ENV PYTHON_LIBRARIES /opt/python/${PYTHON_SPEC}/lib/python${PYTHON_VERSION} RUN pip3 install pyyaml ADD .build/dependencies.yaml / -ADD .build/pulsar-client-cpp-version.txt / ADD .build/dep-version.py /usr/local/bin -# Download and compile boost +# Download and install boost RUN BOOST_VERSION=$(dep-version.py boost) && \ BOOST_VERSION_UNDESRSCORE=$(echo $BOOST_VERSION | sed 's/\./_/g') && \ curl -O -L https://boostorg.jfrog.io/artifactory/main/release/${BOOST_VERSION}/source/boost_${BOOST_VERSION_UNDESRSCORE}.tar.gz && \ tar xfz boost_${BOOST_VERSION_UNDESRSCORE}.tar.gz && \ - cd boost_${BOOST_VERSION_UNDESRSCORE} && \ - ./bootstrap.sh --with-libraries=python && \ - ./b2 -d0 address-model=64 cxxflags=-fPIC link=shared threading=multi variant=release install && \ + cp -r boost_${BOOST_VERSION_UNDESRSCORE}/boost /usr/include/ && \ rm -rf /boost_${BOOST_VERSION_UNDESRSCORE}.tar.gz /boost_${BOOST_VERSION_UNDESRSCORE} RUN CMAKE_VERSION=$(dep-version.py cmake) && \ @@ -114,7 +111,7 @@ RUN CURL_VERSION=$(dep-version.py curl) && \ rm -rf /curl-${CURL_VERSION}.tar.gz /curl-${CURL_VERSION} # Pulsar client C++ -RUN PULSAR_CPP_VERSION=$(cat pulsar-client-cpp-version.txt | xargs) && \ +RUN PULSAR_CPP_VERSION=$(dep-version.py pulsar-cpp) && \ curl -O -L https://archive.apache.org/dist/pulsar/pulsar-client-cpp-${PULSAR_CPP_VERSION}/apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz && \ tar xfz apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz && \ cd apache-pulsar-client-cpp-${PULSAR_CPP_VERSION} && \ diff --git a/pkg/manylinux_musl/Dockerfile b/pkg/manylinux_musl/Dockerfile index 796d355..a2af2bf 100644 --- a/pkg/manylinux_musl/Dockerfile +++ b/pkg/manylinux_musl/Dockerfile @@ -38,20 +38,16 @@ RUN pip install pyyaml RUN apk add cmake ADD .build/dependencies.yaml / -ADD .build/pulsar-client-cpp-version.txt / ADD .build/dep-version.py /usr/local/bin -# Download and compile boost +# Download and install boost RUN BOOST_VERSION=$(dep-version.py boost) && \ BOOST_VERSION_UNDESRSCORE=$(echo $BOOST_VERSION | sed 's/\./_/g') && \ curl -O -L https://boostorg.jfrog.io/artifactory/main/release/${BOOST_VERSION}/source/boost_${BOOST_VERSION_UNDESRSCORE}.tar.gz && \ tar xfz boost_${BOOST_VERSION_UNDESRSCORE}.tar.gz && \ - cd boost_${BOOST_VERSION_UNDESRSCORE} && \ - ./bootstrap.sh --with-libraries=python && \ - ./b2 -d0 address-model=64 cxxflags=-fPIC link=shared threading=multi variant=release install && \ + cp -r boost_${BOOST_VERSION_UNDESRSCORE}/boost /usr/include/ && \ rm -rf /boost_${BOOST_VERSION_UNDESRSCORE}.tar.gz /boost_${BOOST_VERSION_UNDESRSCORE} - # Download and compile protobuf RUN PROTOBUF_VERSION=$(dep-version.py protobuf) && \ curl -O -L https://github.com/google/protobuf/releases/download/v${PROTOBUF_VERSION}/protobuf-cpp-${PROTOBUF_VERSION}.tar.gz && \ @@ -111,7 +107,7 @@ RUN CURL_VERSION=$(dep-version.py curl) && \ rm -rf /curl-${CURL_VERSION}.tar.gz /curl-${CURL_VERSION} # Pulsar client C++ -RUN PULSAR_CPP_VERSION=$(cat pulsar-client-cpp-version.txt | xargs) && \ +RUN PULSAR_CPP_VERSION=$(dep-version.py pulsar-cpp) && \ curl -O -L https://archive.apache.org/dist/pulsar/pulsar-client-cpp-${PULSAR_CPP_VERSION}/apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz && \ tar xfz apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz && \ cd apache-pulsar-client-cpp-${PULSAR_CPP_VERSION} && \ diff --git a/pulsar-client-cpp-version.txt b/pulsar-client-cpp-version.txt deleted file mode 100644 index 4a36342..0000000 --- a/pulsar-client-cpp-version.txt +++ /dev/null @@ -1 +0,0 @@ -3.0.0 diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 0007e61..ec119e4 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -223,7 +223,7 @@ def __init__(self, dynamicLibPath, authParamsString): """ _check_type(str, dynamicLibPath, 'dynamicLibPath') _check_type(str, authParamsString, 'authParamsString') - self.auth = _pulsar.Authentication(dynamicLibPath, authParamsString) + self.auth = _pulsar.Authentication.create(dynamicLibPath, authParamsString) class AuthenticationTLS(Authentication): @@ -244,7 +244,7 @@ def __init__(self, certificate_path, private_key_path): """ _check_type(str, certificate_path, 'certificate_path') _check_type(str, private_key_path, 'private_key_path') - self.auth = _pulsar.AuthenticationTLS(certificate_path, private_key_path) + self.auth = _pulsar.AuthenticationTLS.create(certificate_path, private_key_path) class AuthenticationToken(Authentication): @@ -263,7 +263,7 @@ def __init__(self, token): """ if not (isinstance(token, str) or callable(token)): raise ValueError("Argument token is expected to be of type 'str' or a function returning 'str'") - self.auth = _pulsar.AuthenticationToken(token) + self.auth = _pulsar.AuthenticationToken.create(token) class AuthenticationAthenz(Authentication): @@ -281,7 +281,7 @@ def __init__(self, auth_params_string): JSON encoded configuration for Athenz client """ _check_type(str, auth_params_string, 'auth_params_string') - self.auth = _pulsar.AuthenticationAthenz(auth_params_string) + self.auth = _pulsar.AuthenticationAthenz.create(auth_params_string) class AuthenticationOauth2(Authentication): """ @@ -298,7 +298,7 @@ def __init__(self, auth_params_string): JSON encoded configuration for Oauth2 client """ _check_type(str, auth_params_string, 'auth_params_string') - self.auth = _pulsar.AuthenticationOauth2(auth_params_string) + self.auth = _pulsar.AuthenticationOauth2.create(auth_params_string) class AuthenticationBasic(Authentication): """ @@ -334,12 +334,12 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str """ if auth_params_string is not None: _check_type(str, auth_params_string, 'auth_params_string') - self.auth = _pulsar.AuthenticationBasic('', '', '', auth_params_string) + self.auth = _pulsar.AuthenticationBasic.create(auth_params_string) else: _check_type(str, username, 'username') _check_type(str, password, 'password') _check_type(str, method, 'method') - self.auth = _pulsar.AuthenticationBasic(username, password, method, '') + self.auth = _pulsar.AuthenticationBasic.create(username, password, method) class Client: """ @@ -1037,7 +1037,7 @@ def send(self, content, msg = self._build_msg(content, properties, partition_key, sequence_id, replication_clusters, disable_replication, event_timestamp, deliver_at, deliver_after) - return MessageId.deserialize(self._producer.send(msg)) + return self._producer.send(msg) def send_async(self, content, callback, properties=None, diff --git a/pybind11 b/pybind11 new file mode 160000 index 0000000..9c18a74 --- /dev/null +++ b/pybind11 @@ -0,0 +1 @@ +Subproject commit 9c18a74e377dece2f2acd22c2c6e63ecb2a59c77 diff --git a/src/authentication.cc b/src/authentication.cc index 1bec468..45941ea 100644 --- a/src/authentication.cc +++ b/src/authentication.cc @@ -17,109 +17,45 @@ * under the License. */ #include "utils.h" - -AuthenticationWrapper::AuthenticationWrapper() {} - -AuthenticationWrapper::AuthenticationWrapper(const std::string& dynamicLibPath, - const std::string& authParamsString) { - this->auth = AuthFactory::create(dynamicLibPath, authParamsString); -} - -struct AuthenticationTlsWrapper : public AuthenticationWrapper { - AuthenticationTlsWrapper(const std::string& certificatePath, const std::string& privateKeyPath) - : AuthenticationWrapper() { - this->auth = AuthTls::create(certificatePath, privateKeyPath); - } -}; - -struct TokenSupplierWrapper { - PyObject* _pySupplier; - - TokenSupplierWrapper(py::object pySupplier) : _pySupplier(pySupplier.ptr()) { Py_XINCREF(_pySupplier); } - - TokenSupplierWrapper(const TokenSupplierWrapper& other) { - _pySupplier = other._pySupplier; - Py_XINCREF(_pySupplier); - } - - TokenSupplierWrapper& operator=(const TokenSupplierWrapper& other) { - _pySupplier = other._pySupplier; - Py_XINCREF(_pySupplier); - return *this; - } - - virtual ~TokenSupplierWrapper() { Py_XDECREF(_pySupplier); } - - std::string operator()() { - PyGILState_STATE state = PyGILState_Ensure(); - - std::string token; - try { - token = py::call(_pySupplier); - } catch (const py::error_already_set& e) { - PyErr_Print(); - } - - PyGILState_Release(state); - return token; - } -}; - -struct AuthenticationTokenWrapper : public AuthenticationWrapper { - AuthenticationTokenWrapper(py::object token) : AuthenticationWrapper() { - if (py::extract(token).check()) { - // It's a string - std::string tokenStr = py::extract(token); - this->auth = AuthToken::createWithToken(tokenStr); - } else { - // It's a function object - this->auth = AuthToken::create(TokenSupplierWrapper(token)); - } - } -}; - -struct AuthenticationAthenzWrapper : public AuthenticationWrapper { - AuthenticationAthenzWrapper(const std::string& authParamsString) : AuthenticationWrapper() { - this->auth = AuthAthenz::create(authParamsString); - } -}; - -struct AuthenticationOauth2Wrapper : public AuthenticationWrapper { - AuthenticationOauth2Wrapper(const std::string& authParamsString) : AuthenticationWrapper() { - this->auth = AuthOauth2::create(authParamsString); - } -}; - -struct AuthenticationBasicWrapper : public AuthenticationWrapper { - AuthenticationBasicWrapper(const std::string& username, const std::string& password, - const std::string& method, const std::string& authParamsString) - : AuthenticationWrapper() { - if (authParamsString.empty()) { - this->auth = AuthBasic::create(username, password, method); - } else { - this->auth = AuthBasic::create(authParamsString); - } - } -}; - -void export_authentication() { - using namespace boost::python; - - class_("Authentication", init()); - - class_ >( - "AuthenticationTLS", init()); - - class_ >("AuthenticationToken", - init()); - - class_ >("AuthenticationAthenz", - init()); - - class_ >("AuthenticationOauth2", - init()); - - class_ >( - "AuthenticationBasic", - init()); +#include +#include +#include +#include + +namespace py = pybind11; +using namespace pulsar; + +void export_authentication(py::module_& m) { + using namespace py; + + class_>(m, "Authentication") + .def("getAuthMethodName", &Authentication::getAuthMethodName) + .def("getAuthData", &Authentication::getAuthData) + .def_static("create", static_cast( + &AuthFactory::create)); + + class_>(m, "AuthenticationTLS") + .def(init()) + .def_static("create", static_cast( + &AuthTls::create)); + + class_>(m, "AuthenticationToken") + .def(init()) + .def_static("create", static_cast(&AuthToken::create)) + .def_static("create", static_cast(&AuthToken::create)); + + class_>(m, "AuthenticationAthenz") + .def(init()) + .def_static("create", static_cast(&AuthAthenz::create)); + + class_>(m, "AuthenticationOauth2") + .def(init()) + .def_static("create", static_cast(&AuthOauth2::create)); + + class_>(m, "AuthenticationBasic") + .def(init()) + .def_static("create", static_cast(&AuthBasic::create)) + .def_static("create", static_cast(&AuthBasic::create)); } diff --git a/src/client.cc b/src/client.cc index 701578d..206c4e2 100644 --- a/src/client.cc +++ b/src/client.cc @@ -18,6 +18,11 @@ */ #include "utils.h" +#include +#include + +namespace py = pybind11; + Producer Client_createProducer(Client& client, const std::string& topic, const ProducerConfiguration& conf) { Producer producer; @@ -41,18 +46,12 @@ Consumer Client_subscribe(Client& client, const std::string& topic, const std::s return consumer; } -Consumer Client_subscribe_topics(Client& client, boost::python::list& topics, +Consumer Client_subscribe_topics(Client& client, const std::vector& topics, const std::string& subscriptionName, const ConsumerConfiguration& conf) { - std::vector topics_vector; - for (int i = 0; i < len(topics); i++) { - std::string content = boost::python::extract(topics[i]); - topics_vector.push_back(content); - } - Consumer consumer; waitForAsyncValue(std::function([&](SubscribeCallback callback) { - client.subscribeAsync(topics_vector, subscriptionName, conf, callback); + client.subscribeAsync(topics, subscriptionName, conf, callback); }), consumer); @@ -83,7 +82,7 @@ Reader Client_createReader(Client& client, const std::string& topic, const Messa return reader; } -boost::python::list Client_getTopicPartitions(Client& client, const std::string& topic) { +std::vector Client_getTopicPartitions(Client& client, const std::string& topic) { std::vector partitions; waitForAsyncValue(std::function([&](GetPartitionsCallback callback) { @@ -91,22 +90,16 @@ boost::python::list Client_getTopicPartitions(Client& client, const std::string& }), partitions); - boost::python::list pyList; - for (int i = 0; i < partitions.size(); i++) { - pyList.append(boost::python::object(partitions[i])); - } - - return pyList; + return partitions; } void Client_close(Client& client) { waitForAsyncResult([&](ResultCallback callback) { client.closeAsync(callback); }); } -void export_client() { - using namespace boost::python; - - class_("Client", init()) +void export_client(py::module_& m) { + py::class_>(m, "Client") + .def(py::init()) .def("create_producer", &Client_createProducer) .def("subscribe", &Client_subscribe) .def("subscribe_topics", &Client_subscribe_topics) diff --git a/src/config.cc b/src/config.cc index f2b7187..71795dd 100644 --- a/src/config.cc +++ b/src/config.cc @@ -17,87 +17,29 @@ * under the License. */ #include "utils.h" +#include #include +#include +#include +#include +#include #include -template -struct ListenerWrapper { - PyObject* _pyListener; +namespace py = pybind11; - ListenerWrapper(py::object pyListener) : _pyListener(pyListener.ptr()) { Py_XINCREF(_pyListener); } +#ifdef __GNUC__ +#define HIDDEN __attribute__((visibility("hidden"))) +#else +#define HIDDEN +#endif - ListenerWrapper(const ListenerWrapper& other) { - _pyListener = other._pyListener; - Py_XINCREF(_pyListener); - } - - ListenerWrapper& operator=(const ListenerWrapper& other) { - _pyListener = other._pyListener; - Py_XINCREF(_pyListener); - return *this; - } - - virtual ~ListenerWrapper() { Py_XDECREF(_pyListener); } - - void operator()(T consumer, const Message& msg) { - PyGILState_STATE state = PyGILState_Ensure(); - - try { - py::call(_pyListener, py::object(&consumer), py::object(&msg)); - } catch (const py::error_already_set& e) { - PyErr_Print(); - } - - PyGILState_Release(state); - } -}; - -static ConsumerConfiguration& ConsumerConfiguration_setMessageListener(ConsumerConfiguration& conf, - py::object pyListener) { - conf.setMessageListener(ListenerWrapper(pyListener)); - return conf; -} - -static ReaderConfiguration& ReaderConfiguration_setReaderListener(ReaderConfiguration& conf, - py::object pyListener) { - conf.setReaderListener(ListenerWrapper(pyListener)); - return conf; -} - -static ClientConfiguration& ClientConfiguration_setAuthentication(ClientConfiguration& conf, - py::object authentication) { - AuthenticationWrapper wrapper = py::extract(authentication); - conf.setAuth(wrapper.auth); - return conf; -} - -static ConsumerConfiguration& ConsumerConfiguration_setCryptoKeyReader(ConsumerConfiguration& conf, - py::object cryptoKeyReader) { - CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract(cryptoKeyReader); - conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); - return conf; -} - -static ProducerConfiguration& ProducerConfiguration_setCryptoKeyReader(ProducerConfiguration& conf, - py::object cryptoKeyReader) { - CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract(cryptoKeyReader); - conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); - return conf; -} - -static ReaderConfiguration& ReaderConfiguration_setCryptoKeyReader(ReaderConfiguration& conf, - py::object cryptoKeyReader) { - CryptoKeyReaderWrapper cryptoKeyReaderWrapper = py::extract(cryptoKeyReader); - conf.setCryptoKeyReader(cryptoKeyReaderWrapper.cryptoKeyReader); - return conf; -} - -class LoggerWrapper : public Logger, public CaptivePythonObjectMixin { +class HIDDEN LoggerWrapper : public Logger, public CaptivePythonObjectMixin { const std::unique_ptr _fallbackLogger; + py::object _pyLogger; public: - LoggerWrapper(PyObject* pyLogger, Logger* fallbackLogger) - : CaptivePythonObjectMixin(pyLogger), _fallbackLogger(fallbackLogger) {} + LoggerWrapper(PyObject* pyLoggerPtr, Logger* fallbackLogger, py::object pyLogger) + : CaptivePythonObjectMixin(pyLoggerPtr), _fallbackLogger(fallbackLogger), _pyLogger(pyLogger) {} LoggerWrapper(const LoggerWrapper&) = delete; LoggerWrapper(LoggerWrapper&&) noexcept = delete; @@ -119,16 +61,16 @@ class LoggerWrapper : public Logger, public CaptivePythonObjectMixin { try { switch (level) { case Logger::LEVEL_DEBUG: - py::call(_captive, "DEBUG", message.c_str()); + _pyLogger(py::str("DEBUG"), message); break; case Logger::LEVEL_INFO: - py::call(_captive, "INFO", message.c_str()); + _pyLogger(py::str("INFO"), message); break; case Logger::LEVEL_WARN: - py::call(_captive, "WARNING", message.c_str()); + _pyLogger(py::str("WARNING"), message); break; case Logger::LEVEL_ERROR: - py::call(_captive, "ERROR", message.c_str()); + _pyLogger(py::str("ERROR"), message); break; } } catch (const py::error_already_set& e) { @@ -141,18 +83,20 @@ class LoggerWrapper : public Logger, public CaptivePythonObjectMixin { } }; -class LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin { +class HIDDEN LoggerWrapperFactory : public LoggerFactory, public CaptivePythonObjectMixin { + py::object _pyLogger; std::unique_ptr _fallbackLoggerFactory{new ConsoleLoggerFactory}; public: - LoggerWrapperFactory(py::object pyLogger) : CaptivePythonObjectMixin(pyLogger.ptr()) {} + LoggerWrapperFactory(py::object pyLogger) + : CaptivePythonObjectMixin(pyLogger.ptr()), _pyLogger(pyLogger) {} Logger* getLogger(const std::string& fileName) { const auto fallbackLogger = _fallbackLoggerFactory->getLogger(fileName); if (_captive == py::object().ptr()) { return fallbackLogger; } else { - return new LoggerWrapper(_captive, fallbackLogger); + return new LoggerWrapper(_captive, fallbackLogger, _pyLogger); } } }; @@ -162,7 +106,8 @@ static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& c return conf; } -static ClientConfiguration& ClientConfiguration_setConsoleLogger(ClientConfiguration& conf, Logger::Level level) { +static ClientConfiguration& ClientConfiguration_setConsoleLogger(ClientConfiguration& conf, + Logger::Level level) { conf.setLogger(new ConsoleLoggerFactory(level)); return conf; } @@ -173,107 +118,124 @@ static ClientConfiguration& ClientConfiguration_setFileLogger(ClientConfiguratio return conf; } -void export_config() { - using namespace boost::python; +void export_config(py::module_& m) { + using namespace py; + + class_>(m, "AbstractCryptoKeyReader") + .def("getPublicKey", &CryptoKeyReader::getPublicKey) + .def("getPrivateKey", &CryptoKeyReader::getPrivateKey); - class_("ClientConfiguration") - .def("authentication", &ClientConfiguration_setAuthentication, return_self<>()) + class_>( + m, "CryptoKeyReader") + .def(init()); + + class_>(m, "ClientConfiguration") + .def(init<>()) + .def("authentication", &ClientConfiguration::setAuth, return_value_policy::reference) .def("operation_timeout_seconds", &ClientConfiguration::getOperationTimeoutSeconds) - .def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, return_self<>()) + .def("operation_timeout_seconds", &ClientConfiguration::setOperationTimeoutSeconds, + return_value_policy::reference) .def("connection_timeout", &ClientConfiguration::getConnectionTimeout) - .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_self<>()) + .def("connection_timeout", &ClientConfiguration::setConnectionTimeout, return_value_policy::reference) .def("io_threads", &ClientConfiguration::getIOThreads) - .def("io_threads", &ClientConfiguration::setIOThreads, return_self<>()) + .def("io_threads", &ClientConfiguration::setIOThreads, return_value_policy::reference) .def("message_listener_threads", &ClientConfiguration::getMessageListenerThreads) - .def("message_listener_threads", &ClientConfiguration::setMessageListenerThreads, return_self<>()) + .def("message_listener_threads", &ClientConfiguration::setMessageListenerThreads, + return_value_policy::reference) .def("concurrent_lookup_requests", &ClientConfiguration::getConcurrentLookupRequest) - .def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, return_self<>()) - .def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, - return_value_policy()) - .def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_self<>()) + .def("concurrent_lookup_requests", &ClientConfiguration::setConcurrentLookupRequest, + return_value_policy::reference) + .def("log_conf_file_path", &ClientConfiguration::getLogConfFilePath, return_value_policy::copy) + .def("log_conf_file_path", &ClientConfiguration::setLogConfFilePath, return_value_policy::reference) .def("use_tls", &ClientConfiguration::isUseTls) - .def("use_tls", &ClientConfiguration::setUseTls, return_self<>()) + .def("use_tls", &ClientConfiguration::setUseTls, return_value_policy::reference) .def("tls_trust_certs_file_path", &ClientConfiguration::getTlsTrustCertsFilePath, - return_value_policy()) - .def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath, return_self<>()) + return_value_policy::copy) + .def("tls_trust_certs_file_path", &ClientConfiguration::setTlsTrustCertsFilePath, + return_value_policy::reference) .def("tls_allow_insecure_connection", &ClientConfiguration::isTlsAllowInsecureConnection) .def("tls_allow_insecure_connection", &ClientConfiguration::setTlsAllowInsecureConnection, - return_self<>()) - .def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>()) - .def("listener_name", &ClientConfiguration::setListenerName, return_self<>()) - .def("set_logger", &ClientConfiguration_setLogger, return_self<>()) - .def("set_console_logger", &ClientConfiguration_setConsoleLogger, return_self<>()) - .def("set_file_logger", &ClientConfiguration_setFileLogger, return_self<>()); - - - class_("ProducerConfiguration") - .def("producer_name", &ProducerConfiguration::getProducerName, - return_value_policy()) - .def("producer_name", &ProducerConfiguration::setProducerName, return_self<>()) - .def("schema", &ProducerConfiguration::getSchema, return_value_policy()) - .def("schema", &ProducerConfiguration::setSchema, return_self<>()) + return_value_policy::reference) + .def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, + return_value_policy::reference) + .def("listener_name", &ClientConfiguration::setListenerName, return_value_policy::reference) + .def("set_logger", &ClientConfiguration_setLogger, return_value_policy::reference) + .def("set_console_logger", &ClientConfiguration_setConsoleLogger, return_value_policy::reference) + .def("set_file_logger", &ClientConfiguration_setFileLogger, return_value_policy::reference); + + class_>(m, "ProducerConfiguration") + .def(init<>()) + .def("producer_name", &ProducerConfiguration::getProducerName, return_value_policy::copy) + .def("producer_name", &ProducerConfiguration::setProducerName, return_value_policy::reference) + .def("schema", &ProducerConfiguration::getSchema, return_value_policy::copy) + .def("schema", &ProducerConfiguration::setSchema, return_value_policy::reference) .def("send_timeout_millis", &ProducerConfiguration::getSendTimeout) - .def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_self<>()) + .def("send_timeout_millis", &ProducerConfiguration::setSendTimeout, return_value_policy::reference) .def("initial_sequence_id", &ProducerConfiguration::getInitialSequenceId) - .def("initial_sequence_id", &ProducerConfiguration::setInitialSequenceId, return_self<>()) + .def("initial_sequence_id", &ProducerConfiguration::setInitialSequenceId, + return_value_policy::reference) .def("compression_type", &ProducerConfiguration::getCompressionType) - .def("compression_type", &ProducerConfiguration::setCompressionType, return_self<>()) + .def("compression_type", &ProducerConfiguration::setCompressionType, return_value_policy::reference) .def("max_pending_messages", &ProducerConfiguration::getMaxPendingMessages) - .def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, return_self<>()) + .def("max_pending_messages", &ProducerConfiguration::setMaxPendingMessages, + return_value_policy::reference) .def("max_pending_messages_across_partitions", &ProducerConfiguration::getMaxPendingMessagesAcrossPartitions) .def("max_pending_messages_across_partitions", - &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_self<>()) + &ProducerConfiguration::setMaxPendingMessagesAcrossPartitions, return_value_policy::reference) .def("block_if_queue_full", &ProducerConfiguration::getBlockIfQueueFull) - .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>()) + .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, + return_value_policy::reference) .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode) - .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>()) + .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, + return_value_policy::reference) .def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers) .def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers, - return_self<>()) - .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, - return_value_policy()) - .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>()) + return_value_policy::reference) + .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, return_value_policy::copy) + .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_value_policy::reference) .def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, - return_value_policy()) - .def("batching_max_messages", &ProducerConfiguration::setBatchingMaxMessages, return_self<>()) + return_value_policy::copy) + .def("batching_max_messages", &ProducerConfiguration::setBatchingMaxMessages, + return_value_policy::reference) .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::getBatchingMaxAllowedSizeInBytes, - return_value_policy()) + return_value_policy::copy) .def("batching_max_allowed_size_in_bytes", &ProducerConfiguration::setBatchingMaxAllowedSizeInBytes, - return_self<>()) + return_value_policy::reference) .def("batching_max_publish_delay_ms", &ProducerConfiguration::getBatchingMaxPublishDelayMs, - return_value_policy()) + return_value_policy::copy) .def("batching_max_publish_delay_ms", &ProducerConfiguration::setBatchingMaxPublishDelayMs, - return_self<>()) + return_value_policy::reference) .def("chunking_enabled", &ProducerConfiguration::isChunkingEnabled) - .def("chunking_enabled", &ProducerConfiguration::setChunkingEnabled, return_self<>()) - .def("property", &ProducerConfiguration::setProperty, return_self<>()) - .def("batching_type", &ProducerConfiguration::setBatchingType, return_self<>()) + .def("chunking_enabled", &ProducerConfiguration::setChunkingEnabled, return_value_policy::reference) + .def("property", &ProducerConfiguration::setProperty, return_value_policy::reference) + .def("batching_type", &ProducerConfiguration::setBatchingType, return_value_policy::reference) .def("batching_type", &ProducerConfiguration::getBatchingType) - .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_self<>()) - .def("crypto_key_reader", &ProducerConfiguration_setCryptoKeyReader, return_self<>()); + .def("encryption_key", &ProducerConfiguration::addEncryptionKey, return_value_policy::reference) + .def("crypto_key_reader", &ProducerConfiguration::setCryptoKeyReader, return_value_policy::reference); - class_("BatchReceivePolicy", init()) + class_(m, "BatchReceivePolicy") + .def(init()) .def("getTimeoutMs", &BatchReceivePolicy::getTimeoutMs) .def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages) .def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes); - class_("ConsumerConfiguration") + class_>(m, "ConsumerConfiguration") + .def(init<>()) .def("consumer_type", &ConsumerConfiguration::getConsumerType) - .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_self<>()) - .def("schema", &ConsumerConfiguration::getSchema, return_value_policy()) - .def("schema", &ConsumerConfiguration::setSchema, return_self<>()) - .def("message_listener", &ConsumerConfiguration_setMessageListener, return_self<>()) + .def("consumer_type", &ConsumerConfiguration::setConsumerType, return_value_policy::reference) + .def("schema", &ConsumerConfiguration::getSchema, return_value_policy::copy) + .def("schema", &ConsumerConfiguration::setSchema, return_value_policy::reference) + .def("message_listener", &ConsumerConfiguration::setMessageListener, return_value_policy::reference) .def("receiver_queue_size", &ConsumerConfiguration::getReceiverQueueSize) .def("receiver_queue_size", &ConsumerConfiguration::setReceiverQueueSize) .def("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::getMaxTotalReceiverQueueSizeAcrossPartitions) .def("max_total_receiver_queue_size_across_partitions", &ConsumerConfiguration::setMaxTotalReceiverQueueSizeAcrossPartitions) - .def("consumer_name", &ConsumerConfiguration::getConsumerName, - return_value_policy()) - .def("batch_receive_policy", &ConsumerConfiguration::getBatchReceivePolicy, return_value_policy()) + .def("batch_receive_policy", &ConsumerConfiguration::getBatchReceivePolicy, return_value_policy::copy) .def("batch_receive_policy", &ConsumerConfiguration::setBatchReceivePolicy) + .def("consumer_name", &ConsumerConfiguration::getConsumerName, return_value_policy::copy) .def("consumer_name", &ConsumerConfiguration::setConsumerName) .def("unacked_messages_timeout_ms", &ConsumerConfiguration::getUnAckedMessagesTimeoutMs) .def("unacked_messages_timeout_ms", &ConsumerConfiguration::setUnAckedMessagesTimeoutMs) @@ -287,37 +249,39 @@ void export_config() { .def("pattern_auto_discovery_period", &ConsumerConfiguration::setPatternAutoDiscoveryPeriod) .def("read_compacted", &ConsumerConfiguration::isReadCompacted) .def("read_compacted", &ConsumerConfiguration::setReadCompacted) - .def("property", &ConsumerConfiguration::setProperty, return_self<>()) + .def("property", &ConsumerConfiguration::setProperty, return_value_policy::reference) .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition) - .def("crypto_key_reader", &ConsumerConfiguration_setCryptoKeyReader, return_self<>()) + .def("crypto_key_reader", &ConsumerConfiguration::setCryptoKeyReader, return_value_policy::reference) .def("replicate_subscription_state_enabled", &ConsumerConfiguration::setReplicateSubscriptionStateEnabled) .def("replicate_subscription_state_enabled", &ConsumerConfiguration::isReplicateSubscriptionStateEnabled) .def("max_pending_chunked_message", &ConsumerConfiguration::getMaxPendingChunkedMessage) .def("max_pending_chunked_message", &ConsumerConfiguration::setMaxPendingChunkedMessage, - return_self<>()) + return_value_policy::reference) .def("auto_ack_oldest_chunked_message_on_queue_full", &ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull) .def("auto_ack_oldest_chunked_message_on_queue_full", - &ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, return_self<>()) + &ConsumerConfiguration::setAutoAckOldestChunkedMessageOnQueueFull, + return_value_policy::reference) .def("start_message_id_inclusive", &ConsumerConfiguration::isStartMessageIdInclusive) - .def("start_message_id_inclusive",&ConsumerConfiguration::setStartMessageIdInclusive, - return_self<>()); - - class_("ReaderConfiguration") - .def("reader_listener", &ReaderConfiguration_setReaderListener, return_self<>()) - .def("schema", &ReaderConfiguration::getSchema, return_value_policy()) - .def("schema", &ReaderConfiguration::setSchema, return_self<>()) + .def("start_message_id_inclusive", &ConsumerConfiguration::setStartMessageIdInclusive, + return_value_policy::reference); + + class_>(m, "ReaderConfiguration") + .def(init<>()) + .def("reader_listener", &ReaderConfiguration::setReaderListener, return_value_policy::reference) + .def("schema", &ReaderConfiguration::getSchema, return_value_policy::copy) + .def("schema", &ReaderConfiguration::setSchema, return_value_policy::reference) .def("receiver_queue_size", &ReaderConfiguration::getReceiverQueueSize) .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize) - .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy()) + .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy::copy) .def("reader_name", &ReaderConfiguration::setReaderName) .def("subscription_role_prefix", &ReaderConfiguration::getSubscriptionRolePrefix, - return_value_policy()) + return_value_policy::copy) .def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix) .def("read_compacted", &ReaderConfiguration::isReadCompacted) .def("read_compacted", &ReaderConfiguration::setReadCompacted) - .def("crypto_key_reader", &ReaderConfiguration_setCryptoKeyReader, return_self<>()); + .def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference); } diff --git a/src/consumer.cc b/src/consumer.cc index 5298fae..a77bb50 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -18,6 +18,12 @@ */ #include "utils.h" +#include +#include +#include + +namespace py = pybind11; + void Consumer_unsubscribe(Consumer& consumer) { waitForAsyncResult([&consumer](ResultCallback callback) { consumer.unsubscribeAsync(callback); }); } @@ -46,8 +52,7 @@ Messages Consumer_batch_receive(Consumer& consumer) { Messages msgs; Result res; Py_BEGIN_ALLOW_THREADS res = consumer.batchReceive(msgs); - Py_END_ALLOW_THREADS - CHECK_RESULT(res); + Py_END_ALLOW_THREADS CHECK_RESULT(res); return msgs; } @@ -102,13 +107,12 @@ MessageId Consumer_get_last_message_id(Consumer& consumer) { return msgId; } -void export_consumer() { - using namespace boost::python; - - class_("Consumer", no_init) +void export_consumer(py::module_& m) { + py::class_(m, "Consumer") + .def(py::init<>()) .def("topic", &Consumer::getTopic, "return the topic this consumer is subscribed to", - return_value_policy()) - .def("subscription_name", &Consumer::getSubscriptionName, return_value_policy()) + py::return_value_policy::copy) + .def("subscription_name", &Consumer::getSubscriptionName, py::return_value_policy::copy) .def("unsubscribe", &Consumer_unsubscribe) .def("receive", &Consumer_receive) .def("receive", &Consumer_receive_timeout) diff --git a/src/cryptoKeyReader.cc b/src/cryptoKeyReader.cc deleted file mode 100644 index 2c46b6f..0000000 --- a/src/cryptoKeyReader.cc +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#include "utils.h" - -CryptoKeyReaderWrapper::CryptoKeyReaderWrapper() {} - -CryptoKeyReaderWrapper::CryptoKeyReaderWrapper(const std::string& publicKeyPath, - const std::string& privateKeyPath) { - this->cryptoKeyReader = DefaultCryptoKeyReader::create(publicKeyPath, privateKeyPath); -} - -void export_cryptoKeyReader() { - using namespace boost::python; - - class_("CryptoKeyReader", init()); -} \ No newline at end of file diff --git a/src/enums.cc b/src/enums.cc index 733ee3e..f61011f 100644 --- a/src/enums.cc +++ b/src/enums.cc @@ -17,29 +17,36 @@ * under the License. */ #include "utils.h" +#include +#include +#include +#include -void export_enums() { - using namespace boost::python; +using namespace pulsar; +namespace py = pybind11; - enum_("PartitionsRoutingMode") +void export_enums(py::module_& m) { + using namespace py; + + enum_(m, "PartitionsRoutingMode") .value("UseSinglePartition", ProducerConfiguration::UseSinglePartition) .value("RoundRobinDistribution", ProducerConfiguration::RoundRobinDistribution) .value("CustomPartition", ProducerConfiguration::CustomPartition); - enum_("CompressionType") + enum_(m, "CompressionType") .value("NONE", CompressionNone) // Don't use 'None' since it's a keyword in py3 .value("LZ4", CompressionLZ4) .value("ZLib", CompressionZLib) .value("ZSTD", CompressionZSTD) .value("SNAPPY", CompressionSNAPPY); - enum_("ConsumerType") + enum_(m, "ConsumerType") .value("Exclusive", ConsumerExclusive) .value("Shared", ConsumerShared) .value("Failover", ConsumerFailover) .value("KeyShared", ConsumerKeyShared); - enum_("Result", "Collection of return codes") + enum_(m, "Result", "Collection of return codes") .value("Ok", ResultOk) .value("UnknownError", ResultUnknownError) .value("InvalidConfiguration", ResultInvalidConfiguration) @@ -87,7 +94,7 @@ void export_enums() { .value("MemoryBufferIsFull", ResultMemoryBufferIsFull) .value("Interrupted", pulsar::ResultInterrupted); - enum_("SchemaType", "Supported schema types") + enum_(m, "SchemaType", "Supported schema types") .value("NONE", pulsar::NONE) .value("STRING", pulsar::STRING) .value("INT8", pulsar::INT8) @@ -104,15 +111,15 @@ void export_enums() { .value("AUTO_PUBLISH", pulsar::AUTO_PUBLISH) .value("KEY_VALUE", pulsar::KEY_VALUE); - enum_("InitialPosition", "Supported initial position") + enum_(m, "InitialPosition", "Supported initial position") .value("Latest", InitialPositionLatest) .value("Earliest", InitialPositionEarliest); - enum_("BatchingType", "Supported batching types") + enum_(m, "BatchingType", "Supported batching types") .value("Default", ProducerConfiguration::DefaultBatching) .value("KeyBased", ProducerConfiguration::KeyBasedBatching); - enum_("LoggerLevel") + enum_(m, "LoggerLevel") .value("Debug", Logger::LEVEL_DEBUG) .value("Info", Logger::LEVEL_INFO) .value("Warn", Logger::LEVEL_WARN) diff --git a/src/exceptions.cc b/src/exceptions.cc index efca661..b3ec775 100644 --- a/src/exceptions.cc +++ b/src/exceptions.cc @@ -16,97 +16,149 @@ * specific language governing permissions and limitations * under the License. */ -#include +#include "exceptions.h" +#include +#include -#include "utils.h" +using namespace pulsar; +namespace py = pybind11; -static PyObject* basePulsarException = nullptr; -std::map exceptions; +#define CASE_RESULT(className) \ + case pulsar::Result##className: \ + throw className{pulsar::Result##className}; -PyObject* createExceptionClass(const char* name, PyObject* baseTypeObj = PyExc_Exception) { - using namespace boost::python; - - std::string fullName = "_pulsar."; - fullName += name; - - PyObject* typeObj = PyErr_NewException(const_cast(fullName.c_str()), baseTypeObj, nullptr); - if (!typeObj) throw_error_already_set(); - scope().attr(name) = handle<>(borrowed(typeObj)); - return typeObj; -} - -PyObject* get_exception_class(Result result) { - auto it = exceptions.find(result); - if (it != exceptions.end()) { - return it->second; - } else { - std::cerr << "Error result exception not found: " << result << std::endl; - abort(); +void raiseException(pulsar::Result result) { + switch (result) { + CASE_RESULT(UnknownError) + CASE_RESULT(InvalidConfiguration) + CASE_RESULT(Timeout) + CASE_RESULT(LookupError) + CASE_RESULT(ConnectError) + CASE_RESULT(ReadError) + CASE_RESULT(AuthenticationError) + CASE_RESULT(AuthorizationError) + CASE_RESULT(ErrorGettingAuthenticationData) + CASE_RESULT(BrokerMetadataError) + CASE_RESULT(BrokerPersistenceError) + CASE_RESULT(ChecksumError) + CASE_RESULT(ConsumerBusy) + CASE_RESULT(NotConnected) + CASE_RESULT(AlreadyClosed) + CASE_RESULT(InvalidMessage) + CASE_RESULT(ConsumerNotInitialized) + CASE_RESULT(ProducerNotInitialized) + CASE_RESULT(ProducerBusy) + CASE_RESULT(TooManyLookupRequestException) + CASE_RESULT(InvalidTopicName) + CASE_RESULT(InvalidUrl) + CASE_RESULT(ServiceUnitNotReady) + CASE_RESULT(OperationNotSupported) + CASE_RESULT(ProducerBlockedQuotaExceededError) + CASE_RESULT(ProducerBlockedQuotaExceededException) + CASE_RESULT(ProducerQueueIsFull) + CASE_RESULT(MessageTooBig) + CASE_RESULT(TopicNotFound) + CASE_RESULT(SubscriptionNotFound) + CASE_RESULT(ConsumerNotFound) + CASE_RESULT(UnsupportedVersionError) + CASE_RESULT(TopicTerminated) + CASE_RESULT(CryptoError) + CASE_RESULT(IncompatibleSchema) + CASE_RESULT(ConsumerAssignError) + CASE_RESULT(CumulativeAcknowledgementNotAllowedError) + CASE_RESULT(TransactionCoordinatorNotFoundError) + CASE_RESULT(InvalidTxnStatusError) + CASE_RESULT(NotAllowedError) + CASE_RESULT(TransactionConflict) + CASE_RESULT(TransactionNotFound) + CASE_RESULT(ProducerFenced) + CASE_RESULT(MemoryBufferIsFull) + CASE_RESULT(Interrupted) + default: + return; } } -void export_exceptions() { - using namespace boost::python; +// There is no std::hash specification for an enum in Clang compiler of macOS for C++11 +template <> +struct std::hash { + std::size_t operator()(const Result& result) const noexcept { + return std::hash()(static_cast(result)); + } +}; - basePulsarException = createExceptionClass("PulsarException"); +using PythonExceptionMap = std::unordered_map>; +static PythonExceptionMap createPythonExceptionMap(py::module_& m, py::exception& base) { + PythonExceptionMap exceptions; + exceptions[ResultUnknownError] = {m, "UnknownError", base}; + exceptions[ResultInvalidConfiguration] = {m, "InvalidConfiguration", base}; + exceptions[ResultTimeout] = {m, "Timeout", base}; + exceptions[ResultLookupError] = {m, "LookupError", base}; + exceptions[ResultConnectError] = {m, "ConnectError", base}; + exceptions[ResultReadError] = {m, "ReadError", base}; + exceptions[ResultAuthenticationError] = {m, "AuthenticationError", base}; + exceptions[ResultAuthorizationError] = {m, "AuthorizationError", base}; + exceptions[ResultErrorGettingAuthenticationData] = {m, "ErrorGettingAuthenticationData", base}; + exceptions[ResultBrokerMetadataError] = {m, "BrokerMetadataError", base}; + exceptions[ResultBrokerPersistenceError] = {m, "BrokerPersistenceError", base}; + exceptions[ResultChecksumError] = {m, "ChecksumError", base}; + exceptions[ResultConsumerBusy] = {m, "ConsumerBusy", base}; + exceptions[ResultNotConnected] = {m, "NotConnected", base}; + exceptions[ResultAlreadyClosed] = {m, "AlreadyClosed", base}; + exceptions[ResultInvalidMessage] = {m, "InvalidMessage", base}; + exceptions[ResultConsumerNotInitialized] = {m, "ConsumerNotInitialized", base}; + exceptions[ResultProducerNotInitialized] = {m, "ProducerNotInitialized", base}; + exceptions[ResultProducerBusy] = {m, "ProducerBusy", base}; + exceptions[ResultTooManyLookupRequestException] = {m, "TooManyLookupRequestException", base}; + exceptions[ResultInvalidTopicName] = {m, "InvalidTopicName", base}; + exceptions[ResultInvalidUrl] = {m, "InvalidUrl", base}; + exceptions[ResultServiceUnitNotReady] = {m, "ServiceUnitNotReady", base}; + exceptions[ResultOperationNotSupported] = {m, "OperationNotSupported", base}; + exceptions[ResultProducerBlockedQuotaExceededError] = {m, "ProducerBlockedQuotaExceededError", base}; + exceptions[ResultProducerBlockedQuotaExceededException] = {m, "ProducerBlockedQuotaExceededException", + base}; + exceptions[ResultProducerQueueIsFull] = {m, "ProducerQueueIsFull", base}; + exceptions[ResultMessageTooBig] = {m, "MessageTooBig", base}; + exceptions[ResultTopicNotFound] = {m, "TopicNotFound", base}; + exceptions[ResultSubscriptionNotFound] = {m, "SubscriptionNotFound", base}; + exceptions[ResultConsumerNotFound] = {m, "ConsumerNotFound", base}; + exceptions[ResultUnsupportedVersionError] = {m, "UnsupportedVersionError", base}; + exceptions[ResultTopicTerminated] = {m, "TopicTerminated", base}; + exceptions[ResultCryptoError] = {m, "CryptoError", base}; + exceptions[ResultIncompatibleSchema] = {m, "IncompatibleSchema", base}; + exceptions[ResultConsumerAssignError] = {m, "ConsumerAssignError", base}; + exceptions[ResultCumulativeAcknowledgementNotAllowedError] = { + m, "CumulativeAcknowledgementNotAllowedError", base}; + exceptions[ResultTransactionCoordinatorNotFoundError] = {m, "TransactionCoordinatorNotFoundError", base}; + exceptions[ResultInvalidTxnStatusError] = {m, "InvalidTxnStatusError", base}; + exceptions[ResultNotAllowedError] = {m, "NotAllowedError", base}; + exceptions[ResultTransactionConflict] = {m, "TransactionConflict", base}; + exceptions[ResultTransactionNotFound] = {m, "TransactionNotFound", base}; + exceptions[ResultProducerFenced] = {m, "ProducerFenced", base}; + exceptions[ResultMemoryBufferIsFull] = {m, "MemoryBufferIsFull", base}; + exceptions[ResultInterrupted] = {m, "Interrupted", base}; + return exceptions; +} - exceptions[ResultUnknownError] = createExceptionClass("UnknownError", basePulsarException); - exceptions[ResultInvalidConfiguration] = - createExceptionClass("InvalidConfiguration", basePulsarException); - exceptions[ResultTimeout] = createExceptionClass("Timeout", basePulsarException); - exceptions[ResultLookupError] = createExceptionClass("LookupError", basePulsarException); - exceptions[ResultConnectError] = createExceptionClass("ConnectError", basePulsarException); - exceptions[ResultReadError] = createExceptionClass("ReadError", basePulsarException); - exceptions[ResultAuthenticationError] = createExceptionClass("AuthenticationError", basePulsarException); - exceptions[ResultAuthorizationError] = createExceptionClass("AuthorizationError", basePulsarException); - exceptions[ResultErrorGettingAuthenticationData] = - createExceptionClass("ErrorGettingAuthenticationData", basePulsarException); - exceptions[ResultBrokerMetadataError] = createExceptionClass("BrokerMetadataError", basePulsarException); - exceptions[ResultBrokerPersistenceError] = - createExceptionClass("BrokerPersistenceError", basePulsarException); - exceptions[ResultChecksumError] = createExceptionClass("ChecksumError", basePulsarException); - exceptions[ResultConsumerBusy] = createExceptionClass("ConsumerBusy", basePulsarException); - exceptions[ResultNotConnected] = createExceptionClass("NotConnected", basePulsarException); - exceptions[ResultAlreadyClosed] = createExceptionClass("AlreadyClosed", basePulsarException); - exceptions[ResultInvalidMessage] = createExceptionClass("InvalidMessage", basePulsarException); - exceptions[ResultConsumerNotInitialized] = - createExceptionClass("ConsumerNotInitialized", basePulsarException); - exceptions[ResultProducerNotInitialized] = - createExceptionClass("ProducerNotInitialized", basePulsarException); - exceptions[ResultProducerBusy] = createExceptionClass("ProducerBusy", basePulsarException); - exceptions[ResultTooManyLookupRequestException] = - createExceptionClass("TooManyLookupRequestException", basePulsarException); - exceptions[ResultInvalidTopicName] = createExceptionClass("InvalidTopicName", basePulsarException); - exceptions[ResultInvalidUrl] = createExceptionClass("InvalidUrl", basePulsarException); - exceptions[ResultServiceUnitNotReady] = createExceptionClass("ServiceUnitNotReady", basePulsarException); - exceptions[ResultOperationNotSupported] = - createExceptionClass("OperationNotSupported", basePulsarException); - exceptions[ResultProducerBlockedQuotaExceededError] = - createExceptionClass("ProducerBlockedQuotaExceededError", basePulsarException); - exceptions[ResultProducerBlockedQuotaExceededException] = - createExceptionClass("ProducerBlockedQuotaExceededException", basePulsarException); - exceptions[ResultProducerQueueIsFull] = createExceptionClass("ProducerQueueIsFull", basePulsarException); - exceptions[ResultMessageTooBig] = createExceptionClass("MessageTooBig", basePulsarException); - exceptions[ResultTopicNotFound] = createExceptionClass("TopicNotFound", basePulsarException); - exceptions[ResultSubscriptionNotFound] = - createExceptionClass("SubscriptionNotFound", basePulsarException); - exceptions[ResultConsumerNotFound] = createExceptionClass("ConsumerNotFound", basePulsarException); - exceptions[ResultUnsupportedVersionError] = - createExceptionClass("UnsupportedVersionError", basePulsarException); - exceptions[ResultTopicTerminated] = createExceptionClass("TopicTerminated", basePulsarException); - exceptions[ResultCryptoError] = createExceptionClass("CryptoError", basePulsarException); - exceptions[ResultIncompatibleSchema] = createExceptionClass("IncompatibleSchema", basePulsarException); - exceptions[ResultConsumerAssignError] = createExceptionClass("ConsumerAssignError", basePulsarException); - exceptions[ResultCumulativeAcknowledgementNotAllowedError] = - createExceptionClass("CumulativeAcknowledgementNotAllowedError", basePulsarException); - exceptions[ResultTransactionCoordinatorNotFoundError] = - createExceptionClass("TransactionCoordinatorNotFoundError", basePulsarException); - exceptions[ResultInvalidTxnStatusError] = - createExceptionClass("InvalidTxnStatusError", basePulsarException); - exceptions[ResultNotAllowedError] = createExceptionClass("NotAllowedError", basePulsarException); - exceptions[ResultTransactionConflict] = createExceptionClass("TransactionConflict", basePulsarException); - exceptions[ResultTransactionNotFound] = createExceptionClass("TransactionNotFound", basePulsarException); - exceptions[ResultProducerFenced] = createExceptionClass("ProducerFenced", basePulsarException); - exceptions[ResultMemoryBufferIsFull] = createExceptionClass("MemoryBufferIsFull", basePulsarException); - exceptions[ResultInterrupted] = createExceptionClass("Interrupted", basePulsarException); +void export_exceptions(py::module_& m) { + static py::exception base{m, "PulsarException"}; + static auto exceptions = createPythonExceptionMap(m, base); + py::register_exception_translator([](std::exception_ptr e) { + try { + if (e) { + std::rethrow_exception(e); + } + } catch (const PulsarException& e) { + auto it = exceptions.find(e._result); + if (it != exceptions.end()) { + PyErr_SetString(it->second.ptr(), e.what()); + } else { + base(e.what()); + } + } catch (const std::invalid_argument& e) { + PyErr_SetString(PyExc_ValueError, e.what()); + } catch (const std::exception& e) { + PyErr_SetString(PyExc_RuntimeError, e.what()); + } + }); } diff --git a/src/exceptions.h b/src/exceptions.h new file mode 100644 index 0000000..06e3f28 --- /dev/null +++ b/src/exceptions.h @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include + +#include +#include + +struct PulsarException : std::exception { + const pulsar::Result _result; + std::string _msg = "Pulsar error: "; + PulsarException(pulsar::Result res) : _result(res) { _msg += strResult(res); } + const char* what() const noexcept override { return _msg.c_str(); } +}; + +void raiseException(pulsar::Result result); + +#define INHERIT_PULSAR_EXCEPTION(name) \ + struct name : PulsarException { \ + using PulsarException::PulsarException; \ + }; + +INHERIT_PULSAR_EXCEPTION(UnknownError) +INHERIT_PULSAR_EXCEPTION(InvalidConfiguration) +INHERIT_PULSAR_EXCEPTION(Timeout) +INHERIT_PULSAR_EXCEPTION(LookupError) +INHERIT_PULSAR_EXCEPTION(ConnectError) +INHERIT_PULSAR_EXCEPTION(ReadError) +INHERIT_PULSAR_EXCEPTION(AuthenticationError) +INHERIT_PULSAR_EXCEPTION(AuthorizationError) +INHERIT_PULSAR_EXCEPTION(ErrorGettingAuthenticationData) +INHERIT_PULSAR_EXCEPTION(BrokerMetadataError) +INHERIT_PULSAR_EXCEPTION(BrokerPersistenceError) +INHERIT_PULSAR_EXCEPTION(ChecksumError) +INHERIT_PULSAR_EXCEPTION(ConsumerBusy) +INHERIT_PULSAR_EXCEPTION(NotConnected) +INHERIT_PULSAR_EXCEPTION(AlreadyClosed) +INHERIT_PULSAR_EXCEPTION(InvalidMessage) +INHERIT_PULSAR_EXCEPTION(ConsumerNotInitialized) +INHERIT_PULSAR_EXCEPTION(ProducerNotInitialized) +INHERIT_PULSAR_EXCEPTION(ProducerBusy) +INHERIT_PULSAR_EXCEPTION(TooManyLookupRequestException) +INHERIT_PULSAR_EXCEPTION(InvalidTopicName) +INHERIT_PULSAR_EXCEPTION(InvalidUrl) +INHERIT_PULSAR_EXCEPTION(ServiceUnitNotReady) +INHERIT_PULSAR_EXCEPTION(OperationNotSupported) +INHERIT_PULSAR_EXCEPTION(ProducerBlockedQuotaExceededError) +INHERIT_PULSAR_EXCEPTION(ProducerBlockedQuotaExceededException) +INHERIT_PULSAR_EXCEPTION(ProducerQueueIsFull) +INHERIT_PULSAR_EXCEPTION(MessageTooBig) +INHERIT_PULSAR_EXCEPTION(TopicNotFound) +INHERIT_PULSAR_EXCEPTION(SubscriptionNotFound) +INHERIT_PULSAR_EXCEPTION(ConsumerNotFound) +INHERIT_PULSAR_EXCEPTION(UnsupportedVersionError) +INHERIT_PULSAR_EXCEPTION(TopicTerminated) +INHERIT_PULSAR_EXCEPTION(CryptoError) +INHERIT_PULSAR_EXCEPTION(IncompatibleSchema) +INHERIT_PULSAR_EXCEPTION(ConsumerAssignError) +INHERIT_PULSAR_EXCEPTION(CumulativeAcknowledgementNotAllowedError) +INHERIT_PULSAR_EXCEPTION(TransactionCoordinatorNotFoundError) +INHERIT_PULSAR_EXCEPTION(InvalidTxnStatusError) +INHERIT_PULSAR_EXCEPTION(NotAllowedError) +INHERIT_PULSAR_EXCEPTION(TransactionConflict) +INHERIT_PULSAR_EXCEPTION(TransactionNotFound) +INHERIT_PULSAR_EXCEPTION(ProducerFenced) +INHERIT_PULSAR_EXCEPTION(MemoryBufferIsFull) +INHERIT_PULSAR_EXCEPTION(Interrupted) + +#undef INHERIT_PULSAR_EXCEPTION diff --git a/src/message.cc b/src/message.cc index b93380b..1924bc2 100644 --- a/src/message.cc +++ b/src/message.cc @@ -18,154 +18,88 @@ */ #include "utils.h" -#include -#include -#include +#include +#include +#include +#include -std::string MessageId_str(const MessageId& msgId) { - std::stringstream ss; - ss << msgId; - return ss.str(); -} - -bool MessageId_eq(const MessageId& a, const MessageId& b) { return a == b; } - -bool MessageId_ne(const MessageId& a, const MessageId& b) { return a != b; } - -bool MessageId_lt(const MessageId& a, const MessageId& b) { return a < b; } - -bool MessageId_le(const MessageId& a, const MessageId& b) { return a <= b; } - -bool MessageId_gt(const MessageId& a, const MessageId& b) { return a > b; } - -bool MessageId_ge(const MessageId& a, const MessageId& b) { return a >= b; } - -boost::python::object MessageId_serialize(const MessageId& msgId) { - std::string serialized; - msgId.serialize(serialized); - return boost::python::object( - boost::python::handle<>(PyBytes_FromStringAndSize(serialized.c_str(), serialized.length()))); -} - -std::string Message_str(const Message& msg) { - std::stringstream ss; - ss << msg; - return ss.str(); -} - -boost::python::object Message_data(const Message& msg) { - return boost::python::object( - boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength()))); -} - -boost::python::object Message_properties(const Message& msg) { - boost::python::dict pyProperties; - for (const auto& item : msg.getProperties()) { - pyProperties[item.first] = item.second; - } - return boost::python::object(std::move(pyProperties)); -} +namespace py = pybind11; -std::string Topic_name_str(const Message& msg) { - std::stringstream ss; - ss << msg.getTopicName(); - return ss.str(); -} - -std::string schema_version_str(const Message& msg) { - std::stringstream ss; - ss << msg.getSchemaVersion(); - return ss.str(); -} - -const MessageId& Message_getMessageId(const Message& msg) { return msg.getMessageId(); } - -void deliverAfter(MessageBuilder* const builder, PyObject* obj_delta) { - PyDateTime_Delta const* pydelta = reinterpret_cast(obj_delta); - - long days = pydelta->days; - const bool is_negative = days < 0; - if (is_negative) { - days = -days; - } - - // Create chrono duration object - std::chrono::milliseconds duration = std::chrono::duration_cast( - std::chrono::hours(24) * days + std::chrono::seconds(pydelta->seconds) + - std::chrono::microseconds(pydelta->microseconds)); - - if (is_negative) { - duration = duration * -1; - } - - builder->setDeliverAfter(duration); -} - -void export_message() { - using namespace boost::python; +void export_message(py::module_& m) { + using namespace py; PyDateTime_IMPORT; MessageBuilder& (MessageBuilder::*MessageBuilderSetContentString)(const std::string&) = &MessageBuilder::setContent; - class_("MessageBuilder") - .def("content", MessageBuilderSetContentString, return_self<>()) - .def("property", &MessageBuilder::setProperty, return_self<>()) - .def("properties", &MessageBuilder::setProperties, return_self<>()) - .def("sequence_id", &MessageBuilder::setSequenceId, return_self<>()) - .def("deliver_after", &deliverAfter, return_self<>()) - .def("deliver_at", &MessageBuilder::setDeliverAt, return_self<>()) - .def("partition_key", &MessageBuilder::setPartitionKey, return_self<>()) - .def("event_timestamp", &MessageBuilder::setEventTimestamp, return_self<>()) - .def("replication_clusters", &MessageBuilder::setReplicationClusters, return_self<>()) - .def("disable_replication", &MessageBuilder::disableReplication, return_self<>()) + class_(m, "MessageBuilder") + .def(init<>()) + .def("content", MessageBuilderSetContentString, return_value_policy::reference) + .def("property", &MessageBuilder::setProperty, return_value_policy::reference) + .def("properties", &MessageBuilder::setProperties, return_value_policy::reference) + .def("sequence_id", &MessageBuilder::setSequenceId, return_value_policy::reference) + .def("deliver_after", &MessageBuilder::setDeliverAfter, return_value_policy::reference) + .def("deliver_at", &MessageBuilder::setDeliverAt, return_value_policy::reference) + .def("partition_key", &MessageBuilder::setPartitionKey, return_value_policy::reference) + .def("event_timestamp", &MessageBuilder::setEventTimestamp, return_value_policy::reference) + .def("replication_clusters", &MessageBuilder::setReplicationClusters, return_value_policy::reference) + .def("disable_replication", &MessageBuilder::disableReplication, return_value_policy::reference) .def("build", &MessageBuilder::build); - class_("MessageStringMap").def(map_indexing_suite()); - - static const MessageId& _MessageId_earliest = MessageId::earliest(); - static const MessageId& _MessageId_latest = MessageId::latest(); - - class_("MessageId") + class_(m, "MessageId") .def(init()) - .def("__str__", &MessageId_str) - .def("__eq__", &MessageId_eq) - .def("__ne__", &MessageId_ne) - .def("__le__", &MessageId_le) - .def("__lt__", &MessageId_lt) - .def("__ge__", &MessageId_ge) - .def("__gt__", &MessageId_gt) + .def("__str__", + [](const MessageId& msgId) { + std::ostringstream oss; + oss << msgId; + return oss.str(); + }) + .def("__eq__", &MessageId::operator==) + .def("__ne__", &MessageId::operator!=) + .def("__le__", &MessageId::operator<=) + .def("__lt__", &MessageId::operator<) + .def("__ge__", &MessageId::operator>=) + .def("__gt__", &MessageId::operator>) .def("ledger_id", &MessageId::ledgerId) .def("entry_id", &MessageId::entryId) .def("batch_index", &MessageId::batchIndex) .def("partition", &MessageId::partition) - .add_static_property("earliest", make_getter(&_MessageId_earliest)) - .add_static_property("latest", make_getter(&_MessageId_latest)) - .def("serialize", &MessageId_serialize) - .def("deserialize", &MessageId::deserialize) - .staticmethod("deserialize"); - - class_("Message") - .def("properties", &Message_properties) - .def("data", &Message_data) + .def_property_readonly_static("earliest", [](object) { return MessageId::earliest(); }) + .def_property_readonly_static("latest", [](object) { return MessageId::latest(); }) + .def("serialize", + [](const MessageId& msgId) { + std::string serialized; + msgId.serialize(serialized); + return bytes(serialized); + }) + .def_static("deserialize", &MessageId::deserialize); + + class_(m, "Message") + .def(init<>()) + .def("properties", &Message::getProperties) + .def("data", [](const Message& msg) { return bytes(msg.getDataAsString()); }) .def("length", &Message::getLength) - .def("partition_key", &Message::getPartitionKey, return_value_policy()) + .def("partition_key", &Message::getPartitionKey, return_value_policy::copy) .def("publish_timestamp", &Message::getPublishTimestamp) .def("event_timestamp", &Message::getEventTimestamp) - .def("message_id", &Message_getMessageId, return_value_policy()) - .def("__str__", &Message_str) - .def("topic_name", &Topic_name_str) + .def("message_id", &Message::getMessageId, return_value_policy::copy) + .def("__str__", + [](const Message& msg) { + std::ostringstream oss; + oss << msg; + return oss.str(); + }) + .def("topic_name", &Message::getTopicName, return_value_policy::copy) .def("redelivery_count", &Message::getRedeliveryCount) - .def("schema_version", &schema_version_str); + .def("schema_version", &Message::getSchemaVersion, return_value_policy::copy); MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom; - class_("MessageBatch") - .def("with_message_id", &MessageBatch::withMessageId, return_self<>()) - .def("parse_from", MessageBatchParseFromString, return_self<>()) - .def("messages", &MessageBatch::messages, return_value_policy()); - - class_ >("Messages").def(vector_indexing_suite >()); + class_(m, "MessageBatch") + .def(init<>()) + .def("with_message_id", &MessageBatch::withMessageId, return_value_policy::reference) + .def("parse_from", MessageBatchParseFromString, return_value_policy::reference) + .def("messages", &MessageBatch::messages, return_value_policy::copy); } diff --git a/src/producer.cc b/src/producer.cc index d1a11cf..bba262a 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -19,44 +19,19 @@ #include "utils.h" #include +#include +#include -extern boost::python::object MessageId_serialize(const MessageId& msgId); +namespace py = pybind11; -boost::python::object Producer_send(Producer& producer, const Message& message) { +MessageId Producer_send(Producer& producer, const Message& message) { MessageId messageId; waitForAsyncValue(std::function( [&](SendCallback callback) { producer.sendAsync(message, callback); }), messageId); - return MessageId_serialize(messageId); -} - -void Producer_sendAsyncCallback(PyObject* callback, Result res, const MessageId& msgId) { - if (callback == Py_None) { - return; - } - - PyGILState_STATE state = PyGILState_Ensure(); - - try { - py::call(callback, res, py::object(&msgId)); - } catch (const py::error_already_set& e) { - PyErr_Print(); - } - - Py_XDECREF(callback); - PyGILState_Release(state); -} - -void Producer_sendAsync(Producer& producer, const Message& message, py::object callback) { - PyObject* pyCallback = callback.ptr(); - Py_XINCREF(pyCallback); - - Py_BEGIN_ALLOW_THREADS producer.sendAsync( - message, - std::bind(Producer_sendAsyncCallback, pyCallback, std::placeholders::_1, std::placeholders::_2)); - Py_END_ALLOW_THREADS + return messageId; } void Producer_flush(Producer& producer) { @@ -67,18 +42,17 @@ void Producer_close(Producer& producer) { waitForAsyncResult([&](ResultCallback callback) { producer.closeAsync(callback); }); } -bool Producer_is_connected(Producer& producer) { return producer.isConnected(); } - -void export_producer() { - using namespace boost::python; +void export_producer(py::module_& m) { + using namespace py; - class_("Producer", no_init) + class_(m, "Producer") + .def(init<>()) .def("topic", &Producer::getTopic, "return the topic to which producer is publishing to", - return_value_policy()) + return_value_policy::copy) .def("producer_name", &Producer::getProducerName, "return the producer name which could have been assigned by the system or specified by the " "client", - return_value_policy()) + return_value_policy::copy) .def("last_sequence_id", &Producer::getLastSequenceId) .def("send", &Producer_send, "Publish a message on the topic associated with this Producer.\n" @@ -93,10 +67,10 @@ void export_producer() { "This method is equivalent to asyncSend() and wait until the callback is triggered.\n" "\n" "@param msg message to publish\n") - .def("send_async", &Producer_sendAsync) + .def("send_async", &Producer::sendAsync) .def("flush", &Producer_flush, "Flush all the messages buffered in the client and wait until all messages have been\n" "successfully persisted\n") .def("close", &Producer_close) - .def("is_connected", &Producer_is_connected); + .def("is_connected", &Producer::isConnected); } diff --git a/src/pulsar.cc b/src/pulsar.cc index a82a533..9bfeb59 100644 --- a/src/pulsar.cc +++ b/src/pulsar.cc @@ -17,43 +17,31 @@ * under the License. */ #include "utils.h" +#include +namespace py = pybind11; -void export_client(); -void export_message(); -void export_producer(); -void export_consumer(); -void export_reader(); -void export_config(); -void export_enums(); -void export_authentication(); -void export_schema(); -void export_cryptoKeyReader(); -void export_exceptions(); +using Module = py::module_; -PyObject* get_exception_class(Result result); +void export_client(Module& m); +void export_message(Module& m); +void export_producer(Module& m); +void export_consumer(Module& m); +void export_reader(Module& m); +void export_config(Module& m); +void export_enums(Module& m); +void export_authentication(Module& m); +void export_schema(Module& m); +void export_exceptions(Module& m); -static void translateException(const PulsarException& ex) { - std::string err = "Pulsar error: "; - err += strResult(ex._result); - PyErr_SetString(get_exception_class(ex._result), err.c_str()); -} - -BOOST_PYTHON_MODULE(_pulsar) { - py::register_exception_translator(translateException); - - // Initialize thread support so that we can grab the GIL mutex - // from pulsar library threads - PyEval_InitThreads(); - - export_client(); - export_message(); - export_producer(); - export_consumer(); - export_reader(); - export_config(); - export_enums(); - export_authentication(); - export_schema(); - export_cryptoKeyReader(); - export_exceptions(); +PYBIND11_MODULE(_pulsar, m) { + export_exceptions(m); + export_client(m); + export_message(m); + export_producer(m); + export_consumer(m); + export_reader(m); + export_config(m); + export_enums(m); + export_authentication(m); + export_schema(m); } diff --git a/src/reader.cc b/src/reader.cc index 70873c8..7194c29 100644 --- a/src/reader.cc +++ b/src/reader.cc @@ -17,6 +17,9 @@ * under the License. */ #include "utils.h" +#include + +namespace py = pybind11; Message Reader_readNext(Reader& reader) { Message msg; @@ -83,11 +86,11 @@ void Reader_seek_timestamp(Reader& reader, uint64_t timestamp) { bool Reader_is_connected(Reader& reader) { return reader.isConnected(); } -void export_reader() { - using namespace boost::python; +void export_reader(py::module_& m) { + using namespace py; - class_("Reader", no_init) - .def("topic", &Reader::getTopic, return_value_policy()) + class_(m, "Reader") + .def("topic", &Reader::getTopic, return_value_policy::copy) .def("read_next", &Reader_readNext) .def("read_next", &Reader_readNextTimeout) .def("has_message_available", &Reader_hasMessageAvailable) diff --git a/src/schema.cc b/src/schema.cc index cdfcda6..94c6774 100644 --- a/src/schema.cc +++ b/src/schema.cc @@ -17,12 +17,16 @@ * under the License. */ #include "utils.h" +#include -void export_schema() { - using namespace boost::python; +namespace py = pybind11; - class_("SchemaInfo", init()) +void export_schema(py::module_& m) { + using namespace py; + + class_(m, "SchemaInfo") + .def(init()) .def("schema_type", &SchemaInfo::getSchemaType) - .def("name", &SchemaInfo::getName, return_value_policy()) - .def("schema", &SchemaInfo::getSchema, return_value_policy()); + .def("name", &SchemaInfo::getName, return_value_policy::copy) + .def("schema", &SchemaInfo::getSchema, return_value_policy::copy); } diff --git a/src/utils.h b/src/utils.h index 3cbf98a..fb700c6 100644 --- a/src/utils.h +++ b/src/utils.h @@ -19,24 +19,18 @@ #pragma once -#include - #include #include +#include +#include +#include "exceptions.h" #include "future.h" using namespace pulsar; -namespace py = boost::python; - -struct PulsarException { - Result _result; - PulsarException(Result res) : _result(res) {} -}; - inline void CHECK_RESULT(Result res) { if (res != ResultOk) { - throw PulsarException(res); + raiseException(res); } } @@ -92,13 +86,6 @@ inline void waitForAsyncValue(std::function func, T& value) { } } -struct AuthenticationWrapper { - AuthenticationPtr auth; - - AuthenticationWrapper(); - AuthenticationWrapper(const std::string& dynamicLibPath, const std::string& authParamsString); -}; - struct CryptoKeyReaderWrapper { CryptoKeyReaderPtr cryptoKeyReader; diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 30f451d..feba877 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -50,8 +50,7 @@ from urllib.request import urlopen, Request TM = 10000 # Do not wait forever in tests -CERTS_DIR = os.path.dirname(__file__) + "/test-conf/" - +CERTS_DIR = os.path.dirname(os.path.abspath(__file__)) + "/test-conf/" def doHttpPost(url, data): req = Request(url, data.encode()) diff --git a/vcpkg b/vcpkg deleted file mode 160000 index 2537044..0000000 --- a/vcpkg +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 253704407ae68efa37bf8f5b59b3e06dd40d3d3f diff --git a/vcpkg-3.10.json b/vcpkg-3.10.json deleted file mode 100644 index fdb1128..0000000 --- a/vcpkg-3.10.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "name": "pulsar-python", - "version": "3.0.0", - "description": "Pulsar Python SDK (Python 3.10)", - "dependencies": [ - { - "name": "boost-python", - "version>=": "1.79.0" - } - ], - "builtin-baseline": "c266859544a3cdcfd952d218039c55a268863740" -} diff --git a/vcpkg-3.7.json b/vcpkg-3.7.json deleted file mode 100644 index 1b7b846..0000000 --- a/vcpkg-3.7.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "name": "pulsar-python", - "version": "3.0.0", - "description": "Pulsar Python SDK (Python 3.7)", - "dependencies": [ - { - "name": "boost-python", - "version>=": "1.76.0" - } - ], - "builtin-baseline": "35312384e7701760ed7855961eff41a63f9cc379", - "overrides": [ - { - "name": "python3", - "version": "3.7.3" - } - ] -} diff --git a/vcpkg-3.8.json b/vcpkg-3.8.json deleted file mode 100644 index b210e87..0000000 --- a/vcpkg-3.8.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "name": "pulsar-python", - "version": "3.0.0", - "description": "Pulsar Python SDK (Python 3.8)", - "dependencies": [ - { - "name": "boost-python", - "version>=": "1.76.0" - } - ], - "builtin-baseline": "35312384e7701760ed7855961eff41a63f9cc379", - "overrides": [ - { - "name": "python3", - "version": "3.8.3" - } - ] -} diff --git a/vcpkg-3.9.json b/vcpkg-3.9.json deleted file mode 100644 index 250a1ee..0000000 --- a/vcpkg-3.9.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "name": "pulsar-python", - "version": "3.0.0", - "description": "Pulsar Python SDK (Python 3.9)", - "dependencies": [ - { - "name": "boost-python", - "version>=": "1.76.0" - } - ], - "builtin-baseline": "35312384e7701760ed7855961eff41a63f9cc379" -} diff --git a/vcpkg.json b/vcpkg.json deleted file mode 100644 index ef5c7c2..0000000 --- a/vcpkg.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name": "pulsar-python", - "version": "3.0.0", - "description": "Pulsar Python SDK", - "dependencies": ["boost-python"] -} From 0b76a69320c1dba8bf8e1b9a1e0a6c1754e484c1 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Nov 2022 10:56:32 +0800 Subject: [PATCH 2/3] Use Pulsar C++ client 3.1.0-candidate-1 --- .github/workflows/ci-build-release-wheels.yaml | 17 +++++++++-------- .github/workflows/ci-pr-validation.yaml | 15 +++++---------- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/.github/workflows/ci-build-release-wheels.yaml b/.github/workflows/ci-build-release-wheels.yaml index 7ddf8ec..f0b2e9b 100644 --- a/.github/workflows/ci-build-release-wheels.yaml +++ b/.github/workflows/ci-build-release-wheels.yaml @@ -151,11 +151,6 @@ jobs: strategy: fail-fast: false matrix: - windows: - - name: 'Windows x64' - os: windows-2019 - arch: '-A x64' - triplet: 'x64-windows' python: - {version: '3.7'} - {version: '3.8'} @@ -175,8 +170,8 @@ jobs: mkdir -p ${{ env.PULSAR_CPP_DIR }} cd ${{ env.PULSAR_CPP_DIR }} # TODO: switch to official releases - curl -O -L https://github.com/BewareMyPower/pulsar-client-cpp/releases/download/v3.1.0-rc-20221028/x64-windows-static.zip - unzip -q x64-windows-static.zip + curl -O -L https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-3.1.0-candidate-1/x64-windows-static.tar.gz + tar zxf x64-windows-static.tar.gz ls -l ${{ env.PULSAR_CPP_DIR }} - name: Configure CMake @@ -188,7 +183,7 @@ jobs: tar zxf v${PYBIND11_VERSION}.tar.gz rm -rf pybind11 mv pybind11-${PYBIND11_VERSION} pybind11 - cmake -B build ${{ matrix.windows.arch }} \ + cmake -B build -A x64 \ -DCMAKE_PREFIX_PATH=${{ env.PULSAR_CPP_DIR }} \ -DLINK_STATIC=ON @@ -200,3 +195,9 @@ jobs: python setup.py bdist_wheel python -m pip install ./dist/*.whl python -c 'import pulsar; c = pulsar.Client("pulsar://localhost:6650"); c.close()' + + - name: Upload artifacts + uses: actions/upload-artifact@v3 + with: + name: wheel-windows-py${{matrix.python.version}} + path: dist/*.whl diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 13b93ad..0e8ad4f 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -173,9 +173,9 @@ jobs: run: pkg/mac/build-mac-wheels.sh ${{matrix.py.version}} windows-wheels: - name: "Python ${{ matrix.python.version }} Wheel on ${{ matrix.windows.name }}" + name: "Python ${{ matrix.python.version }} Wheel on Windows x64" needs: unit-tests - runs-on: ${{ matrix.windows.os }} + runs-on: windows-2019 timeout-minutes: 120 env: @@ -183,11 +183,6 @@ jobs: strategy: fail-fast: false matrix: - windows: - - name: 'Windows x64' - os: windows-2022 - arch: '-A x64' - triplet: 'x64-windows' python: - version: '3.7' - version: '3.8' @@ -207,8 +202,8 @@ jobs: mkdir -p ${{ env.PULSAR_CPP_DIR }} cd ${{ env.PULSAR_CPP_DIR }} # TODO: switch to official releases - curl -O -L https://github.com/BewareMyPower/pulsar-client-cpp/releases/download/v3.1.0-rc-20221028/${{ matrix.windows.triplet }}-static.zip - unzip -q ${{ matrix.windows.triplet }}-static.zip + curl -O -L https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-3.1.0-candidate-1/x64-windows-static.tar.gz + tar zxf x64-windows-static.tar.gz ls -l ${{ env.PULSAR_CPP_DIR }} - name: Configure CMake @@ -220,7 +215,7 @@ jobs: tar zxf v${PYBIND11_VERSION}.tar.gz rm -rf pybind11 mv pybind11-${PYBIND11_VERSION} pybind11 - cmake -B build ${{ matrix.windows.arch }} \ + cmake -B build -A x64 \ -DCMAKE_PREFIX_PATH=${{ env.PULSAR_CPP_DIR }} \ -DLINK_STATIC=ON From 5c87f087556e41ba21c7bc4b3229b09a6a6bd58e Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Thu, 24 Nov 2022 12:21:12 +0800 Subject: [PATCH 3/3] fix ci --- .github/workflows/ci-build-release-wheels.yaml | 1 + .github/workflows/ci-pr-validation.yaml | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/ci-build-release-wheels.yaml b/.github/workflows/ci-build-release-wheels.yaml index f0b2e9b..7998398 100644 --- a/.github/workflows/ci-build-release-wheels.yaml +++ b/.github/workflows/ci-build-release-wheels.yaml @@ -172,6 +172,7 @@ jobs: # TODO: switch to official releases curl -O -L https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-3.1.0-candidate-1/x64-windows-static.tar.gz tar zxf x64-windows-static.tar.gz + mv x64-windows-static/* . ls -l ${{ env.PULSAR_CPP_DIR }} - name: Configure CMake diff --git a/.github/workflows/ci-pr-validation.yaml b/.github/workflows/ci-pr-validation.yaml index 0e8ad4f..5bba956 100644 --- a/.github/workflows/ci-pr-validation.yaml +++ b/.github/workflows/ci-pr-validation.yaml @@ -204,6 +204,7 @@ jobs: # TODO: switch to official releases curl -O -L https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp/pulsar-client-cpp-3.1.0-candidate-1/x64-windows-static.tar.gz tar zxf x64-windows-static.tar.gz + mv x64-windows-static/* . ls -l ${{ env.PULSAR_CPP_DIR }} - name: Configure CMake