Skip to content

Commit 18a87a1

Browse files
authored
feat: support for gRPC stream APIs (#12)
1 parent c73b33f commit 18a87a1

File tree

6 files changed

+502
-442
lines changed

6 files changed

+502
-442
lines changed

go.mod

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ go 1.11
44

55
require (
66
github.com/golang/protobuf v1.5.0
7-
golang.org/x/net v0.0.0-20190311183353-d8887717615a
87
google.golang.org/grpc v1.32.0
9-
google.golang.org/protobuf v1.27.1
8+
google.golang.org/protobuf v1.27.1 // indirect
109
)

go.sum

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,15 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
88
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
99
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
1010
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
11-
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
1211
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
1312
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
1413
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
1514
github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4=
1615
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
1716
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
1817
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
18+
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
1919
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
20-
github.com/nic-chen/grpc_server_example v0.0.0-20190811083654-8dcb31121ca8 h1:niL8VDYTDycXeBTWoXmrzIlDHrTSXdgp4SA0vLoQGj8=
2120
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
2221
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
2322
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@@ -42,16 +41,14 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
4241
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
4342
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
4443
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
44+
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
4545
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
4646
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
4747
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
48-
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc=
4948
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
5049
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
5150
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
5251
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
53-
google.golang.org/grpc v1.22.1 h1:/7cs52RnTJmD43s3uxzlq2U7nqVTd/37viQwMrMNlOM=
54-
google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
5552
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
5653
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
5754
google.golang.org/grpc v1.32.0 h1:zWTV+LMdc3kaiJMSTOFz2UgSBgx8RNQoTGiZu3fR9S0=

main.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
"crypto/x509"
2828
"flag"
2929
"fmt"
30+
"io"
3031
"io/ioutil"
3132
"log"
3233
"net"
@@ -115,6 +116,59 @@ func (s *server) Plus(ctx context.Context, in *pb.PlusRequest) (*pb.PlusReply, e
115116
return &pb.PlusReply{Result: in.A + in.B}, nil
116117
}
117118

119+
// SayHelloServerStream streams HelloReply back to the client.
120+
func (s *server) SayHelloServerStream(req *pb.HelloRequest, stream pb.Greeter_SayHelloServerStreamServer) error {
121+
log.Printf("Received server side stream req: %v\n", req)
122+
123+
// Say Hello 5 times.
124+
for i := 0; i < 5; i++ {
125+
if err := stream.Send(&pb.HelloReply{
126+
Message: fmt.Sprintf("Hello %s", req.Name),
127+
}); err != nil {
128+
return status.Errorf(codes.Unavailable, "Unable to stream request back to client: %v", err)
129+
}
130+
}
131+
return nil
132+
}
133+
134+
// SayHelloClientStream receives a stream of HelloRequest from a client.
135+
func (s *server) SayHelloClientStream(stream pb.Greeter_SayHelloClientStreamServer) error {
136+
log.Println("SayHello client side streaming has been initiated.")
137+
cache := ""
138+
for {
139+
req, err := stream.Recv()
140+
if err == io.EOF {
141+
return stream.SendAndClose(&pb.HelloReply{Message: cache})
142+
}
143+
if err != nil {
144+
return status.Errorf(codes.Unavailable, "Failed to read client stream: %v", err)
145+
}
146+
cache = fmt.Sprintf("%sHello %s!", cache, req.Name)
147+
}
148+
}
149+
150+
// SayHelloBidirectionalStream establishes a bidirectional stream with the client.
151+
func (s *server) SayHelloBidirectionalStream(stream pb.Greeter_SayHelloBidirectionalStreamServer) error {
152+
log.Println("SayHello bidirectional streaming has been initiated.")
153+
154+
for {
155+
req, err := stream.Recv()
156+
if err == io.EOF {
157+
return stream.Send(&pb.HelloReply{Message: "stream ended"})
158+
}
159+
if err != nil {
160+
return status.Errorf(codes.Unavailable, "Failed to read client stream: %v", err)
161+
}
162+
163+
// A small 0.5 sec sleep
164+
time.Sleep(500 * time.Millisecond)
165+
166+
if err := stream.Send(&pb.HelloReply{Message: fmt.Sprintf("Hello %s", req.Name)}); err != nil {
167+
return status.Errorf(codes.Unknown, "Failed to stream response back to client: %v", err)
168+
}
169+
}
170+
}
171+
118172
func main() {
119173
flag.Parse()
120174

@@ -182,7 +236,7 @@ func main() {
182236
}()
183237
}
184238

185-
signals := make(chan os.Signal, 1)
239+
signals := make(chan os.Signal)
186240
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)
187241
sig := <-signals
188242
log.Printf("get signal %s, exit\n", sig.String())

0 commit comments

Comments
 (0)