Skip to content

Commit 00538ea

Browse files
hannahhowardrvagg
andauthored
Update graphsync to 2.0 branch - absolute minimum (#300)
* feat(deps): update graphsync to 2.0 branch Provide the absolute minimum required to work with the latest version of graphsync with the 2.0 protocl. This does use a very unfortunate hack to deal with the change to extension type -- it now deserializes/serializes from basicnode.Any in order to get back to a []byte type * style(mod): tidy * refactor(message): create To/From IPLD funcs * fix(impl): add one more int conversion * feat(msg): ipld-prime messages, compat testing w/ cbor-gen * fix(msg): replace message 1_1 with 1_1prime form * chore(msg): rename struct fields, use renames for consistent encoding * fix(msg): remove through-bytes roundtrip for conversions * fix(msg): fixed upstream bindnode bug, added some temporary debugging * fix(graphsync): compare Nodes with ipld.DeepEqual * fix(deps): update ipld-prime with bindnode tuple fix * fix(msg): remove debug prints, fix imports * fix(cbor-gen): make go generate work again in old msgs * chore(deps): update go-ipld-prime to new master with tuple fix * feat(graphsync): remove peer id tracking no longer need to utilize peer id to tract graphsync requests since RequestIDs are unique * feat(deps): update to tagged go-graphsync Co-authored-by: Rod Vagg <[email protected]>
1 parent 240ff4a commit 00538ea

29 files changed

+2027
-674
lines changed

encoding/encoding.go

+53-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import (
77
cbor "github.com/ipfs/go-ipld-cbor"
88
"github.com/ipld/go-ipld-prime"
99
"github.com/ipld/go-ipld-prime/codec/dagcbor"
10+
"github.com/ipld/go-ipld-prime/datamodel"
11+
"github.com/ipld/go-ipld-prime/node/basicnode"
12+
"github.com/ipld/go-ipld-prime/schema"
1013
cborgen "github.com/whyrusleeping/cbor-gen"
1114
"golang.org/x/xerrors"
1215
)
@@ -25,8 +28,11 @@ func Encode(value Encodable) ([]byte, error) {
2528
}
2629
return buf.Bytes(), nil
2730
}
28-
if ipldEncodable, ok := value.(ipld.Node); ok {
29-
buf := new(bytes.Buffer)
31+
if ipldEncodable, ok := value.(datamodel.Node); ok {
32+
if tn, ok := ipldEncodable.(schema.TypedNode); ok {
33+
ipldEncodable = tn.Representation()
34+
}
35+
buf := &bytes.Buffer{}
3036
err := dagcbor.Encode(ipldEncodable, buf)
3137
if err != nil {
3238
return nil, err
@@ -36,17 +42,30 @@ func Encode(value Encodable) ([]byte, error) {
3642
return cbor.DumpObject(value)
3743
}
3844

45+
func EncodeToNode(encodable Encodable) (datamodel.Node, error) {
46+
byts, err := Encode(encodable)
47+
if err != nil {
48+
return nil, err
49+
}
50+
na := basicnode.Prototype.Any.NewBuilder()
51+
if err := dagcbor.Decode(na, bytes.NewReader(byts)); err != nil {
52+
return nil, err
53+
}
54+
return na.Build(), nil
55+
}
56+
3957
// Decoder is CBOR decoder for a given encodable type
4058
type Decoder interface {
4159
DecodeFromCbor([]byte) (Encodable, error)
60+
DecodeFromNode(datamodel.Node) (Encodable, error)
4261
}
4362

4463
// NewDecoder creates a new Decoder that will decode into new instances of the given
4564
// object type. It will use the decoding that is optimal for that type
4665
// It returns error if it's not possible to setup a decoder for this type
4766
func NewDecoder(decodeType Encodable) (Decoder, error) {
48-
// check if type is ipld.Node, if so, just use style
49-
if ipldDecodable, ok := decodeType.(ipld.Node); ok {
67+
// check if type is datamodel.Node, if so, just use style
68+
if ipldDecodable, ok := decodeType.(datamodel.Node); ok {
5069
return &ipldDecoder{ipldDecodable.Prototype()}, nil
5170
}
5271
// check if type is a pointer, as we need that to make new copies
@@ -86,6 +105,14 @@ func (decoder *ipldDecoder) DecodeFromCbor(encoded []byte) (Encodable, error) {
86105
return builder.Build(), nil
87106
}
88107

108+
func (decoder *ipldDecoder) DecodeFromNode(node datamodel.Node) (Encodable, error) {
109+
builder := decoder.style.NewBuilder()
110+
if err := builder.AssignNode(node); err != nil {
111+
return nil, err
112+
}
113+
return builder.Build(), nil
114+
}
115+
89116
type cbgDecoder struct {
90117
cbgType reflect.Type
91118
}
@@ -104,6 +131,17 @@ func (decoder *cbgDecoder) DecodeFromCbor(encoded []byte) (Encodable, error) {
104131
return decoded, nil
105132
}
106133

134+
func (decoder *cbgDecoder) DecodeFromNode(node datamodel.Node) (Encodable, error) {
135+
if tn, ok := node.(schema.TypedNode); ok {
136+
node = tn.Representation()
137+
}
138+
buf := &bytes.Buffer{}
139+
if err := dagcbor.Encode(node, buf); err != nil {
140+
return nil, err
141+
}
142+
return decoder.DecodeFromCbor(buf.Bytes())
143+
}
144+
107145
type defaultDecoder struct {
108146
ptrType reflect.Type
109147
}
@@ -120,3 +158,14 @@ func (decoder *defaultDecoder) DecodeFromCbor(encoded []byte) (Encodable, error)
120158
}
121159
return decoded, nil
122160
}
161+
162+
func (decoder *defaultDecoder) DecodeFromNode(node datamodel.Node) (Encodable, error) {
163+
if tn, ok := node.(schema.TypedNode); ok {
164+
node = tn.Representation()
165+
}
166+
buf := &bytes.Buffer{}
167+
if err := dagcbor.Encode(node, buf); err != nil {
168+
return nil, err
169+
}
170+
return decoder.DecodeFromCbor(buf.Bytes())
171+
}

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/ipfs/go-cid v0.1.0
1414
github.com/ipfs/go-datastore v0.5.1
1515
github.com/ipfs/go-ds-badger v0.3.0
16-
github.com/ipfs/go-graphsync v0.12.0
16+
github.com/ipfs/go-graphsync v0.13.0
1717
github.com/ipfs/go-ipfs-blockstore v1.1.2
1818
github.com/ipfs/go-ipfs-blocksutil v0.0.1
1919
github.com/ipfs/go-ipfs-chunker v0.0.5
@@ -25,7 +25,7 @@ require (
2525
github.com/ipfs/go-log/v2 v2.3.0
2626
github.com/ipfs/go-merkledag v0.5.1
2727
github.com/ipfs/go-unixfs v0.3.1
28-
github.com/ipld/go-ipld-prime v0.14.4
28+
github.com/ipld/go-ipld-prime v0.16.0
2929
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
3030
github.com/jpillora/backoff v1.0.0
3131
github.com/libp2p/go-libp2p v0.16.0

go.sum

+10-10
Original file line numberDiff line numberDiff line change
@@ -195,8 +195,9 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
195195
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
196196
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
197197
github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k=
198-
github.com/frankban/quicktest v1.14.0 h1:+cqqvzZV87b4adx/5ayVOaYZ2CrvM4ejQvUdBzPPUss=
199198
github.com/frankban/quicktest v1.14.0/go.mod h1:NeW+ay9A/U67EYXNFA1nPE8e/tnQv/09mUdL/ijj8og=
199+
github.com/frankban/quicktest v1.14.2 h1:SPb1KFFmM+ybpEjPUhCCkZOM5xlovT5UbrMvWnXyBns=
200+
github.com/frankban/quicktest v1.14.2/go.mod h1:mgiwOwqx65TmIk1wJ6Q7wvnVMocbUorkibMOrVTHZps=
200201
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
201202
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
202203
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
@@ -278,8 +279,9 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
278279
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
279280
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
280281
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
281-
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
282282
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
283+
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
284+
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
283285
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
284286
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
285287
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
@@ -307,7 +309,6 @@ github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE0
307309
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
308310
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
309311
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
310-
github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
311312
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f h1:KMlcu9X58lhTA/KrfX8Bi1LQSO4pzoVjTiL3h4Jk+Zk=
312313
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
313314
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
@@ -403,8 +404,8 @@ github.com/ipfs/go-ds-leveldb v0.0.1/go.mod h1:feO8V3kubwsEF22n0YRQCffeb79OOYIyk
403404
github.com/ipfs/go-ds-leveldb v0.4.1/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
404405
github.com/ipfs/go-ds-leveldb v0.4.2/go.mod h1:jpbku/YqBSsBc1qgME8BkWS4AxzF2cEu1Ii2r79Hh9s=
405406
github.com/ipfs/go-ds-leveldb v0.5.0/go.mod h1:d3XG9RUDzQ6V4SHi8+Xgj9j1XuEk1z82lquxrVbml/Q=
406-
github.com/ipfs/go-graphsync v0.12.0 h1:QCsVHVzb9FTkcm3NEa8GjXnUeGit1L9s08HcSVQ4m/g=
407-
github.com/ipfs/go-graphsync v0.12.0/go.mod h1:nASYWYETgsnMbQ3+DirNImOHQ8TY0a5AhAqyOY55tUg=
407+
github.com/ipfs/go-graphsync v0.13.0 h1:8reYjVKxKocJ9jD471xs9XNuegquPrnBFuGZmCqT8zU=
408+
github.com/ipfs/go-graphsync v0.13.0/go.mod h1:oPBU9JGNlyWHyH9lWYmyl19M++5yiXjBnNC4boh5nVU=
408409
github.com/ipfs/go-ipfs-blockstore v0.2.1/go.mod h1:jGesd8EtCM3/zPgx+qr0/feTXGUeRai6adgwC+Q+JvE=
409410
github.com/ipfs/go-ipfs-blockstore v1.1.2 h1:WCXoZcMYnvOTmlpX+RSSnhVN0uCmbWTeepTGX5lgiXw=
410411
github.com/ipfs/go-ipfs-blockstore v1.1.2/go.mod h1:w51tNR9y5+QXB0wkNcHt4O2aSZjTdqaEWaQdSxEyUOY=
@@ -469,8 +470,8 @@ github.com/ipfs/go-peertaskqueue v0.7.1 h1:7PLjon3RZwRQMgOTvYccZ+mjzkmds/7YzSWKF
469470
github.com/ipfs/go-peertaskqueue v0.7.1/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
470471
github.com/ipfs/go-unixfs v0.3.1 h1:LrfED0OGfG98ZEegO4/xiprx2O+yS+krCMQSp7zLVv8=
471472
github.com/ipfs/go-unixfs v0.3.1/go.mod h1:h4qfQYzghiIc8ZNFKiLMFWOTzrWIAtzYQ59W/pCFf1o=
472-
github.com/ipfs/go-unixfsnode v1.2.0 h1:tHHBJftsJyHGa8bS62PpkYNqHy/Sug3c/vxxC8NaGQY=
473-
github.com/ipfs/go-unixfsnode v1.2.0/go.mod h1:mQEgLjxkV/1mohkC4p7taRRBYPBeXu97SA3YaerT2q0=
473+
github.com/ipfs/go-unixfsnode v1.4.0 h1:9BUxHBXrbNi8mWHc6j+5C580WJqtVw9uoeEKn4tMhwA=
474+
github.com/ipfs/go-unixfsnode v1.4.0/go.mod h1:qc7YFFZ8tABc58p62HnIYbUMwj9chhUuFWmxSokfePo=
474475
github.com/ipfs/go-verifcid v0.0.1 h1:m2HI7zIuR5TFyQ1b79Da5N9dnnCP1vcu2QqawmWlK2E=
475476
github.com/ipfs/go-verifcid v0.0.1/go.mod h1:5Hrva5KBeIog4A+UpqlaIU+DEstipcJYQQZc0g37pY0=
476477
github.com/ipld/go-car/v2 v2.1.1 h1:saaKz4nC0AdfCGHLYKeXLGn8ivoPC54fyS55uyOLKwA=
@@ -480,8 +481,8 @@ github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9
480481
github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
481482
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
482483
github.com/ipld/go-ipld-prime v0.14.0/go.mod h1:9ASQLwUFLptCov6lIYc70GRB4V7UTyLD0IJtrDJe6ZM=
483-
github.com/ipld/go-ipld-prime v0.14.4 h1:bqhmume8+nbNsX4/+J6eohktfZHAI8GKrF3rQ0xgOyc=
484-
github.com/ipld/go-ipld-prime v0.14.4/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
484+
github.com/ipld/go-ipld-prime v0.16.0 h1:RS5hhjB/mcpeEPJvfyj0qbOj/QL+/j05heZ0qa97dVo=
485+
github.com/ipld/go-ipld-prime v0.16.0/go.mod h1:axSCuOCBPqrH+gvXr2w9uAOulJqBPhHPT2PjoiiU1qA=
485486
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20211210234204-ce2a1c70cd73/go.mod h1:2PJ0JgxyB08t0b2WKrcuqI3di0V+5n6RS/LTUJhkoxY=
486487
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
487488
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
@@ -1055,7 +1056,6 @@ github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6Mwd
10551056
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
10561057
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
10571058
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
1058-
github.com/smartystreets/assertions v1.0.0/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
10591059
github.com/smartystreets/assertions v1.0.1 h1:voD4ITNjPL5jjBfgR/r8fPIIBrliWrWHeiJApdr3r4w=
10601060
github.com/smartystreets/assertions v1.0.1/go.mod h1:kHHU4qYBaI3q23Pp3VPrmWhuIUrLW/7eUrw0BU5VaoM=
10611061
github.com/smartystreets/goconvey v0.0.0-20190222223459-a17d461953aa/go.mod h1:2RVY1rIf+2J2o/IM9+vPq9RzmHDSseB7FoXiSNIUsoU=

impl/integration_test.go

+22-36
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,6 @@ func TestAutoRestart(t *testing.T) {
920920
// 6. The connection is automatically re-established and the transfer completes
921921
func TestAutoRestartAfterBouncingInitiator(t *testing.T) {
922922
t.Skip("flaky test")
923-
SetDTLogLevelDebug()
924923

925924
runTest := func(t *testing.T, isPush bool) {
926925
ctx := context.Background()
@@ -1484,13 +1483,13 @@ func TestPauseAndResume(t *testing.T) {
14841483
resumeResponder := make(chan struct{}, 2)
14851484
var subscriber datatransfer.Subscriber = func(event datatransfer.Event, channelState datatransfer.ChannelState) {
14861485

1487-
if event.Code == datatransfer.DataQueued {
1486+
if event.Code == datatransfer.DataQueuedProgress {
14881487
if channelState.Queued() > 0 {
14891488
sent <- channelState.Queued()
14901489
}
14911490
}
14921491

1493-
if event.Code == datatransfer.DataReceived {
1492+
if event.Code == datatransfer.DataReceivedProgress {
14941493
if channelState.Received() > 0 {
14951494
received <- channelState.Received()
14961495
}
@@ -1784,7 +1783,7 @@ func (fgsr *fakeGraphSyncReceiver) consumeResponses(ctx context.Context, t *test
17841783
t.Fail()
17851784
case gsMessageReceived = <-fgsr.receivedMessages:
17861785
responses := gsMessageReceived.message.Responses()
1787-
if (len(responses) > 0) && gsmsg.IsTerminalResponseCode(responses[0].Status()) {
1786+
if (len(responses) > 0) && responses[0].Status().IsTerminal() {
17881787
return responses[0].Status()
17891788
}
17901789
}
@@ -1834,16 +1833,13 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
18341833
}
18351834
requestReceived := messageReceived.message.(datatransfer.Request)
18361835

1837-
var buf bytes.Buffer
18381836
response, err := message.NewResponse(requestReceived.TransferID(), true, false, voucherResult.Type(), voucherResult)
18391837
require.NoError(t, err)
1840-
err = response.ToNet(&buf)
1838+
nd, err := response.ToIPLD()
18411839
require.NoError(t, err)
1842-
extData := buf.Bytes()
1843-
1844-
request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
1840+
request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
18451841
Name: extension.ExtensionDataTransfer1_1,
1846-
Data: extData,
1842+
Data: nd,
18471843
})
18481844
builder := gsmsg.NewBuilder()
18491845
builder.AddRequest(request)
@@ -1852,20 +1848,17 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
18521848
require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage))
18531849

18541850
status := gsr.consumeResponses(ctx, t)
1855-
require.False(t, gsmsg.IsTerminalFailureCode(status))
1851+
require.False(t, status.IsFailure())
18561852
})
18571853

18581854
t.Run("when no request is initiated", func(t *testing.T) {
1859-
var buf bytes.Buffer
1860-
response, err := message.NewResponse(datatransfer.TransferID(rand.Uint64()), true, false, voucher.Type(), voucher)
1855+
response, err := message.NewResponse(datatransfer.TransferID(rand.Uint32()), true, false, voucher.Type(), voucher)
18611856
require.NoError(t, err)
1862-
err = response.ToNet(&buf)
1857+
nd, err := response.ToIPLD()
18631858
require.NoError(t, err)
1864-
extData := buf.Bytes()
1865-
1866-
request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
1859+
request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
18671860
Name: extension.ExtensionDataTransfer1_1,
1868-
Data: extData,
1861+
Data: nd,
18691862
})
18701863
builder := gsmsg.NewBuilder()
18711864
builder.AddRequest(request)
@@ -1874,7 +1867,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
18741867
require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage))
18751868

18761869
status := gsr.consumeResponses(ctx, t)
1877-
require.True(t, gsmsg.IsTerminalFailureCode(status))
1870+
require.True(t, status.IsFailure())
18781871
})
18791872
}
18801873

@@ -1923,15 +1916,15 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) {
19231916
case <-r.messageReceived:
19241917
}
19251918

