Skip to content

Commit afff390

Browse files
authored
Add a new PartitionStream type for polling a single partition
1 parent 1f6bf53 commit afff390

File tree

3 files changed

+134
-277
lines changed

3 files changed

+134
-277
lines changed

examples/partition_consumer.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,28 @@ async fn consume_and_print(brokers: &str, group_id: &str, topic: &str) {
8686
let mut streams = partitions
8787
.into_iter()
8888
.map(|p| {
89-
consumer
90-
.split_partition_queue(topic, p)
91-
.expect("Failed to get partition")
92-
.map(move |msg| (p, msg))
89+
(
90+
p,
91+
consumer
92+
.split_partition_queue(topic, p)
93+
.expect("Failed to get partition"),
94+
)
9395
})
9496
.collect::<Vec<_>>();
9597

96-
let mut mine = streams.pop().unwrap();
98+
let mine = streams.pop().unwrap();
9799
tokio::spawn(async move {
98-
while let Some((p, _msg)) = mine.next().await {
100+
let (p, mine) = mine;
101+
let mut mine = mine.stream();
102+
while let Some(_msg) = mine.next().await {
99103
info!("hello from the other side of partition: {}", p);
100104
}
101105
});
102106

107+
let streams = streams
108+
.iter()
109+
.map(|(p, s)| s.stream().map(move |m| (p, m)))
110+
.collect::<Vec<_>>();
103111
let mut streams = futures::stream::select_all(streams);
104112
loop {
105113
while let Some((p, msg)) = streams.next().await {
@@ -128,7 +136,7 @@ async fn consume_and_print(brokers: &str, group_id: &str, topic: &str) {
128136
info!(" Header {:#?}: {:?}", header.0, header.1);
129137
}
130138
}
131-
// consumer.commit_message(&m, CommitMode::Async).unwrap();
139+
consumer.commit_message(&m, CommitMode::Async).unwrap();
132140
}
133141
}
134142
}

src/consumer/stream_consumer.rs

Lines changed: 119 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::consumer::{
2424
};
2525
use crate::error::{KafkaError, KafkaResult};
2626
use crate::groups::GroupList;
27-
use crate::message::{ArcMessage, BorrowedMessage, Message};
27+
use crate::message::BorrowedMessage;
2828
use crate::metadata::Metadata;
2929
use crate::statistics::Statistics;
3030
use crate::topic_partition_list::{Offset, TopicPartitionList};
@@ -118,151 +118,84 @@ where
118118
}
119119
}
120120

