-
Notifications
You must be signed in to change notification settings - Fork 228
/
Copy pathmain.go
93 lines (77 loc) · 2.69 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package main
import (
"context"
"io"
"log"
"time"
"github.com/IBM/sarama"
"github.com/google/uuid"
"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
)
// In order to run this test, look at documentation in https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/README.md
func main() {
saramaConfig := sarama.NewConfig()
saramaConfig.Version = sarama.V2_0_0_0
ctx := context.Background()
// With NewProtocol you can use the same client both to send and receive.
protocol, err := kafka_sarama.NewProtocol([]string{"127.0.0.1:9092"}, saramaConfig, "output-topic", "input-topic")
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer protocol.Close(context.Background())
// Pipe all incoming message, eventually transforming them
go func() {
for {
// Blocking call to wait for new messages from protocol
inputMessage, err := protocol.Receive(ctx)
if err != nil {
if err == io.EOF {
return // Context closed and/or receiver closed
}
log.Printf("Error while receiving a inputMessage: %s", err.Error())
continue
}
defer inputMessage.Finish(nil)
outputMessage := inputMessage
// If encoding is unknown, then the inputMessage is a non cloudevent
// and we need to convert it
if inputMessage.ReadEncoding() == binding.EncodingUnknown {
// We need to get the inputMessage internals
// Because the message could be wrapped by the protocol implementation
// we need to unwrap it and then cast to the message representation
// specific to the protocol
kafkaMessage := binding.UnwrapMessage(inputMessage).(*kafka_sarama.Message)
// Now let's create a new event
event := cloudevents.NewEvent()
event.SetID(uuid.New().String())
event.SetTime(time.Now())
event.SetType("generated.examples")
event.SetSource("https://github.com/cloudevents/sdk-go/samples/kafka/message-handle-non-cloudevents")
err = event.SetData(kafkaMessage.ContentType, kafkaMessage.Value)
if err != nil {
log.Printf("Error while setting the event data: %s", err.Error())
continue
}
outputMessage = binding.ToMessage(&event)
}
// Send outputMessage directly to output-topic
err = protocol.Send(ctx, outputMessage)
if err != nil {
log.Printf("Error while forwarding the inputMessage: %s", err.Error())
}
}
}()
// Start the Kafka Consumer Group invoking OpenInbound()
go func() {
if err := protocol.OpenInbound(ctx); err != nil {
log.Printf("failed to StartHTTPReceiver, %v", err)
}
}()
<-ctx.Done()
}