diff --git a/build-support/install-cpp-client.sh b/build-support/install-cpp-client.sh index ac493c1..4c4608f 100755 --- a/build-support/install-cpp-client.sh +++ b/build-support/install-cpp-client.sh @@ -34,7 +34,7 @@ cd /tmp # Fetch the client binaries ## TODO: Fetch from official release once it's available -BASE_URL=https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp-${CPP_CLIENT_VERSION}-candidate-1 +BASE_URL=https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp-${CPP_CLIENT_VERSION}-candidate-2 UNAME_ARCH=$(uname -m) if [ $UNAME_ARCH == 'aarch64' ]; then diff --git a/pkg/mac/build-pulsar-cpp.sh b/pkg/mac/build-pulsar-cpp.sh index c1eb514..ccf7eb4 100755 --- a/pkg/mac/build-pulsar-cpp.sh +++ b/pkg/mac/build-pulsar-cpp.sh @@ -38,7 +38,7 @@ DEPS_PREFIX=${CACHE_DIR_DEPS}/install ############################################################################### ## TODO: Fetch from official release -curl -O -L https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp-${PULSAR_CPP_VERSION}-candidate-1/apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz +curl -O -L https://dist.apache.org/repos/dist/dev/pulsar/pulsar-client-cpp-${PULSAR_CPP_VERSION}-candidate-2/apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz tar xfz apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}.tar.gz if [ ! -f apache-pulsar-client-cpp-${PULSAR_CPP_VERSION}/.done ]; then diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 1d29d90..5474c16 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -103,7 +103,8 @@ def send_callback(res, msg_id): import logging import _pulsar -from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType # noqa: F401 +from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ + LoggerLevel # noqa: F401 from pulsar.exceptions import * @@ -468,7 +469,6 @@ def __init__(self, service_url, _check_type_or_none(str, tls_trust_certs_file_path, 'tls_trust_certs_file_path') _check_type(bool, tls_allow_insecure_connection, 'tls_allow_insecure_connection') _check_type(bool, tls_validate_hostname, 'tls_validate_hostname') - _check_type_or_none(logging.Logger, logger, 'logger') _check_type_or_none(str, listener_name, 'listener_name') conf = _pulsar.ClientConfiguration() @@ -481,7 +481,16 @@ def __init__(self, service_url, conf.concurrent_lookup_requests(concurrent_lookup_requests) if log_conf_file_path: conf.log_conf_file_path(log_conf_file_path) - conf.set_logger(self._prepare_logger(logger) if logger else None) + + if isinstance(logger, logging.Logger): + conf.set_logger(self._prepare_logger(logger)) + elif isinstance(logger, ConsoleLogger): + conf.set_console_logger(logger.log_level) + elif isinstance(logger, FileLogger): + conf.set_file_logger(logger.log_level, logger.log_file) + elif logger is not None: + raise ValueError("Logger is expected to be either None, logger.Logger, pulsar.ConsoleLogger or pulsar.FileLogger") + if listener_name: conf.listener_name(listener_name) if use_tls or service_url.startswith('pulsar+ssl://') or service_url.startswith('https://'): @@ -1426,6 +1435,36 @@ def __init__(self, public_key_path, private_key_path): _check_type(str, private_key_path, 'private_key_path') self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) + +class ConsoleLogger: + """ + Logger that writes on standard output + + **Args** + + * `log_level`: The logging level. eg: `pulsar.LoggerLevel.Info` + """ + def __init__(self, log_level=_pulsar.LoggerLevel.Info): + _check_type(_pulsar.LoggerLevel, log_level, 'log_level') + self.log_level = log_level + + +class FileLogger: + """ + Logger that writes into a file + + **Args** + + * `log_level`: The logging level. eg: `pulsar.LoggerLevel.Info` + * `log_file`: The file where to write the logs + """ + def __init__(self, log_level, log_file): + _check_type(_pulsar.LoggerLevel, log_level, 'log_level') + _check_type(str, log_file, 'log_file') + self.log_level = log_level + self.log_file = log_file + + def _check_type(var_type, var, name): if not isinstance(var, var_type): raise ValueError("Argument %s is expected to be of type '%s' and not '%s'" diff --git a/src/config.cc b/src/config.cc index c312648..3a9c14b 100644 --- a/src/config.cc +++ b/src/config.cc @@ -162,6 +162,17 @@ static ClientConfiguration& ClientConfiguration_setLogger(ClientConfiguration& c return conf; } +static ClientConfiguration& ClientConfiguration_setConsoleLogger(ClientConfiguration& conf, Logger::Level level) { + conf.setLogger(new ConsoleLoggerFactory(level)); + return conf; +} + +static ClientConfiguration& ClientConfiguration_setFileLogger(ClientConfiguration& conf, Logger::Level level, + const std::string& logFile) { + conf.setLogger(new FileLoggerFactory(level, logFile)); + return conf; +} + void export_config() { using namespace boost::python; @@ -190,7 +201,10 @@ void export_config() { return_self<>()) .def("tls_validate_hostname", &ClientConfiguration::setValidateHostName, return_self<>()) .def("listener_name", &ClientConfiguration::setListenerName, return_self<>()) - .def("set_logger", &ClientConfiguration_setLogger, return_self<>()); + .def("set_logger", &ClientConfiguration_setLogger, return_self<>()) + .def("set_console_logger", &ClientConfiguration_setConsoleLogger, return_self<>()) + .def("set_file_logger", &ClientConfiguration_setFileLogger, return_self<>()); + class_("ProducerConfiguration") .def("producer_name", &ProducerConfiguration::getProducerName, diff --git a/src/enums.cc b/src/enums.cc index 92f08a1..733ee3e 100644 --- a/src/enums.cc +++ b/src/enums.cc @@ -111,4 +111,10 @@ void export_enums() { enum_("BatchingType", "Supported batching types") .value("Default", ProducerConfiguration::DefaultBatching) .value("KeyBased", ProducerConfiguration::KeyBasedBatching); + + enum_("LoggerLevel") + .value("Debug", Logger::LEVEL_DEBUG) + .value("Info", Logger::LEVEL_INFO) + .value("Warn", Logger::LEVEL_WARN) + .value("Error", Logger::LEVEL_ERROR); } diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 8cc2c55..314f6f6 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -1238,6 +1238,30 @@ def test_json_schema_encode(self): second_encode = schema.encode(record) self.assertEqual(first_encode, second_encode) + def test_configure_log_level(self): + client = pulsar.Client( + service_url="pulsar://localhost:6650", + logger=pulsar.ConsoleLogger(pulsar.LoggerLevel.Debug) + ) + + producer = client.create_producer( + topic='test_log_level' + ) + + producer.send(b'hello') + + def test_configure_log_to_file(self): + client = pulsar.Client( + service_url="pulsar://localhost:6650", + logger=pulsar.FileLogger(pulsar.LoggerLevel.Debug, 'test.log') + ) + + producer = client.create_producer( + topic='test_log_to_file' + ) + + producer.send(b'hello') + def test_logger_thread_leaks(self): def _do_connect(close): logger = logging.getLogger(str(threading.current_thread().ident))