Skip to content

Commit 9df53ea

Browse files
Initial Commit
1 parent 76e3108 commit 9df53ea

File tree

2 files changed

+64
-0
lines changed

2 files changed

+64
-0
lines changed
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.producer.*;
3+
import org.apache.kafka.common.*;
4+
import org.apache.kafka.common.utils.*;
5+
import org.apache.kafka.common.record.*;
6+
7+
public class SensorPartitioner implements Partitioner {
8+
9+
private String speedSensorName;
10+
11+
public void configure(Map<String, ?> configs) {
12+
speedSensorName = configs.get("speed.sensor.name").toString();
13+
14+
}
15+
16+
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
17+
18+
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
19+
int numPartitions = partitions.size();
20+
int sp = (int)Math.abs(numPartitions*0.3);
21+
int p=0;
22+
23+
if ( (keyBytes == null) || (!(key instanceof String)) )
24+
throw new InvalidRecordException("All messages must have sensor name as key");
25+
26+
if ( ((String)key).equals(speedSensorName) )
27+
p = Utils.toPositive(Utils.murmur2(valueBytes)) % sp;
28+
else
29+
p = Utils.toPositive(Utils.murmur2(keyBytes)) % (numPartitions-sp) + sp ;
30+
31+
System.out.println("Key = " + (String)key + " Partition = " + p );
32+
return p;
33+
}
34+
public void close() {}
35+
36+
}

ProducerExamples/SensorProducer.java

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.producer.*;
3+
public class SensorProducer {
4+
5+
public static void main(String[] args) throws Exception{
6+
7+
String topicName = "SensorTopic";
8+
9+
Properties props = new Properties();
10+
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
11+
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
12+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
13+
props.put("partitioner.class", "SensorPartitioner");
14+
props.put("speed.sensor.name", "TSS");
15+
16+
Producer<String, String> producer = new KafkaProducer <>(props);
17+
18+
for (int i=0 ; i<10 ; i++)
19+
producer.send(new ProducerRecord<>(topicName,"SSP"+i,"500"+i));
20+
21+
for (int i=0 ; i<10 ; i++)
22+
producer.send(new ProducerRecord<>(topicName,"TSS","500"+i));
23+
24+
producer.close();
25+
26+
System.out.println("SimpleProducer Completed.");
27+
}
28+
}

0 commit comments

Comments
 (0)