Skip to content

Commit 6b59115

Browse files
Improve feature #76
- Add support for filtering nested objects
1 parent a78c35c commit 6b59115

File tree

5 files changed

+191
-96
lines changed

5 files changed

+191
-96
lines changed

src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java

+13-12
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.IOException;
2626
import java.net.UnknownHostException;
2727
import java.util.ArrayList;
28+
import java.util.Arrays;
2829
import java.util.HashMap;
2930
import java.util.HashSet;
3031
import java.util.List;
@@ -63,7 +64,7 @@
6364
import org.elasticsearch.river.RiverIndexName;
6465
import org.elasticsearch.river.RiverName;
6566
import org.elasticsearch.river.RiverSettings;
66-
import org.elasticsearch.river.mongodb.util.GridFSHelper;
67+
import org.elasticsearch.river.mongodb.util.MongoDBHelper;
6768
import org.elasticsearch.script.ExecutableScript;
6869
import org.elasticsearch.script.ScriptService;
6970

@@ -556,10 +557,8 @@ private Mongo getMongoClient() {
556557
if (mongo == null) {
557558
// TODO: MongoClientOptions should be configurable
558559
MongoClientOptions mco = MongoClientOptions.builder()
559-
.autoConnectRetry(true)
560-
.connectTimeout(15000)
561-
.socketTimeout(60000)
562-
.build();
560+
.autoConnectRetry(true).connectTimeout(15000)
561+
.socketTimeout(60000).build();
563562
mongo = new MongoClient(mongoServers, mco);
564563
}
565564
return mongo;
@@ -821,20 +820,21 @@ private XContentBuilder build(final Map<String, Object> data,
821820
if (data.containsKey(IS_MONGODB_ATTACHMENT)) {
822821
logger.info("Add Attachment: {} to index {} / type {}",
823822
objectId, indexName, typeName);
824-
return GridFSHelper.serialize((GridFSDBFile) data
823+
return MongoDBHelper.serialize((GridFSDBFile) data
825824
.get(MONGODB_ATTACHMENT));
826825
} else {
827826
return XContentFactory.jsonBuilder().map(data);
828827
}
829828
}
830829

831830
private String extractObjectId(Map<String, Object> ctx, String objectId) {
832-
String id = (String) ctx.get("id");
831+
String id = (String) ctx.get("id");
833832
if (id == null) {
834833
id = objectId;
835834
}
836835
return id;
837836
}
837+
838838
private String extractParent(Map<String, Object> ctx) {
839839
return (String) ctx.get("_parent");
840840
}
@@ -1022,11 +1022,12 @@ private void processOplogEntry(final DBObject entry)
10221022
.get(OPLOG_TIMESTAMP);
10231023
DBObject object = (DBObject) entry.get(OPLOG_OBJECT);
10241024

1025-
if (excludeFields != null) {
1026-
for (String excludeField : excludeFields) {
1027-
object.removeField(excludeField);
1028-
}
1029-
}
1025+
object = MongoDBHelper.applyExcludeFields(object, excludeFields);
1026+
// if (excludeFields != null) {
1027+
// for (String excludeField : excludeFields) {
1028+
// object.removeField(excludeField);
1029+
// }
1030+
// }
10301031

10311032
// Initial support for sharded collection -
10321033
// https://jira.mongodb.org/browse/SERVER-4333

src/main/java/org/elasticsearch/river/mongodb/util/GridFSHelper.java

-84
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.river.mongodb.util;
21+
22+
import java.io.ByteArrayOutputStream;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.util.Arrays;
26+
import java.util.HashSet;
27+
import java.util.Set;
28+
29+
import org.elasticsearch.common.Base64;
30+
import org.elasticsearch.common.xcontent.XContentBuilder;
31+
import org.elasticsearch.common.xcontent.XContentFactory;
32+
33+
import com.mongodb.DBObject;
34+
import com.mongodb.gridfs.GridFSDBFile;
35+
36+
/*
37+
* GridFS Helper class
38+
*/
39+
public abstract class MongoDBHelper {
40+
41+
public static XContentBuilder serialize(GridFSDBFile file)
42+
throws IOException {
43+
44+
XContentBuilder builder = XContentFactory.jsonBuilder();
45+
46+
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
47+
48+
int nRead;
49+
byte[] data = new byte[1024];
50+
51+
InputStream stream = file.getInputStream();
52+
while ((nRead = stream.read(data, 0, data.length)) != -1) {
53+
buffer.write(data, 0, nRead);
54+
}
55+
56+
buffer.flush();
57+
stream.close();
58+
59+
String encodedContent = Base64.encodeBytes(buffer.toByteArray());
60+
61+
// Probably not necessary...
62+
buffer.close();
63+
64+
builder.startObject();
65+
builder.startObject("content");
66+
builder.field("content_type", file.getContentType());
67+
builder.field("title", file.getFilename());
68+
builder.field("content", encodedContent);
69+
builder.endObject();
70+
builder.field("filename", file.getFilename());
71+
builder.field("contentType", file.getContentType());
72+
builder.field("md5", file.getMD5());
73+
builder.field("length", file.getLength());
74+
builder.field("chunkSize", file.getChunkSize());
75+
builder.field("uploadDate", file.getUploadDate());
76+
builder.startObject("metadata");
77+
DBObject metadata = file.getMetaData();
78+
if (metadata != null) {
79+
for (String key : metadata.keySet()) {
80+
builder.field(key, metadata.get(key));
81+
}
82+
}
83+
builder.endObject();
84+
builder.endObject();
85+
86+
return builder;
87+
}
88+
89+
public static DBObject applyExcludeFields(DBObject bsonObject,
90+
Set<String> excludeFields) {
91+
if (excludeFields == null) {
92+
return bsonObject;
93+
}
94+
95+
DBObject filteredObject = bsonObject;
96+
for (String field : excludeFields) {
97+
if (field.contains(".")) {
98+
String rootObject = field.substring(0, field.indexOf("."));
99+
String childObject = field.substring(field.indexOf(".") + 1);
100+
if (filteredObject.containsField(rootObject)) {
101+
Object object = filteredObject.get(rootObject);
102+
if (object instanceof DBObject) {
103+
DBObject object2 = (DBObject) object;
104+
object2 = applyExcludeFields(object2,
105+
new HashSet<String>(Arrays.asList(childObject)));
106+
}
107+
}
108+
} else {
109+
if (filteredObject.containsField(field)) {
110+
filteredObject.removeField(field);
111+
}
112+
}
113+
}
114+
return filteredObject;
115+
}
116+
117+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package test.elasticsearch.plugin.river.mongodb;
2+
3+
import static org.elasticsearch.common.io.Streams.copyToStringFromClasspath;
4+
5+
import java.util.Arrays;
6+
import java.util.HashSet;
7+
import java.util.Set;
8+
9+
import org.elasticsearch.common.logging.ESLogger;
10+
import org.elasticsearch.common.logging.Loggers;
11+
import org.elasticsearch.river.mongodb.util.MongoDBHelper;
12+
import org.testng.Assert;
13+
import org.testng.annotations.Test;
14+
15+
import com.mongodb.DBObject;
16+
import com.mongodb.util.JSON;
17+
18+
@Test
19+
public class ExcludeFieldsTest {
20+
21+
private final ESLogger logger = Loggers.getLogger(getClass());
22+
23+
@Test
24+
public void testExcludeFields() {
25+
try {
26+
Set<String> excludeFields = new HashSet<String>(Arrays.asList(
27+
"lastName", "hobbies", "address.apartment"));
28+
// test-exclude-fields-document.json
29+
String mongoDocument = copyToStringFromClasspath("/test/elasticsearch/plugin/river/mongodb/test-exclude-fields-document.json");
30+
DBObject dbObject = (DBObject) JSON.parse(mongoDocument);
31+
logger.debug("Initial BSON object: {}", dbObject);
32+
DBObject filteredObject = MongoDBHelper.applyExcludeFields(dbObject,
33+
excludeFields);
34+
logger.debug("Filtered BSON object: {}", filteredObject);
35+
Assert.assertNotNull(filteredObject);
36+
Assert.assertFalse(filteredObject.containsField("hobbies"));
37+
Assert.assertTrue(filteredObject.containsField("address"));
38+
Assert.assertFalse(((DBObject) filteredObject.get("address"))
39+
.containsField("apartment"));
40+
} catch (Throwable t) {
41+
logger.error("testExcludeFields failed", t);
42+
Assert.fail();
43+
}
44+
}
45+
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"firstName": "John",
3+
"lastName": "Doe",
4+
"address": {
5+
"street": "Main Street",
6+
"apartment": "1A",
7+
"state": "MA",
8+
"city": "Boston",
9+
"country": "USA"
10+
},
11+
"hobbies": [
12+
"movie",
13+
"soccer"
14+
]
15+
}

0 commit comments

Comments
 (0)