Skip to content

Commit cf68619

Browse files
authored
Merge pull request #7394 from ipfs/fix/pubsub
fix two pubsub issues.
2 parents 91b1db5 + c58e3e4 commit cf68619

File tree

4 files changed

+14
-60
lines changed

4 files changed

+14
-60
lines changed

core/commands/pubsub.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ This command outputs data in the following encodings:
7474
cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
7575
},
7676
Options: []cmds.Option{
77-
cmds.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
77+
cmds.BoolOption(pubsubDiscoverOptionName, "Deprecated option to instruct pubsub to discovery peers for the topic. Discovery is now built into pubsub."),
7878
},
7979
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
8080
api, err := cmdenv.GetApi(env, req)
@@ -83,9 +83,7 @@ This command outputs data in the following encodings:
8383
}
8484

8585
topic := req.Arguments[0]
86-
discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
87-
88-
sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
86+
sub, err := api.PubSub().Subscribe(req.Context, topic)
8987
if err != nil {
9088
return err
9189
}

core/coreapi/coreapi.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import (
2626
"github.com/ipfs/go-ipfs-provider"
2727
offlineroute "github.com/ipfs/go-ipfs-routing/offline"
2828
ipld "github.com/ipfs/go-ipld-format"
29-
logging "github.com/ipfs/go-log"
3029
dag "github.com/ipfs/go-merkledag"
3130
coreiface "github.com/ipfs/interface-go-ipfs-core"
3231
"github.com/ipfs/interface-go-ipfs-core/options"
@@ -44,8 +43,6 @@ import (
4443
"github.com/ipfs/go-ipfs/repo"
4544
)
4645

47-
var log = logging.Logger("core/coreapi")
48-
4946
type CoreAPI struct {
5047
nctx context.Context
5148

core/coreapi/pubsub.go

Lines changed: 7 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,9 @@ package coreapi
33
import (
44
"context"
55
"errors"
6-
"strings"
7-
"sync"
8-
"time"
96

10-
cid "github.com/ipfs/go-cid"
117
coreiface "github.com/ipfs/interface-go-ipfs-core"
128
caopts "github.com/ipfs/interface-go-ipfs-core/options"
13-
p2phost "github.com/libp2p/go-libp2p-core/host"
149
peer "github.com/libp2p/go-libp2p-core/peer"
1510
routing "github.com/libp2p/go-libp2p-core/routing"
1611
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -19,7 +14,6 @@ import (
1914
type PubSubAPI CoreAPI
2015

2116
type pubSubSubscription struct {
22-
cancel context.CancelFunc
2317
subscription *pubsub.Subscription
2418
}
2519

@@ -61,12 +55,16 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
6155
}
6256

6357
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
64-
options, err := caopts.PubSubSubscribeOptions(opts...)
58+
// Parse the options to avoid introducing silent failures for invalid
59+
// options. However, we don't currently have any use for them. The only
60+
// subscription option, discovery, is now a no-op as it's handled by
61+
// pubsub itself.
62+
_, err := caopts.PubSubSubscribeOptions(opts...)
6563
if err != nil {
6664
return nil, err
6765
}
6866

69-
r, err := api.checkNode()
67+
_, err = api.checkNode()
7068
if err != nil {
7169
return nil, err
7270
}
@@ -77,45 +75,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
7775
return nil, err
7876
}
7977

80-
pubctx, cancel := context.WithCancel(api.nctx)
81-
82-
if options.Discover {
83-
go func() {
84-
blk, err := api.core().Block().Put(pubctx, strings.NewReader("floodsub:"+topic))
85-
if err != nil {
86-
log.Error("pubsub discovery: ", err)
87-
return
88-
}
89-
90-
connectToPubSubPeers(pubctx, r, api.peerHost, blk.Path().Cid())
91-
}()
92-
}
93-
94-
return &pubSubSubscription{cancel, sub}, nil
95-
}
96-
97-
func connectToPubSubPeers(ctx context.Context, r routing.Routing, ph p2phost.Host, cid cid.Cid) {
98-
ctx, cancel := context.WithCancel(ctx)
99-
defer cancel()
100-
101-
provs := r.FindProvidersAsync(ctx, cid, 10)
102-
var wg sync.WaitGroup
103-
for p := range provs {
104-
wg.Add(1)
105-
go func(pi peer.AddrInfo) {
106-
defer wg.Done()
107-
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
108-
defer cancel()
109-
err := ph.Connect(ctx, pi)
110-
if err != nil {
111-
log.Info("pubsub discover: ", err)
112-
return
113-
}
114-
log.Info("connected to pubsub peer:", pi.ID)
115-
}(p)
116-
}
117-
118-
wg.Wait()
78+
return &pubSubSubscription{sub}, nil
11979
}
12080

12181
func (api *PubSubAPI) checkNode() (routing.Routing, error) {
@@ -132,7 +92,6 @@ func (api *PubSubAPI) checkNode() (routing.Routing, error) {
13292
}
13393

13494
func (sub *pubSubSubscription) Close() error {
135-
sub.cancel()
13695
sub.subscription.Cancel()
13796
return nil
13897
}
@@ -161,7 +120,3 @@ func (msg *pubSubMessage) Seq() []byte {
161120
func (msg *pubSubMessage) Topics() []string {
162121
return msg.msg.TopicIDs
163122
}
164-
165-
func (api *PubSubAPI) core() coreiface.CoreAPI {
166-
return (*CoreAPI)(api)
167-
}

core/node/libp2p/pubsub.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
1717

1818
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
1919
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
20-
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
20+
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(
21+
pubsubOptions,
22+
pubsub.WithDiscovery(disc),
23+
pubsub.WithFloodPublish(true))...,
24+
)
2125
}
2226
}

0 commit comments

Comments
 (0)