Skip to content

WIP: add the ipld plus dag-cbor protocol v1.1 #323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/ipfs/go-graphsync
go 1.16

require (
github.com/google/go-cmp v0.5.6
github.com/hannahhoward/cbor-gen-for v0.0.0-20200817222906-ea96cece81f1
github.com/hannahhoward/go-pubsub v0.0.0-20200423002714-8d62886cc36e
github.com/ipfs/go-block-format v0.0.3
Expand All @@ -25,7 +26,7 @@ require (
github.com/ipfs/go-peertaskqueue v0.7.1
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.14.3
github.com/ipld/go-ipld-prime v0.14.4
github.com/jbenet/go-random v0.0.0-20190219211222-123a90aedc0c
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.16.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ github.com/ipld/go-codec-dagpb v1.3.0 h1:czTcaoAuNNyIYWs6Qe01DJ+sEX7B+1Z0LcXjSat
github.com/ipld/go-codec-dagpb v1.3.0/go.mod h1:ga4JTU3abYApDC3pZ00BC2RSvC3qfBb9MSJkMLSwnhA=
github.com/ipld/go-ipld-prime v0.9.1-0.20210324083106-dc342a9917db/go.mod h1:KvBLMr4PX1gWptgkzRjVZCrLmSGcZCb/jioOQwCqZN8=
github.com/ipld/go-ipld-prime v0.11.0/go.mod h1:+WIAkokurHmZ/KwzDOMUuoeJgaRQktHtEaLglS3ZeV8=
github.com/ipld/go-ipld-prime v0.14.3 h1:cGUmxSws2IHurn00/iLMDapeXsnf9+FyAtYVy8G/JsQ=
github.com/ipld/go-ipld-prime v0.14.3/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/ipld/go-ipld-prime v0.14.4 h1:bqhmume8+nbNsX4/+J6eohktfZHAI8GKrF3rQ0xgOyc=
github.com/ipld/go-ipld-prime v0.14.4/go.mod h1:QcE4Y9n/ZZr8Ijg5bGPT0GqYWgZ1704nH0RDcQtgTP0=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
97 changes: 97 additions & 0 deletions message/bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package message

import (
"bytes"
"math/rand"
"reflect"
"testing"

"github.com/google/go-cmp/cmp"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-graphsync"
"github.com/ipfs/go-graphsync/message/ipldbind"
"github.com/ipfs/go-graphsync/testutil"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/node/basicnode"
"github.com/ipld/go-ipld-prime/node/bindnode"
"github.com/ipld/go-ipld-prime/traversal/selector/builder"
"github.com/stretchr/testify/require"
)

func BenchmarkMessageEncodingRoundtrip(b *testing.B) {
root := testutil.GenerateCids(1)[0]
ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any)
selector := ssb.Matcher().Node()
extensionName := graphsync.ExtensionName("graphsync/awesome")
extension := graphsync.ExtensionData{
Name: extensionName,
Data: testutil.RandomBytes(100),
}
id := graphsync.RequestID(rand.Int31())
priority := graphsync.Priority(rand.Int31())
status := graphsync.RequestAcknowledged

builder := NewBuilder()
builder.AddRequest(NewRequest(id, root, selector, priority, extension))
builder.AddRequest(NewRequest(id, root, selector, priority))
builder.AddResponseCode(id, status)
builder.AddExtensionData(id, extension)
builder.AddBlock(blocks.NewBlock([]byte("W")))
builder.AddBlock(blocks.NewBlock([]byte("E")))
builder.AddBlock(blocks.NewBlock([]byte("F")))
builder.AddBlock(blocks.NewBlock([]byte("M")))

gsm, err := builder.Build()
require.NoError(b, err)

b.Run("Protobuf", func(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
buf := new(bytes.Buffer)
for pb.Next() {
buf.Reset()

err := gsm.ToNet(buf)
require.NoError(b, err)

gsm2, err := FromNet(buf)
require.NoError(b, err)

// Note that require.Equal doesn't seem to handle maps well.
// It says they are non-equal simply because their order isn't deterministic.
if diff := cmp.Diff(gsm, gsm2, cmp.Exporter(func(reflect.Type) bool { return true })); diff != "" {
b.Fatal(diff)
}
}
})
})

b.Run("DagCbor", func(b *testing.B) {
b.ReportAllocs()
b.RunParallel(func(pb *testing.PB) {
buf := new(bytes.Buffer)
for pb.Next() {
buf.Reset()

ipldGSM, err := gsm.ToIPLD()
require.NoError(b, err)
node := bindnode.Wrap(ipldGSM, ipldbind.Prototype.Message.Type())
err = dagcbor.Encode(node.Representation(), buf)
require.NoError(b, err)

builder := ipldbind.Prototype.Message.Representation().NewBuilder()
err = dagcbor.Decode(builder, buf)
require.NoError(b, err)
node2 := builder.Build()
ipldGSM2 := bindnode.Unwrap(node2).(*ipldbind.GraphSyncMessage)
gsm2, err := messageFromIPLD(ipldGSM2)
require.NoError(b, err)

// same as above.
if diff := cmp.Diff(gsm, gsm2, cmp.Exporter(func(reflect.Type) bool { return true })); diff != "" {
b.Fatal(diff)
}
}
})
})
}
123 changes: 123 additions & 0 deletions message/ipldbind/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package ipldbind

import (
"io"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"

"github.com/ipfs/go-graphsync"
pb "github.com/ipfs/go-graphsync/message/pb"
)

// IsTerminalSuccessCode returns true if the response code indicates the
// request terminated successfully.
// DEPRECATED: use status.IsSuccess()
func IsTerminalSuccessCode(status graphsync.ResponseStatusCode) bool {
return status.IsSuccess()
}

