From 3e555ad83439e07e50513c119be402c993b2e666 Mon Sep 17 00:00:00 2001 From: Will Scott <will.scott@protocol.ai> Date: Wed, 2 Mar 2022 14:32:53 +0100 Subject: [PATCH 1/8] add AsLargeBytes support to unixfs files --- file/file.go | 29 ++++++++- file/shard.go | 158 +++++++++++++++++++++++++++++++++++++++----------- 2 files changed, 151 insertions(+), 36 deletions(-) diff --git a/file/file.go b/file/file.go index bda6f9e..9f7af29 100644 --- a/file/file.go +++ b/file/file.go @@ -30,14 +30,15 @@ func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSyst ctx: ctx, lsys: lsys, substrate: substrate, - done: false, + offset: 0, rdr: nil}, nil } // A StreamableByteNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type. type StreamableByteNode interface { ipld.Node - io.Reader + io.ReadSeeker + AsLargeBytes() (io.ReadSeeker, error) } type singleNodeFile struct { @@ -45,6 +46,10 @@ type singleNodeFile struct { offset int } +func (f *singleNodeFile) AsLargeBytes() (io.ReadSeeker, error) { + return f, nil +} + func (f *singleNodeFile) Read(p []byte) (int, error) { buf, err := f.Node.AsBytes() if err != nil { @@ -57,3 +62,23 @@ func (f *singleNodeFile) Read(p []byte) (int, error) { f.offset += n return n, nil } + +func (f *singleNodeFile) Seek(offset int64, whence int) (int64, error) { + buf, err := f.Node.AsBytes() + if err != nil { + return 0, err + } + + switch whence { + case io.SeekStart: + f.offset = int(offset) + case io.SeekCurrent: + f.offset += int(offset) + case io.SeekEnd: + f.offset = len(buf) + int(offset) + } + if f.offset < 0 { + return 0, io.EOF + } + return int64(f.offset), nil +} diff --git a/file/shard.go b/file/shard.go index 303054f..ee25da7 100644 --- a/file/shard.go +++ b/file/shard.go @@ -4,6 +4,7 @@ import ( "context" "io" + "github.com/ipfs/go-unixfsnode/data" dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime" cidlink "github.com/ipld/go-ipld-prime/linking/cid" @@ -15,56 +16,145 @@ type shardNodeFile struct { ctx context.Context lsys *ipld.LinkSystem substrate ipld.Node - done bool rdr io.Reader + offset int64 } var _ ipld.Node = (*shardNodeFile)(nil) -func (s *shardNodeFile) Read(p []byte) (int, error) { - if s.done { - return 0, io.EOF +func (s *shardNodeFile) makeReader() (io.Reader, error) { + links, err := s.substrate.LookupByString("Links") + if err != nil { + return nil, err } - // collect the sub-nodes on first use - if s.rdr == nil { - links, err := s.substrate.LookupByString("Links") + readers := make([]io.Reader, 0) + lnki := links.ListIterator() + at := int64(0) + for !lnki.Done() { + _, lnk, err := lnki.Next() if err != nil { - return 0, err + return nil, err + } + sz, err := lnk.LookupByString("Tsize") + if err != nil { + return nil, err + } + childSize, err := sz.AsInt() + if err != nil { + return nil, err + } + if s.offset > at+childSize { + continue + } + lnkhash, err := lnk.LookupByString("Hash") + if err != nil { + return nil, err + } + lnklnk, err := lnkhash.AsLink() + if err != nil { + return nil, err + } + target, err := s.lsys.Load(ipld.LinkContext{Ctx: s.ctx}, lnklnk, protoFor(lnklnk)) + if err != nil { + return nil, err } - readers := make([]io.Reader, 0) - lnki := links.ListIterator() - for !lnki.Done() { - _, lnk, err := lnki.Next() - if err != nil { - return 0, err - } - lnkhash, err := lnk.LookupByString("Hash") - if err != nil { - return 0, err - } - lnklnk, err := lnkhash.AsLink() - if err != nil { - return 0, err - } - target, err := s.lsys.Load(ipld.LinkContext{Ctx: s.ctx}, lnklnk, protoFor(lnklnk)) - if err != nil { - return 0, err - } - asFSNode, err := NewUnixFSFile(s.ctx, target, s.lsys) + asFSNode, err := NewUnixFSFile(s.ctx, target, s.lsys) + if err != nil { + return nil, err + } + // fastforward the first one if needed. + if at < s.offset { + _, err := asFSNode.Seek(s.offset-at, io.SeekStart) if err != nil { - return 0, err + return nil, err } - readers = append(readers, asFSNode) } - s.rdr = io.MultiReader(readers...) + at += childSize + readers = append(readers, asFSNode) + } + if len(readers) == 0 { + return nil, io.EOF + } + return io.MultiReader(readers...), nil +} + +func (s *shardNodeFile) Read(p []byte) (int, error) { + // build reader + if s.rdr == nil { + rdr, err := s.makeReader() + if err != nil { + return 0, err + } + s.rdr = rdr } n, err := s.rdr.Read(p) - if err == io.EOF { + return n, err +} + +func (s *shardNodeFile) Seek(offset int64, whence int) (int64, error) { + if s.rdr != nil { s.rdr = nil - s.done = true } - return n, err + switch whence { + case io.SeekStart: + s.offset = offset + case io.SeekCurrent: + s.offset += offset + case io.SeekEnd: + s.offset = s.length() + offset + } + return s.offset, nil +} + +func (s *shardNodeFile) length() int64 { + // see if we have size specified in the unixfs data. errors fall back to length from links + nodeData, err := s.substrate.LookupByString("Data") + if err != nil { + return s.lengthFromLinks() + } + nodeDataBytes, err := nodeData.AsBytes() + ud, err := data.DecodeUnixFSData(nodeDataBytes) + if err != nil { + return s.lengthFromLinks() + } + if ud.FileSize.Exists() { + fs, err := ud.FileSize.Must().AsInt() + if err != nil { + return s.lengthFromLinks() + } + return fs + } + return s.lengthFromLinks() +} + +func (s *shardNodeFile) lengthFromLinks() int64 { + links, err := s.substrate.LookupByString("Links") + if err != nil { + return 0 + } + size := int64(0) + li := links.ListIterator() + for !li.Done() { + _, l, err := li.Next() + if err != nil { + return 0 + } + sn, err := l.LookupByString("Tsize") + if err != nil { + return 0 + } + ll, err := sn.AsInt() + if err != nil { + return 0 + } + size += ll + } + return size +} + +func (s *shardNodeFile) AsLargeBytes() (io.ReadSeeker, error) { + return s, nil } func protoFor(link ipld.Link) ipld.NodePrototype { From 2999af918dc3e54a5c4da899ecec99770c652c67 Mon Sep 17 00:00:00 2001 From: Will Scott <will.scott@protocol.ai> Date: Wed, 2 Mar 2022 14:39:05 +0100 Subject: [PATCH 2/8] error check --- file/shard.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/file/shard.go b/file/shard.go index ee25da7..9740f0a 100644 --- a/file/shard.go +++ b/file/shard.go @@ -54,6 +54,7 @@ func (s *shardNodeFile) makeReader() (io.Reader, error) { if err != nil { return nil, err } + // todo: defer load until first read. target, err := s.lsys.Load(ipld.LinkContext{Ctx: s.ctx}, lnklnk, protoFor(lnklnk)) if err != nil { return nil, err @@ -114,6 +115,9 @@ func (s *shardNodeFile) length() int64 { return s.lengthFromLinks() } nodeDataBytes, err := nodeData.AsBytes() + if err != nil { + return s.lengthFromLinks() + } ud, err := data.DecodeUnixFSData(nodeDataBytes) if err != nil { return s.lengthFromLinks() From 1022bdb148855d3ecd1f95e3ed6ff79a53eae45a Mon Sep 17 00:00:00 2001 From: Will Scott <will.scott@protocol.ai> Date: Wed, 2 Mar 2022 15:09:01 +0100 Subject: [PATCH 3/8] properly defer loads until needed --- file/deferred.go | 154 +++++++++++++++++++++++++++++++++++++++++++++++ file/shard.go | 16 ++--- 2 files changed, 158 insertions(+), 12 deletions(-) create mode 100644 file/deferred.go diff --git a/file/deferred.go b/file/deferred.go new file mode 100644 index 0000000..02bf5a6 --- /dev/null +++ b/file/deferred.go @@ -0,0 +1,154 @@ +package file + +import ( + "context" + "io" + + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime" +) + +func newDeferredFileNode(ctx context.Context, lsys *ipld.LinkSystem, root ipld.Link) StreamableByteNode { + dfn := deferredFileNode{ + StreamableByteNode: nil, + root: root, + l: lsys, + ctx: ctx, + } + dfn.StreamableByteNode = deferred{&dfn} + return &dfn +} + +type deferredFileNode struct { + StreamableByteNode + + root ipld.Link + l *ipld.LinkSystem + ctx context.Context +} + +func (d *deferredFileNode) resolve() error { + target, err := d.l.Load(ipld.LinkContext{Ctx: d.ctx}, d.root, protoFor(d.root)) + if err != nil { + return err + } + + asFSNode, err := NewUnixFSFile(d.ctx, target, d.l) + if err != nil { + return err + } + d.StreamableByteNode = asFSNode + d.root = nil + d.l = nil + d.ctx = nil + return nil +} + +type deferred struct { + *deferredFileNode +} + +func (d deferred) AsLargeBytes() (io.ReadSeeker, error) { + if err := d.deferredFileNode.resolve(); err != nil { + return nil, err + } + return d.deferredFileNode.AsLargeBytes() +} + +func (d deferred) Read(p []byte) (int, error) { + if err := d.deferredFileNode.resolve(); err != nil { + return 0, err + } + return d.deferredFileNode.Read(p) +} + +func (d deferred) Seek(offset int64, whence int) (int64, error) { + if err := d.deferredFileNode.resolve(); err != nil { + return 0, err + } + return d.deferredFileNode.Seek(offset, whence) +} + +func (d deferred) Kind() ipld.Kind { + return ipld.Kind_Bytes +} + +func (d deferred) AsBytes() ([]byte, error) { + if err := d.deferredFileNode.resolve(); err != nil { + return []byte{}, err + } + + return d.deferredFileNode.AsBytes() +} + +func (d deferred) AsBool() (bool, error) { + return false, ipld.ErrWrongKind{TypeName: "bool", MethodName: "AsBool", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d deferred) AsInt() (int64, error) { + return 0, ipld.ErrWrongKind{TypeName: "int", MethodName: "AsInt", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d deferred) AsFloat() (float64, error) { + return 0, ipld.ErrWrongKind{TypeName: "float", MethodName: "AsFloat", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d deferred) AsString() (string, error) { + return "", ipld.ErrWrongKind{TypeName: "string", MethodName: "AsString", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d deferred) AsLink() (ipld.Link, error) { + return nil, ipld.ErrWrongKind{TypeName: "link", MethodName: "AsLink", AppropriateKind: ipld.KindSet_JustBytes} +} + +func (d deferred) AsNode() (ipld.Node, error) { + return nil, nil +} + +func (d deferred) Size() int { + return 0 +} + +func (d deferred) IsAbsent() bool { + return false +} + +func (d deferred) IsNull() bool { + if err := d.deferredFileNode.resolve(); err != nil { + return true + } + return d.deferredFileNode.IsNull() +} + +func (d deferred) Length() int64 { + return 0 +} + +func (d deferred) ListIterator() ipld.ListIterator { + return nil +} + +func (d deferred) MapIterator() ipld.MapIterator { + return nil +} + +func (d deferred) LookupByIndex(idx int64) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{} +} + +func (d deferred) LookupByString(key string) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{} +} + +func (d deferred) LookupByNode(key ipld.Node) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{} +} + +func (d deferred) LookupBySegment(seg ipld.PathSegment) (ipld.Node, error) { + return nil, ipld.ErrWrongKind{} +} + +// shardded files / nodes look like dagpb nodes. +func (d deferred) Prototype() ipld.NodePrototype { + return dagpb.Type.PBNode +} diff --git a/file/shard.go b/file/shard.go index 9740f0a..fb4a16b 100644 --- a/file/shard.go +++ b/file/shard.go @@ -44,6 +44,7 @@ func (s *shardNodeFile) makeReader() (io.Reader, error) { return nil, err } if s.offset > at+childSize { + at += childSize continue } lnkhash, err := lnk.LookupByString("Hash") @@ -54,25 +55,16 @@ func (s *shardNodeFile) makeReader() (io.Reader, error) { if err != nil { return nil, err } - // todo: defer load until first read. - target, err := s.lsys.Load(ipld.LinkContext{Ctx: s.ctx}, lnklnk, protoFor(lnklnk)) - if err != nil { - return nil, err - } - - asFSNode, err := NewUnixFSFile(s.ctx, target, s.lsys) - if err != nil { - return nil, err - } + target := newDeferredFileNode(s.ctx, s.lsys, lnklnk) // fastforward the first one if needed. if at < s.offset { - _, err := asFSNode.Seek(s.offset-at, io.SeekStart) + _, err := target.Seek(s.offset-at, io.SeekStart) if err != nil { return nil, err } } at += childSize - readers = append(readers, asFSNode) + readers = append(readers, target) } if len(readers) == 0 { return nil, io.EOF From 9554498d585371754c8d7c99f38fe6b2d4e71f38 Mon Sep 17 00:00:00 2001 From: Will Scott <will.scott@protocol.ai> Date: Wed, 2 Mar 2022 16:36:56 +0100 Subject: [PATCH 4/8] split AsLargeBytes from core object --- data/builder/file_test.go | 3 +- file/deferred.go | 62 ++++++++++++++++++++++++--------------- file/file.go | 24 ++++++++------- file/shard.go | 32 +++++++++++++------- 4 files changed, 75 insertions(+), 46 deletions(-) diff --git a/data/builder/file_test.go b/data/builder/file_test.go index 5861979..de3803e 100644 --- a/data/builder/file_test.go +++ b/data/builder/file_test.go @@ -3,7 +3,6 @@ package builder import ( "bytes" "context" - "io" "testing" "github.com/ipfs/go-cid" @@ -70,7 +69,7 @@ func TestUnixFSFileRoundtrip(t *testing.T) { t.Fatal(err) } // read back out the file. - out, err := io.ReadAll(ufn) + out, err := ufn.AsBytes() if err != nil { t.Fatal(err) } diff --git a/file/deferred.go b/file/deferred.go index 02bf5a6..0ae365e 100644 --- a/file/deferred.go +++ b/file/deferred.go @@ -8,19 +8,19 @@ import ( "github.com/ipld/go-ipld-prime" ) -func newDeferredFileNode(ctx context.Context, lsys *ipld.LinkSystem, root ipld.Link) StreamableByteNode { +func newDeferredFileNode(ctx context.Context, lsys *ipld.LinkSystem, root ipld.Link) LargeBytesNode { dfn := deferredFileNode{ - StreamableByteNode: nil, - root: root, - l: lsys, - ctx: ctx, + LargeBytesNode: nil, + root: root, + l: lsys, + ctx: ctx, } - dfn.StreamableByteNode = deferred{&dfn} + dfn.LargeBytesNode = deferred{&dfn} return &dfn } type deferredFileNode struct { - StreamableByteNode + LargeBytesNode root ipld.Link l *ipld.LinkSystem @@ -37,7 +37,7 @@ func (d *deferredFileNode) resolve() error { if err != nil { return err } - d.StreamableByteNode = asFSNode + d.LargeBytesNode = asFSNode d.root = nil d.l = nil d.ctx = nil @@ -48,25 +48,41 @@ type deferred struct { *deferredFileNode } -func (d deferred) AsLargeBytes() (io.ReadSeeker, error) { - if err := d.deferredFileNode.resolve(); err != nil { - return nil, err - } - return d.deferredFileNode.AsLargeBytes() +type deferredReader struct { + io.ReadSeeker + *deferredFileNode } -func (d deferred) Read(p []byte) (int, error) { - if err := d.deferredFileNode.resolve(); err != nil { - return 0, err +func (d deferred) AsLargeBytes() (io.ReadSeeker, error) { + return deferredReader{nil, d.deferredFileNode}, nil +} + +func (d deferredReader) Read(p []byte) (int, error) { + if d.ReadSeeker == nil { + if err := d.deferredFileNode.resolve(); err != nil { + return 0, err + } + rs, err := d.deferredFileNode.AsLargeBytes() + if err != nil { + return 0, err + } + d.ReadSeeker = rs } - return d.deferredFileNode.Read(p) -} - -func (d deferred) Seek(offset int64, whence int) (int64, error) { - if err := d.deferredFileNode.resolve(); err != nil { - return 0, err + return d.ReadSeeker.Read(p) +} + +func (d deferredReader) Seek(offset int64, whence int) (int64, error) { + if d.ReadSeeker == nil { + if err := d.deferredFileNode.resolve(); err != nil { + return 0, err + } + rs, err := d.deferredFileNode.AsLargeBytes() + if err != nil { + return 0, err + } + d.ReadSeeker = rs } - return d.deferredFileNode.Seek(offset, whence) + return d.ReadSeeker.Seek(offset, whence) } func (d deferred) Kind() ipld.Kind { diff --git a/file/file.go b/file/file.go index 9f7af29..17a004a 100644 --- a/file/file.go +++ b/file/file.go @@ -11,10 +11,10 @@ import ( // root of a unixfs File. // It provides a `bytes` view over the file, along with access to io.Reader streaming access // to file data. -func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSystem) (StreamableByteNode, error) { +func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSystem) (LargeBytesNode, error) { if substrate.Kind() == ipld.Kind_Bytes { // A raw / single-node file. - return &singleNodeFile{substrate, 0}, nil + return &singleNodeFile{substrate}, nil } // see if it's got children. links, err := substrate.LookupByString("Links") @@ -30,27 +30,29 @@ func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSyst ctx: ctx, lsys: lsys, substrate: substrate, - offset: 0, - rdr: nil}, nil + }, nil } -// A StreamableByteNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type. -type StreamableByteNode interface { +// A LargeBytesNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type. +type LargeBytesNode interface { ipld.Node - io.ReadSeeker AsLargeBytes() (io.ReadSeeker, error) } type singleNodeFile struct { ipld.Node - offset int } func (f *singleNodeFile) AsLargeBytes() (io.ReadSeeker, error) { - return f, nil + return &singleNodeReader{f, 0}, nil +} + +type singleNodeReader struct { + ipld.Node + offset int } -func (f *singleNodeFile) Read(p []byte) (int, error) { +func (f *singleNodeReader) Read(p []byte) (int, error) { buf, err := f.Node.AsBytes() if err != nil { return 0, err @@ -63,7 +65,7 @@ func (f *singleNodeFile) Read(p []byte) (int, error) { return n, nil } -func (f *singleNodeFile) Seek(offset int64, whence int) (int64, error) { +func (f *singleNodeReader) Seek(offset int64, whence int) (int64, error) { buf, err := f.Node.AsBytes() if err != nil { return 0, err diff --git a/file/shard.go b/file/shard.go index fb4a16b..fadfa55 100644 --- a/file/shard.go +++ b/file/shard.go @@ -16,14 +16,18 @@ type shardNodeFile struct { ctx context.Context lsys *ipld.LinkSystem substrate ipld.Node - rdr io.Reader - offset int64 } var _ ipld.Node = (*shardNodeFile)(nil) -func (s *shardNodeFile) makeReader() (io.Reader, error) { - links, err := s.substrate.LookupByString("Links") +type shardNodeReader struct { + *shardNodeFile + rdr io.Reader + offset int64 +} + +func (s *shardNodeReader) makeReader() (io.Reader, error) { + links, err := s.shardNodeFile.substrate.LookupByString("Links") if err != nil { return nil, err } @@ -56,15 +60,19 @@ func (s *shardNodeFile) makeReader() (io.Reader, error) { return nil, err } target := newDeferredFileNode(s.ctx, s.lsys, lnklnk) + tr, err := target.AsLargeBytes() + if err != nil { + return nil, err + } // fastforward the first one if needed. if at < s.offset { - _, err := target.Seek(s.offset-at, io.SeekStart) + _, err := tr.Seek(s.offset-at, io.SeekStart) if err != nil { return nil, err } } at += childSize - readers = append(readers, target) + readers = append(readers, tr) } if len(readers) == 0 { return nil, io.EOF @@ -72,7 +80,7 @@ func (s *shardNodeFile) makeReader() (io.Reader, error) { return io.MultiReader(readers...), nil } -func (s *shardNodeFile) Read(p []byte) (int, error) { +func (s *shardNodeReader) Read(p []byte) (int, error) { // build reader if s.rdr == nil { rdr, err := s.makeReader() @@ -85,7 +93,7 @@ func (s *shardNodeFile) Read(p []byte) (int, error) { return n, err } -func (s *shardNodeFile) Seek(offset int64, whence int) (int64, error) { +func (s *shardNodeReader) Seek(offset int64, whence int) (int64, error) { if s.rdr != nil { s.rdr = nil } @@ -150,7 +158,7 @@ func (s *shardNodeFile) lengthFromLinks() int64 { } func (s *shardNodeFile) AsLargeBytes() (io.ReadSeeker, error) { - return s, nil + return &shardNodeReader{s, nil, 0}, nil } func protoFor(link ipld.Link) ipld.NodePrototype { @@ -167,7 +175,11 @@ func (s *shardNodeFile) Kind() ipld.Kind { } func (s *shardNodeFile) AsBytes() ([]byte, error) { - return io.ReadAll(s) + rdr, err := s.AsLargeBytes() + if err != nil { + return nil, err + } + return io.ReadAll(rdr) } func (s *shardNodeFile) AsBool() (bool, error) { From 9db1d8a6e87a73a920403be05b4be9200bfd25bf Mon Sep 17 00:00:00 2001 From: Will Scott <will.scott@protocol.ai> Date: Wed, 2 Mar 2022 16:38:46 +0100 Subject: [PATCH 5/8] compress --- file/shard.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/file/shard.go b/file/shard.go index fadfa55..2646f91 100644 --- a/file/shard.go +++ b/file/shard.go @@ -123,11 +123,10 @@ func (s *shardNodeFile) length() int64 { return s.lengthFromLinks() } if ud.FileSize.Exists() { - fs, err := ud.FileSize.Must().AsInt() - if err != nil { - return s.lengthFromLinks() + if fs, err := ud.FileSize.Must().AsInt(); err == nil { + return int64(fs) } - return fs + return s.lengthFromLinks() } return s.lengthFromLinks() } From 5ff41d505566c4bdca441d3aed66f420ee5b4346 Mon Sep 17 00:00:00 2001 From: Will Scott <will.scott@protocol.ai> Date: Wed, 2 Mar 2022 16:39:32 +0100 Subject: [PATCH 6/8] add uncommitted file --- file/wrapped.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/file/wrapped.go b/file/wrapped.go index 56b2c6c..b2c2210 100644 --- a/file/wrapped.go +++ b/file/wrapped.go @@ -6,7 +6,7 @@ import ( "github.com/ipld/go-ipld-prime/node/basicnode" ) -func newWrappedNode(substrate ipld.Node) (StreamableByteNode, error) { +func newWrappedNode(substrate ipld.Node) (LargeBytesNode, error) { dataField, err := substrate.LookupByString("Data") if err != nil { return nil, err From dd5ec50bd93762df03fa0ed70a09d6284a1c92e2 Mon Sep 17 00:00:00 2001 From: Will Scott <will.scott@protocol.ai> Date: Wed, 2 Mar 2022 18:30:36 +0100 Subject: [PATCH 7/8] split reader from core node --- file/deferred.go | 51 +++++++++++++++++++++++++----------------------- file/shard.go | 3 ++- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/file/deferred.go b/file/deferred.go index 0ae365e..9d26501 100644 --- a/file/deferred.go +++ b/file/deferred.go @@ -15,7 +15,7 @@ func newDeferredFileNode(ctx context.Context, lsys *ipld.LinkSystem, root ipld.L l: lsys, ctx: ctx, } - dfn.LargeBytesNode = deferred{&dfn} + dfn.LargeBytesNode = &deferred{&dfn} return &dfn } @@ -28,6 +28,9 @@ type deferredFileNode struct { } func (d *deferredFileNode) resolve() error { + if d.l == nil { + return nil + } target, err := d.l.Load(ipld.LinkContext{Ctx: d.ctx}, d.root, protoFor(d.root)) if err != nil { return err @@ -53,11 +56,11 @@ type deferredReader struct { *deferredFileNode } -func (d deferred) AsLargeBytes() (io.ReadSeeker, error) { - return deferredReader{nil, d.deferredFileNode}, nil +func (d *deferred) AsLargeBytes() (io.ReadSeeker, error) { + return &deferredReader{nil, d.deferredFileNode}, nil } -func (d deferredReader) Read(p []byte) (int, error) { +func (d *deferredReader) Read(p []byte) (int, error) { if d.ReadSeeker == nil { if err := d.deferredFileNode.resolve(); err != nil { return 0, err @@ -71,7 +74,7 @@ func (d deferredReader) Read(p []byte) (int, error) { return d.ReadSeeker.Read(p) } -func (d deferredReader) Seek(offset int64, whence int) (int64, error) { +func (d *deferredReader) Seek(offset int64, whence int) (int64, error) { if d.ReadSeeker == nil { if err := d.deferredFileNode.resolve(); err != nil { return 0, err @@ -85,11 +88,11 @@ func (d deferredReader) Seek(offset int64, whence int) (int64, error) { return d.ReadSeeker.Seek(offset, whence) } -func (d deferred) Kind() ipld.Kind { +func (d *deferred) Kind() ipld.Kind { return ipld.Kind_Bytes } -func (d deferred) AsBytes() ([]byte, error) { +func (d *deferred) AsBytes() ([]byte, error) { if err := d.deferredFileNode.resolve(); err != nil { return []byte{}, err } @@ -97,74 +100,74 @@ func (d deferred) AsBytes() ([]byte, error) { return d.deferredFileNode.AsBytes() } -func (d deferred) AsBool() (bool, error) { +func (d *deferred) AsBool() (bool, error) { return false, ipld.ErrWrongKind{TypeName: "bool", MethodName: "AsBool", AppropriateKind: ipld.KindSet_JustBytes} } -func (d deferred) AsInt() (int64, error) { +func (d *deferred) AsInt() (int64, error) { return 0, ipld.ErrWrongKind{TypeName: "int", MethodName: "AsInt", AppropriateKind: ipld.KindSet_JustBytes} } -func (d deferred) AsFloat() (float64, error) { +func (d *deferred) AsFloat() (float64, error) { return 0, ipld.ErrWrongKind{TypeName: "float", MethodName: "AsFloat", AppropriateKind: ipld.KindSet_JustBytes} } -func (d deferred) AsString() (string, error) { +func (d *deferred) AsString() (string, error) { return "", ipld.ErrWrongKind{TypeName: "string", MethodName: "AsString", AppropriateKind: ipld.KindSet_JustBytes} } -func (d deferred) AsLink() (ipld.Link, error) { +func (d *deferred) AsLink() (ipld.Link, error) { return nil, ipld.ErrWrongKind{TypeName: "link", MethodName: "AsLink", AppropriateKind: ipld.KindSet_JustBytes} } -func (d deferred) AsNode() (ipld.Node, error) { +func (d *deferred) AsNode() (ipld.Node, error) { return nil, nil } -func (d deferred) Size() int { +func (d *deferred) Size() int { return 0 } -func (d deferred) IsAbsent() bool { +func (d *deferred) IsAbsent() bool { return false } -func (d deferred) IsNull() bool { +func (d *deferred) IsNull() bool { if err := d.deferredFileNode.resolve(); err != nil { return true } return d.deferredFileNode.IsNull() } -func (d deferred) Length() int64 { +func (d *deferred) Length() int64 { return 0 } -func (d deferred) ListIterator() ipld.ListIterator { +func (d *deferred) ListIterator() ipld.ListIterator { return nil } -func (d deferred) MapIterator() ipld.MapIterator { +func (d *deferred) MapIterator() ipld.MapIterator { return nil } -func (d deferred) LookupByIndex(idx int64) (ipld.Node, error) { +func (d *deferred) LookupByIndex(idx int64) (ipld.Node, error) { return nil, ipld.ErrWrongKind{} } -func (d deferred) LookupByString(key string) (ipld.Node, error) { +func (d *deferred) LookupByString(key string) (ipld.Node, error) { return nil, ipld.ErrWrongKind{} } -func (d deferred) LookupByNode(key ipld.Node) (ipld.Node, error) { +func (d *deferred) LookupByNode(key ipld.Node) (ipld.Node, error) { return nil, ipld.ErrWrongKind{} } -func (d deferred) LookupBySegment(seg ipld.PathSegment) (ipld.Node, error) { +func (d *deferred) LookupBySegment(seg ipld.PathSegment) (ipld.Node, error) { return nil, ipld.ErrWrongKind{} } // shardded files / nodes look like dagpb nodes. -func (d deferred) Prototype() ipld.NodePrototype { +func (d *deferred) Prototype() ipld.NodePrototype { return dagpb.Type.PBNode } diff --git a/file/shard.go b/file/shard.go index 2646f91..5dd84ac 100644 --- a/file/shard.go +++ b/file/shard.go @@ -178,7 +178,8 @@ func (s *shardNodeFile) AsBytes() ([]byte, error) { if err != nil { return nil, err } - return io.ReadAll(rdr) + buf, err := io.ReadAll(rdr) + return buf, err } func (s *shardNodeFile) AsBool() (bool, error) { From 7e4eac5d09969342654acce47612a48a1a5ff986 Mon Sep 17 00:00:00 2001 From: Will Scott <will.scott@protocol.ai> Date: Thu, 3 Mar 2022 12:01:23 +0100 Subject: [PATCH 8/8] code review --- file/deferred.go | 12 ++++++------ file/shard.go | 4 +--- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/file/deferred.go b/file/deferred.go index 9d26501..44ce8ca 100644 --- a/file/deferred.go +++ b/file/deferred.go @@ -12,7 +12,7 @@ func newDeferredFileNode(ctx context.Context, lsys *ipld.LinkSystem, root ipld.L dfn := deferredFileNode{ LargeBytesNode: nil, root: root, - l: lsys, + lsys: lsys, ctx: ctx, } dfn.LargeBytesNode = &deferred{&dfn} @@ -23,26 +23,26 @@ type deferredFileNode struct { LargeBytesNode root ipld.Link - l *ipld.LinkSystem + lsys *ipld.LinkSystem ctx context.Context } func (d *deferredFileNode) resolve() error { - if d.l == nil { + if d.lsys == nil { return nil } - target, err := d.l.Load(ipld.LinkContext{Ctx: d.ctx}, d.root, protoFor(d.root)) + target, err := d.lsys.Load(ipld.LinkContext{Ctx: d.ctx}, d.root, protoFor(d.root)) if err != nil { return err } - asFSNode, err := NewUnixFSFile(d.ctx, target, d.l) + asFSNode, err := NewUnixFSFile(d.ctx, target, d.lsys) if err != nil { return err } d.LargeBytesNode = asFSNode d.root = nil - d.l = nil + d.lsys = nil d.ctx = nil return nil } diff --git a/file/shard.go b/file/shard.go index 5dd84ac..e3dc79b 100644 --- a/file/shard.go +++ b/file/shard.go @@ -126,7 +126,6 @@ func (s *shardNodeFile) length() int64 { if fs, err := ud.FileSize.Must().AsInt(); err == nil { return int64(fs) } - return s.lengthFromLinks() } return s.lengthFromLinks() } @@ -178,8 +177,7 @@ func (s *shardNodeFile) AsBytes() ([]byte, error) { if err != nil { return nil, err } - buf, err := io.ReadAll(rdr) - return buf, err + return io.ReadAll(rdr) } func (s *shardNodeFile) AsBool() (bool, error) {