Skip to content

Commit 76e3108

Browse files
Initial Commit
1 parent a7aa4a9 commit 76e3108

File tree

2 files changed

+68
-0
lines changed

2 files changed

+68
-0
lines changed
+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.producer.*;
3+
4+
public class AsynchronousProducer {
5+
6+
public static void main(String[] args) throws Exception{
7+
String topicName = "AsynchronousProducerTopic";
8+
String key = "Key1";
9+
String value = "Value-1";
10+
11+
Properties props = new Properties();
12+
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
13+
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
14+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
15+
16+
Producer<String, String> producer = new KafkaProducer <>(props);
17+
18+
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
19+
20+
producer.send(record, new MyProducerCallback());
21+
System.out.println("AsynchronousProducer call completed");
22+
producer.close();
23+
24+
}
25+
26+
}
27+
28+
class MyProducerCallback implements Callback{
29+
30+
@Override
31+
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
32+
if (e != null)
33+
System.out.println("AsynchronousProducer failed with an exception");
34+
else
35+
System.out.println("AsynchronousProducer call Success:");
36+
}
37+
}
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import java.util.*;
2+
import org.apache.kafka.clients.producer.*;
3+
public class SynchronousProducer {
4+
5+
public static void main(String[] args) throws Exception{
6+
7+
String topicName = "SynchronousProducerTopic";
8+
String key = "Key1";
9+
String value = "Value-1";
10+
11+
Properties props = new Properties();
12+
props.put("bootstrap.servers", "localhost:9092,localhost:9093");
13+
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
14+
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
15+
16+
Producer<String, String> producer = new KafkaProducer <>(props);
17+
18+
ProducerRecord<String, String> record = new ProducerRecord<>(topicName,key,value);
19+
20+
try{
21+
RecordMetadata metadata = producer.send(record).get();
22+
System.out.println("Message is sent to Partition no " + metadata.partition() + " and offset " + metadata.offset());
23+
System.out.println("SynchronousProducer Completed with success.");
24+
}catch (Exception e) {
25+
e.printStackTrace();
26+
System.out.println("SynchronousProducer failed with an exception");
27+
}finally{
28+
producer.close();
29+
}
30+
}
31+
}

0 commit comments

Comments
 (0)