// IsTerminalFailureCode returns true if the response code indicates the
// request terminated in failure.
// DEPRECATED: use status.IsFailure()
func IsTerminalFailureCode(status graphsync.ResponseStatusCode) bool {
return status.IsFailure()
}

// IsTerminalResponseCode returns true if the response code signals
// the end of the request
// DEPRECATED: use status.IsTerminal()
func IsTerminalResponseCode(status graphsync.ResponseStatusCode) bool {
return status.IsTerminal()
}

// Exportable is an interface that can serialize to a protobuf
type Exportable interface {
ToProto() (*pb.Message, error)
ToNet(w io.Writer) error
}

type GraphSyncExtensions struct {
Keys []string
Values map[string]datamodel.Node
}

// GraphSyncRequest is a struct to capture data on a request contained in a
// GraphSyncMessage.
type GraphSyncRequest struct {
Id graphsync.RequestID

Root cid.Cid
Selector ipld.Node
Extensions GraphSyncExtensions
Priority graphsync.Priority
Cancel bool
Update bool
}

type GraphSyncMetadatum struct {
Link datamodel.Link
BlockPresent bool
}

// GraphSyncResponse is an struct to capture data on a response sent back
// in a GraphSyncMessage.
type GraphSyncResponse struct {
Id graphsync.RequestID

Status graphsync.ResponseStatusCode
Metadata []GraphSyncMetadatum
Extensions GraphSyncExtensions
}

type GraphSyncBlock struct {
Prefix []byte
Data []byte
}

func FromBlockFormat(block blocks.Block) GraphSyncBlock {
return GraphSyncBlock{
Prefix: block.Cid().Prefix().Bytes(),
Data: block.RawData(),
}
}

func (b GraphSyncBlock) BlockFormat() *blocks.BasicBlock {
pref, err := cid.PrefixFromBytes(b.Prefix)
if err != nil {
panic(err) // should never happen
}

c, err := pref.Sum(b.Data)
if err != nil {
panic(err) // should never happen
}

block, err := blocks.NewBlockWithCid(b.Data, c)
if err != nil {
panic(err) // should never happen
}
return block
}

func BlockFormatSlice(bs []GraphSyncBlock) []blocks.Block {
blks := make([]blocks.Block, len(bs))
for i, b := range bs {
blks[i] = b.BlockFormat()
}
return blks
}

type GraphSyncMessage struct {
Requests []GraphSyncRequest
Responses []GraphSyncResponse
Blocks []GraphSyncBlock
}

// NamedExtension exists just for the purpose of the constructors.
type NamedExtension struct {
Name graphsync.ExtensionName
Data ipld.Node
}
28 changes: 28 additions & 0 deletions message/ipldbind/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ipldbind

import (
_ "embed"

"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/node/bindnode"
"github.com/ipld/go-ipld-prime/schema"
)

//go:embed schema.ipldsch
var embedSchema []byte

var schemaTypeSystem *schema.TypeSystem

var Prototype struct {
Message schema.TypedPrototype
}

func init() {
ts, err := ipld.LoadSchemaBytes(embedSchema)
if err != nil {
panic(err)
}
schemaTypeSystem = ts

Prototype.Message = bindnode.Prototype((*GraphSyncMessage)(nil), ts.TypeByName("GraphSyncMessage"))
}
63 changes: 63 additions & 0 deletions message/ipldbind/schema.ipldsch
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
type GraphSyncExtensions {String:Any}
type GraphSyncRequestID int
type GraphSyncPriority int

type GraphSyncMetadatum struct {
link Link
blockPresent Bool
} representation tuple

type GraphSyncMetadata [GraphSyncMetadatum]

type GraphSyncResponseStatusCode enum {
# Informational Codes (request in progress)

| RequestAcknowledged ("10")
| AdditionalPeers ("11")
| NotEnoughGas ("12")
| OtherProtocol ("13")
| PartialResponse ("14")
| RequestPaused ("15")

# Success Response Codes (request terminated)

| RequestCompletedFull ("20")
| RequestCompletedPartial ("21")

# Error Response Codes (request terminated)

| RequestRejected ("30")
| RequestFailedBusy ("31")
| RequestFailedUnknown ("32")
| RequestFailedLegal ("33")
| RequestFailedContentNotFound ("34")
| RequestCancelled ("35")
} representation int

type GraphSyncRequest struct {
id GraphSyncRequestID (rename "ID") # unique id set on the requester side
root Link (rename "Root") # a CID for the root node in the query
selector Any (rename "Sel") # see https://github.com/ipld/specs/blob/master/selectors/selectors.md
extensions GraphSyncExtensions (rename "Ext") # side channel information
priority GraphSyncPriority (rename "Pri") # the priority (normalized). default to 1
cancel Bool (rename "Canc") # whether this cancels a request
update Bool (rename "Updt") # whether this is an update to an in progress request
} representation map

type GraphSyncResponse struct {
id GraphSyncRequestID (rename "ID") # the request id we are responding to
status GraphSyncResponseStatusCode (rename "Stat") # a status code.
metadata GraphSyncMetadata (rename "Meta") # metadata about response
extensions GraphSyncExtensions (rename "Ext") # side channel information
} representation map

type GraphSyncBlock struct {
prefix Bytes (rename "Pre") # CID prefix (cid version, multicodec and multihash prefix (type + length)
data Bytes (rename "Data")
} representation map

type GraphSyncMessage struct {
requests [GraphSyncRequest] (rename "Reqs")
responses [GraphSyncResponse] (rename "Rsps")
blocks [GraphSyncBlock] (rename "Blks")
} representation map
Loading