1926-
request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()))
1919+
request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()))
19271920
builder := gsmsg.NewBuilder()
19281921
builder.AddRequest(request)
19291922
gsmessage, err := builder.Build()
19301923
require.NoError(t, err)
19311924
require.NoError(t, gsData.GsNet2.SendMessage(ctx, host1.ID(), gsmessage))
19321925

19331926
status := gsr.consumeResponses(ctx, t)
1934-
assert.False(t, gsmsg.IsTerminalFailureCode(status))
1927+
assert.False(t, status.IsFailure())
19351928
})
19361929
}
19371930

@@ -1954,14 +1947,10 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
19541947
voucher := testutil.NewFakeDTType()
19551948
request, err := message.NewRequest(id, false, true, voucher.Type(), voucher, testutil.GenerateCids(1)[0], gsData.AllSelector)
19561949
require.NoError(t, err)
1957-
buf := new(bytes.Buffer)
1958-
err = request.ToNet(buf)
1959-
require.NoError(t, err)
1960-
extData := buf.Bytes()
1961-
1962-
gsRequest := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
1950+
nd, err := request.ToIPLD()
1951+
gsRequest := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
19631952
Name: extension.ExtensionDataTransfer1_1,
1964-
Data: extData,
1953+
Data: nd,
19651954
})
19661955

