Skip to content

DI-based core.NewNode #6162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Apr 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
7c6a920
gomod: import dig
magik6k Mar 26, 2019
2379787
gomod: import uber/fx
magik6k Mar 28, 2019
0fd2f80
Initial DI node implementation
magik6k Mar 28, 2019
361ba69
Organize NewNode a bit
magik6k Mar 28, 2019
65d8fad
Properly set IsOnline in NewNode
magik6k Mar 28, 2019
ccc576b
More constructor fixes
magik6k Mar 28, 2019
cc2be2e
Fix goprocess / lifecycle / ctx relations
magik6k Mar 29, 2019
bfaffb2
Fix bootstrap
magik6k Apr 1, 2019
1acb450
Fix some blockstore type mixups
magik6k Apr 1, 2019
0ba7661
Fix context propagation sortof
magik6k Apr 1, 2019
d0670f2
Rewire teardown routines to lifecycles
magik6k Apr 1, 2019
5299299
Move ConnMgr construction into DI provider
magik6k Apr 1, 2019
c5f887d
fuse: fix govet warning
magik6k Apr 1, 2019
adbc85b
Remove old constructor code
magik6k Apr 1, 2019
d35dac7
Cleanup core package
magik6k Apr 3, 2019
7046626
Move pathresolve
magik6k Apr 3, 2019
fd0c06a
Remove DI module dependency on BuildCfg
magik6k Apr 3, 2019
3ac6057
Separate function to parse BuildCfg into Options
magik6k Apr 3, 2019
e4cf660
Move option parsing to BuildCfg; fix imports
magik6k Apr 3, 2019
0e6f8d4
bootstrap: cleanup randomSubsetOfPeers
magik6k Apr 8, 2019
803512e
Invoke Provider.Run in lc.OnStart
magik6k Apr 8, 2019
bb4b99e
Rename lcProcess.Run to Append, add docs
magik6k Apr 8, 2019
23f50ab
Minor cleanups in libp2p construction
magik6k Apr 8, 2019
bf380b8
Cleanup routing related units
magik6k Apr 15, 2019
d574658
Handle Provider closing in lifecycle
magik6k Apr 15, 2019
19cc7c9
Remove relay address filtering logic
magik6k Apr 15, 2019
c9ea728
chore: remove duplicate import
Stebalien Apr 17, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
coreapi "github.com/ipfs/go-ipfs/core/coreapi"
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
"github.com/ipfs/go-ipfs/core/node"
nodeMount "github.com/ipfs/go-ipfs/fuse/node"
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
migrate "github.com/ipfs/go-ipfs/repo/fsrepo/migrations"
Expand Down Expand Up @@ -323,11 +324,11 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
case routingOptionSupernodeKwd:
return errors.New("supernode routing was never fully implemented and has been removed")
case routingOptionDHTClientKwd:
ncfg.Routing = core.DHTClientOption
ncfg.Routing = node.DHTClientOption
case routingOptionDHTKwd:
ncfg.Routing = core.DHTOption
ncfg.Routing = node.DHTOption
case routingOptionNoneKwd:
ncfg.Routing = core.NilRouterOption
ncfg.Routing = node.NilRouterOption
default:
return fmt.Errorf("unrecognized routing option: %s", routingOption)
}
Expand Down Expand Up @@ -372,7 +373,7 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
if err != nil {
return err
}
node.Process().AddChild(goprocess.WithTeardown(cctx.Plugins.Close))
node.Process.AddChild(goprocess.WithTeardown(cctx.Plugins.Close))

// construct api endpoint - every time
apiErrc, err := serveHTTPApi(req, cctx)
Expand Down
93 changes: 48 additions & 45 deletions core/bootstrap.go → core/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package core
package bootstrap

