Skip to content

Commit d735b28

Browse files
committed
ipns(pubsub): utilize persistent pubsub value store
1 parent 6eb481c commit d735b28

File tree

6 files changed

+54
-15
lines changed

6 files changed

+54
-15
lines changed

core/node/groups.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,10 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
6767

6868
// parse PubSub config
6969

70-
ps := fx.Options()
70+
ps, disc := fx.Options(), fx.Options()
7171
if bcfg.getOpt("pubsub") || bcfg.getOpt("ipnsps") {
72+
disc = fx.Provide(libp2p.TopicDiscovery())
73+
7274
var pubsubOptions []pubsub.Option
7375
pubsubOptions = append(
7476
pubsubOptions,
@@ -113,6 +115,7 @@ func LibP2P(bcfg *BuildCfg, cfg *config.Config) fx.Option {
113115
maybeInvoke(libp2p.AutoNATService(cfg.Experimental.QUIC), cfg.Swarm.EnableAutoNATService),
114116
connmgr,
115117
ps,
118+
disc,
116119
)
117120

118121
return opts

core/node/libp2p/pubsub.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,22 @@
11
package libp2p
22

33
import (
4-
host "github.com/libp2p/go-libp2p-core/host"
4+
"github.com/libp2p/go-libp2p-core/discovery"
5+
"github.com/libp2p/go-libp2p-core/host"
56
pubsub "github.com/libp2p/go-libp2p-pubsub"
67
"go.uber.org/fx"
78

89
"github.com/ipfs/go-ipfs/core/node/helpers"
910
)
1011

1112
func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
12-
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
13-
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
13+
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
14+
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
1415
}
1516
}
1617

1718
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
18-
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host) (service *pubsub.PubSub, err error) {
19-
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, pubsubOptions...)
19+
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
20+
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
2021
}
2122
}