19671956
// initiator requests data over graphsync network
@@ -1971,7 +1960,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
19711960
require.NoError(t, err)
19721961
require.NoError(t, gsData.GsNet1.SendMessage(ctx, gsData.Host2.ID(), gsmessage))
19731962
status := gsr.consumeResponses(ctx, t)
1974-
require.False(t, gsmsg.IsTerminalFailureCode(status))
1963+
require.False(t, status.IsFailure())
19751964
},
19761965
},
19771966
"When request is initiated, but fails validation": {
@@ -1986,13 +1975,11 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
19861975
dtRequest, err := message.NewRequest(id, false, true, voucher.Type(), voucher, testutil.GenerateCids(1)[0], gsData.AllSelector)
19871976
require.NoError(t, err)
19881977

1989-
buf := new(bytes.Buffer)
1990-
err = dtRequest.ToNet(buf)
1978+
nd, err := dtRequest.ToIPLD()
19911979
require.NoError(t, err)
1992-
extData := buf.Bytes()
1993-
request := gsmsg.NewRequest(graphsync.RequestID(rand.Int31()), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
1980+
request := gsmsg.NewRequest(graphsync.NewRequestID(), link.(cidlink.Link).Cid, gsData.AllSelector, graphsync.Priority(rand.Int31()), graphsync.ExtensionData{
19941981
Name: extension.ExtensionDataTransfer1_1,
1995-
Data: extData,
1982+
Data: nd,
19961983
})
19971984
builder := gsmsg.NewBuilder()
19981985
builder.AddRequest(request)
@@ -2003,7 +1990,7 @@ func TestRespondingToPullGraphsyncRequests(t *testing.T) {
20031990
// because there was no previous request
20041991
require.NoError(t, gsData.GsNet1.SendMessage(ctx, gsData.Host2.ID(), gsmessage))
20051992
status := gsr.consumeResponses(ctx, t)
2006-
require.True(t, gsmsg.IsTerminalFailureCode(status))
1993+
require.True(t, status.IsFailure())
20071994
},
20081995
},
20091996
}
@@ -2189,7 +2176,6 @@ func (r *completeRevalidator) OnComplete(chid datatransfer.ChannelID) (bool, dat
21892176
}
21902177