import (
"context"
Expand All @@ -9,27 +9,28 @@ import (
"sync"
"time"

math2 "github.com/ipfs/go-ipfs/thirdparty/math2"
lgbl "github.com/libp2p/go-libp2p-loggables"

config "github.com/ipfs/go-ipfs-config"
goprocess "github.com/jbenet/goprocess"
procctx "github.com/jbenet/goprocess/context"
periodicproc "github.com/jbenet/goprocess/periodic"
host "github.com/libp2p/go-libp2p-host"
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pstore "github.com/libp2p/go-libp2p-peerstore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
"github.com/jbenet/goprocess/context"
"github.com/jbenet/goprocess/periodic"
"github.com/libp2p/go-libp2p-host"
"github.com/libp2p/go-libp2p-loggables"
"github.com/libp2p/go-libp2p-net"
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore"
"github.com/libp2p/go-libp2p-routing"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave the explicit names for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(meh, actually, I don't care much either way)

)

var log = logging.Logger("bootstrap")

// ErrNotEnoughBootstrapPeers signals that we do not have enough bootstrap
// peers to bootstrap correctly.
var ErrNotEnoughBootstrapPeers = errors.New("not enough bootstrap peers to bootstrap")

// BootstrapConfig specifies parameters used in an IpfsNode's network
// bootstrapping process.
type BootstrapConfig struct {

// MinPeerThreshold governs whether to bootstrap more connections. If the
// node has less open connections than this number, it will open connections
// to the bootstrap nodes. From there, the routing system should be able
Expand All @@ -50,7 +51,7 @@ type BootstrapConfig struct {
// BootstrapPeers is a function that returns a set of bootstrap peers
// for the bootstrap process to use. This makes it possible for clients
// to control the peers the process uses at any moment.
BootstrapPeers func() []pstore.PeerInfo
BootstrapPeers func() []peerstore.PeerInfo
}

// DefaultBootstrapConfig specifies default sane parameters for bootstrapping.
Expand All @@ -60,9 +61,9 @@ var DefaultBootstrapConfig = BootstrapConfig{
ConnectionTimeout: (30 * time.Second) / 3, // Perod / 3
}

func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig {
func BootstrapConfigWithPeers(pis []peerstore.PeerInfo) BootstrapConfig {
cfg := DefaultBootstrapConfig
cfg.BootstrapPeers = func() []pstore.PeerInfo {
cfg.BootstrapPeers = func() []peerstore.PeerInfo {
return pis
}
return cfg
Expand All @@ -72,7 +73,7 @@ func BootstrapConfigWithPeers(pis []pstore.PeerInfo) BootstrapConfig {
// check the number of open connections and -- if there are too few -- initiate
// connections to well-known bootstrap peers. It also kicks off subsystem
// bootstrapping (i.e. routing).
func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
func Bootstrap(id peer.ID, host host.Host, rt routing.IpfsRouting, cfg BootstrapConfig) (io.Closer, error) {

// make a signal to wait for one bootstrap round to complete.
doneWithRound := make(chan struct{})
Expand All @@ -85,12 +86,12 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {

// the periodic bootstrap function -- the connection supervisor
periodic := func(worker goprocess.Process) {
ctx := procctx.OnClosingContext(worker)
defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done()
ctx := goprocessctx.OnClosingContext(worker)
defer log.EventBegin(ctx, "periodicBootstrap", id).Done()

if err := bootstrapRound(ctx, n.PeerHost, cfg); err != nil {
log.Event(ctx, "bootstrapError", n.Identity, lgbl.Error(err))
log.Debugf("%s bootstrap error: %s", n.Identity, err)
if err := bootstrapRound(ctx, host, cfg); err != nil {
log.Event(ctx, "bootstrapError", id, loggables.Error(err))
log.Debugf("%s bootstrap error: %s", id, err)
}

<-doneWithRound
Expand All @@ -101,9 +102,9 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) {
proc.Go(periodic) // run one right now.

// kick off Routing.Bootstrap
if n.Routing != nil {
ctx := procctx.OnClosingContext(proc)
if err := n.Routing.Bootstrap(ctx); err != nil {
if rt != nil {
ctx := goprocessctx.OnClosingContext(proc)
if err := rt.Bootstrap(ctx); err != nil {
proc.Close()
return nil, err
}
Expand Down Expand Up @@ -134,9 +135,9 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er
numToDial := cfg.MinPeerThreshold - len(connected)

// filter out bootstrap nodes we are already connected to
var notConnected []pstore.PeerInfo
var notConnected []peerstore.PeerInfo
for _, p := range peers {
if host.Network().Connectedness(p.ID) != inet.Connected {
if host.Network().Connectedness(p.ID) != net.Connected {
notConnected = append(notConnected, p)
}
}
Expand All @@ -155,7 +156,7 @@ func bootstrapRound(ctx context.Context, host host.Host, cfg BootstrapConfig) er
return bootstrapConnect(ctx, host, randSubset)
}

func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo) error {
func bootstrapConnect(ctx context.Context, ph host.Host, peers []peerstore.PeerInfo) error {
if len(peers) < 1 {
return ErrNotEnoughBootstrapPeers
}
Expand All @@ -170,12 +171,12 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo
// Also, performed asynchronously for dial speed.

wg.Add(1)
go func(p pstore.PeerInfo) {
go func(p peerstore.PeerInfo) {
defer wg.Done()
defer log.EventBegin(ctx, "bootstrapDial", ph.ID(), p.ID).Done()
log.Debugf("%s bootstrapping to %s", ph.ID(), p.ID)

ph.Peerstore().AddAddrs(p.ID, p.Addrs, pstore.PermanentAddrTTL)
ph.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
if err := ph.Connect(ctx, p); err != nil {
log.Event(ctx, "bootstrapDialFailed", p.ID)
log.Debugf("failed to bootstrap with %v: %s", p.ID, err)
Expand Down Expand Up @@ -204,35 +205,37 @@ func bootstrapConnect(ctx context.Context, ph host.Host, peers []pstore.PeerInfo
return nil
}

func toPeerInfos(bpeers []config.BootstrapPeer) []pstore.PeerInfo {
pinfos := make(map[peer.ID]*pstore.PeerInfo)
func randomSubsetOfPeers(in []peerstore.PeerInfo, max int) []peerstore.PeerInfo {
if max > len(in) {
max = len(in)
}

out := make([]peerstore.PeerInfo, max)
for i, val := range rand.Perm(len(in))[:max] {
out[i] = in[val]
}
return out
}

type Peers []config.BootstrapPeer

func (bpeers Peers) ToPeerInfos() []peerstore.PeerInfo {
pinfos := make(map[peer.ID]*peerstore.PeerInfo)
for _, bootstrap := range bpeers {
pinfo, ok := pinfos[bootstrap.ID()]
if !ok {
pinfo = new(pstore.PeerInfo)
pinfo = new(peerstore.PeerInfo)
pinfos[bootstrap.ID()] = pinfo
pinfo.ID = bootstrap.ID()
}

pinfo.Addrs = append(pinfo.Addrs, bootstrap.Transport())
}

var peers []pstore.PeerInfo
var peers []peerstore.PeerInfo
for _, pinfo := range pinfos {
peers = append(peers, *pinfo)
}

return peers
}

func randomSubsetOfPeers(in []pstore.PeerInfo, max int) []pstore.PeerInfo {
n := math2.IntMin(max, len(in))
var out []pstore.PeerInfo
for _, val := range rand.Perm(len(in)) {
out = append(out, in[val])
if len(out) >= n {
break
}
}
return out
}
4 changes: 2 additions & 2 deletions core/bootstrap_test.go → core/bootstrap/bootstrap_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package core
package bootstrap

import (
"fmt"
Expand Down Expand Up @@ -49,7 +49,7 @@ func TestMultipleAddrsPerPeer(t *testing.T) {
bsps = append(bsps, bsp1, bsp2)
}

pinfos := toPeerInfos(bsps)
pinfos := Peers.ToPeerInfos(bsps)
if len(pinfos) != len(bsps)/2 {
t.Fatal("expected fewer peers")
}
Expand Down
Loading