Skip to content

Commit fc58e23

Browse files
committed
fix(net): refactor message version format code to separate packages
1 parent 989cf66 commit fc58e23

19 files changed

+1074
-785
lines changed

benchmarks/testnet/virtual.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"google.golang.org/protobuf/proto"
1717

1818
gsmsg "github.com/ipfs/go-graphsync/message"
19+
gsmsgv1 "github.com/ipfs/go-graphsync/message/v1"
1920
gsnet "github.com/ipfs/go-graphsync/network"
2021
)
2122

@@ -137,7 +138,7 @@ func (n *network) SendMessage(
137138
rateLimiters[to] = rateLimiter
138139
}
139140

140-
pbMsg, err := gsmsg.NewMessageHandler().ToProtoV11(mes)
141+
pbMsg, err := gsmsgv1.NewMessageHandler().ToProto(peer.ID("foo"), mes)
141142
if err != nil {
142143
return err
143144
}

message/bench_test.go renamed to message/bench/bench_test.go

+23-20
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,16 @@ import (
88

99
"github.com/google/go-cmp/cmp"
1010
blocks "github.com/ipfs/go-block-format"
11-
"github.com/ipld/go-ipld-prime/codec/dagcbor"
11+
"github.com/ipld/go-ipld-prime"
1212
"github.com/ipld/go-ipld-prime/node/basicnode"
13-
"github.com/ipld/go-ipld-prime/node/bindnode"
1413
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
14+
"github.com/libp2p/go-libp2p-core/peer"
1515
"github.com/stretchr/testify/require"
1616

1717
"github.com/ipfs/go-graphsync"
18-
"github.com/ipfs/go-graphsync/message/ipldbind"
18+
"github.com/ipfs/go-graphsync/message"
19+
v1 "github.com/ipfs/go-graphsync/message/v1"
20+
v2 "github.com/ipfs/go-graphsync/message/v2"
1921
"github.com/ipfs/go-graphsync/testutil"
2022
)
2123

