Skip to content

Commit e133058

Browse files
committed
constructor: break down libp2p logic
License: MIT Signed-off-by: Łukasz Magiera <[email protected]>
1 parent 3a12454 commit e133058

13 files changed

+613
-508
lines changed

core/node/groups.go

-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ var BaseLibP2P = fx.Options(
3838
)
3939

4040
func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
41-
4241
// parse ConnMgr config
4342

4443
grace := config.DefaultConnMgrGracePeriod

core/node/libp2p/addrs.go

+117
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package libp2p
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/libp2p/go-libp2p"
7+
host "github.com/libp2p/go-libp2p-host"
8+
p2pbhost "github.com/libp2p/go-libp2p/p2p/host/basic"
9+
mafilter "github.com/libp2p/go-maddr-filter"
10+
ma "github.com/multiformats/go-multiaddr"
11+
mamask "github.com/whyrusleeping/multiaddr-filter"
12+
)
13+
14+
func AddrFilters(filters []string) func() (opts Libp2pOpts, err error) {
15+
return func() (opts Libp2pOpts, err error) {
16+
for _, s := range filters {
17+
f, err := mamask.NewMask(s)
18+
if err != nil {
19+
return opts, fmt.Errorf("incorrectly formatted address filter in config: %s", s)
20+
}
21+
opts.Opts = append(opts.Opts, libp2p.FilterAddresses(f))
22+
}
23+
return opts, nil
24+
}
25+
}
26+
27+
func makeAddrsFactory(announce []string, noAnnounce []string) (p2pbhost.AddrsFactory, error) {
28+
var annAddrs []ma.Multiaddr
29+
for _, addr := range announce {
30+
maddr, err := ma.NewMultiaddr(addr)
31+
if err != nil {
32+
return nil, err
33+
}
34+
annAddrs = append(annAddrs, maddr)
35+
}
36+
37+
filters := mafilter.NewFilters()
38+
noAnnAddrs := map[string]bool{}
39+
for _, addr := range noAnnounce {
40+
f, err := mamask.NewMask(addr)
41+
if err == nil {
42+
filters.AddDialFilter(f)
43+
continue
44+
}
45+
maddr, err := ma.NewMultiaddr(addr)
46+
if err != nil {
47+
return nil, err
48+
}
49+
noAnnAddrs[string(maddr.Bytes())] = true
50+
}
51+
52+
return func(allAddrs []ma.Multiaddr) []ma.Multiaddr {
53+
var addrs []ma.Multiaddr
54+
if len(annAddrs) > 0 {
55+
addrs = annAddrs
56+
} else {
57+
addrs = allAddrs
58+
}
59+
60+
var out []ma.Multiaddr
61+
for _, maddr := range addrs {
62+
// check for exact matches
63+
ok := noAnnAddrs[string(maddr.Bytes())]
64+
// check for /ipcidr matches
65+
if !ok && !filters.AddrBlocked(maddr) {
66+
out = append(out, maddr)
67+
}
68+
}
69+
return out
70+
}, nil
71+
}
72+
73+
func AddrsFactory(announce []string, noAnnounce []string) func() (opts Libp2pOpts, err error) {
74+
return func() (opts Libp2pOpts, err error) {
75+
addrsFactory, err := makeAddrsFactory(announce, noAnnounce)
76+
if err != nil {
77+
return opts, err
78+
}
79+
opts.Opts = append(opts.Opts, libp2p.AddrsFactory(addrsFactory))
80+
return
81+
}
82+
}
83+
84+
func listenAddresses(addresses []string) ([]ma.Multiaddr, error) {
85+
var listen []ma.Multiaddr
86+
for _, addr := range addresses {
87+
maddr, err := ma.NewMultiaddr(addr)
88+
if err != nil {
89+
return nil, fmt.Errorf("failure to parse config.Addresses.Swarm: %s", addresses)
90+
}
91+
listen = append(listen, maddr)
92+
}
93+
94+
return listen, nil
95+
}
96+
97+
func StartListening(addresses []string) func(host host.Host) error {
98+
return func(host host.Host) error {
99+
listenAddrs, err := listenAddresses(addresses)
100+
if err != nil {
101+
return err
102+
}
103+
104+
// Actually start listening:
105+
if err := host.Network().Listen(listenAddrs...); err != nil {
106+
return err
107+
}
108+
109+
// list out our addresses
110+
addrs, err := host.Network().InterfaceListenAddresses()
111+
if err != nil {
112+
return err
113+
}
114+
log.Infof("Swarm listening at: %s", addrs)
115+
return nil
116+
}
117+
}

