Skip to content

Commit ae7d192

Browse files
authored
Merge pull request ipfs/kubo#6162 from ipfs/feat/dig
DI-based core.NewNode This commit was moved from ipfs/kubo@c3a7bc2
2 parents 0cc547b + 28adb70 commit ae7d192

File tree

2 files changed

+297
-0
lines changed

2 files changed

+297
-0
lines changed

core/bootstrap/bootstrap.go

+241
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,241 @@
1+
package bootstrap
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io"
8+
"math/rand"
9+
"sync"
10+
"time"
11+
12+
config "github.com/ipfs/go-ipfs-config"
13+
logging "github.com/ipfs/go-log"
14+
"github.com/jbenet/goprocess"
15+
"github.com/jbenet/goprocess/context"
16+
"github.com/jbenet/goprocess/periodic"
17+
"github.com/libp2p/go-libp2p-host"
18+
"github.com/libp2p/go-libp2p-loggables"
19+
"github.com/libp2p/go-libp2p-net"
20+
"github.com/libp2p/go-libp2p-peer"
21+
"github.com/libp2p/go-libp2p-peerstore"
22+
"github.com/libp2p/go-libp2p-routing"
23+
)
24+
25+
var log = logging.Logger("bootstrap")
26+
27+
// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap
28+
// peers to bootstrap correctly.
29+
var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap")
30+
31+
// BootstrapConfig specifies parameters used in an IpfsNode's network
32+
// bootstrapping process.
33+
type BootstrapConfig struct {
34+
// MinPeerThreshold governs whether to bootstrap more connections. If the
35+
// node has less open connections than this number, it will open connections
36+
// to the bootstrap nodes. From there, the routing system should be able
37+
// to use the connections to the bootstrap nodes to connect to even more
38+
// peers. Routing systems like the IpfsDHT do so in their own Bootstrap
39+
// process, which issues random queries to find more peers.
40+
MinPeerThreshold int
41+
42+
// Period governs the periodic interval at which the node will
43+
// attempt to bootstrap. The bootstrap process is not very expensive, so
44+
// this threshold can afford to be small (<=30s).
45+
Period time.Duration
46+
47+
// ConnectionTimeout determines how long to wait for a bootstrap
48+
// connection attempt before cancelling it.
49+
ConnectionTimeout time.Duration
50+
51+
// BootstrapPeers is a function that returns a set of bootstrap peers
52+
// for the bootstrap process to use. This makes it possible for clients
53+
// to control the peers the process uses at any moment.
54+
BootstrapPeers func() []peerstore.PeerInfo
55+
}
56+
57+
// DefaultBootstrapConfig specifies default sane parameters for bootstrapping.
58+
var DefaultBootstrapConfig = BootstrapConfig{
59+
MinPeerThreshold: 4,
60+
Period: 30 * time.Second,
61+
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
62+
}
63+
64+
func BootstrapConfigWithPeers(pis []peerstore.PeerInfo) BootstrapConfig {
65+
cfg := DefaultBootstrapConfig
66+
cfg.BootstrapPeers = func() []peerstore.PeerInfo {
67+
return pis
68+
}
69+
return cfg
70+
}
71+
72+
// Bootstrap kicks off IpfsNode bootstrapping. This function will periodically
73+
// check the number of open connections and -- if there are too few -- initiate
74+
// connections to well-known bootstrap peers. It also kicks off subsystem
75+
// bootstrapping (i.e. routing).
76+
func Bootstrap(id peer.ID, host host.Host, rt routing.IpfsRouting, cfg BootstrapConfig) (io.Closer, error) {
77+
78+
// make a signal to wait for one bootstrap round to complete.
79+
doneWithRound := make(chan struct{})
80+
81+
if len(cfg.BootstrapPeers()) == 0 {
82+
// We *need* to bootstrap but we have no bootstrap peers
83+
// configured *at all*, inform the user.
84+
log.Warning("no bootstrap nodes configured: go-ipfs may have difficulty connecting to the network")
85+
}
86+
87+
// the periodic bootstrap function -- the connection supervisor
88+
periodic := func(worker goprocess.Process) {
89+
ctx := goprocessctx.OnClosingContext(worker)
90+
defer log.EventBegin(ctx, "periodicBootstrap", id).Done()
91+
92+
if err := bootstrapRound(ctx, host, cfg); err != nil {
93+
log.Event(ctx, "bootstrapError", id, loggables.Error(err))
94+
log.Debugf("%s bootstrap error: %s", id, err)
95+
}
96+
97+
<-doneWithRound
98+
}
99+
100+
// kick off the node's periodic bootstrapping
101+
proc := periodicproc.Tick(cfg.Period, periodic)
102+
proc.Go(periodic) // run one right now.
103+
104+
// kick off Routing.Bootstrap
105+
if rt != nil {
106+
ctx := goprocessctx.OnClosingContext(proc)
107+
if err := rt.Bootstrap(ctx); err != nil {
108+
proc.Close()
109+
return nil, err
110+
}
111+
}
112+
113+
doneWithRound <- struct{}{}
114+
close(doneWithRound) // it no longer blocks periodic
115+
return proc, nil
116+
}
117+
118+
func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) error {
119+
120+
ctx, cancel := context.WithTimeout(ctx, cfg.ConnectionTimeout)
121+
defer cancel()
122+
id := host.ID()
123+
124+
// get bootstrap peers from config. retrieving them here makes
125+
// sure we remain observant of changes to client configuration.
126+
peers := cfg.BootstrapPeers()
127+
// determine how many bootstrap connections to open
128+
connected := host.Network().Peers()
129+
if len(connected) >= cfg.MinPeerThreshold {
130+
log.Event(ctx, "bootstrapSkip", id)
131+
log.Debugf("%s core bootstrap skipped -- connected to %d (> %d) nodes",
132+
id, len(connected), cfg.MinPeerThreshold)
133+
return nil
134+
}
135+
numToDial := cfg.MinPeerThreshold - len(connected)
136+
137+
// filter out bootstrap nodes we are already connected to
138+
var notConnected []peerstore.PeerInfo
139+
for _, p := range peers {
140+
if host.Network().Connectedness(p.ID) != net.Connected {
141+
notConnected = append(notConnected, p)
142+
}
143+
}
144+
145+
// if connected to all bootstrap peer candidates, exit
146+
if len(notConnected) < 1 {
147+
log.Debugf("%s no more bootstrap peers to create %d connections", id, numToDial)
148+
return ErrNotEnoughBootstrapPeers
149+
}
150+
151+
// connect to a random susbset of bootstrap candidates
152+
randSubset := randomSubsetOfPeers(notConnected, numToDial)
153+
154+
defer log.EventBegin(ctx, "bootstrapStart", id).Done()
155+
log.Debugf("%s bootstrapping to %d nodes: %s", id, numToDial, randSubset)
156+
return bootstrapConnect(ctx, host, randSubset)
157+
}
158+
159+
func bootstrapConnect(ctx context.Context, ph host.Host, peers []peerstore.PeerInfo) error {
160+
if len(peers) < 1 {
161+
return ErrNotEnoughBootstrapPeers
162+
}
163+
164+
errs := make(chan error, len(peers))
165+
var wg sync.WaitGroup
166+
for _, p := range peers {
167+
168+
// performed asynchronously because when performed synchronously, if
169+
// one `Connect` call hangs, subsequent calls are more likely to
170+
// fail/abort due to an expiring context.
171+
// Also, performed asynchronously for dial speed.
172+
173+
wg.Add(1)
174+
go func(p peerstore.PeerInfo) {
175+
defer wg.Done()
176+
defer log.EventBegin(ctx, "bootstrapDial", ph.ID(), p.ID).Done()
177+
log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID)
178+
179+
ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
180+
if err := ph.Connect(ctx, p); err != nil {
181+
log.Event(ctx, "bootstrapDialFailed", p.ID)
182+
log.Debugf("failed to bootstrap with %v: %s", p.ID, err)
183+
errs <- err
184+
return
185+
}
186+
log.Event(ctx, "bootstrapDialSuccess", p.ID)
187+
log.Infof("bootstrapped with %v", p.ID)
188+
}(p)
189+
}
190+
wg.Wait()
191+
192+
// our failure condition is when no connection attempt succeeded.
193+
// So drain the errs channel, counting the results.
194+
close(errs)
195+
count := 0
196+
var err error
197+
for err = range errs {
198+
if err != nil {
199+
count++
200+
}
201+
}
202+
if count == len(peers) {
203+
return fmt.Errorf("failed to bootstrap. %s", err)
204+
}
205+
return nil
206+
}
207+
208+
func randomSubsetOfPeers(in []peerstore.PeerInfo, max int) []peerstore.PeerInfo {
209+
if max > len(in) {
210+
max = len(in)
211+
}
212+
213+
out := make([]peerstore.PeerInfo, max)
214+
for i, val := range rand.Perm(len(in))[:max] {
215+
out[i] = in[val]
216+
}
217+
return out
218+
}
219+
220+
type Peers []config.BootstrapPeer
221+
222+
func (bpeers Peers) ToPeerInfos() []peerstore.PeerInfo {
223+
pinfos := make(map[peer.ID]*peerstore.PeerInfo)
224+
for _, bootstrap := range bpeers {
225+
pinfo, ok := pinfos[bootstrap.ID()]
226+
if !ok {
227+
pinfo = new(peerstore.PeerInfo)
228+
pinfos[bootstrap.ID()] = pinfo
229+
pinfo.ID = bootstrap.ID()
230+
}
231+
232+
pinfo.Addrs = append(pinfo.Addrs, bootstrap.Transport())
233+
}
234+
235+
var peers []peerstore.PeerInfo
236+
for _, pinfo := range pinfos {
237+
peers = append(peers, *pinfo)
238+
}
239+
240+
return peers
241+
}