21912178
func TestMultipleParallelTransfers(t *testing.T) {
2192-
SetDTLogLevelDebug()
21932179

21942180
// Add more sizes here to trigger more transfers.
21952181
sizes := []int{300000, 256000, 200000, 256000}

message.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import (
55

66
"github.com/ipfs/go-cid"
77
"github.com/ipld/go-ipld-prime"
8+
"github.com/ipld/go-ipld-prime/datamodel"
89
"github.com/libp2p/go-libp2p-core/protocol"
9-
cborgen "github.com/whyrusleeping/cbor-gen"
1010

1111
"github.com/filecoin-project/go-data-transfer/encoding"
1212
)
@@ -27,9 +27,8 @@ type Message interface {
2727
IsPaused() bool
2828
IsCancel() bool
2929
TransferID() TransferID
30-
cborgen.CBORMarshaler
31-
cborgen.CBORUnmarshaler
3230
ToNet(w io.Writer) error
31+
ToIPLD() (datamodel.Node, error)
3332
MessageForProtocol(targetProtocol protocol.ID) (newMsg Message, err error)
3433
}
3534

message/message.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package message
22

33
import (
4-
"github.com/filecoin-project/go-data-transfer/message/message1_1"
4+
message1_1 "github.com/filecoin-project/go-data-transfer/message/message1_1prime"
55
)
66

77
var NewRequest = message1_1.NewRequest
@@ -14,5 +14,6 @@ var VoucherResultResponse = message1_1.VoucherResultResponse
1414
var CancelResponse = message1_1.CancelResponse
1515
var UpdateResponse = message1_1.UpdateResponse
1616
var FromNet = message1_1.FromNet
17+
var FromIPLD = message1_1.FromIPLD
1718
var CompleteResponse = message1_1.CompleteResponse
1819
var CancelRequest = message1_1.CancelRequest

0 commit comments

Comments
 (0)