core/node/libp2p/host.go

+76
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package libp2p
2+
3+
import (
4+
"context"
5+
6+
"github.com/libp2p/go-libp2p"
7+
host "github.com/libp2p/go-libp2p-host"
8+
peer "github.com/libp2p/go-libp2p-peer"
9+
peerstore "github.com/libp2p/go-libp2p-peerstore"
10+
record "github.com/libp2p/go-libp2p-record"
11+
routing "github.com/libp2p/go-libp2p-routing"
12+
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
13+
"go.uber.org/fx"
14+
15+
"github.com/ipfs/go-ipfs/core/node/helpers"
16+
"github.com/ipfs/go-ipfs/repo"
17+
)
18+
19+
type P2PHostIn struct {
20+
fx.In
21+
22+
Repo repo.Repo
23+
Validator record.Validator
24+
HostOption HostOption
25+
RoutingOption RoutingOption
26+
ID peer.ID
27+
Peerstore peerstore.Peerstore
28+
29+
Opts [][]libp2p.Option `group:"libp2p"`
30+
}
31+
32+
type P2PHostOut struct {
33+
fx.Out
34+
35+
Host host.Host
36+
Routing BaseIpfsRouting
37+
}
38+
39+
func Host(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PHostIn) (out P2PHostOut, err error) {
40+
opts := []libp2p.Option{libp2p.NoListenAddrs}
41+
for _, o := range params.Opts {
42+
opts = append(opts, o...)
43+
}
44+
45+
ctx := helpers.LifecycleCtx(mctx, lc)
46+
47+
opts = append(opts, libp2p.Routing(func(h host.Host) (routing.PeerRouting, error) {
48+
r, err := params.RoutingOption(ctx, h, params.Repo.Datastore(), params.Validator)
49+
out.Routing = r
50+
return r, err
51+
}))
52+
53+
out.Host, err = params.HostOption(ctx, params.ID, params.Peerstore, opts...)
54+
if err != nil {
55+
return P2PHostOut{}, err
56+
}
57+
58+
// this code is necessary just for tests: mock network constructions
59+
// ignore the libp2p constructor options that actually construct the routing!
60+
if out.Routing == nil {
61+
r, err := params.RoutingOption(ctx, out.Host, params.Repo.Datastore(), params.Validator)
62+
if err != nil {
63+
return P2PHostOut{}, err
64+
}
65+
out.Routing = r
66+
out.Host = routedhost.Wrap(out.Host, out.Routing)
67+
}
68+
69+
lc.Append(fx.Hook{
70+
OnStop: func(ctx context.Context) error {
71+
return out.Host.Close()
72+
},
73+
})
74+
75+
return out, err
76+
}

core/node/libp2p/hostopt.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package libp2p
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/libp2p/go-libp2p"
8+
host "github.com/libp2p/go-libp2p-host"
9+
peer "github.com/libp2p/go-libp2p-peer"
10+
peerstore "github.com/libp2p/go-libp2p-peerstore"
11+
)
12+
13+
type HostOption func(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error)
14+
15+
var DefaultHostOption HostOption = constructPeerHost
16+
17+
// isolates the complex initialization steps
18+
func constructPeerHost(ctx context.Context, id peer.ID, ps peerstore.Peerstore, options ...libp2p.Option) (host.Host, error) {
19+
pkey := ps.PrivKey(id)
20+
if pkey == nil {
21+
return nil, fmt.Errorf("missing private key for node ID: %s", id.Pretty())
22+
}
23+
options = append([]libp2p.Option{libp2p.Identity(pkey), libp2p.Peerstore(ps)}, options...)
24+
return libp2p.New(ctx, options...)
25+
}

0 commit comments

Comments
 (0)