Skip to content

Commit 67d8e74

Browse files
committed
implement batching for leveldb
1 parent 9b48c4e commit 67d8e74

File tree

3 files changed

+68
-9
lines changed

3 files changed

+68
-9
lines changed

flatfs/flatfs.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (fs *Datastore) makePrefixDirNoSync(dir string) error {
9999
return nil
100100
}
101101

102-
var putMaxRetries = 3
102+
var putMaxRetries = 6
103103

104104
func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
105105
val, ok := value.([]byte)
@@ -111,15 +111,15 @@ func (fs *Datastore) Put(key datastore.Key, value interface{}) error {
111111
for i := 0; i < putMaxRetries; i++ {
112112
err = fs.doPut(key, val)
113113
if err == nil {
114-
return nil
114+
break
115115
}
116116

117117
if !strings.Contains(err.Error(), "too many open files") {
118-
return err
118+
break
119119
}
120120

121121
log.Errorf("too many open files, retrying in %dms", 100*i)
122-
time.Sleep(time.Millisecond * 100 * time.Duration(i))
122+
time.Sleep(time.Millisecond * 100 * time.Duration(i+1))
123123
}
124124
return err
125125
}

leveldb/datastore.go

+31-5
Original file line numberDiff line numberDiff line change
@@ -147,14 +147,40 @@ func (d *datastore) runQuery(worker goprocess.Process, qrb *dsq.ResultBuilder) {
147147
}
148148
}
149149

150-
func (d *datastore) Batch() (ds.Batch, error) {
151-
// TODO: implement batch on leveldb
152-
return nil, ds.ErrBatchUnsupported
153-
}
154-
155150
// LevelDB needs to be closed.
156151
func (d *datastore) Close() (err error) {
157152
return d.DB.Close()
158153
}
159154

160155
func (d *datastore) IsThreadSafe() {}
156+
157+
type leveldbBatch struct {
158+
b *leveldb.Batch
159+
db *leveldb.DB
160+
}
161+
162+
func (d *datastore) Batch() (ds.Batch, error) {
163+
return &leveldbBatch{
164+
b: new(leveldb.Batch),
165+
db: d.DB,
166+
}, nil
167+
}
168+
169+
func (b *leveldbBatch) Put(key ds.Key, value interface{}) error {
170+
val, ok := value.([]byte)
171+
if !ok {
172+
return ds.ErrInvalidType
173+
}
174+
175+
b.b.Put(key.Bytes(), val)
176+
return nil
177+
}
178+
179+
func (b *leveldbBatch) Commit() error {
180+
return b.db.Write(b.b, nil)
181+
}
182+
183+
func (b *leveldbBatch) Delete(key ds.Key) error {
184+
b.b.Delete(key.Bytes())
185+
return nil
186+
}

leveldb/ds_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,36 @@ func expectMatches(t *testing.T, expect []string, actualR dsq.Results) {
122122
}
123123
}
124124
}
125+
126+
func TestBatching(t *testing.T) {
127+
d, done := newDS(t)
128+
defer done()
129+
130+
b, err := d.Batch()
131+
if err != nil {
132+
t.Fatal(err)
133+
}
134+
135+
for k, v := range testcases {
136+
err := b.Put(ds.NewKey(k), []byte(v))
137+
if err != nil {
138+
t.Fatal(err)
139+
}
140+
}
141+
142+
err = b.Commit()
143+
if err != nil {
144+
t.Fatal(err)
145+
}
146+
147+
for k, v := range testcases {
148+
val, err := d.Get(ds.NewKey(k))
149+
if err != nil {
150+
t.Fatal(err)
151+
}
152+
153+
if v != string(val.([]byte)) {
154+
t.Fatal("got wrong data!")
155+
}
156+
}
157+
}

0 commit comments

Comments
 (0)