Skip to content

omarkj/swift-kafka-client

This branch is 1 commit ahead of, 24 commits behind swift-server/swift-kafka-client:main.

Folders and files

NameName
Last commit message
Last commit date

Latest commit

Ómar Kjartan YasinÓmar K. Yasin
Ómar Kjartan Yasin
and
Ómar K. Yasin
Jan 5, 2024
6ed10e1 · Jan 5, 2024

History

85 Commits
Nov 24, 2023
Jan 5, 2024
Dec 1, 2023
Nov 24, 2023
Nov 24, 2023
Aug 7, 2023
May 12, 2023
Nov 24, 2023
May 12, 2023
May 10, 2023
Jun 8, 2023
Aug 7, 2023
Sep 21, 2023
Jul 15, 2022
Aug 7, 2023
Nov 6, 2023
Aug 14, 2023

Repository files navigation

Swift Kafka Client

The Swift Kafka Client library provides a convenient way to interact with Apache Kafka by leveraging Swift's new concurrency features. This package wraps the native librdkafka library.

Adding Kafka as a Dependency

To use the Kafka library in a SwiftPM project, add the following line to the dependencies in your Package.swift file:

.package(url: "https://github.com/swift-server/swift-kafka-client", branch: "main")

Include "Kafka" as a dependency for your executable target:

.target(name: "<target>", dependencies: [
    .product(name: "Kafka", package: "swift-kafka-client"),
]),

Finally, add import Kafka to your source code.

Usage

Kafka should be used within a Swift Service Lifecycle ServiceGroup for proper startup and shutdown handling. Both the KafkaProducer and the KafkaConsumer implement the Service protocol.

Producer API

The send(_:) method of KafkaProducer returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the events AsyncSequence. Each acknowledgement indicates that producing a message was successful or returns an error.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress])

let (producer, events) = try KafkaProducer.makeProducerWithEvents(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [producer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task sending message and receiving events
    group.addTask {
        let messageID = try producer.send(
            KafkaProducerMessage(
                topic: "topic-name",
                value: "Hello, World!"
            )
        )

        for await event in events {
            switch event {
            case .deliveryReports(let deliveryReports):
                // Check what messages the delivery reports belong to
            default:
                break // Ignore any other events
            }
        }
    }
}

Consumer API

After initializing the KafkaConsumer with a topic-partition pair to read from, messages can be consumed using the messages AsyncSequence.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .partition(
        KafkaPartition(rawValue: 0),
        topic: "topic-name"
    ),
    bootstrapBrokerAddresses: [brokerAddress]
)

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
        }
    }
}

Consumer Groups

Kafka also allows users to subscribe to an array of topics as part of a consumer group.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
let configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
)

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
        }
    }
}

Manual commits

By default, the KafkaConsumer automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.

let brokerAddress = KafkaConfiguration.BrokerAddress(host: "localhost", port: 9092)
var configuration = KafkaConsumerConfiguration(
    consumptionStrategy: .group(id: "example-group", topics: ["topic-name"]),
    bootstrapBrokerAddresses: [brokerAddress]
)
configuration.isAutoCommitEnabled = false

let consumer = try KafkaConsumer(
    configuration: configuration,
    logger: logger
)

await withThrowingTaskGroup(of: Void.self) { group in

    // Run Task
    group.addTask {
        let serviceGroup = ServiceGroup(
            services: [consumer],
            configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
            logger: logger
        )
        try await serviceGroup.run()
    }

    // Task receiving messages
    group.addTask {
        for try await message in consumer.messages {
            // Do something with message
            // ...
            try await consumer.commitSync(message)
        }
    }
}

Security Mechanisms

Both the KafkaProducer and the KafkaConsumer can be configured to use different security mechanisms.

Plaintext

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .plaintext

TLS

var configuration = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
configuration.securityProtocol = .tls()

SASL

let kerberosConfiguration = KafkaConfiguration.SASLMechanism.KerberosConfiguration(
    keytab: "KEYTAB_FILE"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslPlaintext(
    mechanism: .gssapi(kerberosConfiguration: kerberosConfiguration)
)

SASL + TLS

let saslMechanism = KafkaConfiguration.SASLMechanism.scramSHA256(
    username: "USERNAME",
    password: "PASSWORD"
)

var config = KafkaProducerConfiguration(bootstrapBrokerAddresses: [])
config.securityProtocol = .saslTLS(
    saslMechanism: saslMechanism
)

librdkafka

The Package depends on the librdkafka library, which is included as a git submodule. It has source files that are excluded in Package.swift.

Development Setup

We provide a Docker environment for this package. This will automatically start a local Kafka server and run the package tests.

docker-compose -f docker/docker-compose.yaml run test

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Swift 94.1%
  • C 2.9%
  • Shell 2.7%
  • Dockerfile 0.3%