Skip to content

Commit 216c157

Browse files
thep2pWondertanvgonkivs
authored
Adds RPC inspector (#8)
* perf: use msgio pooled buffers for received msgs (libp2p#500) * perf: use pooled buffers for message writes (libp2p#507) * improve handling of dead peers (libp2p#508) * chore: ignore signing keys during WithLocalPublication publishing (libp2p#497) * adds app specific rpc handler Co-authored-by: Hlib Kanunnikov <[email protected]> Co-authored-by: Viacheslav <[email protected]>
1 parent 1c99052 commit 216c157

File tree

3 files changed

+62
-27
lines changed

3 files changed

+62
-27
lines changed

comm.go

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
package pubsub
22

33
import (
4-
"bufio"
54
"context"
5+
"encoding/binary"
66
"io"
77
"time"
88

9+
"github.com/gogo/protobuf/proto"
10+
pool "github.com/libp2p/go-buffer-pool"
11+
"github.com/multiformats/go-varint"
12+
913
"github.com/libp2p/go-libp2p/core/network"
1014
"github.com/libp2p/go-libp2p/core/peer"
15+
"github.com/libp2p/go-msgio"
1116

1217
pb "github.com/libp2p/go-libp2p-pubsub/pb"
13-
14-
"github.com/libp2p/go-msgio/protoio"
15-
16-
"github.com/gogo/protobuf/proto"
1718
)
1819

1920
// get the initial RPC containing all of our subscriptions to send to new peers
@@ -60,11 +61,11 @@ func (p *PubSub) handleNewStream(s network.Stream) {
6061
p.inboundStreamsMx.Unlock()
6162
}()
6263

63-
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
64+
r := msgio.NewVarintReaderSize(s, p.maxMessageSize)
6465
for {
65-
rpc := new(RPC)
66-
err := r.ReadMsg(&rpc.RPC)
66+
msgbytes, err := r.ReadMsg()
6767
if err != nil {
68+
r.ReleaseMsg(msgbytes)
6869
if err != io.EOF {
6970
s.Reset()
7071
log.Debugf("error reading rpc from %s: %s", s.Conn().RemotePeer(), err)
@@ -77,6 +78,15 @@ func (p *PubSub) handleNewStream(s network.Stream) {
7778
return
7879
}
7980

81+
rpc := new(RPC)
82+
err = rpc.Unmarshal(msgbytes)
83+
r.ReleaseMsg(msgbytes)
84+
if err != nil {
85+
s.Reset()
86+
log.Warnf("bogus rpc from %s: %s", s.Conn().RemotePeer(), err)
87+
return
88+
}
89+
8090
rpc.from = peer
8191
select {
8292
case p.incoming <- rpc:
@@ -115,7 +125,7 @@ func (p *PubSub) handleNewPeer(ctx context.Context, pid peer.ID, outgoing <-chan
115125
}
116126

117127
go p.handleSendingMessages(ctx, s, outgoing)
118-
go p.handlePeerEOF(ctx, s)
128+
go p.handlePeerDead(s)
119129
select {
120130
case p.newPeerStream <- s:
121131
case <-ctx.Done():
@@ -131,32 +141,33 @@ func (p *PubSub) handleNewPeerWithBackoff(ctx context.Context, pid peer.ID, back
131141
}
132142
}
133143

134-
func (p *PubSub) handlePeerEOF(ctx context.Context, s network.Stream) {
144+
func (p *PubSub) handlePeerDead(s network.Stream) {
135145
pid := s.Conn().RemotePeer()
136-
r := protoio.NewDelimitedReader(s, p.maxMessageSize)
137-
rpc := new(RPC)
138-
for {
139-
err := r.ReadMsg(&rpc.RPC)
140-
if err != nil {
141-
p.notifyPeerDead(pid)
142-
return
143-
}
144146

147+
_, err := s.Read([]byte{0})
148+
if err == nil {
145149
log.Debugf("unexpected message from %s", pid)
146150
}
151+
152+
s.Reset()
153+
p.notifyPeerDead(pid)
147154
}
148155

149156
func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, outgoing <-chan *RPC) {
150-
bufw := bufio.NewWriter(s)
151-
wc := protoio.NewDelimitedWriter(bufw)
157+
writeRpc := func(rpc *RPC) error {
158+
size := uint64(rpc.Size())
159+
160+
buf := pool.Get(varint.UvarintSize(size) + int(size))
161+
defer pool.Put(buf)
152162

153-
writeMsg := func(msg proto.Message) error {
154-
err := wc.WriteMsg(msg)
163+
n := binary.PutUvarint(buf, size)
164+
_, err := rpc.MarshalTo(buf[n:])
155165
if err != nil {
156166
return err
157167
}
158168

159-
return bufw.Flush()
169+
_, err = s.Write(buf)
170+
return err
160171
}
161172

162173
defer s.Close()
@@ -167,7 +178,7 @@ func (p *PubSub) handleSendingMessages(ctx context.Context, s network.Stream, ou
167178
return
168179
}
169180

170-
err := writeMsg(&rpc.RPC)
181+
err := writeRpc(rpc)
171182
if err != nil {
172183
s.Reset()
173184
log.Debugf("writing message to %s: %s", s.Conn().RemotePeer(), err)

gossipsub.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,9 +217,9 @@ func NewGossipSubWithRouter(ctx context.Context, h host.Host, rt PubSubRouter, o
217217
}
218218

219219
// DefaultGossipSubRouter returns a new GossipSubRouter with default parameters.
220-
func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
220+
func DefaultGossipSubRouter(h host.Host, opts ...func(*GossipSubRouter)) *GossipSubRouter {
221221
params := DefaultGossipSubParams()
222-
return &GossipSubRouter{
222+
rt := &GossipSubRouter{
223223
peers: make(map[peer.ID]protocol.ID),
224224
mesh: make(map[string]map[peer.ID]struct{}),
225225
fanout: make(map[string]map[peer.ID]struct{}),
@@ -237,6 +237,18 @@ func DefaultGossipSubRouter(h host.Host) *GossipSubRouter {
237237
tagTracer: newTagTracer(h.ConnManager()),
238238
params: params,
239239
}
240+
241+
for _, opt := range opts {
242+
opt(rt)
243+
}
244+
245+
return rt
246+
}
247+
248+
func WithAppSpecificRpcInspector(inspector func(peer.ID, *RPC) bool) func(*GossipSubRouter) {
249+
return func(rt *GossipSubRouter) {
250+
rt.appSpecificRpcInspector = inspector
251+
}
240252
}
241253

242254
// DefaultGossipSubParams returns the default gossip sub parameters
@@ -474,6 +486,11 @@ type GossipSubRouter struct {
474486
// number of heartbeats since the beginning of time; this allows us to amortize some resource
475487
// clean up -- eg backoff clean up.
476488
heartbeatTicks uint64
489+
490+
// appSpecificRpcInspector is an auxiliary that may be set by the application to inspect incoming RPCs prior to
491+
// processing them. The inspector is invoked on an accepted RPC right prior to handling it.
492+
// The return value of the inspector function is a boolean indicating whether the RPC should be processed or not.
493+
appSpecificRpcInspector func(peer.ID, *RPC) bool
477494
}
478495

479496
type connectInfo struct {
@@ -614,6 +631,13 @@ func (gs *GossipSubRouter) HandleRPC(rpc *RPC) {
614631
return
615632
}
616633

634+
if gs.appSpecificRpcInspector != nil {
635+
// check if the RPC is allowed by the external inspector
636+
if accept := gs.appSpecificRpcInspector(rpc.from, rpc); !accept {
637+
return // reject the RPC
638+
}
639+
}
640+
617641
iwant := gs.handleIHave(rpc.from, ctl)
618642
ihave := gs.handleIWant(rpc.from, ctl)
619643
prune := gs.handleGraft(rpc.from, ctl)

topic.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error
239239
}
240240
}
241241

242-
if pub.customKey != nil {
242+
if pub.customKey != nil && !pub.local {
243243
key, pid = pub.customKey()
244244
if key == nil {
245245
return ErrNilSignKey

0 commit comments

Comments
 (0)