@@ -3,11 +3,11 @@ use rdkafka::{
3
3
config:: RDKafkaLogLevel ,
4
4
consumer:: { Consumer , StreamConsumer } ,
5
5
metadata:: Metadata ,
6
- ClientConfig ,
6
+ ClientConfig , TopicPartitionList ,
7
7
} ;
8
8
use tabwriter:: TabWriter ;
9
9
10
- pub fn get_metadata ( config : AppConfig ) {
10
+ pub fn get_metadata ( _config : AppConfig ) {
11
11
let stream = ClientConfig :: new ( )
12
12
// .set("group.id", &config.metadata_kafka.group_id)
13
13
// .set("bootstrap.servers", &config.metadata_kafka.brokers)
@@ -19,9 +19,21 @@ pub fn get_metadata(config: AppConfig) {
19
19
. create :: < StreamConsumer > ( )
20
20
. expect ( "Consumer creation failed" ) ;
21
21
let metadata = Consumer :: fetch_metadata ( & stream, None , rdkafka:: util:: Timeout :: Never ) . unwrap ( ) ;
22
+ let position = Consumer :: position ( & stream) . unwrap ( ) ;
23
+ print_positions ( & position) ;
22
24
print_topics ( & metadata) ;
23
25
}
24
26
27
+ fn print_positions ( position : & TopicPartitionList ) {
28
+ let inner = position
29
+ . elements ( )
30
+ . iter ( )
31
+ . map ( |x| x. topic ( ) . to_string ( ) )
32
+ . collect :: < Vec < _ > > ( )
33
+ . join ( "\n " ) ;
34
+ println ! ( "{}" , inner) ;
35
+ }
36
+
25
37
fn print_topics ( metadata : & Metadata ) {
26
38
// NOTE: Can be optimized by precalculating the required memory, I think.
27
39
let mut tw = TabWriter :: new ( vec ! [ ] ) ;
@@ -33,7 +45,7 @@ fn print_topics(metadata: &Metadata) {
33
45
. map ( |( idx, metadata_topic) | {
34
46
format ! (
35
47
"{}.\t {}\t {}\t {}" ,
36
- idx,
48
+ idx + 1 ,
37
49
metadata_topic. name( ) ,
38
50
metadata_topic. partitions( ) . len( ) ,
39
51
metadata_topic. error( ) . map( |_| "true" ) . unwrap_or( "false" )
0 commit comments