Skip to content

Commit 80d1c6e

Browse files
Change oplog.rs filter - #123
- Implement suggested changes - https://groups.google.com/forum/#!topic/mongodb-user/E7BSv624nBg
1 parent d270e06 commit 80d1c6e

File tree

2 files changed

+12
-16
lines changed

2 files changed

+12
-16
lines changed

Diff for: src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
8383
public final static String MONGODB_ADMIN_DATABASE = "admin";
8484
public final static String MONGODB_CONFIG_DATABASE = "config";
8585
public final static String MONGODB_ID_FIELD = "_id";
86+
public final static String MONGODB_IN_OPERATOR = "$in";
8687
public final static String MONGODB_OR_OPERATOR = "$or";
8788
public final static String MONGODB_AND_OPERATOR = "$and";
8889
public final static String MONGODB_NATURAL_OPERATOR = "$natural";

Diff for: src/main/java/org/elasticsearch/river/mongodb/Slurper.java

+11-16
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.elasticsearch.river.mongodb;
22

33
import java.util.ArrayList;
4+
import java.util.Collections;
45
import java.util.List;
56
import java.util.NoSuchElementException;
67
import java.util.Set;
@@ -335,7 +336,6 @@ private String getObjectIdFromOplogEntry(DBObject entry) {
335336

336337
private DBObject getOplogFilter(final BSONTimestamp time) {
337338
BasicDBObject filter = new BasicDBObject();
338-
List<DBObject> values2 = new ArrayList<DBObject>();
339339

340340
if (time == null) {
341341
logger.info("No known previous slurping time for this collection");
@@ -346,10 +346,10 @@ private DBObject getOplogFilter(final BSONTimestamp time) {
346346
if (definition.isMongoGridFS()) {
347347
filter.put(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoOplogNamespace() + MongoDBRiver.GRIDFS_FILES_SUFFIX);
348348
} else {
349-
values2.add(new BasicDBObject(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoOplogNamespace()));
350-
values2.add(new BasicDBObject(MongoDBRiver.OPLOG_NAMESPACE, definition.getMongoDb() + "."
351-
+ MongoDBRiver.OPLOG_NAMESPACE_COMMAND));
352-
filter.put(MongoDBRiver.MONGODB_OR_OPERATOR, values2);
349+
List<String> namespaceFilter = new ArrayList<String>();
350+
namespaceFilter.add(definition.getMongoOplogNamespace());
351+
namespaceFilter.add(definition.getMongoDb() + "." + MongoDBRiver.OPLOG_NAMESPACE_COMMAND);
352+
filter.put(MongoDBRiver.OPLOG_NAMESPACE, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, namespaceFilter));
353353
}
354354
if (definition.getMongoOplogFilter().size() > 0) {
355355
filter.putAll(getMongoFilter());
@@ -363,20 +363,15 @@ private DBObject getOplogFilter(final BSONTimestamp time) {
363363
private DBObject getMongoFilter() {
364364
List<DBObject> filters = new ArrayList<DBObject>();
365365
List<DBObject> filters2 = new ArrayList<DBObject>();
366-
List<DBObject> filters3 = new ArrayList<DBObject>();
367-
// include delete operation
368-
filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_DELETE_OPERATION));
369366

370-
// include update, insert in filters3
371-
filters3.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_UPDATE_OPERATION));
372-
filters3.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, MongoDBRiver.OPLOG_INSERT_OPERATION));
373-
374-
// include or operation statement in filter2
375-
filters2.add(new BasicDBObject(MongoDBRiver.MONGODB_OR_OPERATOR, filters3));
367+
List<String> operationFilter = new ArrayList<String>();
368+
operationFilter.add(MongoDBRiver.OPLOG_DELETE_OPERATION);
369+
operationFilter.add(MongoDBRiver.OPLOG_UPDATE_OPERATION);
370+
operationFilter.add(MongoDBRiver.OPLOG_INSERT_OPERATION);
371+
filters.add(new BasicDBObject(MongoDBRiver.OPLOG_OPERATION, new BasicBSONObject(MongoDBRiver.MONGODB_IN_OPERATOR, operationFilter)));
376372

377373
// include custom filter in filters2
378374
filters2.add(definition.getMongoOplogFilter());
379-
380375
filters.add(new BasicDBObject(MongoDBRiver.MONGODB_AND_OPERATOR, filters2));
381376

382377
return new BasicDBObject(MongoDBRiver.MONGODB_OR_OPERATOR, filters);
@@ -396,7 +391,7 @@ private DBCursor oplogCursor(final BSONTimestamp timestampOverride) {
396391
if (indexFilter.containsField(MongoDBRiver.OPLOG_TIMESTAMP)) {
397392
options = options | Bytes.QUERYOPTION_OPLOGREPLAY;
398393
}
399-
return oplogCollection.find(indexFilter).sort(new BasicDBObject(MongoDBRiver.MONGODB_NATURAL_OPERATOR, 1)).setOptions(options);
394+
return oplogCollection.find(indexFilter).setOptions(options);
400395
}
401396

402397
private void addQueryToStream(final String operation, final BSONTimestamp currentTimestamp, final DBObject update)

0 commit comments

Comments
 (0)