core/bootstrap/bootstrap_test.go

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package bootstrap
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
config "github.com/ipfs/go-ipfs-config"
8+
pstore "github.com/libp2p/go-libp2p-peerstore"
9+
testutil "github.com/libp2p/go-testutil"
10+
)
11+
12+
func TestSubsetWhenMaxIsGreaterThanLengthOfSlice(t *testing.T) {
13+
var ps []pstore.PeerInfo
14+
sizeofSlice := 100
15+
for i := 0; i < sizeofSlice; i++ {
16+
pid, err := testutil.RandPeerID()
17+
if err != nil {
18+
t.Fatal(err)
19+
}
20+
21+
ps = append(ps, pstore.PeerInfo{ID: pid})
22+
}
23+
out := randomSubsetOfPeers(ps, 2*sizeofSlice)
24+
if len(out) != len(ps) {
25+
t.Fail()
26+
}
27+
}
28+
29+
func TestMultipleAddrsPerPeer(t *testing.T) {
30+
var bsps []config.BootstrapPeer
31+
for i := 0; i < 10; i++ {
32+
pid, err := testutil.RandPeerID()
33+
if err != nil {
34+
t.Fatal(err)
35+
}
36+
37+
addr := fmt.Sprintf("/ip4/127.0.0.1/tcp/5001/ipfs/%s", pid.Pretty())
38+
bsp1, err := config.ParseBootstrapPeer(addr)
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
43+
addr = fmt.Sprintf("/ip4/127.0.0.1/udp/5002/utp/ipfs/%s", pid.Pretty())
44+
bsp2, err := config.ParseBootstrapPeer(addr)
45+
if err != nil {
46+
t.Fatal(err)
47+
}
48+
49+
bsps = append(bsps, bsp1, bsp2)
50+
}
51+
52+
pinfos := Peers.ToPeerInfos(bsps)
53+
if len(pinfos) != len(bsps)/2 {
54+
t.Fatal("expected fewer peers")
55+
}
56+
}

0 commit comments

Comments
 (0)