Skip to content
This repository was archived by the owner on Mar 23, 2023. It is now read-only.

Commit b8a13f9

Browse files
author
Alan Shaw
committed
refactor: async iterators
Uses async await and async iterators to implement the proposal here ipfs/interface-datastore#25 License: MIT Signed-off-by: Alan Shaw <[email protected]>
1 parent 0aa53d3 commit b8a13f9

File tree

5 files changed

+154
-190
lines changed

5 files changed

+154
-190
lines changed

package.json

+3-5
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,13 @@
4242
"dependencies": {
4343
"datastore-core": "~0.6.0",
4444
"encoding-down": "^5.0.4",
45-
"interface-datastore": "~0.6.0",
45+
"interface-datastore": "github:ipfs/interface-datastore#refactor/async-iterators",
4646
"level-js": "github:timkuijsten/level.js#idbunwrapper",
4747
"leveldown": "^3.0.2",
48-
"levelup": "^2.0.2",
49-
"pull-stream": "^3.6.9"
48+
"levelup": "^2.0.2"
5049
},
5150
"devDependencies": {
52-
"aegir": "^15.3.1",
53-
"async": "^2.6.1",
51+
"aegir": "^17.1.1",
5452
"chai": "^4.2.0",
5553
"cids": "~0.5.5",
5654
"dirty-chai": "^2.0.1",

src/index.js

+85-98
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,12 @@
33

44
/* :: import type {Callback, Batch, Query, QueryResult, QueryEntry} from 'interface-datastore' */
55

6-
const pull = require('pull-stream')
76
const levelup = require('levelup')
8-
9-
const asyncFilter = require('interface-datastore').utils.asyncFilter
10-
const asyncSort = require('interface-datastore').utils.asyncSort
11-
const Key = require('interface-datastore').Key
12-
const Errors = require('interface-datastore').Errors
7+
const { Key, Errors, utils } = require('interface-datastore')
138
const encode = require('encoding-down')
9+
const { promisify } = require('util')
10+
11+
const { filter, map, take, sortAll } = utils
1412

1513
/**
1614
* A datastore backed by leveldb.
@@ -50,59 +48,53 @@ class LevelDatastore {
5048
)
5149
}
5250

53-
open (callback /* : Callback<void> */) /* : void */ {
54-
this.db.open((err) => {
55-
if (err) {
56-
return callback(Errors.dbOpenFailedError(err))
57-
}
58-
callback()
59-
})
51+
async open () /* : Promise */ {
52+
try {
53+
await this.db.open()
54+
} catch (err) {
55+
throw Errors.dbOpenFailedError(err)
56+
}
6057
}
6158

62-
put (key /* : Key */, value /* : Buffer */, callback /* : Callback<void> */) /* : void */ {
63-
this.db.put(key.toString(), value, (err) => {
64-
if (err) {
65-
return callback(Errors.dbWriteFailedError(err))
66-
}
67-
callback()
68-
})
59+
async put (key /* : Key */, value /* : Buffer */) /* : Promise */ {
60+
try {
61+
await this.db.put(key.toString(), value)
62+
} catch (err) {
63+
throw Errors.dbWriteFailedError(err)
64+
}
6965
}
7066

71-
get (key /* : Key */, callback /* : Callback<Buffer> */) /* : void */ {
72-
this.db.get(key.toString(), (err, data) => {
73-
if (err) {
74-
return callback(Errors.notFoundError(err))
75-
}
76-
callback(null, data)
77-
})
67+
async get (key /* : Key */) /* : Promise */ {
68+
let data
69+
try {
70+
data = await this.db.get(key.toString())
71+
} catch (err) {
72+
if (err.notFound) throw Errors.notFoundError(err)
73+
throw Errors.dbWriteFailedError(err)
74+
}
75+
return data
7876
}
7977

80-
has (key /* : Key */, callback /* : Callback<bool> */) /* : void */ {
81-
this.db.get(key.toString(), (err, res) => {
82-
if (err) {
83-
if (err.notFound) {
84-
callback(null, false)
85-
return
86-
}
87-
callback(err)
88-
return
89-
}
90-
91-
callback(null, true)
92-
})
78+
async has (key /* : Key */) /* : Promise<Boolean> */ {
79+
try {
80+
await this.db.get(key.toString())
81+
} catch (err) {
82+
if (err.notFound) return false
83+
throw err
84+
}
85+
return true
9386
}
9487

95-
delete (key /* : Key */, callback /* : Callback<void> */) /* : void */ {
96-
this.db.del(key.toString(), (err) => {
97-
if (err) {
98-
return callback(Errors.dbDeleteFailedError(err))
99-
}
100-
callback()
101-
})
88+
async delete (key /* : Key */) /* : Promise */ {
89+
try {
90+
await this.db.del(key.toString())
91+
} catch (err) {
92+
throw Errors.dbDeleteFailedError(err)
93+
}
10294
}
10395

104-
close (callback /* : Callback<void> */) /* : void */ {
105-
this.db.close(callback)
96+
async close () /* : Promise */ {
97+
return this.db.close()
10698
}
10799

108100
batch () /* : Batch<Buffer> */ {
@@ -121,8 +113,8 @@ class LevelDatastore {
121113
key: key.toString()
122114
})
123115
},
124-
commit: (callback /* : Callback<void> */) /* : void */ => {
125-
this.db.batch(ops, callback)
116+
commit: async () /* : Promise */ => {
117+
return this.db.batch(ops)
126118
}
127119
}
128120
}
@@ -133,70 +125,65 @@ class LevelDatastore {
133125
values = !q.keysOnly
134126
}
135127

136-
const iter = this.db.db.iterator({
137-
keys: true,
138-
values: values,
139-
keyAsBuffer: true
140-
})
141-
142-
const rawStream = (end, cb) => {
143-
if (end) {
144-
return iter.end((err) => {
145-
cb(err || end)
146-
})
147-
}
148-
149-
iter.next((err, key, value) => {
150-
if (err) {
151-
return cb(err)
152-
}
153-
154-
if (err == null && key == null && value == null) {
155-
return iter.end((err) => {
156-
cb(err || true)
157-
})
158-
}
159-
160-
const res /* : QueryEntry<Buffer> */ = {
161-
key: new Key(key, false)
162-
}
163-
164-
if (values) {
165-
res.value = Buffer.from(value)
166-
}
167-
168-
cb(null, res)
128+
let it = levelIteratorToIterator(
129+
this.db.db.iterator({
130+
keys: true,
131+
values: values,
132+
keyAsBuffer: true
169133
})
170-
}
134+
)
171135

172-
let tasks = [rawStream]
173-
let filters = []
136+
it = map(it, ({ key, value }) => {
137+
const res /* : QueryEntry<Buffer> */ = { key: new Key(key, false) }
138+
if (values) {
139+
res.value = Buffer.from(value)
140+
}
141+
return res
142+
})
174143

175144
if (q.prefix != null) {
176-
const prefix = q.prefix
177-
filters.push((e, cb) => cb(null, e.key.toString().startsWith(prefix)))
145+
it = filter(it, e => e.key.toString().startsWith(q.prefix))
178146
}
179147

180-
if (q.filters != null) {
181-
filters = filters.concat(q.filters)
148+
if (Array.isArray(q.filters)) {
149+
it = q.filters.reduce((it, f) => filter(it, f), it)
182150
}
183151

184-
tasks = tasks.concat(filters.map(f => asyncFilter(f)))
185-
186-
if (q.orders != null) {
187-
tasks = tasks.concat(q.orders.map(o => asyncSort(o)))
152+
if (Array.isArray(q.orders)) {
153+
it = q.orders.reduce((it, f) => sortAll(it, f), it)
188154
}
189155

190156
if (q.offset != null) {
191157
let i = 0
192-
tasks.push(pull.filter(() => i++ >= q.offset))
158+
it = filter(it, () => i++ >= q.offset)
193159
}
194160

195161
if (q.limit != null) {
196-
tasks.push(pull.take(q.limit))
162+
it = take(it, q.limit)
197163
}
198164

199-
return pull.apply(null, tasks)
165+
return it
166+
}
167+
}
168+
169+
function levelIteratorToIterator (li) {
170+
return {
171+
next: () => new Promise((resolve, reject) => {
172+
li.next((err, key, value) => {
173+
if (err) return reject(err)
174+
if (key == null) return resolve({ done: true })
175+
resolve({ done: false, value: { key, value } })
176+
})
177+
}),
178+
return: () => new Promise((resolve, reject) => {
179+
li.end(err => {
180+
if (err) return reject(err)
181+
resolve({ done: true })
182+
})
183+
}),
184+
[Symbol.asyncIterator] () {
185+
return this
186+
}
200187
}
201188
}
202189

test/browser.js

+26-18
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
/* eslint-env mocha */
33
'use strict'
44

5-
const each = require('async/each')
6-
const MountStore = require('datastore-core').MountDatastore
7-
const Key = require('interface-datastore').Key
5+
const { MountDatastore } = require('datastore-core')
6+
const { Key } = require('interface-datastore')
87

98
// leveldown will be swapped for level-js
109
const leveljs = require('leveldown')
@@ -14,31 +13,40 @@ const LevelStore = require('../src')
1413
describe('LevelDatastore', () => {
1514
describe('interface-datastore (leveljs)', () => {
1615
require('interface-datastore/src/tests')({
17-
setup (callback) {
18-
callback(null, new LevelStore('hello', {db: leveljs}))
19-
},
20-
teardown (callback) {
21-
leveljs.destroy('hello', callback)
22-
}
16+
setup: () => new LevelStore('hello', { db: leveljs }),
17+
teardown: () => new Promise((resolve, reject) => {
18+
leveljs.destroy('hello', err => {
19+
if (err) return reject(err)
20+
resolve()
21+
})
22+
})
2323
})
2424
})
2525

26-
describe('interface-datastore (mount(leveljs, leveljs, leveljs))', () => {
26+
// TODO: unskip when datastore-core is converted to async/await/iterators
27+
describe.skip('interface-datastore (mount(leveljs, leveljs, leveljs))', () => {
2728
require('interface-datastore/src/tests')({
28-
setup (callback) {
29-
callback(null, new MountStore([{
29+
setup () {
30+
return new MountDatastore([{
3031
prefix: new Key('/a'),
31-
datastore: new LevelStore('one', {db: leveljs})
32+
datastore: new LevelStore('one', { db: leveljs })
3233
}, {
3334
prefix: new Key('/q'),
34-
datastore: new LevelStore('two', {db: leveljs})
35+
datastore: new LevelStore('two', { db: leveljs })
3536
}, {
3637
prefix: new Key('/z'),
37-
datastore: new LevelStore('three', {db: leveljs})
38-
}]))
38+
datastore: new LevelStore('three', { db: leveljs })
39+
}])
3940
},
40-
teardown (callback) {
41-
each(['one', 'two', 'three'], leveljs.destroy.bind(leveljs), callback)
41+
teardown () {
42+
return Promise.all(['one', 'two', 'three'].map(dir => {
43+
return new Promise((resolve, reject) => {
44+
leveljs.destroy(dir, err => {
45+
if (err) return reject(err)
46+
resolve()
47+
})
48+
})
49+
}))
4250
}
4351
})
4452
})

0 commit comments

Comments
 (0)