@@ -34,35 +36,40 @@ func BenchmarkMessageEncodingRoundtrip(b *testing.B) {
3436
priority := graphsync.Priority(rand.Int31())
3537
status := graphsync.RequestAcknowledged
3638

37-
builder := NewBuilder()
38-
builder.AddRequest(NewRequest(id, root, selector, priority, extension))
39-
builder.AddRequest(NewRequest(id, root, selector, priority))
39+
builder := message.NewBuilder()
40+
builder.AddRequest(message.NewRequest(id, root, selector, priority, extension))
41+
builder.AddRequest(message.NewRequest(id, root, selector, priority))
4042
builder.AddResponseCode(id, status)
4143
builder.AddExtensionData(id, extension)
4244
builder.AddBlock(blocks.NewBlock([]byte("W")))
4345
builder.AddBlock(blocks.NewBlock([]byte("E")))
4446
builder.AddBlock(blocks.NewBlock([]byte("F")))
4547
builder.AddBlock(blocks.NewBlock([]byte("M")))
4648

49+
p := peer.ID("test peer")
4750
gsm, err := builder.Build()
4851
require.NoError(b, err)
4952

5053
b.Run("Protobuf", func(b *testing.B) {
5154
b.ReportAllocs()
5255
b.RunParallel(func(pb *testing.PB) {
5356
buf := new(bytes.Buffer)
57+
mh := v1.NewMessageHandler()
5458
for pb.Next() {
5559
buf.Reset()
5660

57-
err := NewMessageHandler().ToNet(gsm, buf)
61+
err := mh.ToNet(p, gsm, buf)
5862
require.NoError(b, err)
5963

60-
gsm2, err := NewMessageHandler().FromNet(buf)
64+
gsm2, err := mh.FromNet(p, buf)
6165
require.NoError(b, err)
6266

6367
// Note that require.Equal doesn't seem to handle maps well.
6468
// It says they are non-equal simply because their order isn't deterministic.
65-
if diff := cmp.Diff(gsm, gsm2, cmp.Exporter(func(reflect.Type) bool { return true })); diff != "" {
69+
if diff := cmp.Diff(gsm, gsm2,
70+
cmp.Exporter(func(reflect.Type) bool { return true }),
71+
cmp.Comparer(ipld.DeepEqual),
72+
); diff != "" {
6673
b.Fatal(diff)
6774
}
6875
}
@@ -73,25 +80,21 @@ func BenchmarkMessageEncodingRoundtrip(b *testing.B) {
7380
b.ReportAllocs()
7481
b.RunParallel(func(pb *testing.PB) {
7582
buf := new(bytes.Buffer)
83+
mh := v2.NewMessageHandler()
7684
for pb.Next() {
7785
buf.Reset()
7886

79-
ipldGSM, err := NewMessageHandler().toIPLD(gsm)
80-
require.NoError(b, err)
81-
node := bindnode.Wrap(ipldGSM, ipldbind.Prototype.Message.Type())
82-
err = dagcbor.Encode(node.Representation(), buf)
87+
err := mh.ToNet(p, gsm, buf)
8388
require.NoError(b, err)
8489

85-
builder := ipldbind.Prototype.Message.Representation().NewBuilder()
86-
err = dagcbor.Decode(builder, buf)
87-
require.NoError(b, err)
88-
node2 := builder.Build()
89-
ipldGSM2 := bindnode.Unwrap(node2).(*ipldbind.GraphSyncMessage)
90-
gsm2, err := NewMessageHandler().fromIPLD(ipldGSM2)
90+
gsm2, err := mh.FromNet(p, buf)
9191
require.NoError(b, err)
9292

9393
// same as above.
94-
if diff := cmp.Diff(gsm, gsm2, cmp.Exporter(func(reflect.Type) bool { return true })); diff != "" {
94+
if diff := cmp.Diff(gsm, gsm2,
95+
cmp.Exporter(func(reflect.Type) bool { return true }),
96+
cmp.Comparer(ipld.DeepEqual),
97+
); diff != "" {
9598
b.Fatal(diff)
9699
}
97100
}

message/ipldbind/message.go

+19-66
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,42 @@
11
package ipldbind
22

33
import (
4-
"io"
5-
6-
blocks "github.com/ipfs/go-block-format"
74
cid "github.com/ipfs/go-cid"
85
"github.com/ipld/go-ipld-prime/datamodel"
96

107
"github.com/ipfs/go-graphsync"
11-
pb "github.com/ipfs/go-graphsync/message/pb"
128
)
139

14-
// IsTerminalSuccessCode returns true if the response code indicates the
15-
// request terminated successfully.
16-
// DEPRECATED: use status.IsSuccess()
17-
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
18-
return status.IsSuccess()
19-
}
20-
21-
// IsTerminalFailureCode returns true if the response code indicates the
22-
// request terminated in failure.
23-
// DEPRECATED: use status.IsFailure()
24-
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
25-
return status.IsFailure()
26-
}
27-
28-
// IsTerminalResponseCode returns true if the response code signals
29-
// the end of the request
30-
// DEPRECATED: use status.IsTerminal()
31-
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
32-
return status.IsTerminal()
33-
}
34-
35-
// Exportable is an interface that can serialize to a protobuf
36-
type Exportable interface {
37-
ToProto() (*pb.Message, error)
38-
ToNet(w io.Writer) error
10+
type MessagePartWithExtensions interface {
11+
ExtensionNames() []graphsync.ExtensionName
12+
Extension(name graphsync.ExtensionName) (datamodel.Node, bool)
3913
}
4014

4115
type GraphSyncExtensions struct {
4216
Keys []string
4317
Values map[string]datamodel.Node
4418
}
4519

46-
func NewGraphSyncExtensions(values map[string]datamodel.Node) GraphSyncExtensions {
47-
keys := make([]string, 0, len(values))
48-
for k := range values {
49-
keys = append(keys, k)
20+
func NewGraphSyncExtensions(part MessagePartWithExtensions) GraphSyncExtensions {
21+
names := part.ExtensionNames()
22+
keys := make([]string, 0, len(names))
23+
values := make(map[string]datamodel.Node, len(names))
24+
for _, name := range names {
25+
keys = append(keys, string(name))
26+
data, _ := part.Extension(graphsync.ExtensionName(name))
27+
values[string(name)] = data
5028
}
5129
return GraphSyncExtensions{keys, values}
5230
}
5331

32+
func (gse GraphSyncExtensions) ToExtensionsList() []graphsync.ExtensionData {
33+
exts := make([]graphsync.ExtensionData, 0, len(gse.Values))
34+
for name, data := range gse.Values {
35+
exts = append(exts, graphsync.ExtensionData{Name: graphsync.ExtensionName(name), Data: data})
36+
}
37+
return exts
38+
}
39+
5440
// GraphSyncRequest is a struct to capture data on a request contained in a
5541
// GraphSyncMessage.
5642
type GraphSyncRequest struct {
@@ -84,39 +70,6 @@ type GraphSyncBlock struct {
8470
Data []byte
8571
}
8672

87-
func FromBlockFormat(block blocks.Block) GraphSyncBlock {
88-
return GraphSyncBlock{
89-
Prefix: block.Cid().Prefix().Bytes(),
90-
Data: block.RawData(),
91-
}
92-
}
93-
94-
func (b GraphSyncBlock) BlockFormat() *blocks.BasicBlock {
95-
pref, err := cid.PrefixFromBytes(b.Prefix)
96-
if err != nil {
97-
panic(err) // should never happen
98-
}
99-
100-
c, err := pref.Sum(b.Data)
101-
if err != nil {
102-
panic(err) // should never happen
103-
}
104-
105-
block, err := blocks.NewBlockWithCid(b.Data, c)
106-
if err != nil {
107-
panic(err) // should never happen
108-
}
109-
return block
110-
}
111-
112-
func BlockFormatSlice(bs []GraphSyncBlock) []blocks.Block {
113-
blks := make([]blocks.Block, len(bs))
114-
for i, b := range bs {
115-
blks[i] = b.BlockFormat()
116-
}
117-
return blks
118-
}
119-
12073
type GraphSyncMessage struct {
12174
Requests []GraphSyncRequest
12275
Responses []GraphSyncResponse

message/ipldbind/schema.go

-3
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@ import (
1111
//go:embed schema.ipldsch
1212
var embedSchema []byte
1313

14-
var schemaTypeSystem *schema.TypeSystem
15-
1614
var Prototype struct {
1715
Message schema.TypedPrototype
1816
}
@@ -22,7 +20,6 @@ func init() {
2220
if err != nil {
2321
panic(err)
2422
}
25-
schemaTypeSystem = ts
2623

2724
Prototype.Message = bindnode.Prototype((*GraphSyncMessage)(nil), ts.TypeByName("GraphSyncMessage"))
2825
}

message/message.go

+40-13
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,26 @@ package message
33
import (
44
"bytes"
55
"fmt"
6+
"io"
67
"strings"
78

89
blocks "github.com/ipfs/go-block-format"
910
cid "github.com/ipfs/go-cid"
1011
"github.com/ipld/go-ipld-prime"
1112
"github.com/ipld/go-ipld-prime/codec/dagjson"
1213
"github.com/ipld/go-ipld-prime/datamodel"
14+
"github.com/libp2p/go-libp2p-core/peer"
15+
"github.com/libp2p/go-msgio"
1316

1417
"github.com/ipfs/go-graphsync"
1518
)
1619

20+
type MessageHandler interface {
21+
FromNet(peer.ID, io.Reader) (GraphSyncMessage, error)
22+
FromMsgReader(peer.ID, msgio.Reader) (GraphSyncMessage, error)
23+
ToNet(peer.ID, GraphSyncMessage, io.Writer) error
24+
}
25+
1726
// GraphSyncRequest is a struct to capture data on a request contained in a
1827
// GraphSyncMessage.
1928
type GraphSyncRequest struct {
@@ -34,14 +43,19 @@ func (gsr GraphSyncRequest) String() string {
3443
dagjson.Encode(gsr.selector, &buf)
3544
sel = buf.String()
3645
}
46+
extStr := strings.Builder{}
47+
for _, name := range gsr.ExtensionNames() {
48+
extStr.WriteString(string(name))
49+
extStr.WriteString("|")
50+
}
3751
return fmt.Sprintf("GraphSyncRequest<root=%s, selector=%s, priority=%d, id=%s, cancel=%v, update=%v, exts=%s>",
3852
gsr.root.String(),
3953
sel,
4054
gsr.priority,
4155
gsr.id.String(),
4256
gsr.isCancel,
4357
gsr.isUpdate,
44-
strings.Join(gsr.ExtensionNames(), "|"),
58+
extStr.String(),
4559
)
4660
}
4761

@@ -55,10 +69,15 @@ type GraphSyncResponse struct {
5569

5670
// String returns a human-readable form of a GraphSyncResponse
5771
func (gsr GraphSyncResponse) String() string {
72+
extStr := strings.Builder{}
73+
for _, name := range gsr.ExtensionNames() {
74+
extStr.WriteString(string(name))
75+
extStr.WriteString("|")
76+
}
5877
return fmt.Sprintf("GraphSyncResponse<id=%s, status=%d, exts=%s>",
5978
gsr.requestID.String(),
6079
gsr.status,
61-
strings.Join(gsr.ExtensionNames(), "|"),
80+
extStr.String(),
6281
)
6382
}
6483

@@ -70,6 +89,14 @@ type GraphSyncMessage struct {
7089
blocks map[cid.Cid]blocks.Block
7190
}
7291

92+
func NewMessage(
93+
requests map[graphsync.RequestID]GraphSyncRequest,
94+
responses map[graphsync.RequestID]GraphSyncResponse,
95+
blocks map[cid.Cid]blocks.Block,
96+
) GraphSyncMessage {
97+
return GraphSyncMessage{requests, responses, blocks}
98+
}
99+
73100
// String returns a human-readable (multi-line) form of a GraphSyncMessage and
74101
// its contents
75102
func (gsm GraphSyncMessage) String() string {
@@ -87,7 +114,7 @@ func (gsm GraphSyncMessage) String() string {
87114
return fmt.Sprintf("GraphSyncMessage<\n\t%s\n>", strings.Join(cts, ",\n\t"))
88115
}
89116

90-
// NewRequest builds a new Graphsync request
117+
// NewRequest builds a new GraphSyncRequest
91118
func NewRequest(id graphsync.RequestID,
92119
root cid.Cid,
93120
selector ipld.Node,
@@ -97,13 +124,13 @@ func NewRequest(id graphsync.RequestID,
97124
return newRequest(id, root, selector, priority, false, false, toExtensionsMap(extensions))
98125
}
99126

100-
// CancelRequest request generates a request to cancel an in progress request
101-
func CancelRequest(id graphsync.RequestID) GraphSyncRequest {
127+
// NewCancelRequest request generates a request to cancel an in progress request
128+
func NewCancelRequest(id graphsync.RequestID) GraphSyncRequest {
102129
return newRequest(id, cid.Cid{}, nil, 0, true, false, nil)
103130
}
104131

105-
// UpdateRequest generates a new request to update an in progress request with the given extensions
106-
func UpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
132+
// NewUpdateRequest generates a new request to update an in progress request with the given extensions
133+
func NewUpdateRequest(id graphsync.RequestID, extensions ...graphsync.ExtensionData) GraphSyncRequest {
107134
return newRequest(id, cid.Cid{}, nil, 0, false, true, toExtensionsMap(extensions))
108135
}
109136

@@ -247,10 +274,10 @@ func (gsr GraphSyncRequest) Extension(name graphsync.ExtensionName) (datamodel.N
247274
}
248275

249276
// ExtensionNames returns the names of the extensions included in this request
250-
func (gsr GraphSyncRequest) ExtensionNames() []string {
251-
var extNames []string
277+
func (gsr GraphSyncRequest) ExtensionNames() []graphsync.ExtensionName {
278+
var extNames []graphsync.ExtensionName
252279
for ext := range gsr.extensions {
253-
extNames = append(extNames, ext)
280+
extNames = append(extNames, graphsync.ExtensionName(ext))
254281
}
255282
return extNames
256283
}
@@ -281,10 +308,10 @@ func (gsr GraphSyncResponse) Extension(name graphsync.ExtensionName) (datamodel.
281308
}
282309

283310
// ExtensionNames returns the names of the extensions included in this request
284-
func (gsr GraphSyncResponse) ExtensionNames() []string {
285-
var extNames []string
311+
func (gsr GraphSyncResponse) ExtensionNames() []graphsync.ExtensionName {
312+
var extNames []graphsync.ExtensionName
286313
for ext := range gsr.extensions {
287-
extNames = append(extNames, ext)
314+
extNames = append(extNames, graphsync.ExtensionName(ext))
288315
}
289316
return extNames
290317
}

0 commit comments

Comments
 (0)