File tree 1 file changed +35
-0
lines changed
1 file changed +35
-0
lines changed Original file line number Diff line number Diff line change @@ -548,3 +548,38 @@ w := kafka.NewWriter(kafka.WriterConfig{
548
548
Dialer : dialer,
549
549
})
550
550
```
551
+
552
+ #### Reading all messages within a time range
553
+
554
+ ``` go
555
+ startTime := time.Now ().Add (-time.Hour )
556
+ endTime := time.Now ()
557
+ batchSize := int (10e6 ) // 10MB
558
+
559
+ r := kafka.NewReader (kafka.ReaderConfig {
560
+ Brokers : []string {" localhost:9092" },
561
+ Topic : " my-topic1" ,
562
+ Partition : 0 ,
563
+ MinBytes : batchSize,
564
+ MaxBytes : batchSize,
565
+ })
566
+
567
+ r.SetOffsetAt (context.Background (), startTime)
568
+
569
+ for {
570
+ m , err := r.ReadMessage (context.Background ())
571
+
572
+ if err != nil {
573
+ break
574
+ }
575
+ if m.Time .After (endTime) {
576
+ break
577
+ }
578
+ // TODO: process message
579
+ fmt.Printf (" message at offset %d : %s = %s \n " , m.Offset , string (m.Key ), string (m.Value ))
580
+ }
581
+
582
+ if err := r.Close (); err != nil {
583
+ log.Fatal (" failed to close reader:" , err)
584
+ }
585
+ ```
You can’t perform that action at this time.
0 commit comments