-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathreader.go
114 lines (93 loc) · 2.62 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package siva
import (
"errors"
"io"
)
var (
ErrPendingContent = errors.New("entry wasn't fully read")
ErrInvalidCheckshum = errors.New("invalid checksum")
ErrInvalidReaderAt = errors.New("reader provided dosen't implement ReaderAt interface")
)
// A Reader provides random access to the contents of a siva archive.
type Reader interface {
io.Reader
Seek(e *IndexEntry) (int64, error)
Index() (Index, error)
Get(e *IndexEntry) (*io.SectionReader, error)
}
type reader struct {
r io.ReadSeeker
getIndexFunc func() (Index, error)
index Index
current *IndexEntry
pending uint64
offset uint64
}
// NewReader creates a new Reader reading from r, reader requires be seekable
// and optionally should implement io.ReaderAt to make usage of the Get method
func NewReader(r io.ReadSeeker) Reader {
return &reader{r: r}
}
// NewReaderWithOffset creates a new Reader giving the position of the index.
// This is useful to open siva files that are being written or reading an
// old index.
func NewReaderWithOffset(r io.ReadSeeker, o uint64) Reader {
return &reader{
r: r,
offset: o,
}
}
func newReaderWithIndex(r io.ReadSeeker, getIndexFunc func() (Index, error)) *reader {
return &reader{
r: r,
getIndexFunc: getIndexFunc,
}
}
// Index reads the index of the siva file from the provided reader
func (r *reader) Index() (Index, error) {
if r.getIndexFunc != nil {
return r.getIndexFunc()
}
if r.index == nil {
i, err := readIndex(r.r, r.offset)
if err != nil && err != ErrEmptyIndex {
return nil, err
}
index := OrderedIndex(i.filter())
index.Sort()
r.index = Index(index)
}
return r.index, nil
}
// Get returns a new io.SectionReader allowing concurrent read access to the
// content of the read
func (r *reader) Get(e *IndexEntry) (*io.SectionReader, error) {
ra, ok := r.r.(io.ReaderAt)
if !ok {
return nil, ErrInvalidReaderAt
}
return io.NewSectionReader(ra, int64(e.absStart), int64(e.Size)), nil
}
// Seek seek the internal reader to the starting position of the content for the
// given IndexEntry
func (r *reader) Seek(e *IndexEntry) (int64, error) {
r.current = e
r.pending = e.Size
return r.r.Seek(int64(e.absStart), io.SeekStart)
}
// Read reads up to len(p) bytes, starting at the current position set by Seek
// and ending in the end of the content, retuning a io.EOF when its reached
func (r *reader) Read(p []byte) (n int, err error) {
if r.pending == 0 {
return 0, io.EOF
}
if uint64(len(p)) > r.pending {
p = p[0:r.pending]
}
n, err = r.r.Read(p)
r.pending -= uint64(n)
if err == io.EOF && r.pending > 0 {
err = io.ErrUnexpectedEOF
}
return
}