diff --git a/go.mod b/go.mod index 641b1177..a23ef5b4 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/ipfs/go-graphsync go 1.16 require ( + github.com/google/go-cmp v0.5.6 github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1 github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e github.com/ipfs/go-block-format v0.0.3 @@ -25,7 +26,7 @@ require ( github.com/ipfs/go-peertaskqueue v0.7.1 github.com/ipfs/go-unixfs v0.2.4 github.com/ipld/go-codec-dagpb v1.3.0 - github.com/ipld/go-ipld-prime v0.14.3 + github.com/ipld/go-ipld-prime v0.14.4 github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c github.com/libp2p/go-buffer-pool v0.0.2 github.com/libp2p/go-libp2p v0.16.0 diff --git a/go.sum b/go.sum index 3ee093b9..3cc56de9 100644 --- a/go.sum +++ b/go.sum @@ -457,8 +457,8 @@ github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSat github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA= github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8= github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8= -github.com/ipld/go-ipld-prime v0.14.3 h1:cGUmxSws2IHurn00/iLMDapeXsnf9+FyAtYVy8G/JsQ= -github.com/ipld/go-ipld-prime v0.14.3/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0= +github.com/ipld/go-ipld-prime v0.14.4 h1:bqhmume8+nbNsX4/+J6eohktfZHAI8GKrF3rQ0xgOyc= +github.com/ipld/go-ipld-prime v0.14.4/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0= github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA= github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= diff --git a/message/bench_test.go b/message/bench_test.go new file mode 100644 index 00000000..1b346e67 --- /dev/null +++ b/message/bench_test.go @@ -0,0 +1,97 @@ +package message + +import ( + "bytes" + "math/rand" + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-graphsync" + "github.com/ipfs/go-graphsync/message/ipldbind" + "github.com/ipfs/go-graphsync/testutil" + "github.com/ipld/go-ipld-prime/codec/dagcbor" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" + "github.com/stretchr/testify/require" +) + +func BenchmarkMessageEncodingRoundtrip(b *testing.B) { + root := testutil.GenerateCids(1)[0] + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + selector := ssb.Matcher().Node() + extensionName := graphsync.ExtensionName("graphsync/awesome") + extension := graphsync.ExtensionData{ + Name: extensionName, + Data: testutil.RandomBytes(100), + } + id := graphsync.RequestID(rand.Int31()) + priority := graphsync.Priority(rand.Int31()) + status := graphsync.RequestAcknowledged + + builder := NewBuilder() + builder.AddRequest(NewRequest(id, root, selector, priority, extension)) + builder.AddRequest(NewRequest(id, root, selector, priority)) + builder.AddResponseCode(id, status) + builder.AddExtensionData(id, extension) + builder.AddBlock(blocks.NewBlock([]byte("W"))) + builder.AddBlock(blocks.NewBlock([]byte("E"))) + builder.AddBlock(blocks.NewBlock([]byte("F"))) + builder.AddBlock(blocks.NewBlock([]byte("M"))) + + gsm, err := builder.Build() + require.NoError(b, err) + + b.Run("Protobuf", func(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + buf := new(bytes.Buffer) + for pb.Next() { + buf.Reset() + + err := gsm.ToNet(buf) + require.NoError(b, err) + + gsm2, err := FromNet(buf) + require.NoError(b, err) + + // Note that require.Equal doesn't seem to handle maps well. + // It says they are non-equal simply because their order isn't deterministic. + if diff := cmp.Diff(gsm, gsm2, cmp.Exporter(func(reflect.Type) bool { return true })); diff != "" { + b.Fatal(diff) + } + } + }) + }) + + b.Run("DagCbor", func(b *testing.B) { + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + buf := new(bytes.Buffer) + for pb.Next() { + buf.Reset() + + ipldGSM, err := gsm.ToIPLD() + require.NoError(b, err) + node := bindnode.Wrap(ipldGSM, ipldbind.Prototype.Message.Type()) + err = dagcbor.Encode(node.Representation(), buf) + require.NoError(b, err) + + builder := ipldbind.Prototype.Message.Representation().NewBuilder() + err = dagcbor.Decode(builder, buf) + require.NoError(b, err) + node2 := builder.Build() + ipldGSM2 := bindnode.Unwrap(node2).(*ipldbind.GraphSyncMessage) + gsm2, err := messageFromIPLD(ipldGSM2) + require.NoError(b, err) + + // same as above. + if diff := cmp.Diff(gsm, gsm2, cmp.Exporter(func(reflect.Type) bool { return true })); diff != "" { + b.Fatal(diff) + } + } + }) + }) +} diff --git a/message/ipldbind/message.go b/message/ipldbind/message.go new file mode 100644 index 00000000..c78ef556 --- /dev/null +++ b/message/ipldbind/message.go @@ -0,0 +1,123 @@ +package ipldbind + +import ( + "io" + + blocks "github.com/ipfs/go-block-format" + cid "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + + "github.com/ipfs/go-graphsync" + pb "github.com/ipfs/go-graphsync/message/pb" +) + +// IsTerminalSuccessCode returns true if the response code indicates the +// request terminated successfully. +// DEPRECATED: use status.IsSuccess() +func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool { + return status.IsSuccess() +} + +// IsTerminalFailureCode returns true if the response code indicates the +// request terminated in failure. +// DEPRECATED: use status.IsFailure() +func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool { + return status.IsFailure() +} + +// IsTerminalResponseCode returns true if the response code signals +// the end of the request +// DEPRECATED: use status.IsTerminal() +func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool { + return status.IsTerminal() +} + +// Exportable is an interface that can serialize to a protobuf +type Exportable interface { + ToProto() (*pb.Message, error) + ToNet(w io.Writer) error +} + +type GraphSyncExtensions struct { + Keys []string + Values map[string]datamodel.Node +} + +// GraphSyncRequest is a struct to capture data on a request contained in a +// GraphSyncMessage. +type GraphSyncRequest struct { + Id graphsync.RequestID + + Root cid.Cid + Selector ipld.Node + Extensions GraphSyncExtensions + Priority graphsync.Priority + Cancel bool + Update bool +} + +type GraphSyncMetadatum struct { + Link datamodel.Link + BlockPresent bool +} + +// GraphSyncResponse is an struct to capture data on a response sent back +// in a GraphSyncMessage. +type GraphSyncResponse struct { + Id graphsync.RequestID + + Status graphsync.ResponseStatusCode + Metadata []GraphSyncMetadatum + Extensions GraphSyncExtensions +} + +type GraphSyncBlock struct { + Prefix []byte + Data []byte +} + +func FromBlockFormat(block blocks.Block) GraphSyncBlock { + return GraphSyncBlock{ + Prefix: block.Cid().Prefix().Bytes(), + Data: block.RawData(), + } +} + +func (b GraphSyncBlock) BlockFormat() *blocks.BasicBlock { + pref, err := cid.PrefixFromBytes(b.Prefix) + if err != nil { + panic(err) // should never happen + } + + c, err := pref.Sum(b.Data) + if err != nil { + panic(err) // should never happen + } + + block, err := blocks.NewBlockWithCid(b.Data, c) + if err != nil { + panic(err) // should never happen + } + return block +} + +func BlockFormatSlice(bs []GraphSyncBlock) []blocks.Block { + blks := make([]blocks.Block, len(bs)) + for i, b := range bs { + blks[i] = b.BlockFormat() + } + return blks +} + +type GraphSyncMessage struct { + Requests []GraphSyncRequest + Responses []GraphSyncResponse + Blocks []GraphSyncBlock +} + +// NamedExtension exists just for the purpose of the constructors. +type NamedExtension struct { + Name graphsync.ExtensionName + Data ipld.Node +} diff --git a/message/ipldbind/schema.go b/message/ipldbind/schema.go new file mode 100644 index 00000000..ec7be58a --- /dev/null +++ b/message/ipldbind/schema.go @@ -0,0 +1,28 @@ +package ipldbind + +import ( + _ "embed" + + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/node/bindnode" + "github.com/ipld/go-ipld-prime/schema" +) + +//go:embed schema.ipldsch +var embedSchema []byte + +var schemaTypeSystem *schema.TypeSystem + +var Prototype struct { + Message schema.TypedPrototype +} + +func init() { + ts, err := ipld.LoadSchemaBytes(embedSchema) + if err != nil { + panic(err) + } + schemaTypeSystem = ts + + Prototype.Message = bindnode.Prototype((*GraphSyncMessage)(nil), ts.TypeByName("GraphSyncMessage")) +} diff --git a/message/ipldbind/schema.ipldsch b/message/ipldbind/schema.ipldsch new file mode 100644 index 00000000..89fc5a3d --- /dev/null +++ b/message/ipldbind/schema.ipldsch @@ -0,0 +1,63 @@ +type GraphSyncExtensions {String:Any} +type GraphSyncRequestID int +type GraphSyncPriority int + +type GraphSyncMetadatum struct { + link Link + blockPresent Bool +} representation tuple + +type GraphSyncMetadata [GraphSyncMetadatum] + +type GraphSyncResponseStatusCode enum { + # Informational Codes (request in progress) + + | RequestAcknowledged ("10") + | AdditionalPeers ("11") + | NotEnoughGas ("12") + | OtherProtocol ("13") + | PartialResponse ("14") + | RequestPaused ("15") + + # Success Response Codes (request terminated) + + | RequestCompletedFull ("20") + | RequestCompletedPartial ("21") + + # Error Response Codes (request terminated) + + | RequestRejected ("30") + | RequestFailedBusy ("31") + | RequestFailedUnknown ("32") + | RequestFailedLegal ("33") + | RequestFailedContentNotFound ("34") + | RequestCancelled ("35") +} representation int + +type GraphSyncRequest struct { + id GraphSyncRequestID (rename "ID") # unique id set on the requester side + root Link (rename "Root") # a CID for the root node in the query + selector Any (rename "Sel") # see https://github.com/ipld/specs/blob/master/selectors/selectors.md + extensions GraphSyncExtensions (rename "Ext") # side channel information + priority GraphSyncPriority (rename "Pri") # the priority (normalized). default to 1 + cancel Bool (rename "Canc") # whether this cancels a request + update Bool (rename "Updt") # whether this is an update to an in progress request +} representation map + +type GraphSyncResponse struct { + id GraphSyncRequestID (rename "ID") # the request id we are responding to + status GraphSyncResponseStatusCode (rename "Stat") # a status code. + metadata GraphSyncMetadata (rename "Meta") # metadata about response + extensions GraphSyncExtensions (rename "Ext") # side channel information +} representation map + +type GraphSyncBlock struct { + prefix Bytes (rename "Pre") # CID prefix (cid version, multicodec and multihash prefix (type + length) + data Bytes (rename "Data") +} representation map + +type GraphSyncMessage struct { + requests [GraphSyncRequest] (rename "Reqs") + responses [GraphSyncResponse] (rename "Rsps") + blocks [GraphSyncBlock] (rename "Blks") +} representation map diff --git a/message/message.go b/message/message.go index 6e818847..3a08949e 100644 --- a/message/message.go +++ b/message/message.go @@ -16,6 +16,7 @@ import ( "github.com/ipfs/go-graphsync" "github.com/ipfs/go-graphsync/ipldutil" + "github.com/ipfs/go-graphsync/message/ipldbind" pb "github.com/ipfs/go-graphsync/message/pb" ) @@ -159,9 +160,6 @@ func newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) { } } exts := req.GetExtensions() - if exts == nil { - exts = make(map[string][]byte) - } requests[graphsync.RequestID(req.Id)] = newRequest(graphsync.RequestID(req.Id), root, selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, exts) } @@ -171,9 +169,6 @@ func newMessageFromProto(pbm *pb.Message) (GraphSyncMessage, error) { return GraphSyncMessage{}, errors.New("response is nil") } exts := res.GetExtensions() - if exts == nil { - exts = make(map[string][]byte) - } responses[graphsync.RequestID(res.Id)] = newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), exts) } @@ -308,6 +303,109 @@ func (gsm GraphSyncMessage) ToProto() (*pb.Message, error) { return pbm, nil } +func (gsm GraphSyncMessage) ToIPLD() (*ipldbind.GraphSyncMessage, error) { + ibm := new(ipldbind.GraphSyncMessage) + ibm.Requests = make([]ipldbind.GraphSyncRequest, 0, len(gsm.requests)) + for _, request := range gsm.requests { + ibm.Requests = append(ibm.Requests, ipldbind.GraphSyncRequest{ + Id: request.id, + Root: request.root, + Selector: request.selector, + Priority: request.priority, + Cancel: request.isCancel, + Update: request.isUpdate, + // Extensions: request.extensions, + }) + } + + ibm.Responses = make([]ipldbind.GraphSyncResponse, 0, len(gsm.responses)) + for _, response := range gsm.responses { + ibm.Responses = append(ibm.Responses, ipldbind.GraphSyncResponse{ + Id: response.requestID, + Status: response.status, + // Extensions: response.extensions, + }) + } + + blocks := gsm.Blocks() + ibm.Blocks = make([]ipldbind.GraphSyncBlock, 0, len(blocks)) + for _, b := range blocks { + ibm.Blocks = append(ibm.Blocks, ipldbind.GraphSyncBlock{ + Data: b.RawData(), + Prefix: b.Cid().Prefix().Bytes(), + }) + } + return ibm, nil +} + +func messageFromIPLD(ibm *ipldbind.GraphSyncMessage) (GraphSyncMessage, error) { + requests := make(map[graphsync.RequestID]GraphSyncRequest, len(ibm.Requests)) + for _, req := range ibm.Requests { + // exts := req.Extensions + requests[graphsync.RequestID(req.Id)] = newRequest(graphsync.RequestID(req.Id), req.Root, req.Selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, nil) + } + + responses := make(map[graphsync.RequestID]GraphSyncResponse, len(ibm.Responses)) + for _, res := range ibm.Responses { + // exts := res.Extensions + responses[graphsync.RequestID(res.Id)] = newResponse(graphsync.RequestID(res.Id), graphsync.ResponseStatusCode(res.Status), nil) + } + + blks := make(map[cid.Cid]blocks.Block, len(ibm.Blocks)) + for _, b := range ibm.Blocks { + pref, err := cid.PrefixFromBytes(b.Prefix) + if err != nil { + return GraphSyncMessage{}, err + } + + c, err := pref.Sum(b.Data) + if err != nil { + return GraphSyncMessage{}, err + } + + blk, err := blocks.NewBlockWithCid(b.Data, c) + if err != nil { + return GraphSyncMessage{}, err + } + + blks[blk.Cid()] = blk + } + + return GraphSyncMessage{ + requests, responses, blks, + }, nil +} + +/* +func fromProtoExtensions(protoExts map[string][]byte) GraphSyncExtensions { + var exts []NamedExtension + for name, data := range protoExts { + exts = append(exts, NamedExtension{graphsync.ExtensionName(name), basicnode.NewBytes(data)}) + } + // Iterating over the map above is non-deterministic, + // so sort by the unique names to ensure determinism. + sort.Slice(exts, func(i, j int) bool { return exts[i].Name < exts[j].Name }) + return toExtensionsMap(exts) +} + +func toProtoExtensions(m GraphSyncExtensions) map[string][]byte { + protoExts := make(map[string][]byte, len(m.Values)) + for name, node := range m.Values { + // Only keep those which are plain bytes, + // as those are the only ones that the older protocol clients understand. + if node.Kind() != ipld.Kind_Bytes { + continue + } + raw, err := node.AsBytes() + if err != nil { + panic(err) // shouldn't happen + } + protoExts[name] = raw + } + return protoExts +} +*/ + func (gsm GraphSyncMessage) ToNet(w io.Writer) error { msg, err := gsm.ToProto() if err != nil {