core/node/libp2p/routing.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package libp2p
33
import (
44
"context"
55
"sort"
6+
"time"
67

78
host "github.com/libp2p/go-libp2p-core/host"
89
routing "github.com/libp2p/go-libp2p-core/routing"
@@ -83,15 +84,19 @@ type p2pPSRoutingIn struct {
8384
PubSub *pubsub.PubSub `optional:"true"`
8485
}
8586

86-
func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore) {
87-
psRouter := namesys.NewPubsubValueStore(
87+
func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (p2pRouterOut, *namesys.PubsubValueStore, error) {
88+
psRouter, err := namesys.NewPubsubValueStore(
8889
helpers.LifecycleCtx(mctx, lc),
8990
in.Host,
90-
in.BaseIpfsRouting,
9191
in.PubSub,
9292
in.Validator,
93+
namesys.WithRebroadcastInterval(time.Minute),
9394
)
9495

96+
if err != nil {
97+
return p2pRouterOut{}, nil, err
98+
}
99+
95100
return p2pRouterOut{
96101
Router: Router{
97102
Routing: &routinghelpers.Compose{
@@ -102,5 +107,5 @@ func PubsubRouter(mctx helpers.MetricsCtx, lc fx.Lifecycle, in p2pPSRoutingIn) (
102107
},
103108
Priority: 100,
104109
},
105-
}, psRouter
110+
}, psRouter, nil
106111
}

core/node/libp2p/topicdiscovery.go

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package libp2p
2+
3+
import (
4+
"math/rand"
5+
"time"
6+
7+
"github.com/libp2p/go-libp2p-core/discovery"
8+
"github.com/libp2p/go-libp2p-core/host"
9+
disc "github.com/libp2p/go-libp2p-discovery"
10+
11+
"github.com/ipfs/go-ipfs/core/node/helpers"
12+
"go.uber.org/fx"
13+
)
14+
15+
func TopicDiscovery() interface{} {
16+
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, cr BaseIpfsRouting) (service discovery.Discovery, err error) {
17+
baseDisc := disc.NewRoutingDiscovery(cr)
18+
minBackoff, maxBackoff := time.Second*60, time.Hour
19+
rng := rand.New(rand.NewSource(rand.Int63()))
20+
d, err := disc.NewBackoffDiscovery(
21+
baseDisc,
22+
disc.NewExponentialBackoff(minBackoff, maxBackoff, disc.FullJitter, time.Second, 5.0, 0, rng),
23+
)
24+
25+
if err != nil {
26+
return nil, err
27+
}
28+
29+
return d, nil
30+
}
31+
}

go.mod

+2-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ require (
6262
github.com/libp2p/go-libp2p-circuit v0.1.4
6363
github.com/libp2p/go-libp2p-connmgr v0.1.1
6464
github.com/libp2p/go-libp2p-core v0.2.5
65+
github.com/libp2p/go-libp2p-discovery v0.2.0
6566
github.com/libp2p/go-libp2p-http v0.1.4
6667
github.com/libp2p/go-libp2p-kad-dht v0.3.1
6768
github.com/libp2p/go-libp2p-kbucket v0.2.1
@@ -70,7 +71,7 @@ require (
7071
github.com/libp2p/go-libp2p-peerstore v0.1.4
7172
github.com/libp2p/go-libp2p-pnet v0.1.0
7273
github.com/libp2p/go-libp2p-pubsub v0.2.4
73-
github.com/libp2p/go-libp2p-pubsub-router v0.1.0
74+
github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191209162204-0edbae38a844
7475
github.com/libp2p/go-libp2p-quic-transport v0.2.2
7576
github.com/libp2p/go-libp2p-record v0.1.2
7677
github.com/libp2p/go-libp2p-routing-helpers v0.1.0

go.sum

+2-4
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ github.com/ipfs/go-datastore v0.1.0 h1:TOxI04l8CmO4zGtesENhzm4PwkFwJXY3rKiYaaMf9
168168
github.com/ipfs/go-datastore v0.1.0/go.mod h1:d4KVXhMt913cLBEI/PXAy6ko+W7e9AhyAKBGh803qeE=
169169
github.com/ipfs/go-datastore v0.1.1 h1:F4k0TkTAZGLFzBOrVKDAvch6JZtuN4NHkfdcEZL50aI=
170170
github.com/ipfs/go-datastore v0.1.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
171-
github.com/ipfs/go-datastore v0.3.0 h1:9au0tYi/+n7xeUnGHG6davnS8x9hWbOzP/388Vx3CMs=
172171
github.com/ipfs/go-datastore v0.3.0/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
173172
github.com/ipfs/go-datastore v0.3.1 h1:SS1t869a6cctoSYmZXUk8eL6AzVXgASmKIWFNQkQ1jU=
174173
github.com/ipfs/go-datastore v0.3.1/go.mod h1:w38XXW9kVFNp57Zj5knbKWM2T+KOZCGDRVNdgPHtbHw=
@@ -433,11 +432,10 @@ github.com/libp2p/go-libp2p-pnet v0.1.0 h1:kRUES28dktfnHNIRW4Ro78F7rKBHBiw5MJpl0
433432
github.com/libp2p/go-libp2p-pnet v0.1.0/go.mod h1:ZkyZw3d0ZFOex71halXRihWf9WH/j3OevcJdTmD0lyE=
434433
github.com/libp2p/go-libp2p-protocol v0.0.1/go.mod h1:Af9n4PiruirSDjHycM1QuiMi/1VZNHYcK8cLgFJLZ4s=
435434
github.com/libp2p/go-libp2p-protocol v0.1.0/go.mod h1:KQPHpAabB57XQxGrXCNvbL6UEXfQqUgC/1adR2Xtflk=
436-
github.com/libp2p/go-libp2p-pubsub v0.1.0/go.mod h1:ZwlKzRSe1eGvSIdU5bD7+8RZN/Uzw0t1Bp9R1znpR/Q=
437435
github.com/libp2p/go-libp2p-pubsub v0.2.4 h1:O4BcaKpPQ9p82yTBtzIzgDFoOXkqhrQpfcVac3FAywU=
438436
github.com/libp2p/go-libp2p-pubsub v0.2.4/go.mod h1:1tJwAfySvZQ49R9uTVlkwtSTMVLeQQdrnLTJrr91gVc=
439-
github.com/libp2p/go-libp2p-pubsub-router v0.1.0 h1:xA5B8Sdx64tNlSRIcay2QUngtlu8LpUJClaUk/dYYrg=
440-
github.com/libp2p/go-libp2p-pubsub-router v0.1.0/go.mod h1:PnHOshBr/2I2ZxVfEsqfgCQPsVg09zo+DhSlWkOhPFM=
437+
github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191209162204-0edbae38a844 h1:o5nKi3DP0mJ4NzespTkuv1KzBf4fgOPLf8fhY8TrvL4=
438+
github.com/libp2p/go-libp2p-pubsub-router v0.1.1-0.20191209162204-0edbae38a844/go.mod h1:ULJTrV0Y0pIs8CXKgB/tZ934pIETO894ldEB8x6LPcw=
441439
github.com/libp2p/go-libp2p-quic-transport v0.2.2 h1:XyGRqFHD1oHdI2k98P1tWWRb9s27fl1SfmCcaX8plso=
442440
github.com/libp2p/go-libp2p-quic-transport v0.2.2/go.mod h1:rVzcsiuOFBomAqvNOxeBUcP4vM4wE+NqqRZWvxjkbe0=
443441
github.com/libp2p/go-libp2p-record v0.0.1/go.mod h1:grzqg263Rug/sRex85QrDOLntdFAymLDLm7lxMgU79Q=

0 commit comments

Comments
 (0)