36
36
import org .elasticsearch .indices .IndexAlreadyExistsException ;
37
37
import org .elasticsearch .river .*;
38
38
39
- import java .io .BufferedReader ;
40
- import java .io .IOException ;
41
- import java .io .InputStream ;
42
- import java .io .InputStreamReader ;
39
+ import java .io .*;
43
40
import java .net .HttpURLConnection ;
44
41
import java .net .URL ;
42
+ import java .net .URLEncoder ;
45
43
import java .util .Map ;
46
44
import java .util .concurrent .TimeUnit ;
47
45
@@ -61,6 +59,7 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
61
59
private final int couchPort ;
62
60
private final String couchDb ;
63
61
private final String couchFilter ;
62
+ private final String couchFilterParamsUrl ;
64
63
65
64
private final String indexName ;
66
65
private final String typeName ;
@@ -84,11 +83,26 @@ public class CouchdbRiver extends AbstractRiverComponent implements River {
84
83
couchPort = XContentMapValues .nodeIntegerValue (couchSettings .get ("port" ), 5984 );
85
84
couchDb = XContentMapValues .nodeStringValue (couchSettings .get ("db" ), riverName .name ());
86
85
couchFilter = XContentMapValues .nodeStringValue (couchSettings .get ("filter" ), null );
86
+ if (couchSettings .containsKey ("filter_params" )) {
87
+ Map <String , Object > filterParams = (Map <String , Object >) couchSettings .get ("filter_params" );
88
+ StringBuilder sb = new StringBuilder ();
89
+ for (Map .Entry <String , Object > entry : filterParams .entrySet ()) {
90
+ try {
91
+ sb .append ("&" ).append (URLEncoder .encode (entry .getKey (), "UTF-8" )).append (URLEncoder .encode (entry .getValue ().toString (), "UTF-8" ));
92
+ } catch (UnsupportedEncodingException e ) {
93
+ // should not happen...
94
+ }
95
+ }
96
+ couchFilterParamsUrl = sb .toString ();
97
+ } else {
98
+ couchFilterParamsUrl = null ;
99
+ }
87
100
} else {
88
101
couchHost = "localhost" ;
89
102
couchPort = 5984 ;
90
103
couchDb = "db" ;
91
104
couchFilter = null ;
105
+ couchFilterParamsUrl = null ;
92
106
}
93
107
94
108
if (settings .settings ().containsKey ("index" )) {
@@ -261,7 +275,14 @@ private class Slurper implements Runnable {
261
275
262
276
String file = "/" + couchDb + "/_changes?feed=continuous&include_docs=true&heartbeat=10000" ;
263
277
if (couchFilter != null ) {
264
- file = file + "&filter=" + couchFilter ;
278
+ try {
279
+ file = file + "&filter=" + URLEncoder .encode (couchFilter , "UTF-8" );
280
+ } catch (UnsupportedEncodingException e ) {
281
+ // should not happen!
282
+ }
283
+ if (couchFilterParamsUrl != null ) {
284
+ file = file + couchFilterParamsUrl ;
285
+ }
265
286
}
266
287
if (lastSeq != null ) {
267
288
file = file + "&since=" + lastSeq ;
0 commit comments