121-
enum ConsumerRef<'a, C: ConsumerContext + 'static, R = DefaultRuntime> {
122-
Borrowed(&'a StreamConsumer<C, R>),
123-
Arc(Arc<BaseConsumer<StreamConsumerContext<C>>>),
124-
}
125-
126121
/// A stream of messages from a [`StreamConsumer`].
127122
///
128123
/// See the documentation of [`StreamConsumer::stream`] for details.
129-
pub struct MessageStream<'a, C, R = DefaultRuntime, M = BorrowedMessage<'a>>
124+
pub struct MessageStream<'a, C, R = DefaultRuntime>
130125
where
131126
C: ConsumerContext + 'static,
132-
M: Message,
133127
{
134-
consumer: ConsumerRef<'a, C, R>,
135-
partition: Option<PartitionQueue<StreamConsumerContext<C>>>,
128+
consumer: &'a StreamConsumer<C, R>,
129+
partition: Option<&'a PartitionQueue<StreamConsumerContext<C>>>,
136130
slot: usize,
137-
message: PhantomData<M>,
138131
}
139132

140-
impl<'a, C, R, M> MessageStream<'a, C, R, M>
133+
impl<'a, C, R> MessageStream<'a, C, R>
141134
where
142135
C: ConsumerContext + 'static,
143-
M: Message,
144136
{
145-
fn context(&self) -> &StreamConsumerContext<C> {
146-
match &self.consumer {
147-
ConsumerRef::Borrowed(s) => s.base.context(),
148-
ConsumerRef::Arc(s) => s.context().as_ref(),
149-
}
150-
}
151-
152-
fn client_ptr(&self) -> *mut RDKafka {
153-
match &self.consumer {
154-
ConsumerRef::Borrowed(s) => s.client().native_ptr(),
155-
ConsumerRef::Arc(s) => s.client().native_ptr(),
156-
}
157-
}
158-
159-
fn set_waker(&self, waker: Waker) {
160-
let mut wakers = self.context().wakers.lock().expect("lock poisoned");
161-
wakers[self.slot].replace(waker);
162-
}
163-
}
164-
165-
impl<'a, C, R> MessageStream<'a, C, R, BorrowedMessage<'a>>
166-
where
167-
C: ConsumerContext + 'static,
168-
{
169-
fn new(consumer: &'a StreamConsumer<C, R>) -> MessageStream<'a, C, R, BorrowedMessage<'a>> {
137+
fn new(consumer: &'a StreamConsumer<C, R>) -> MessageStream<'a, C, R> {
170138
let slot = {
171139
let context = consumer.base.context();
172140
let mut wakers = context.wakers.lock().expect("lock poisoned");
173141
wakers.insert(None)
174142
};
175143
MessageStream {
176-
consumer: ConsumerRef::Borrowed(consumer),
144+
consumer,
145+
partition: None,
177146
slot,
178-
message: Default::default(),
179-
partition: Default::default(),
180-
}
181-
}
182-
183-
fn poll(&self) -> Option<KafkaResult<BorrowedMessage<'a>>> {
184-
match self.consumer {
185-
ConsumerRef::Borrowed(s) => unsafe {
186-
NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll(self.client_ptr(), 0))
187-
.map(|p| BorrowedMessage::from_consumer(p, s))
188-
},
189-
ConsumerRef::Arc(_) => unreachable!(),
190147
}
191148
}
192-
}
193149

194-
impl<'a, C, R> MessageStream<'a, C, R, ArcMessage<C>>
195-
where
196-
C: ConsumerContext + 'static,
197-
{
198-
fn new(
199-
consumer: &StreamConsumer<C, R>,
200-
topic: &str,
201-
partition: i32,
202-
) -> Option<MessageStream<'a, C, R, ArcMessage<C>>> {
150+
fn new_partition(
151+
consumer: &'a StreamConsumer<C, R>,
152+
partition: &'a PartitionQueue<StreamConsumerContext<C>>,
153+
) -> MessageStream<'a, C, R> {
203154
let slot = {
204-
let context = consumer.context();
155+
let context = consumer.base.context();
205156
let mut wakers = context.wakers.lock().expect("lock poisoned");
206157
wakers.insert(None)
207158
};
208-
consumer
209-
.base
210-
.split_partition_queue(topic, partition)
211-
.map(|partition| MessageStream {
212-
consumer: ConsumerRef::Arc(consumer.base.clone()),
213-
slot,
214-
message: Default::default(),
215-
partition: Some(partition),
216-
})
217-
}
218-
219-
fn poll(&self) -> Option<KafkaResult<ArcMessage<StreamConsumerContext<C>>>> {
220-
match (&self.consumer, &self.partition) {
221-
(ConsumerRef::Arc(s), Some(p)) => unsafe {
222-
rdsys::rd_kafka_poll(s.client().native_ptr(), 0);
223-
NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(p.native_queue().ptr(), 0))
224-
.map(|ptr| ArcMessage::from_consumer(ptr, &s))
225-
},
226-
_ => unreachable!(),
159+
MessageStream {
160+
consumer,
161+
partition: Some(partition),
162+
slot,
227163
}
228164
}
229-
}
230165

