Skip to content

Commit 01c4514

Browse files
chore(minor): [sc-2373] allow to get metadata from broker (#23)
* chore(minor): [sc-2373] allow to get metadata from broker * use lazy vars * use map instead of for-loop * rename classes
1 parent 18569c6 commit 01c4514

File tree

4 files changed

+120
-2
lines changed

4 files changed

+120
-2
lines changed

Sources/Kafka/KafkaConsumer.swift

+5-1
Original file line numberDiff line numberDiff line change
@@ -454,6 +454,10 @@ public final class KafkaConsumer: Sendable, Service {
454454
throw KafkaError.client(reason: err)
455455
}
456456
}
457+
458+
public func metadata() async throws -> KafkaMetadata {
459+
try await client().metadata()
460+
}
457461

458462
/// Start the ``KafkaConsumer``.
459463
///
@@ -1058,7 +1062,7 @@ extension KafkaConsumer {
10581062
case .uninitialized:
10591063
fatalError("\(#function) invoked while still in state \(self.state)")
10601064
case .initializing:
1061-
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
1065+
self.state = .finished
10621066
case .running(let client, _, let eventSource):
10631067
self.state = .running(client: client, messagePollLoopState: .finished, eventSource: eventSource)
10641068
case .finishing, .finished:

Sources/Kafka/KafkaMetadata.swift

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import Crdkafka
2+
3+
public final class KafkaMetadata {
4+
private let metadata: UnsafePointer<rd_kafka_metadata>
5+
6+
init(metadata: UnsafePointer<rd_kafka_metadata>) {
7+
self.metadata = metadata
8+
}
9+
10+
deinit {
11+
rd_kafka_metadata_destroy(metadata)
12+
}
13+
14+
public private(set) lazy var topics = {
15+
(0..<Int(self.metadata.pointee.topic_cnt)).map { KafkaTopicMetadata(metadata: self, topic: self.metadata.pointee.topics[$0]) }
16+
}()
17+
}
18+
19+
// must be a class to allow mutating lazy vars, otherwise require struct copies
20+
public final class KafkaTopicMetadata {
21+
private let metadata: KafkaMetadata // retain metadata
22+
private let topic: rd_kafka_metadata_topic
23+
24+
init(metadata: KafkaMetadata, topic: rd_kafka_metadata_topic) {
25+
self.metadata = metadata
26+
self.topic = topic
27+
}
28+
29+
public private(set) lazy var name = {
30+
String(cString: self.topic.topic)
31+
}()
32+
33+
public private(set) lazy var partitions = {
34+
(0..<Int(self.topic.partition_cnt)).map { KafkaPartitionMetadata(metadata: self.metadata, partition: topic.partitions[$0]) }
35+
}()
36+
}
37+
38+
public struct KafkaPartitionMetadata {
39+
private let metadata: KafkaMetadata // retain metadata
40+
private let partition: rd_kafka_metadata_partition
41+
42+
init(metadata: KafkaMetadata, partition: rd_kafka_metadata_partition) {
43+
self.metadata = metadata
44+
self.partition = partition
45+
}
46+
47+
var id: Int {
48+
Int(partition.id)
49+
}
50+
51+
var replicasCount: Int {
52+
Int(partition.replica_cnt)
53+
}
54+
}

Sources/Kafka/RDKafka/RDKafkaClient.swift

+16
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,22 @@ final class RDKafkaClient: Sendable {
806806
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) async throws -> T) async rethrows -> T {
807807
return try await body(self.kafkaHandle)
808808
}
809+
810+
func metadata() async throws -> KafkaMetadata {
811+
let queue = DispatchQueue(label: "com.swift-server.swift-kafka.metadata")
812+
return try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<KafkaMetadata, Error>) in
813+
queue.async {
814+
var metadata: UnsafePointer<rd_kafka_metadata>?
815+
let error = rd_kafka_metadata(self.kafkaHandle, 1, nil, &metadata, -1)
816+
guard error == RD_KAFKA_RESP_ERR_NO_ERROR,
817+
let metadata else {
818+
continuation.resume(throwing: KafkaError.rdKafkaError(wrapping: error))
819+
return
820+
}
821+
continuation.resume(returning: KafkaMetadata(metadata: metadata))
822+
}
823+
}
824+
}
809825

810826
func initTransactions(timeout: Duration) async throws {
811827
let result = await performBlockingCall(queue: gcdQueue) {

Tests/IntegrationTests/KafkaTests.swift

+45-1
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,6 @@ final class KafkaTests: XCTestCase {
758758
}
759759

760760
group.addTask {
761-
let logger = Logger.kafkaTest
762761
var messages = [KafkaConsumerMessage]()
763762
for try await record in consumer.messages {
764763
guard !record.eof else {
@@ -774,7 +773,52 @@ final class KafkaTests: XCTestCase {
774773
await consumerServiceGroup.triggerGracefulShutdown()
775774
}
776775
}
776+
777+
func testMetadata() async throws {
778+
let uniqueTopic = try createUniqueTopic(partitions: 7)
779+
defer {
780+
// delete topic
781+
var basicConfig = KafkaConsumerConfiguration(
782+
consumptionStrategy: .group(id: "no-group", topics: []),
783+
bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
784+
)
785+
basicConfig.broker.addressFamily = .v4
777786

787+
let client = try? RDKafkaClient.makeClient(
788+
type: .consumer,
789+
configDictionary: basicConfig.dictionary,
790+
events: [],
791+
logger: .kafkaTest
792+
)
793+
try? client?._deleteTopic(uniqueTopic, timeout: 10 * 1000)
794+
}
795+
796+
var consumerConfig = KafkaConsumerConfiguration(
797+
consumptionStrategy: .group(
798+
id: "test",
799+
topics: []
800+
),
801+
bootstrapBrokerAddresses: [self.bootstrapBrokerAddress]
802+
)
803+
consumerConfig.autoOffsetReset = .beginning // Read topic from beginning
804+
consumerConfig.broker.addressFamily = .v4
805+
consumerConfig.enablePartitionEof = true
806+
807+
let consumer = try KafkaConsumer(
808+
configuration: consumerConfig,
809+
logger: .kafkaTest
810+
)
811+
let metadata = try await consumer.metadata()
812+
let topic = metadata.topics.first { topic in
813+
topic.name == uniqueTopic
814+
}
815+
guard let topic else {
816+
XCTFail("Topic was not found")
817+
return
818+
}
819+
let partitions = topic.partitions
820+
XCTAssertEqual(partitions.count, 7)
821+
}
778822
// MARK: - Helpers
779823

780824
func createUniqueTopic(partitions: Int32 = -1 /* default num for cluster */) throws -> String {

0 commit comments

Comments
 (0)