Skip to content

Commit e931027

Browse files
authored
Remove Arc from base_consumer in StreamConsumer
1 parent afff390 commit e931027

File tree

2 files changed

+46
-16
lines changed

2 files changed

+46
-16
lines changed

src/consumer/base_consumer.rs

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
use std::cmp;
44
use std::ffi::CString;
5+
use std::marker::PhantomData;
56
use std::mem;
67
use std::os::raw::c_void;
78
use std::ptr;
@@ -73,7 +74,10 @@ unsafe extern "C" fn native_message_queue_nonempty_cb<C: ConsumerContext>(
7374
(*context).message_queue_nonempty_callback();
7475
}
7576

76-
unsafe fn enable_nonempty_callback<C: ConsumerContext>(queue: &NativeQueue, context: &Arc<C>) {
77+
pub(super) unsafe fn enable_nonempty_callback<C: ConsumerContext>(
78+
queue: &NativeQueue,
79+
context: &Arc<C>,
80+
) {
7781
rdsys::rd_kafka_queue_cb_event_enable(
7882
queue.ptr(),
7983
Some(native_message_queue_nonempty_cb::<C>),
@@ -610,20 +614,27 @@ where
610614
}
611615

612616
/// A message queue for a single partition.
613-
pub struct PartitionQueue<C>
617+
pub struct PartitionQueue<C, D = BaseConsumer<C>>
614618
where
615619
C: ConsumerContext,
620+
D: Consumer<C>,
616621
{
617-
consumer: Arc<BaseConsumer<C>>,
622+
consumer: Arc<D>,
618623
queue: NativeQueue,
624+
context: PhantomData<C>,
619625
}
620626

621-
impl<C> PartitionQueue<C>
627+
impl<C, D> PartitionQueue<C, D>
622628
where
623629
C: ConsumerContext,
630+
D: Consumer<C>,
624631
{
625-
fn new(consumer: Arc<BaseConsumer<C>>, queue: NativeQueue) -> Self {
626-
PartitionQueue { consumer, queue }
632+
pub(super) fn new(consumer: Arc<D>, queue: NativeQueue) -> Self {
633+
PartitionQueue {
634+
consumer,
635+
queue,
636+
context: Default::default(),
637+
}
627638
}
628639

629640
pub(super) fn native_queue(&self) -> &NativeQueue {

src/consumer/stream_consumer.rs

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
//! High-level consumers with a [`Stream`](futures::Stream) interface.
22
3+
use std::ffi::CString;
34
use std::marker::PhantomData;
45
use std::pin::Pin;
6+
use std::ptr;
57
use std::sync::{Arc, Mutex};
68
use std::task::{Context, Poll, Waker};
79
use std::time::Duration;
@@ -16,9 +18,9 @@ use slab::Slab;
1618
use rdkafka_sys as rdsys;
1719
use rdkafka_sys::types::*;
1820

19-
use crate::client::{Client, ClientContext, NativeClient};
21+
use crate::client::{Client, ClientContext, NativeClient, NativeQueue};
2022
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext, RDKafkaLogLevel};
21-
use crate::consumer::base_consumer::{BaseConsumer, PartitionQueue};
23+
use crate::consumer::base_consumer::{enable_nonempty_callback, BaseConsumer, PartitionQueue};
2224
use crate::consumer::{
2325
CommitMode, Consumer, ConsumerContext, ConsumerGroupMetadata, DefaultConsumerContext, Rebalance,
2426
};
@@ -126,7 +128,7 @@ where
126128
C: ConsumerContext + 'static,
127129
{
128130
consumer: &'a StreamConsumer<C, R>,
129-
partition: Option<&'a PartitionQueue<StreamConsumerContext<C>>>,
131+
partition: Option<&'a PartitionQueue<StreamConsumerContext<C>, StreamConsumer<C, R>>>,
130132
slot: usize,
131133
}
132134

@@ -149,7 +151,7 @@ where
149151

150152
fn new_partition(
151153
consumer: &'a StreamConsumer<C, R>,
152-
partition: &'a PartitionQueue<StreamConsumerContext<C>>,
154+
partition: &'a PartitionQueue<StreamConsumerContext<C>, StreamConsumer<C, R>>,
153155
) -> MessageStream<'a, C, R> {
154156
let slot = {
155157
let context = consumer.base.context();
@@ -250,7 +252,7 @@ pub struct StreamConsumer<C = DefaultConsumerContext, R = DefaultRuntime>
250252
where
251253
C: ConsumerContext,
252254
{
253-
base: Arc<BaseConsumer<StreamConsumerContext<C>>>,
255+
base: BaseConsumer<StreamConsumerContext<C>>,
254256
_shutdown_trigger: oneshot::Sender<()>,
255257
_runtime: PhantomData<R>,
256258
}
@@ -313,7 +315,7 @@ where
313315
});
314316

315317
Ok(StreamConsumer {
316-
base: Arc::new(base),
318+
base,
317319
_shutdown_trigger: shutdown_trigger,
318320
_runtime: PhantomData,
319321
})
@@ -385,8 +387,25 @@ where
385387
topic: &str,
386388
partition: i32,
387389
) -> Option<PartitionStream<C, R>> {
388-
self.base
389-
.split_partition_queue(topic, partition)
390+
let topic = match CString::new(topic) {
391+
Ok(topic) => topic,
392+
Err(_) => return None,
393+
};
394+
let queue = unsafe {
395+
NativeQueue::from_ptr(rdsys::rd_kafka_queue_get_partition(
396+
self.base.client().native_ptr(),
397+
topic.as_ptr(),
398+
partition,
399+
))
400+
};
401+
queue
402+
.map(|queue| {
403+
unsafe {
404+
enable_nonempty_callback(&queue, self.base.client().context());
405+
rdsys::rd_kafka_queue_forward(queue.ptr(), ptr::null_mut());
406+
}
407+
PartitionQueue::new(self.clone(), queue)
408+
})
390409
.map(|partition| PartitionStream::new(self, partition))
391410
}
392411
}
@@ -548,7 +567,7 @@ where
548567
C: ConsumerContext,
549568
{
550569
consumer: Arc<StreamConsumer<C, R>>,
551-
queue: PartitionQueue<StreamConsumerContext<C>>,
570+
queue: PartitionQueue<StreamConsumerContext<C>, StreamConsumer<C, R>>,
552571
}
553572

554573
impl<C, R> PartitionStream<C, R>
@@ -557,7 +576,7 @@ where
557576
{
558577
fn new(
559578
consumer: &Arc<StreamConsumer<C, R>>,
560-
queue: PartitionQueue<StreamConsumerContext<C>>,
579+
queue: PartitionQueue<StreamConsumerContext<C>, StreamConsumer<C, R>>,
561580
) -> Self {
562581
PartitionStream {
563582
consumer: consumer.clone(),

0 commit comments

Comments
 (0)