231-
impl<'a, C, R> Stream for MessageStream<'a, C, R, BorrowedMessage<'a>>
232-
where
233-
C: ConsumerContext + 'a,
234-
{
235-
type Item = KafkaResult<BorrowedMessage<'a>>;
166+
fn context(&self) -> &StreamConsumerContext<C> {
167+
self.consumer.base.context()
168+
}
236169

237-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
238-
// If there is a message ready, yield it immediately to avoid the
239-
// taking the lock in `self.set_waker`.
240-
if let Some(message) = self.poll() {
241-
return Poll::Ready(Some(message));
242-
}
170+
fn client_ptr(&self) -> *mut RDKafka {
171+
self.consumer.client().native_ptr()
172+
}
243173

244-
// Otherwise, we need to wait for a message to become available. Store
245-
// the waker so that we are woken up if the queue flips from non-empty
246-
// to empty. We have to store the waker repatedly in case this future
247-
// migrates between tasks.
248-
self.set_waker(cx.waker().clone());
174+
fn set_waker(&self, waker: Waker) {
175+
let mut wakers = self.context().wakers.lock().expect("lock poisoned");
176+
wakers[self.slot].replace(waker);
177+
}
249178

250-
// Check whether a new message became available after we installed the
251-
// waker. This avoids a race where `poll` returns None to indicate that
252-
// the queue is empty, but the queue becomes non-empty before we've
253-
// installed the waker.
254-
match self.poll() {
255-
None => Poll::Pending,
256-
Some(message) => Poll::Ready(Some(message)),
179+
fn poll(&self) -> Option<KafkaResult<BorrowedMessage<'a>>> {
180+
match self.partition {
181+
Some(p) => unsafe {
182+
rdsys::rd_kafka_poll(self.client_ptr(), 0);
183+
NativePtr::from_ptr(rdsys::rd_kafka_consume_queue(p.native_queue().ptr(), 0))
184+
.map(|ptr| BorrowedMessage::from_consumer(ptr, p))
185+
},
186+
None => unsafe {
187+
NativePtr::from_ptr(rdsys::rd_kafka_consumer_poll(self.client_ptr(), 0))
188+
.map(|p| BorrowedMessage::from_consumer(p, self.consumer))
189+
},
257190
}
258191
}
259192
}
260193

261-
impl<'a, C, R> Stream for MessageStream<'a, C, R, ArcMessage<C>>
194+
impl<'a, C, R> Stream for MessageStream<'a, C, R>
262195
where
263196
C: ConsumerContext + 'a,
264197
{
265-
type Item = KafkaResult<ArcMessage<StreamConsumerContext<C>>>;
198+
type Item = KafkaResult<BorrowedMessage<'a>>;
266199

267200
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
268201
// If there is a message ready, yield it immediately to avoid the
@@ -288,10 +221,9 @@ where
288221
}
289222
}
290223

291-
impl<'a, C, R, M> Drop for MessageStream<'a, C, R, M>
224+
impl<'a, C, R> Drop for MessageStream<'a, C, R>
292225
where
293226
C: ConsumerContext + 'static,
294-
M: Message,
295227
{
296228
fn drop(&mut self) {
297229
let mut wakers = self.context().wakers.lock().expect("lock poisoned");
@@ -404,22 +336,13 @@ where
404336
///
405337
/// If you want multiple independent views of a Kafka topic, create multiple
406338
/// consumers, not multiple message streams.
407-
pub fn stream<'a>(&'a self) -> MessageStream<'_, C, R, BorrowedMessage<'a>> {
408-
MessageStream::<_, _, BorrowedMessage<'a>>::new(self)
409-
}
410-
411-
/// TODO(nemo_supremo)
412-
pub fn split_partition_queue(
413-
&self,
414-
topic: &str,
415-
partition: i32,
416-
) -> Option<MessageStream<'static, C, R, ArcMessage<C>>> {
417-
MessageStream::<_, _, ArcMessage<C>>::new(self, topic, partition)
339+
pub fn stream(&self) -> MessageStream<'_, C, R> {
340+
MessageStream::new(self)
418341
}
419342

