-
Notifications
You must be signed in to change notification settings - Fork 365
/
Copy pathcommand.go
311 lines (276 loc) · 8.4 KB
/
command.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
// Copyright (C) MongoDB, Inc. 2014-present.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
package db
import (
"context"
"fmt"
"github.com/mongodb/mongo-tools/common/bsonutil"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
mopt "go.mongodb.org/mongo-driver/mongo/options"
)
// Query flags.
const (
Snapshot = 1 << iota
LogReplay
Prefetch
)
type NodeType string
const (
Mongos NodeType = "mongos"
Standalone NodeType = "standalone"
ReplSet NodeType = "replset"
Unknown NodeType = "unknown"
)
// CommandRunner exposes functions that can be run against a server
// XXX Does anything rely on this?
type CommandRunner interface {
Run(command interface{}, out interface{}, database string) error
RunString(commandName string, out interface{}, database string) error
FindOne(
db, collection string,
skip int,
query interface{},
sort []string,
into interface{},
opts int,
) error
Remove(db, collection string, query interface{}) error
DatabaseNames() ([]string, error)
CollectionNames(db string) ([]string, error)
}
// // Remove removes all documents matched by query q in the db database and c collection.
// func (sp *SessionProvider) Remove(db, c string, q interface{}) error {
// session, err := sp.GetSession()
// if err != nil {
// return err
// }
// _, err = session.Database(db).Collection(c).RemoveAll(q)
// return err
// }
//
// Run issues the provided command on the db database and unmarshals its result
// into out.
func (sp *SessionProvider) Run(command interface{}, out interface{}, name string) error {
db := sp.DB(name)
result := db.RunCommand(context.Background(), command)
if result.Err() != nil {
return result.Err()
}
err := result.Decode(out)
if err != nil {
return err
}
return nil
}
func (sp *SessionProvider) RunString(commandName string, out interface{}, name string) error {
command := &bson.M{commandName: 1}
return sp.Run(command, out, name)
}
func (sp *SessionProvider) DropDatabase(dbName string) error {
return sp.DB(dbName).Drop(context.Background())
}
func (sp *SessionProvider) CreateCollection(dbName, collName string) error {
command := &bson.M{"create": collName}
out := &bson.Raw{}
err := sp.Run(command, out, dbName)
return err
}
func (sp *SessionProvider) ServerVersion() (string, error) {
out := struct{ Version string }{}
err := sp.RunString("buildInfo", &out, "admin")
if err != nil {
return "", err
}
return out.Version, nil
}
func (sp *SessionProvider) ServerVersionArray() (Version, error) {
var version Version
out := struct {
VersionArray []int `bson:"versionArray"`
}{}
err := sp.RunString("buildInfo", &out, "admin")
if err != nil {
return version, fmt.Errorf("error getting buildInfo: %v", err)
}
if len(out.VersionArray) < 3 {
return version, fmt.Errorf("buildInfo.versionArray had fewer than 3 elements")
}
copy(version[:], out.VersionArray[:len(version)])
// In development server builds `versionArray`’s 4th member is negative, and
// `versionArray`’s patch version exceeds `version`’s by 1. Since we have
// logic that compares this method’s output to `version` we need to subtract
// one from the patch value.
if len(out.VersionArray) > 3 && out.VersionArray[3] < 0 {
version[2]--
}
return version, nil
}
// DatabaseNames returns a slice containing the names of all the databases on the
// connected server.
func (sp *SessionProvider) DatabaseNames() ([]string, error) {
return sp.client.ListDatabaseNames(context.TODO(), bson.D{})
}
// CollectionNames returns the names of all the collections in the dbName database.
// func (sp *SessionProvider) CollectionNames(dbName string) ([]string, error) {
// session, err := sp.GetSession()
// if err != nil {
// return nil, err
// }
// return session.DB(dbName).CollectionNames()
// }
// IsAtlasProxy checks if the connected SessionProvider is an atlas proxy.
func (sp *SessionProvider) IsAtlasProxy() bool {
session, err := sp.GetSession()
if err != nil {
return false
}
// Only the atlas proxy will respond to this command without an error.
result := session.Database("admin").RunCommand(
context.Background(),
&bson.M{"atlasVersion": 1},
)
return result.Err() == nil
}
// GetNodeType checks if the connected SessionProvider is a mongos, standalone, or replset,
// by looking at the result of calling isMaster.
func (sp *SessionProvider) GetNodeType() (NodeType, error) {
session, err := sp.GetSession()
if err != nil {
return Unknown, err
}
masterDoc := struct {
SetName interface{} `bson:"setName"`
Hosts interface{} `bson:"hosts"`
Msg string `bson:"msg"`
}{}
result := session.Database("admin").RunCommand(
context.Background(),
&bson.M{"ismaster": 1},
)
if result.Err() != nil {
return Unknown, result.Err()
}
err = result.Decode(&masterDoc)
if err != nil {
return Unknown, err
}
if masterDoc.SetName != nil || masterDoc.Hosts != nil {
return ReplSet, nil
} else if masterDoc.Msg == "isdbgrid" {
// isdbgrid is always the msg value when calling isMaster on a mongos
// see http://docs.mongodb.org/manual/core/sharded-cluster-query-router/
return Mongos, nil
}
return Standalone, nil
}
// IsReplicaSet returns a boolean which is true if the connected server is part
// of a replica set.
func (sp *SessionProvider) IsReplicaSet() (bool, error) {
nodeType, err := sp.GetNodeType()
if err != nil {
return false, err
}
return nodeType == ReplSet, nil
}
// IsMongos returns true if the connected server is a mongos.
func (sp *SessionProvider) IsMongos() (bool, error) {
nodeType, err := sp.GetNodeType()
if err != nil {
return false, err
}
return nodeType == Mongos, nil
}
//
// // SupportsWriteCommands returns true if the connected server supports write
// // commands, returns false otherwise.
// func (sp *SessionProvider) SupportsWriteCommands() (bool, error) {
// session, err := sp.GetSession()
// if err != nil {
// return false, err
// }
// masterDoc := struct {
// Ok int `bson:"ok"`
// MaxWire int `bson:"maxWireVersion"`
// }{}
// err = session.Run("isMaster", &masterDoc)
// if err != nil {
// return false, err
// }
// // the connected server supports write commands if
// // the maxWriteVersion field is present
// return (masterDoc.Ok == 1 && masterDoc.MaxWire >= 2), nil
// }
// FindOne returns the first document in the collection and database that matches
// the query after skip, sort and query flags are applied.
func (sp *SessionProvider) FindOne(
db, collection string,
skip int,
query interface{},
sort interface{},
into interface{},
flags int,
) error {
session, err := sp.GetSession()
if err != nil {
return err
}
if query == nil {
query = bson.D{}
}
opts := mopt.FindOne().SetSort(sort).SetSkip(int64(skip))
ApplyFlags(opts, flags)
res := session.Database(db).Collection(collection).FindOne(context.TODO(), query, opts)
err = res.Decode(into)
return err
}
// ApplyFlags applies flags to the given query session.
func ApplyFlags(opts *mopt.FindOneOptions, flags int) {
if flags&Snapshot > 0 {
opts.SetHint(bson.D{{"_id", 1}})
}
if flags&LogReplay > 0 {
opts.SetOplogReplay(true)
}
}
// RunApplyOpsCreateIndex will create index using applyOps.
// For versions that support collection UUIDs (<3.6) it uses an insert to system indexes.
// Later versions use the createIndexes command.
func (sp *SessionProvider) RunApplyOpsCreateIndex(
C, DB string,
index bson.D,
UUID *primitive.Binary,
result *interface{},
) error {
var op Oplog
// Add an index version if it is missing. An index version could be missing because
// a tool stripped it out (e.g. mongorestore strips out versions before attempting createIndex)
// or because the index came from an oplog entry from versions <3.4.
_, err := bsonutil.FindValueByKey("v", &index)
if err != nil {
index = append(index, bson.E{Key: "v", Value: 1})
}
if UUID != nil {
o := append(bson.D{{Key: "createIndexes", Value: C}}, index...)
op = Oplog{
Operation: "c",
Namespace: fmt.Sprintf("%s.$cmd", DB),
Object: o,
UI: UUID,
}
} else {
op = Oplog{
Operation: "i",
Namespace: fmt.Sprintf("%s.system.indexes", DB),
Object: index,
}
}
err = sp.Run(bson.D{{Key: "applyOps", Value: []Oplog{op}}}, result, DB)
if err != nil {
return fmt.Errorf("error building index: %v", err)
}
return nil
}