Skip to content

add AsLargeBytes support to unixfs files #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,26 @@ func NewUnixFSFile(ctx context.Context, substrate ipld.Node, lsys *ipld.LinkSyst
ctx: ctx,
lsys: lsys,
substrate: substrate,
done: false,
offset: 0,
rdr: nil}, nil
}

// A StreamableByteNode is an ipld.Node that can be streamed over. It is guaranteed to have a Bytes type.
type StreamableByteNode interface {
ipld.Node
io.Reader
io.ReadSeeker
AsLargeBytes() (io.ReadSeeker, error)
}

type singleNodeFile struct {
ipld.Node
offset int
}

func (f *singleNodeFile) AsLargeBytes() (io.ReadSeeker, error) {
return f, nil
}

func (f *singleNodeFile) Read(p []byte) (int, error) {
buf, err := f.Node.AsBytes()
if err != nil {
Expand All @@ -57,3 +62,23 @@ func (f *singleNodeFile) Read(p []byte) (int, error) {
f.offset += n
return n, nil
}

func (f *singleNodeFile) Seek(offset int64, whence int) (int64, error) {
buf, err := f.Node.AsBytes()
if err != nil {
return 0, err
}

switch whence {
case io.SeekStart:
f.offset = int(offset)
case io.SeekCurrent:
f.offset += int(offset)
case io.SeekEnd:
f.offset = len(buf) + int(offset)
}
if f.offset < 0 {
return 0, io.EOF
}
return int64(f.offset), nil
}
162 changes: 128 additions & 34 deletions file/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"io"

"github.com/ipfs/go-unixfsnode/data"
dagpb "github.com/ipld/go-codec-dagpb"
"github.com/ipld/go-ipld-prime"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
Expand All @@ -15,56 +16,149 @@ type shardNodeFile struct {
ctx context.Context
lsys *ipld.LinkSystem
substrate ipld.Node
done bool
rdr io.Reader
offset int64
}

var _ ipld.Node = (*shardNodeFile)(nil)

func (s *shardNodeFile) Read(p []byte) (int, error) {
if s.done {
return 0, io.EOF
func (s *shardNodeFile) makeReader() (io.Reader, error) {
links, err := s.substrate.LookupByString("Links")
if err != nil {
return nil, err
}
// collect the sub-nodes on first use
if s.rdr == nil {
links, err := s.substrate.LookupByString("Links")
readers := make([]io.Reader, 0)
lnki := links.ListIterator()
at := int64(0)
for !lnki.Done() {
_, lnk, err := lnki.Next()
if err != nil {
return 0, err
return nil, err
}
sz, err := lnk.LookupByString("Tsize")
if err != nil {
return nil, err
}
childSize, err := sz.AsInt()
if err != nil {
return nil, err
}
if s.offset > at+childSize {
continue
}
lnkhash, err := lnk.LookupByString("Hash")
if err != nil {
return nil, err
}
lnklnk, err := lnkhash.AsLink()
if err != nil {
return nil, err
}
// todo: defer load until first read.
target, err := s.lsys.Load(ipld.LinkContext{Ctx: s.ctx}, lnklnk, protoFor(lnklnk))
if err != nil {
return nil, err
}
readers := make([]io.Reader, 0)
lnki := links.ListIterator()
for !lnki.Done() {
_, lnk, err := lnki.Next()
if err != nil {
return 0, err
}
lnkhash, err := lnk.LookupByString("Hash")
if err != nil {
return 0, err
}
lnklnk, err := lnkhash.AsLink()
if err != nil {
return 0, err
}
target, err := s.lsys.Load(ipld.LinkContext{Ctx: s.ctx}, lnklnk, protoFor(lnklnk))
if err != nil {
return 0, err
}

asFSNode, err := NewUnixFSFile(s.ctx, target, s.lsys)
asFSNode, err := NewUnixFSFile(s.ctx, target, s.lsys)
if err != nil {
return nil, err
}
// fastforward the first one if needed.
if at < s.offset {
_, err := asFSNode.Seek(s.offset-at, io.SeekStart)
if err != nil {
return 0, err
return nil, err
}
readers = append(readers, asFSNode)
}
s.rdr = io.MultiReader(readers...)
at += childSize
readers = append(readers, asFSNode)
}
if len(readers) == 0 {
return nil, io.EOF
}
return io.MultiReader(readers...), nil
}

func (s *shardNodeFile) Read(p []byte) (int, error) {
// build reader
if s.rdr == nil {
rdr, err := s.makeReader()
if err != nil {
return 0, err
}
s.rdr = rdr
}
n, err := s.rdr.Read(p)
if err == io.EOF {
return n, err
}

func (s *shardNodeFile) Seek(offset int64, whence int) (int64, error) {
if s.rdr != nil {
s.rdr = nil
s.done = true
}
return n, err
switch whence {
case io.SeekStart:
s.offset = offset
case io.SeekCurrent:
s.offset += offset
case io.SeekEnd:
s.offset = s.length() + offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this will recompute the length for every Seek call; consider memoizing the computed length, or even computing it upfront in AsLargeBytes

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be a fairly cheap single field access in the map, and i don't know if lots of end seeks is a usage pattern that it makes sense to optimize for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, if you're only using it for end seeks, you may be right that it's likely premature optimization

}
return s.offset, nil
}

func (s *shardNodeFile) length() int64 {
// see if we have size specified in the unixfs data. errors fall back to length from links
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm slightly worried that, if a node is neither a "data" node nor a "links" node, you'll happily fall back to return 0 - you won't return any error at all, and you might swallow genuinely useful errors such as those from DecodeUnixFSData.

nodeData, err := s.substrate.LookupByString("Data")
if err != nil {
return s.lengthFromLinks()
}
nodeDataBytes, err := nodeData.AsBytes()
if err != nil {
return s.lengthFromLinks()
}
ud, err := data.DecodeUnixFSData(nodeDataBytes)
if err != nil {
return s.lengthFromLinks()
}
if ud.FileSize.Exists() {
fs, err := ud.FileSize.Must().AsInt()
if err != nil {
return s.lengthFromLinks()
}
return fs
}
return s.lengthFromLinks()
}

func (s *shardNodeFile) lengthFromLinks() int64 {
links, err := s.substrate.LookupByString("Links")
if err != nil {
return 0
}
size := int64(0)
li := links.ListIterator()
for !li.Done() {
_, l, err := li.Next()
if err != nil {
return 0
}
sn, err := l.LookupByString("Tsize")
if err != nil {
return 0
}
ll, err := sn.AsInt()
if err != nil {
return 0
}
size += ll
}
return size
}

func (s *shardNodeFile) AsLargeBytes() (io.ReadSeeker, error) {
return s, nil
}

func protoFor(link ipld.Link) ipld.NodePrototype {
Expand Down