Skip to content
This repository was archived by the owner on Oct 5, 2023. It is now read-only.

Commit 63087b9

Browse files
committed
feat: pubsub http rpc with multibase
This updates HTTP RPC wire format to one from ipfs/kubo#8183
1 parent 8569e83 commit 63087b9

File tree

3 files changed

+64
-20
lines changed

3 files changed

+64
-20
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# go-ipfs-http-api
22

3-
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
4-
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
5-
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
3+
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](https://protocol.ai)
4+
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](https://ipfs.io/)
5+
[![](https://img.shields.io/badge/matrix-%23ipfs-blue.svg?style=flat-square)](https://app.element.io/#/room/#ipfs:matrix.org)
66
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
77
[![GoDoc](https://godoc.org/github.com/ipfs/go-ipfs-http-api?status.svg)](https://godoc.org/github.com/ipfs/go-ipfs-http-api)
88

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515
github.com/libp2p/go-libp2p-core v0.8.6
1616
github.com/mitchellh/go-homedir v1.1.0
1717
github.com/multiformats/go-multiaddr v0.3.3
18+
github.com/multiformats/go-multibase v0.0.3
1819
github.com/multiformats/go-multihash v0.0.15
1920
github.com/pkg/errors v0.9.1
2021
)

pubsub.go

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
iface "github.com/ipfs/interface-go-ipfs-core"
1010
caopts "github.com/ipfs/interface-go-ipfs-core/options"
1111
"github.com/libp2p/go-libp2p-core/peer"
12+
mbase "github.com/multiformats/go-multibase"
1213
)
1314

1415
type PubsubAPI HttpApi
@@ -21,8 +22,15 @@ func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) {
2122
if err := api.core().Request("pubsub/ls").Exec(ctx, &out); err != nil {
2223
return nil, err
2324
}
24-
25-
return out.Strings, nil
25+
topics := make([]string, len(out.Strings))
26+
for n, mb := range out.Strings {
27+
_, topic, err := mbase.Decode(mb)
28+
if err != nil {
29+
return nil, err
30+
}
31+
topics[n] = string(topic)
32+
}
33+
return topics, nil
2634
}
2735

2836
func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
@@ -35,7 +43,11 @@ func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
3543
Strings []string
3644
}
3745

38-
if err := api.core().Request("pubsub/peers", options.Topic).Exec(ctx, &out); err != nil {
46+
var optionalTopic string
47+
if len(options.Topic) > 0 {
48+
optionalTopic = toMultibase([]byte(options.Topic))
49+
}
50+
if err := api.core().Request("pubsub/peers", optionalTopic).Exec(ctx, &out); err != nil {
3951
return nil, err
4052
}
4153

@@ -51,7 +63,7 @@ func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
5163
}
5264

5365
func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error {
54-
return api.core().Request("pubsub/pub", topic).
66+
return api.core().Request("pubsub/pub", toMultibase([]byte(topic))).
5567
FileBody(bytes.NewReader(message)).
5668
Exec(ctx, nil)
5769
}
@@ -64,29 +76,36 @@ type pubsubSub struct {
6476
}
6577

6678
type pubsubMessage struct {
67-
JFrom []byte `json:"from,omitempty"`
68-
JData []byte `json:"data,omitempty"`
69-
JSeqno []byte `json:"seqno,omitempty"`
79+
JFrom string `json:"from,omitempty"`
80+
JData string `json:"data,omitempty"`
81+
JSeqno string `json:"seqno,omitempty"`
7082
JTopicIDs []string `json:"topicIDs,omitempty"`
7183

72-
from peer.ID
73-
err error
84+
// real values after unpacking from text/multibase envelopes
85+
from peer.ID
86+
data []byte
87+
seqno []byte
88+
topics []string
89+
90+
err error
7491
}
7592

7693
func (msg *pubsubMessage) From() peer.ID {
7794
return msg.from
7895
}
7996

8097
func (msg *pubsubMessage) Data() []byte {
81-
return msg.JData
98+
return msg.data
8299
}
83100

84101
func (msg *pubsubMessage) Seq() []byte {
85-
return msg.JSeqno
102+
return msg.seqno
86103
}
87104

105+
// TODO: do we want to keep this interface as []string,
106+
// or change to more correct [][]byte?
88107
func (msg *pubsubMessage) Topics() []string {
89-
return msg.JTopicIDs
108+
return msg.topics
90109
}
91110

92111
func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
@@ -98,22 +117,41 @@ func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
98117
if msg.err != nil {
99118
return nil, msg.err
100119
}
120+
// unpack values from text/multibase envelopes
101121
var err error
102-
msg.from, err = peer.IDFromBytes(msg.JFrom)
103-
return &msg, err
122+
msg.from, err = peer.Decode(msg.JFrom)
123+
if err != nil {
124+
return nil, err
125+
}
126+
_, msg.data, err = mbase.Decode(msg.JData)
127+
if err != nil {
128+
return nil, err
129+
}
130+
_, msg.seqno, err = mbase.Decode(msg.JSeqno)
131+
if err != nil {
132+
return nil, err
133+
}
134+
for _, mbt := range msg.JTopicIDs {
135+
_, topic, err := mbase.Decode(mbt)
136+
if err != nil {
137+
return nil, err
138+
}
139+
msg.topics = append(msg.topics, string(topic))
140+
}
141+
return &msg, nil
104142
case <-ctx.Done():
105143
return nil, ctx.Err()
106144
}
107145
}
108146

109147
func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) {
148+
/* right now we have no options (discover got deprecated)
110149
options, err := caopts.PubSubSubscribeOptions(opts...)
111150
if err != nil {
112151
return nil, err
113152
}
114-
115-
resp, err := api.core().Request("pubsub/sub", topic).
116-
Option("discover", options.Discover).Send(ctx)
153+
*/
154+
resp, err := api.core().Request("pubsub/sub", toMultibase([]byte(topic))).Send(ctx)
117155

118156
if err != nil {
119157
return nil, err
@@ -168,3 +206,8 @@ func (s *pubsubSub) Close() error {
168206
func (api *PubsubAPI) core() *HttpApi {
169207
return (*HttpApi)(api)
170208
}
209+
210+
// Encodes bytes into URL-safe multibase that can be sent over HTTP RPC (URL or body)
211+
func toMultibase(data []byte) string {
212+
return mbase.Encode(mbase.Base64url, data)
213+
}

0 commit comments

Comments
 (0)