Skip to content

Commit b472b14

Browse files
authored
add oauthbearer failure func (#47)
1 parent c6f0317 commit b472b14

4 files changed

+59
-1
lines changed

simple_kafka_client.c

+27
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,33 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes)
261261
}
262262
/* }}} */
263263

264+
/* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerTokenFailure(string $errorString)
265+
The token refresh callback or event handler should invoke this method upon failure. */
266+
ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure)
267+
{
268+
char *error_string;
269+
size_t error_string_len;
270+
kafka_object *intern;
271+
rd_kafka_resp_err_t err;
272+
273+
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
274+
Z_PARAM_STRING(error_string, error_string_len)
275+
ZEND_PARSE_PARAMETERS_END();
276+
277+
intern = get_kafka_object(getThis());
278+
if (!intern) {
279+
return;
280+
}
281+
282+
err = rd_kafka_oauthbearer_set_token_failure(intern->rk, error_string);
283+
284+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
285+
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
286+
return;
287+
}
288+
}
289+
/* }}} */
290+
264291
#define COPY_CONSTANT(name) \
265292
REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT)
266293

simple_kafka_client.stub.php

+2
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,6 @@ public function getOutQLen(): int {}
1313
public function queryWatermarkOffsets(string $topic, int $partition, int &$low, int &$high, int $timeoutMs): void {}
1414

1515
public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {}
16+
17+
public function setOAuthBearerTokenFailure(string $errorString): void {}
1618
}

simple_kafka_client_arginfo.h

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: e61ec0821ea47152b2ce6b7116ec791c0c712a73 */
2+
* Stub hash: 54f0c76165212e21416f46325d0a52b0b7fce4a8 */
33

44
ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
55
ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0)
@@ -23,17 +23,23 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_of
2323
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
2424
ZEND_END_ARG_INFO()
2525

26+
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, 0, 1, IS_VOID, 0)
27+
ZEND_ARG_TYPE_INFO(0, errorString, IS_STRING, 0)
28+
ZEND_END_ARG_INFO()
29+
2630

2731
ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata);
2832
ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen);
2933
ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets);
3034
ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes);
35+
ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure);
3136

3237

3338
static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = {
3439
ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC)
3540
ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC)
3641
ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
3742
ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC)
43+
ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, ZEND_ACC_PUBLIC)
3844
ZEND_FE_END
3945
};

tests/set_oauthbearer_failure.phpt

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
--TEST--
2+
Produce, consume
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
--FILE--
7+
<?php
8+
require __DIR__ . '/integration-tests-check.php';
9+
10+
$conf = new SimpleKafkaClient\Configuration();
11+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
12+
$conf->set('security.protocol', 'SASL_PLAINTEXT');
13+
$conf->set('sasl.mechanisms', 'OAUTHBEARER');
14+
15+
$conf->setErrorCb(function($kafka, $errorCode, $errorString) {
16+
var_dump($errorString);
17+
});
18+
19+
$producer = new SimpleKafkaClient\Producer($conf);
20+
$producer->setOAuthBearerTokenFailure('something');
21+
$producer->poll(-1);
22+
--EXPECT--
23+
string(51) "Failed to acquire SASL OAUTHBEARER token: something"

0 commit comments

Comments
 (0)