diff --git a/mongoexport/mongoexport.go b/mongoexport/mongoexport.go index a10156f4e..b53804830 100644 --- a/mongoexport/mongoexport.go +++ b/mongoexport/mongoexport.go @@ -184,8 +184,8 @@ func (exp *MongoExport) validateSettings() error { return fmt.Errorf("cannot use --forceTableScan when specifying --query") } - if exp.InputOpts.Query != "" && exp.InputOpts.QueryFile != "" { - return fmt.Errorf("either --query or --queryFile can be specified as a query option") + if exp.InputOpts.Query != "" && exp.InputOpts.QueryFile != "" && exp.InputOpts.Pipeline != "" { + return fmt.Errorf("either --query, --queryFile, or --pipeline can be specified as a query option") } if exp.InputOpts != nil && exp.InputOpts.HasQuery() { @@ -193,9 +193,17 @@ func (exp *MongoExport) validateSettings() error { if err != nil { return err } - _, err2 := getObjectFromByteArg(content) - if err2 != nil { - return err2 + + if exp.InputOpts.Pipeline != "" { + _, err2 := getArrayFromByteArg(content) + if err2 != nil { + return err2 + } + } else { + _, err2 := getObjectFromByteArg(content) + if err2 != nil { + return err2 + } } } @@ -256,6 +264,7 @@ func makeFieldSelector(fields string) bson.M { // It always returns Limit if there is a limit, assuming that in general // limits will less then the total possible. // If there is a query and no limit then it returns 0, because it's too expensive to count the query. +// If there is an aggregation pipeline and no limit then it returns 0, because it's too expensive to count the results. // If the collection is a view then it returns 0, because it is too expensive to count the view. // Otherwise it returns the count minus the skip func (exp *MongoExport) getCount() (int64, error) { @@ -269,6 +278,9 @@ func (exp *MongoExport) getCount() (int64, error) { if exp.InputOpts != nil && exp.InputOpts.Query != "" { return 0, nil } + if exp.InputOpts != nil && exp.InputOpts.Pipeline != "" { + return 0, nil + } coll := session.Database(exp.ToolOptions.Namespace.DB).Collection(exp.ToolOptions.Namespace.Collection) if exp.collInfo.IsView() { @@ -298,6 +310,7 @@ func (exp *MongoExport) getCount() (int64, error) { // associated session, so that it can be closed once the cursor is used up. func (exp *MongoExport) getCursor() (*mongo.Cursor, error) { findOpts := mopt.Find() + aggregationOpts := mopt.Aggregate() if exp.InputOpts != nil && exp.InputOpts.Sort != "" { sortD, err := getSortFromArg(exp.InputOpts.Sort) @@ -309,15 +322,23 @@ func (exp *MongoExport) getCursor() (*mongo.Cursor, error) { } query := bson.D{} + pipeline := bson.A{} if exp.InputOpts != nil && exp.InputOpts.HasQuery() { var err error content, err := exp.InputOpts.GetQuery() if err != nil { return nil, err } - err = bson.UnmarshalExtJSON(content, false, &query) - if err != nil { - return nil, fmt.Errorf("error parsing query as Extended JSON: %v", err) + if exp.InputOpts.Pipeline != "" { + err = bson.UnmarshalExtJSON(content, false, &pipeline) + if err != nil { + return nil, fmt.Errorf("error parsing query as Extended JSON: %v", err) + } + } else { + err = bson.UnmarshalExtJSON(content, false, &query) + if err != nil { + return nil, fmt.Errorf("error parsing query as Extended JSON: %v", err) + } } } @@ -366,11 +387,20 @@ func (exp *MongoExport) getCursor() (*mongo.Cursor, error) { findOpts.SetLimit(exp.InputOpts.Limit) } + if exp.InputOpts != nil && exp.InputOpts.AllowDiskUse { + findOpts.SetAllowDiskUse(true) + aggregationOpts.SetAllowDiskUse(true) + } + if len(exp.OutputOpts.Fields) > 0 { findOpts.SetProjection(makeFieldSelector(exp.OutputOpts.Fields)) } - return coll.Find(nil, query, findOpts) + if exp.InputOpts.Pipeline != "" { + return coll.Aggregate(nil, pipeline, aggregationOpts) + } else { + return coll.Find(nil, query, findOpts) + } } // verifyCollectionExists checks if the collection exists. If it does, a copy of the collection info will be cached @@ -529,6 +559,26 @@ func getObjectFromByteArg(queryRaw []byte) (map[string]interface{}, error) { return parsedJSON, nil } +// getArrayFromByteArg takes an object in extended JSON, and converts it to an object that +// can be passed straight to db.collection.find(...) as a query or sort criteria. +// Returns an error if the string is not valid JSON +func getArrayFromByteArg(queryRaw []byte) ([]map[string]interface{}, error) { + parsedJSON := []map[string]interface{}{} + err := json.Unmarshal(queryRaw, &parsedJSON) + if err != nil { + return nil, fmt.Errorf("query '%v' is not valid JSON: %v", queryRaw, err) + } + + for _, stage := range parsedJSON { + err = bsonutil.ConvertLegacyExtJSONDocumentToBSON(stage) + if err != nil { + return nil, err + } + } + + return parsedJSON, nil +} + // getSortFromArg takes a sort specification in JSON and returns it as a bson.D // object which preserves the ordering of the keys as they appear in the input. func getSortFromArg(queryRaw string) (bson.D, error) { diff --git a/mongoexport/options.go b/mongoexport/options.go index e3cb842cc..57d0205c6 100644 --- a/mongoexport/options.go +++ b/mongoexport/options.go @@ -62,6 +62,7 @@ func (*OutputFormatOptions) Name() string { type InputOptions struct { Query string `long:"query" value-name:"" short:"q" description:"query filter, as a JSON string, e.g., '{x:{$gt:1}}'"` QueryFile string `long:"queryFile" value-name:"" description:"path to a file containing a query filter (JSON)"` + Pipeline string `long:"pipeline" value-name:"" short:"p" description:"aggregation pipeline, as a JSON array, e.g., '[$match: {x:{$gt:1}}]'"` SlaveOk bool `long:"slaveOk" short:"k" hidden:"true" description:"allow secondary reads if available" default-mask:"-"` ReadPreference string `long:"readPreference" value-name:"|" description:"specify either a preference mode (e.g. 'nearest') or a preference json object (e.g. '{mode: \"nearest\", tagSets: [{a: \"b\"}], maxStalenessSeconds: 123}')"` ForceTableScan bool `long:"forceTableScan" description:"force a table scan (do not use $snapshot or hint _id). Deprecated since this is default behavior on WiredTiger"` @@ -69,6 +70,7 @@ type InputOptions struct { Limit int64 `long:"limit" value-name:"" description:"limit the number of documents to export"` Sort string `long:"sort" value-name:"" description:"sort order, as a JSON string, e.g. '{x:1}'"` AssertExists bool `long:"assertExists" description:"if specified, export fails if the collection does not exist"` + AllowDiskUse bool `long:"allowDiskUse" description:"if specified, allows server to use temporary files on disk to store data exceeding the 100MB system memory limit"` } // Name returns a human-readable group name for input options. @@ -77,7 +79,7 @@ func (*InputOptions) Name() string { } func (inputOptions *InputOptions) HasQuery() bool { - return inputOptions.Query != "" || inputOptions.QueryFile != "" + return inputOptions.Query != "" || inputOptions.QueryFile != "" || inputOptions.Pipeline != "" } func (inputOptions *InputOptions) GetQuery() ([]byte, error) { @@ -89,8 +91,10 @@ func (inputOptions *InputOptions) GetQuery() ([]byte, error) { err = fmt.Errorf("error reading queryFile: %s", err) } return content, err + } else if inputOptions.Pipeline != "" { + return []byte(inputOptions.Pipeline), nil } - panic("GetQuery can return valid values only for query or queryFile input") + panic("GetQuery can return valid values only for query, queryFile, or pipeline input") } // Options represents all possible options that can be used to configure mongoexport.