Skip to content

Feat/refactor transport protocol update part 2 #338

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion impl/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,3 @@ func (m *manager) cancelMessage(chid datatransfer.ChannelID) datatransfer.Messag
}
return message.CancelResponse(chid.ID)
}

4 changes: 2 additions & 2 deletions itest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1780,7 +1780,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
r := &receiver{
messageReceived: make(chan receivedMessage),
}
dtnet2.SetDelegate("graphsync", r)
dtnet2.SetDelegate(datatransfer.LegacyTransportID, []datatransfer.Version{datatransfer.LegacyTransportVersion}, r)

gsr := &fakeGraphSyncReceiver{
receivedMessages: make(chan receivedGraphSyncMessage),
Expand Down Expand Up @@ -1860,7 +1860,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) {
r := &receiver{
messageReceived: make(chan receivedMessage),
}
dtnet2.SetDelegate("graphsync", r)
dtnet2.SetDelegate(datatransfer.LegacyTransportID, []datatransfer.Version{datatransfer.LegacyTransportVersion}, r)

gsr := &fakeGraphSyncReceiver{
receivedMessages: make(chan receivedGraphSyncMessage),
Expand Down
30 changes: 19 additions & 11 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,41 @@ import (
"github.com/ipld/go-ipld-prime/datamodel"
)

type MessageVersion struct {
type Version struct {
Major uint64
Minor uint64
Patch uint64
}

func (mv MessageVersion) String() string {
func (mv Version) String() string {
return fmt.Sprintf("%d.%d.%d", mv.Major, mv.Minor, mv.Patch)
}

// MessageVersionFromString parses a string into a message version
func MessageVersionFromString(versionString string) (MessageVersion, error) {
func MessageVersionFromString(versionString string) (Version, error) {
versions := strings.Split(versionString, ".")
if len(versions) != 3 {
return MessageVersion{}, errors.New("not a version string")
return Version{}, errors.New("not a version string")
}
major, err := strconv.ParseUint(versions[0], 10, 0)
if err != nil {
return MessageVersion{}, errors.New("unable to parse major version")
return Version{}, errors.New("unable to parse major version")
}
minor, err := strconv.ParseUint(versions[1], 10, 0)
if err != nil {
return MessageVersion{}, errors.New("unable to parse major version")
return Version{}, errors.New("unable to parse major version")
}
patch, err := strconv.ParseUint(versions[2], 10, 0)
if err != nil {
return MessageVersion{}, errors.New("unable to parse major version")
return Version{}, errors.New("unable to parse major version")
}
return MessageVersion{Major: major, Minor: minor, Patch: patch}, nil
return Version{Major: major, Minor: minor, Patch: patch}, nil
}

var (
// DataTransfer1_2 is the identifier for the current
// supported version of data-transfer
DataTransfer1_2 MessageVersion = MessageVersion{1, 2, 0}
DataTransfer1_2 Version = Version{1, 2, 0}
)

// Message is a message for the data transfer protocol
Expand All @@ -60,8 +60,16 @@ type Message interface {
TransferID() TransferID
ToNet(w io.Writer) error
ToIPLD() (datamodel.Node, error)
MessageForVersion(targetProtocol MessageVersion) (newMsg Message, err error)
WrappedForTransport(transportID TransportID) Message
MessageForVersion(targetProtocol Version) (newMsg Message, err error)
Version() Version
WrappedForTransport(transportID TransportID, transportVersion Version) TransportedMessage
}

// TransportedMessage is a message that can also report how it was transported
type TransportedMessage interface {
Message
TransportID() TransportID
TransportVersion() Version
}

// Request is a response message for the data transfer protocol
Expand Down
27 changes: 23 additions & 4 deletions message/message1_1prime/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,34 @@ func fromMessage(tresp *TransferMessage1_1) (datatransfer.Message, error) {
return tresp.Response, nil
}

func fromWrappedMessage(wtresp *WrappedTransferMessage1_1) (datatransfer.TransportedMessage, error) {
tresp := wtresp.Message
if (tresp.IsRequest && tresp.Request == nil) || (!tresp.IsRequest && tresp.Response == nil) {
return nil, xerrors.Errorf("invalid/malformed message")
}

if tresp.IsRequest {
return &WrappedTransferRequest1_1{
tresp.Request,
wtresp.TransportVersion,
wtresp.TransportID,
}, nil
}
return &WrappedTransferResponse1_1{
tresp.Response,
wtresp.TransportID,
wtresp.TransportVersion,
}, nil
}

// FromNetWrraped can read a network stream to deserialize a message + transport ID
func FromNetWrapped(r io.Reader) (datatransfer.TransportID, datatransfer.Message, error) {
func FromNetWrapped(r io.Reader) (datatransfer.TransportedMessage, error) {
tm, err := ipldutils.FromReader(r, &WrappedTransferMessage1_1{})
if err != nil {
return "", nil, err
return nil, err
}
wtresp := tm.(*WrappedTransferMessage1_1)
msg, err := fromMessage(&wtresp.Message)
return datatransfer.TransportID(wtresp.TransportID), msg, err
return fromWrappedMessage(wtresp)
}

// FromNet can read a network stream to deserialize a GraphSyncMessage
Expand Down
22 changes: 13 additions & 9 deletions message/message1_1prime/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,7 @@ func TestToNetFromNetEquivalency(t *testing.T) {
})
t.Run("round-trip with wrapping", func(t *testing.T) {
transportID := datatransfer.TransportID("applesauce")
transportVersion := datatransfer.Version{Major: 1, Minor: 5, Patch: 0}
baseCid := testutil.GenerateCids(1)[0]
selector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node()
isPull := false
Expand All @@ -519,15 +520,16 @@ func TestToNetFromNetEquivalency(t *testing.T) {
voucherResult := testutil.NewTestTypedVoucher()
request, err := message1_1.NewRequest(id, false, isPull, &voucher, baseCid, selector)
require.NoError(t, err)
wrequest := request.WrappedForTransport(transportID)
wrequest := request.WrappedForTransport(transportID, transportVersion)
buf := new(bytes.Buffer)
err = wrequest.ToNet(buf)
require.NoError(t, err)
require.Greater(t, buf.Len(), 0)
receivedTransportID, deserialized, err := message1_1.FromNetWrapped(buf)
deserialized, err := message1_1.FromNetWrapped(buf)
require.NoError(t, err)

require.Equal(t, transportID, receivedTransportID)
require.Equal(t, transportID, deserialized.TransportID())
require.Equal(t, transportVersion, deserialized.TransportVersion())
deserializedRequest, ok := deserialized.(datatransfer.Request)
require.True(t, ok)

Expand All @@ -541,12 +543,13 @@ func TestToNetFromNetEquivalency(t *testing.T) {

response, err := message1_1.NewResponse(id, accepted, false, &voucherResult)
require.NoError(t, err)
wresponse := response.WrappedForTransport(transportID)
wresponse := response.WrappedForTransport(transportID, transportVersion)
err = wresponse.ToNet(buf)
require.NoError(t, err)
receivedTransportID, deserialized, err = message1_1.FromNetWrapped(buf)
deserialized, err = message1_1.FromNetWrapped(buf)
require.NoError(t, err)
require.Equal(t, transportID, receivedTransportID)
require.Equal(t, transportID, deserialized.TransportID())
require.Equal(t, transportVersion, deserialized.TransportVersion())

deserializedResponse, ok := deserialized.(datatransfer.Response)
require.True(t, ok)
Expand All @@ -559,12 +562,13 @@ func TestToNetFromNetEquivalency(t *testing.T) {
testutil.AssertEqualTestVoucherResult(t, response, deserializedResponse)

request = message1_1.CancelRequest(id)
wrequest = request.WrappedForTransport(transportID)
wrequest = request.WrappedForTransport(transportID, transportVersion)
err = wrequest.ToNet(buf)
require.NoError(t, err)
receivedTransportID, deserialized, err = message1_1.FromNetWrapped(buf)
deserialized, err = message1_1.FromNetWrapped(buf)
require.NoError(t, err)
require.Equal(t, transportID, receivedTransportID)
require.Equal(t, transportID, deserialized.TransportID())
require.Equal(t, transportVersion, deserialized.TransportVersion())

deserializedRequest, ok = deserialized.(datatransfer.Request)
require.True(t, ok)
Expand Down
7 changes: 7 additions & 0 deletions message/message1_1prime/schema.ipldsch
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ type TransferMessage1_1 struct {
Response nullable TransferResponse
}

type Version struct {
Major Int
Minor Int
Patch Int
} representation tuple

type WrappedTransferMessage1_1 struct {
TransportID TransportID (rename "ID")
TransportVersion Version (rename "TV")
Message TransferMessage1_1 (rename "Msg")
}
5 changes: 3 additions & 2 deletions message/message1_1prime/transfer_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ func (tm *TransferMessage1_1) ToNet(w io.Writer) error {
}

type WrappedTransferMessage1_1 struct {
TransportID string
Message TransferMessage1_1
TransportID string
TransportVersion datatransfer.Version
Message TransferMessage1_1
}

func (wtm *WrappedTransferMessage1_1) BindnodeSchema() string {
Expand Down
32 changes: 25 additions & 7 deletions message/message1_1prime/transfer_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type TransferRequest1_1 struct {
RestartChannel datatransfer.ChannelID
}

func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.MessageVersion) (datatransfer.Message, error) {
func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.Version) (datatransfer.Message, error) {
switch version {
case datatransfer.DataTransfer1_2:
return trq, nil
Expand All @@ -37,8 +37,16 @@ func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.MessageVer
}
}

func (trq *TransferRequest1_1) WrappedForTransport(transportID datatransfer.TransportID) datatransfer.Message {
return &WrappedTransferRequest1_1{trq, string(transportID)}
func (trq *TransferRequest1_1) Version() datatransfer.Version {
return datatransfer.DataTransfer1_2
}

func (trq *TransferRequest1_1) WrappedForTransport(transportID datatransfer.TransportID, transportVersion datatransfer.Version) datatransfer.TransportedMessage {
return &WrappedTransferRequest1_1{
TransferRequest1_1: trq,
transportID: string(transportID),
transportVersion: transportVersion,
}
}

// IsRequest always returns true in this case because this is a transfer request
Expand Down Expand Up @@ -171,15 +179,25 @@ func (trq *TransferRequest1_1) ToNet(w io.Writer) error {
// transport id
type WrappedTransferRequest1_1 struct {
*TransferRequest1_1
TransportID string
transportVersion datatransfer.Version
transportID string
}

func (trq *WrappedTransferRequest1_1) TransportID() datatransfer.TransportID {
return datatransfer.TransportID(trq.transportID)
}

func (trq *WrappedTransferRequest1_1) TransportVersion() datatransfer.Version {
return trq.transportVersion
}

func (trsp *WrappedTransferRequest1_1) toIPLD() (schema.TypedNode, error) {
func (trq *WrappedTransferRequest1_1) toIPLD() (schema.TypedNode, error) {
msg := WrappedTransferMessage1_1{
TransportID: trsp.TransportID,
TransportID: trq.transportID,
TransportVersion: trq.transportVersion,
Message: TransferMessage1_1{
IsRequest: true,
Request: trsp.TransferRequest1_1,
Request: trq.TransferRequest1_1,
Response: nil,
},
}
Expand Down
10 changes: 4 additions & 6 deletions message/message1_1prime/transfer_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,12 @@ func TestRequestMessageForVersion(t *testing.T) {
require.Equal(t, selector, n)
require.Equal(t, testutil.TestVoucherType, req.VoucherType())

wrappedOut12 := out12.WrappedForTransport(datatransfer.LegacyTransportID)
require.Equal(t, &message1_1.WrappedTransferRequest1_1{
TransferRequest1_1: request.(*message1_1.TransferRequest1_1),
TransportID: string(datatransfer.LegacyTransportID),
}, wrappedOut12)
wrappedOut12 := out12.WrappedForTransport(datatransfer.LegacyTransportID, datatransfer.LegacyTransportVersion)
require.Equal(t, datatransfer.LegacyTransportID, wrappedOut12.TransportID())
require.Equal(t, datatransfer.LegacyTransportVersion, wrappedOut12.TransportVersion())

// random protocol should fail
_, err = request.MessageForVersion(datatransfer.MessageVersion{
_, err = request.MessageForVersion(datatransfer.Version{
Major: rand.Uint64(),
Minor: rand.Uint64(),
Patch: rand.Uint64(),
Expand Down
27 changes: 22 additions & 5 deletions message/message1_1prime/transfer_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (trsp *TransferResponse1_1) EmptyVoucherResult() bool {
return trsp.VoucherTypeIdentifier == datatransfer.EmptyTypeIdentifier
}

func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.MessageVersion) (datatransfer.Message, error) {
func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.Version) (datatransfer.Message, error) {
switch version {
case datatransfer.DataTransfer1_2:
return trsp, nil
Expand All @@ -95,8 +95,16 @@ func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.MessageV
}
}

func (trsp *TransferResponse1_1) WrappedForTransport(transportID datatransfer.TransportID) datatransfer.Message {
return &WrappedTransferResponse1_1{trsp, string(transportID)}
func (trsp *TransferResponse1_1) Version() datatransfer.Version {
return datatransfer.DataTransfer1_2
}

func (trsp *TransferResponse1_1) WrappedForTransport(transportID datatransfer.TransportID, transportVersion datatransfer.Version) datatransfer.TransportedMessage {
return &WrappedTransferResponse1_1{
TransferResponse1_1: trsp,
transportID: string(transportID),
transportVersion: transportVersion,
}
}
func (trsp *TransferResponse1_1) toIPLD() (schema.TypedNode, error) {
msg := TransferMessage1_1{
Expand Down Expand Up @@ -128,12 +136,21 @@ func (trsp *TransferResponse1_1) ToNet(w io.Writer) error {
// transport id
type WrappedTransferResponse1_1 struct {
*TransferResponse1_1
TransportID string
transportID string
transportVersion datatransfer.Version
}

func (trsp *WrappedTransferResponse1_1) TransportID() datatransfer.TransportID {
return datatransfer.TransportID(trsp.transportID)
}
func (trsp *WrappedTransferResponse1_1) TransportVersion() datatransfer.Version {
return trsp.transportVersion
}

func (trsp *WrappedTransferResponse1_1) toIPLD() (schema.TypedNode, error) {
msg := WrappedTransferMessage1_1{
TransportID: trsp.TransportID,
TransportID: trsp.transportID,
TransportVersion: trsp.transportVersion,
Message: TransferMessage1_1{
IsRequest: false,
Request: nil,
Expand Down
10 changes: 4 additions & 6 deletions message/message1_1prime/transfer_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ func TestResponseMessageForVersion(t *testing.T) {
require.Equal(t, testutil.TestVoucherType, resp.VoucherResultType())
require.True(t, resp.IsValidationResult())

wrappedOut := out.WrappedForTransport(datatransfer.LegacyTransportID)
require.Equal(t, &message1_1.WrappedTransferResponse1_1{
TransferResponse1_1: response.(*message1_1.TransferResponse1_1),
TransportID: string(datatransfer.LegacyTransportID),
}, wrappedOut)
wrappedOut := out.WrappedForTransport(datatransfer.LegacyTransportID, datatransfer.LegacyTransportVersion)
require.Equal(t, datatransfer.LegacyTransportID, wrappedOut.TransportID())
require.Equal(t, datatransfer.LegacyTransportVersion, wrappedOut.TransportVersion())

// random protocol should fail
_, err = response.MessageForVersion(datatransfer.MessageVersion{
_, err = response.MessageForVersion(datatransfer.Version{
Major: rand.Uint64(),
Minor: rand.Uint64(),
Patch: rand.Uint64(),
Expand Down
5 changes: 5 additions & 0 deletions testutil/faketransport.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func (ft *FakeTransport) ID() datatransfer.TransportID {
return "fake"
}

// Versions indicates what versions of this transport are supported
func (ft *FakeTransport) Versions() []datatransfer.Version {
return []datatransfer.Version{{Major: 1, Minor: 1, Patch: 0}}
}

// Capabilities tells datatransfer what kinds of capabilities this transport supports
func (ft *FakeTransport) Capabilities() datatransfer.TransportCapabilities {
return datatransfer.TransportCapabilities{
Expand Down
Loading