From 988c4c498f57f75a53418b102126743a3ee9e34c Mon Sep 17 00:00:00 2001
From: nick <nickjobszh@gmail.com>
Date: Sat, 24 Apr 2021 23:36:23 +0200
Subject: [PATCH] add oauthbearer failure func

---
 simple_kafka_client.c              | 27 +++++++++++++++++++++++++++
 simple_kafka_client.stub.php       |  2 ++
 simple_kafka_client_arginfo.h      |  8 +++++++-
 tests/set_oauthbearer_failure.phpt | 23 +++++++++++++++++++++++
 4 files changed, 59 insertions(+), 1 deletion(-)
 create mode 100644 tests/set_oauthbearer_failure.phpt

diff --git a/simple_kafka_client.c b/simple_kafka_client.c
index eaf7df2..997b2cc 100644
--- a/simple_kafka_client.c
+++ b/simple_kafka_client.c
@@ -261,6 +261,33 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes)
 }
 /* }}} */
 
+/* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerTokenFailure(string $errorString)
+   The token refresh callback or event handler should invoke this method upon failure. */
+ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure)
+{
+    char *error_string;
+    size_t error_string_len;
+    kafka_object *intern;
+    rd_kafka_resp_err_t err;
+
+    ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
+        Z_PARAM_STRING(error_string, error_string_len)
+    ZEND_PARSE_PARAMETERS_END();
+
+    intern = get_kafka_object(getThis());
+    if (!intern) {
+        return;
+    }
+
+    err = rd_kafka_oauthbearer_set_token_failure(intern->rk, error_string);
+
+    if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
+        zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
+        return;
+    }
+}
+/* }}} */
+
 #define COPY_CONSTANT(name) \
     REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT)
 
diff --git a/simple_kafka_client.stub.php b/simple_kafka_client.stub.php
index de742ab..0f9e3c4 100644
--- a/simple_kafka_client.stub.php
+++ b/simple_kafka_client.stub.php
@@ -13,4 +13,6 @@ public function getOutQLen(): int {}
     public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {}
 
     public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {}
+
+    public function setOAuthBearerTokenFailure(string $errorString): void {}
 }
diff --git a/simple_kafka_client_arginfo.h b/simple_kafka_client_arginfo.h
index cc393b6..85c57b8 100644
--- a/simple_kafka_client_arginfo.h
+++ b/simple_kafka_client_arginfo.h
@@ -1,5 +1,5 @@
 /* This is a generated file, edit the .stub.php file instead.
- * Stub hash: e61ec0821ea47152b2ce6b7116ec791c0c712a73 */
+ * Stub hash: 54f0c76165212e21416f46325d0a52b0b7fce4a8 */
 
 ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
 	ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0)
@@ -23,11 +23,16 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_of
 	ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
 ZEND_END_ARG_INFO()
 
+ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, 0, 1, IS_VOID, 0)
+	ZEND_ARG_TYPE_INFO(0, errorString, IS_STRING, 0)
+ZEND_END_ARG_INFO()
+
 
 ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata);
 ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen);
 ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets);
 ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes);
+ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure);
 
 
 static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = {
@@ -35,5 +40,6 @@ static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = {
 	ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC)
 	ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
 	ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC)
+	ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, ZEND_ACC_PUBLIC)
 	ZEND_FE_END
 };
diff --git a/tests/set_oauthbearer_failure.phpt b/tests/set_oauthbearer_failure.phpt
new file mode 100644
index 0000000..fe8f6cc
--- /dev/null
+++ b/tests/set_oauthbearer_failure.phpt
@@ -0,0 +1,23 @@
+--TEST--
+Produce, consume
+--SKIPIF--
+<?php
+require __DIR__ . '/integration-tests-check.php';
+--FILE--
+<?php
+require __DIR__ . '/integration-tests-check.php';
+
+$conf = new SimpleKafkaClient\Configuration();
+$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
+$conf->set('security.protocol', 'SASL_PLAINTEXT');
+$conf->set('sasl.mechanisms', 'OAUTHBEARER');
+
+$conf->setErrorCb(function($kafka, $errorCode, $errorString) {
+    var_dump($errorString);
+});
+
+$producer = new SimpleKafkaClient\Producer($conf);
+$producer->setOAuthBearerTokenFailure('something');
+$producer->poll(-1);
+--EXPECT--
+string(51) "Failed to acquire SASL OAUTHBEARER token: something"