Skip to content

Commit 71c4a53

Browse files
committed
fix: remove pubsub discovery hack
Pubsub handles this internally now.
1 parent d39b58e commit 71c4a53

File tree

2 files changed

+8
-28
lines changed

2 files changed

+8
-28
lines changed

core/commands/pubsub.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ This command outputs data in the following encodings:
7474
cmds.StringArg("topic", true, false, "String name of topic to subscribe to."),
7575
},
7676
Options: []cmds.Option{
77-
cmds.BoolOption(pubsubDiscoverOptionName, "try to discover other peers subscribed to the same topic"),
77+
cmds.BoolOption(pubsubDiscoverOptionName, "Deprecated option to instruct pubsub to discovery peers for the topic. Discovery is now built into pubsub."),
7878
},
7979
Run: func(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
8080
api, err := cmdenv.GetApi(env, req)
@@ -83,9 +83,7 @@ This command outputs data in the following encodings:
8383
}
8484

8585
topic := req.Arguments[0]
86-
discover, _ := req.Options[pubsubDiscoverOptionName].(bool)
87-
88-
sub, err := api.PubSub().Subscribe(req.Context, topic, options.PubSub.Discover(discover))
86+
sub, err := api.PubSub().Subscribe(req.Context, topic)
8987
if err != nil {
9088
return err
9189
}

core/coreapi/pubsub.go

Lines changed: 6 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package coreapi
33
import (
44
"context"
55
"errors"
6-
"strings"
76
"sync"
87
"time"
98

@@ -19,7 +18,6 @@ import (
1918
type PubSubAPI CoreAPI
2019

2120
type pubSubSubscription struct {
22-
cancel context.CancelFunc
2321
subscription *pubsub.Subscription
2422
}
2523

@@ -61,12 +59,11 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
6159
}
6260

6361
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
64-
options, err := caopts.PubSubSubscribeOptions(opts...)
65-
if err != nil {
66-
return nil, err
67-
}
68-
69-
r, err := api.checkNode()
62+
// Parse the options to avoid introducing silent failures for invalid
63+
// options. However, we don't currently have any use for them. The only
64+
// subscription option, discovery, is now a no-op as it's handled by
65+
// pubsub itself.
66+
_, err := caopts.PubSubSubscribeOptions(opts...)
7067
if err != nil {
7168
return nil, err
7269
}
@@ -77,21 +74,7 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
7774
return nil, err
7875
}
7976

80-
pubctx, cancel := context.WithCancel(api.nctx)
81-
82-
if options.Discover {
83-
go func() {
84-
blk, err := api.core().Block().Put(pubctx, strings.NewReader("floodsub:"+topic))
85-
if err != nil {
86-
log.Error("pubsub discovery: ", err)
87-
return
88-
}
89-
90-
connectToPubSubPeers(pubctx, r, api.peerHost, blk.Path().Cid())
91-
}()
92-
}
93-
94-
return &pubSubSubscription{cancel, sub}, nil
77+
return &pubSubSubscription{sub}, nil
9578
}
9679

9780
func connectToPubSubPeers(ctx context.Context, r routing.Routing, ph p2phost.Host, cid cid.Cid) {
@@ -132,7 +115,6 @@ func (api *PubSubAPI) checkNode() (routing.Routing, error) {
132115
}
133116

134117
func (sub *pubSubSubscription) Close() error {
135-
sub.cancel()
136118
sub.subscription.Cancel()
137119
return nil
138120
}

0 commit comments

Comments
 (0)