forked from alanshaw/ipfs-ds-postgres
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdatastore.go
213 lines (185 loc) Β· 5.3 KB
/
datastore.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package pgds
import (
"context"
"fmt"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
// Datastore is a PostgreSQL backed datastore.
type Datastore struct {
table string
pool *pgxpool.Pool
}
// NewDatastore creates a new PostgreSQL datastore
func NewDatastore(ctx context.Context, connString string, options ...Option) (*Datastore, error) {
cfg := Options{}
cfg.Apply(append([]Option{OptionDefaults}, options...)...)
pool, err := pgxpool.Connect(ctx, connString)
if err != nil {
return nil, err
}
return &Datastore{table: cfg.Table, pool: pool}, nil
}
// PgxPool exposes the underlying pool of connections to Postgres.
func (d *Datastore) PgxPool() *pgxpool.Pool {
return d.pool
}
// Close closes the underying PostgreSQL database.
func (d *Datastore) Close() error {
if d.pool != nil {
d.pool.Close()
}
return nil
}
// Delete removes a row from the PostgreSQL database by the given key.
func (d *Datastore) Delete(ctx context.Context, key ds.Key) error {
sql := fmt.Sprintf("DELETE FROM %s WHERE key = $1", d.table)
_, err := d.pool.Exec(ctx, sql, key.String())
if err != nil {
return err
}
return nil
}
// Get retrieves a value from the PostgreSQL database by the given key.
func (d *Datastore) Get(ctx context.Context, key ds.Key) (value []byte, err error) {
sql := fmt.Sprintf("SELECT data FROM %s WHERE key = $1", d.table)
row := d.pool.QueryRow(ctx, sql, key.String())
var out []byte
switch err := row.Scan(&out); err {
case pgx.ErrNoRows:
return nil, ds.ErrNotFound
case nil:
return out, nil
default:
return nil, err
}
}
// Has determines if a value for the given key exists in the PostgreSQL database.
func (d *Datastore) Has(ctx context.Context, key ds.Key) (bool, error) {
sql := fmt.Sprintf("SELECT exists(SELECT 1 FROM %s WHERE key = $1)", d.table)
row := d.pool.QueryRow(ctx, sql, key.String())
var exists bool
switch err := row.Scan(&exists); err {
case pgx.ErrNoRows:
return exists, ds.ErrNotFound
case nil:
return exists, nil
default:
return exists, err
}
}
// Put "upserts" a row into the SQL database.
func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) error {
sql := fmt.Sprintf("INSERT INTO %s (key, data) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET data = $2", d.table)
_, err := d.pool.Exec(ctx, sql, key.String(), value)
if err != nil {
return err
}
return nil
}
// Query returns multiple rows from the SQL database based on the passed query parameters.
func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
var sql string
if q.KeysOnly && q.ReturnsSizes {
sql = fmt.Sprintf("SELECT key, octet_length(data) FROM %s", d.table)
} else if q.KeysOnly {
sql = fmt.Sprintf("SELECT key FROM %s", d.table)
} else {
sql = fmt.Sprintf("SELECT key, data FROM %s", d.table)
}
if q.Prefix != "" {
// normalize
prefix := ds.NewKey(q.Prefix).String()
if prefix != "/" {
sql += fmt.Sprintf(` WHERE key LIKE '%s%%' ORDER BY key`, prefix+"/")
}
}
// only apply limit and offset if we do not have to naive filter/order the results
if len(q.Filters) == 0 && len(q.Orders) == 0 {
if q.Limit != 0 {
sql += fmt.Sprintf(" LIMIT %d", q.Limit)
}
if q.Offset != 0 {
sql += fmt.Sprintf(" OFFSET %d", q.Offset)
}
}
rows, err := d.pool.Query(ctx, sql)
if err != nil {
return nil, err
}
it := dsq.Iterator{
Next: func() (dsq.Result, bool) {
if !rows.Next() {
if rows.Err() != nil {
return dsq.Result{Error: rows.Err()}, false
}
return dsq.Result{}, false
}
var key string
var size int
var data []byte
if q.KeysOnly && q.ReturnsSizes {
err := rows.Scan(&key, &size)
if err != nil {
return dsq.Result{Error: err}, false
}
return dsq.Result{Entry: dsq.Entry{Key: key, Size: size}}, true
} else if q.KeysOnly {
err := rows.Scan(&key)
if err != nil {
return dsq.Result{Error: err}, false
}
return dsq.Result{Entry: dsq.Entry{Key: key}}, true
}
err := rows.Scan(&key, &data)
if err != nil {
return dsq.Result{Error: err}, false
}
entry := dsq.Entry{Key: key, Value: data}
if q.ReturnsSizes {
entry.Size = len(data)
}
return dsq.Result{Entry: entry}, true
},
Close: func() error {
rows.Close()
return nil
},
}
res := dsq.ResultsFromIterator(q, it)
for _, f := range q.Filters {
res = dsq.NaiveFilter(res, f)
}
res = dsq.NaiveOrder(res, q.Orders...)
// if we have filters or orders, offset and limit won't have been applied in the query
if len(q.Filters) > 0 || len(q.Orders) > 0 {
if q.Offset != 0 {
res = dsq.NaiveOffset(res, q.Offset)
}
if q.Limit != 0 {
res = dsq.NaiveLimit(res, q.Limit)
}
}
return res, nil
}
// Sync is noop for PostgreSQL databases.
func (d *Datastore) Sync(ctx context.Context, key ds.Key) error {
return nil
}
// GetSize determines the size in bytes of the value for a given key.
func (d *Datastore) GetSize(ctx context.Context, key ds.Key) (int, error) {
sql := fmt.Sprintf("SELECT octet_length(data) FROM %s WHERE key = $1", d.table)
row := d.pool.QueryRow(ctx, sql, key.String())
var size int
switch err := row.Scan(&size); err {
case pgx.ErrNoRows:
return -1, ds.ErrNotFound
case nil:
return size, nil
default:
return -1, err
}
}
var _ ds.Datastore = (*Datastore)(nil)