420343
/// Constructs a stream that yields messages from this consumer.
421344
#[deprecated = "use the more clearly named \"StreamConsumer::stream\" method instead"]
422-
pub fn start<'a>(&'a self) -> MessageStream<'_, C, R, BorrowedMessage<'a>> {
345+
pub fn start(&self) -> MessageStream<'_, C, R> {
423346
self.stream()
424347
}
425348

@@ -445,6 +368,27 @@ where
445368
.await
446369
.expect("kafka streams never terminate")
447370
}
371+
372+
/// Splits messages for the specified partition into their own queue and
373+
/// returns an async PartitionQueue.
374+
///
375+
/// If the `topic` or `partition` is invalid, returns `None`.
376+
///
377+
/// Unlike [`PartitionQueue`], [`PartitionStream`] does not
378+
/// require the [`StreamConsumer`] to be polled seperately.
379+
///
380+
/// Note that calling [`Consumer::assign`] will deactivate any existing
381+
/// partition queues. You will need to call this method for every partition
382+
/// that should be split after every call to `assign`.
383+
pub fn split_partition_queue(
384+
self: &Arc<Self>,
385+
topic: &str,
386+
partition: i32,
387+
) -> Option<PartitionStream<C, R>> {
388+
self.base
389+
.split_partition_queue(topic, partition)
390+
.map(|partition| PartitionStream::new(self, partition))
391+
}
448392
}
449393

450394
impl<C, R> Consumer<StreamConsumerContext<C>> for StreamConsumer<C, R>
@@ -597,3 +541,57 @@ where
597541
self.base.resume(partitions)
598542
}
599543
}
544+
545+
/// An asynchronous message queue for a single partition.
546+
pub struct PartitionStream<C = DefaultConsumerContext, R = DefaultRuntime>
547+
where
548+
C: ConsumerContext,
549+
{
550+
consumer: Arc<StreamConsumer<C, R>>,
551+
queue: PartitionQueue<StreamConsumerContext<C>>,
552+
}
553+
554+
impl<C, R> PartitionStream<C, R>
555+
where
556+
C: ConsumerContext + 'static,
557+
{
558+
fn new(
559+
consumer: &Arc<StreamConsumer<C, R>>,
560+
queue: PartitionQueue<StreamConsumerContext<C>>,
561+
) -> Self {
562+
PartitionStream {
563+
consumer: consumer.clone(),
564+
queue,
565+
}
566+
}
567+
568+
/// Constructs a stream that yields messages from this consumer.
569+
///
570+
/// It is legal to have multiple live message streams for the same consumer,
571+
/// and to move those message streams across threads. Note, however, that
572+
/// the message streams share the same underlying state. A message received
573+
/// by the consumer will be delivered to only one of the live message
574+
/// streams. If you seek the underlying consumer, all message streams
575+
/// created from the consumer will begin to draw messages from the new
576+
/// position of the consumer.
577+
///
578+
/// If you want multiple independent views of a Kafka topic, create multiple
579+
/// consumers, not multiple message streams.
580+
pub fn stream(&self) -> MessageStream<'_, C, R> {
581+
MessageStream::new_partition(self.consumer.as_ref(), &self.queue)
582+
}
583+
584+
/// Receives the next message from the stream.
585+
///
586+
/// This method will block until the next message is available or an error
587+
/// occurs. It is legal to call `recv` from multiple threads simultaneously.
588+
///
589+
/// Note that this method is exactly as efficient as constructing a
590+
/// single-use message stream and extracting one message from it:
591+
pub async fn recv(&self) -> Result<BorrowedMessage<'_>, KafkaError> {
592+
self.stream()
593+
.next()
594+
.await
595+
.expect("kafka streams never terminate")
596+
}
597+
}

0 commit comments

Comments
 (0)