Skip to content

Commit f6252e7

Browse files
Merge pull request #152 from benmccann/admin-repl-info
Add replication info to admin page and check whether index exists before initial import
2 parents 81fdfbb + 256140a commit f6252e7

16 files changed

+361
-328
lines changed

src/main/java/org/elasticsearch/rest/action/RestMongoDBRiverAction.java

+23-10
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,25 @@
2020
import org.elasticsearch.rest.XContentRestResponse;
2121
import org.elasticsearch.rest.XContentThrowableRestResponse;
2222
import org.elasticsearch.rest.action.support.RestXContentBuilder;
23+
import org.elasticsearch.river.RiverIndexName;
24+
import org.elasticsearch.river.RiverSettings;
2325
import org.elasticsearch.river.mongodb.MongoDBRiver;
26+
import org.elasticsearch.river.mongodb.MongoDBRiverDefinition;
27+
import org.elasticsearch.river.mongodb.Status;
2428
import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper;
2529
import org.elasticsearch.search.SearchHit;
2630

2731
public class RestMongoDBRiverAction extends BaseRestHandler {
2832

29-
private static final String BASE_URL = "/_river/" + MongoDBRiver.TYPE;
33+
private final String riverIndexName;
3034

3135
@Inject
32-
public RestMongoDBRiverAction(Settings settings, Client client, RestController controller) {
36+
public RestMongoDBRiverAction(Settings settings, Client client, RestController controller, @RiverIndexName String riverIndexName) {
3337
super(settings, client);
34-
controller.registerHandler(RestRequest.Method.GET, BASE_URL + "/{action}", this);
35-
controller.registerHandler(RestRequest.Method.POST, BASE_URL + "/{river}/{action}", this);
38+
this.riverIndexName = riverIndexName;
39+
String baseUrl = "/" + riverIndexName + "/" + MongoDBRiver.TYPE;
40+
controller.registerHandler(RestRequest.Method.GET, baseUrl + "/{action}", this);
41+
controller.registerHandler(RestRequest.Method.POST, baseUrl + "/{river}/{action}", this);
3642
}
3743

3844
@Override
@@ -61,7 +67,7 @@ private void start(RestRequest request, RestChannel channel) {
6167
respondError(request, channel, "Parameter 'river' is required", RestStatus.BAD_REQUEST);
6268
return;
6369
}
64-
MongoDBRiverHelper.setRiverEnabled(client, river, true);
70+
MongoDBRiverHelper.setRiverStatus(client, river, Status.RUNNING);
6571
respondSuccess(request, channel, RestStatus.OK);
6672
}
6773

@@ -71,7 +77,7 @@ private void stop(RestRequest request, RestChannel channel) {
7177
respondError(request, channel, "Parameter 'river' is required", RestStatus.BAD_REQUEST);
7278
return;
7379
}
74-
MongoDBRiverHelper.setRiverEnabled(client, river, false);
80+
MongoDBRiverHelper.setRiverStatus(client, river, Status.STOPPED);
7581
respondSuccess(request, channel, RestStatus.OK);
7682
}
7783

@@ -120,16 +126,23 @@ private void errorResponse(RestRequest request, RestChannel channel, Throwable e
120126
}
121127

122128
private List<Map<String, Object>> getRivers() {
123-
SearchResponse searchResponse = client.prepareSearch("_river").setQuery(new FieldQueryBuilder("type", "mongodb")).execute()
124-
.actionGet();
129+
SearchResponse searchResponse = client.prepareSearch("_river")
130+
.setQuery(new FieldQueryBuilder("type", "mongodb"))
131+
.execute().actionGet();
125132
long totalHits = searchResponse.getHits().totalHits();
126133
logger.trace("totalHits: {}", totalHits);
127134
List<Map<String, Object>> rivers = new ArrayList<Map<String, Object>>();
128135
for (SearchHit hit : searchResponse.getHits().hits()) {
129136
Map<String, Object> source = new HashMap<String, Object>();
130-
source.put("name", hit.getType());
131-
source.put("enabled", MongoDBRiverHelper.isRiverEnabled(client, hit.getType()));
137+
String riverName = hit.getType();
138+
RiverSettings riverSettings = new RiverSettings(null, hit.getSource());
139+
MongoDBRiverDefinition definition = MongoDBRiverDefinition.parseSettings(riverName, riverIndexName, riverSettings, null);
140+
141+
source.put("name", riverName);
142+
source.put("status", MongoDBRiverHelper.getRiverStatus(client, hit.getType()));
132143
source.put("settings", hit.getSource());
144+
source.put("lastTimestamp", MongoDBRiver.getLastTimestamp(client, definition));
145+
source.put("indexCount", MongoDBRiver.getIndexCount(client, definition));
133146
logger.trace("source: {}", hit.getSourceAsString());
134147
rivers.add(source);
135148
}

src/main/java/org/elasticsearch/river/mongodb/Indexer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public Indexer(MongoDBRiverDefinition definition, SharedContext context, Client
6060

6161
@Override
6262
public void run() {
63-
while (context.isActive()) {
63+
while (context.getStatus() == Status.RUNNING) {
6464
sw = new StopWatch().start();
6565
deletedDocuments = 0;
6666
insertedDocuments = 0;

src/main/java/org/elasticsearch/river/mongodb/MongoDBRiver.java

+18-9
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.bson.types.BSONTimestamp;
3333
import org.elasticsearch.ExceptionsHelper;
3434
import org.elasticsearch.action.bulk.BulkRequestBuilder;
35+
import org.elasticsearch.action.count.CountResponse;
3536
import org.elasticsearch.action.get.GetResponse;
3637
import org.elasticsearch.client.Client;
3738
import org.elasticsearch.cluster.block.ClusterBlockException;
@@ -74,8 +75,8 @@ public class MongoDBRiver extends AbstractRiverComponent implements River {
7475

7576
public final static String TYPE = "mongodb";
7677
public final static String NAME = "mongodb-river";
77-
public final static String STATUS = "_mongodbstatus";
78-
public final static String ENABLED = "enabled";
78+
public final static String STATUS_ID = "_riverstatus";
79+
public final static String STATUS_FIELD = "status";
7980
public final static String DESCRIPTION = "MongoDB River Plugin";
8081
public final static String LAST_TIMESTAMP_FIELD = "_last_ts";
8182
public final static String MONGODB_LOCAL_DATABASE = "local";
@@ -130,21 +131,21 @@ public MongoDBRiver(RiverName riverName, RiverSettings settings, @RiverIndexName
130131
BlockingQueue<QueueEntry> stream = definition.getThrottleSize() == -1 ? new LinkedTransferQueue<QueueEntry>()
131132
: new ArrayBlockingQueue<QueueEntry>(definition.getThrottleSize());
132133

133-
this.context = new SharedContext(stream, false);
134+
this.context = new SharedContext(stream, Status.STOPPED);
134135

135136
this.statusThread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "mongodb_river_status").newThread(
136-
new Status(this, definition, context));
137+
new StatusChecker(this, definition, context));
137138
this.statusThread.start();
138139
}
139140

140141
@Override
141142
public void start() {
142-
if (!MongoDBRiverHelper.isRiverEnabled(client, riverName.getName())) {
143+
if (MongoDBRiverHelper.getRiverStatus(client, riverName.getName()) == Status.STOPPED) {
143144
logger.debug("Cannot start river {}. It is currently disabled", riverName.getName());
144145
startInvoked = true;
145146
return;
146147
}
147-
this.context.setActive(true);
148+
this.context.setStatus(Status.RUNNING);
148149
for (ServerAddress server : definition.getMongoServers()) {
149150
logger.info("Using mongodb server(s): host [{}], port [{}]", server.getHost(), server.getPort());
150151
}
@@ -330,7 +331,7 @@ public void close() {
330331
} catch (Throwable t) {
331332
logger.error("Fail to close river {}", t, riverName.getName());
332333
} finally {
333-
this.context.setActive(false);
334+
this.context.setStatus(Status.STOPPED);
334335
}
335336
}
336337

@@ -351,8 +352,8 @@ private XContentBuilder getGridFSMapping() throws IOException {
351352
public static BSONTimestamp getLastTimestamp(Client client, MongoDBRiverDefinition definition) {
352353

353354
GetResponse lastTimestampResponse = client
354-
.prepareGet(definition.getRiverIndexName(), definition.getRiverName(), definition.getMongoOplogNamespace()).execute()
355-
.actionGet();
355+
.prepareGet(definition.getRiverIndexName(), definition.getRiverName(), definition.getMongoOplogNamespace())
356+
.execute().actionGet();
356357

357358
// API changes since 0.90.0 lastTimestampResponse.exists() replaced by
358359
// lastTimestampResponse.isExists()
@@ -396,6 +397,14 @@ static void updateLastTimestamp(final MongoDBRiverDefinition definition, final B
396397
}
397398
}
398399

400+
public static long getIndexCount(Client client, MongoDBRiverDefinition definition) {
401+
CountResponse countResponse = client
402+
.prepareCount(definition.getIndexName())
403+
.execute().actionGet();
404+
return countResponse.getCount();
405+
}
406+
407+
399408
protected static class QueueEntry {
400409

401410
private final DBObject data;

src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverDefinition.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.elasticsearch.common.logging.Loggers;
2424
import org.elasticsearch.common.unit.TimeValue;
2525
import org.elasticsearch.common.xcontent.support.XContentMapValues;
26-
import org.elasticsearch.river.RiverName;
2726
import org.elasticsearch.river.RiverSettings;
2827
import org.elasticsearch.script.ExecutableScript;
2928
import org.elasticsearch.script.ScriptService;
@@ -319,15 +318,15 @@ public MongoDBRiverDefinition build() {
319318
}
320319

321320
@SuppressWarnings("unchecked")
322-
public synchronized static MongoDBRiverDefinition parseSettings(RiverName riverName, String riverIndexName, RiverSettings settings,
321+
public synchronized static MongoDBRiverDefinition parseSettings(String riverName, String riverIndexName, RiverSettings settings,
323322
ScriptService scriptService) {
324323

325324
Preconditions.checkNotNull(riverName, "No riverName specified");
326325
Preconditions.checkNotNull(riverIndexName, "Not riverIndexName specified");
327326
Preconditions.checkNotNull(settings, "No settings specified");
328327

329328
Builder builder = new Builder();
330-
builder.riverName(riverName.name());
329+
builder.riverName(riverName);
331330
builder.riverIndexName(riverIndexName);
332331

333332
List<ServerAddress> mongoServers = new ArrayList<ServerAddress>();
@@ -513,8 +512,8 @@ public synchronized static MongoDBRiverDefinition parseSettings(RiverName riverN
513512
// mongoDbPassword = mdp;
514513
}
515514

516-
builder.mongoDb(XContentMapValues.nodeStringValue(mongoSettings.get(DB_FIELD), riverName.name()));
517-
builder.mongoCollection(XContentMapValues.nodeStringValue(mongoSettings.get(COLLECTION_FIELD), riverName.name()));
515+
builder.mongoDb(XContentMapValues.nodeStringValue(mongoSettings.get(DB_FIELD), riverName));
516+
builder.mongoCollection(XContentMapValues.nodeStringValue(mongoSettings.get(COLLECTION_FIELD), riverName));
518517
builder.mongoGridFS(XContentMapValues.nodeBooleanValue(mongoSettings.get(GRIDFS_FIELD), false));
519518
if (mongoSettings.containsKey(FILTER_FIELD)) {
520519
builder.mongoFilter(XContentMapValues.nodeStringValue(mongoSettings.get(FILTER_FIELD), ""));
@@ -541,8 +540,8 @@ public synchronized static MongoDBRiverDefinition parseSettings(RiverName riverN
541540
} catch (UnknownHostException e) {
542541
e.printStackTrace();
543542
}
544-
builder.mongoDb(riverName.name());
545-
builder.mongoCollection(riverName.name());
543+
builder.mongoDb(riverName);
544+
builder.mongoCollection(riverName);
546545
}
547546

548547
if (settings.settings().containsKey(INDEX_OBJECT)) {

src/main/java/org/elasticsearch/river/mongodb/MongoDBRiverModule.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ protected void configure() {
4040
}
4141

4242
@Provides
43-
protected MongoDBRiverDefinition provideDefinition(RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName,
44-
ScriptService scriptService) {
45-
return MongoDBRiverDefinition.parseSettings(riverName, riverIndexName, settings, scriptService);
43+
protected MongoDBRiverDefinition provideDefinition(
44+
RiverName riverName, RiverSettings settings, @RiverIndexName String riverIndexName, ScriptService scriptService) {
45+
return MongoDBRiverDefinition.parseSettings(riverName.name(), riverIndexName, settings, scriptService);
4646
}
4747

4848
}

src/main/java/org/elasticsearch/river/mongodb/SharedContext.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
public class SharedContext {
1111

1212
private BlockingQueue<QueueEntry> stream;
13-
private boolean active;
13+
private Status status;
1414

15-
public SharedContext(BlockingQueue<QueueEntry> stream, boolean active) {
15+
public SharedContext(BlockingQueue<QueueEntry> stream, Status status) {
1616
this.stream = stream;
17-
this.active = active;
17+
this.status = status;
1818
}
1919

2020
public BlockingQueue<QueueEntry> getStream() {
@@ -25,12 +25,12 @@ public void setStream(BlockingQueue<QueueEntry> stream) {
2525
this.stream = stream;
2626
}
2727

28-
public boolean isActive() {
29-
return active;
28+
public Status getStatus() {
29+
return status;
3030
}
3131

32-
public void setActive(boolean active) {
33-
this.active = active;
32+
public void setStatus(Status status) {
33+
this.status = status;
3434
}
3535

3636
}

src/main/java/org/elasticsearch/river/mongodb/Slurper.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.logging.ESLogger;
1313
import org.elasticsearch.common.logging.ESLoggerFactory;
1414
import org.elasticsearch.river.mongodb.util.MongoDBHelper;
15+
import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper;
1516

1617
import com.mongodb.BasicDBObject;
1718
import com.mongodb.Bytes;
@@ -64,15 +65,19 @@ public Slurper(List<ServerAddress> mongoServers, MongoDBRiverDefinition definiti
6465

6566
@Override
6667
public void run() {
67-
while (context.isActive()) {
68+
while (context.getStatus() == Status.RUNNING) {
6869
try {
6970
if (!assignCollections()) {
7071
break; // failed to assign oplogCollection or
7172
// slurpedCollection
7273
}
7374

7475
BSONTimestamp startTimestamp = null;
75-
if (!riverHasIndexedSomething()) {
76+
if (!riverHasIndexedFromOplog()) {
77+
if (!isIndexEmpty()) {
78+
MongoDBRiverHelper.setRiverStatus(client, definition.getRiverName(), Status.INITIAL_IMPORT_FAILED);
79+
break;
80+
}
7681
startTimestamp = doInitialImport();
7782
}
7883

@@ -113,15 +118,19 @@ public void run() {
113118
}
114119
}
115120

116-
protected boolean riverHasIndexedSomething() {
121+
protected boolean riverHasIndexedFromOplog() {
117122
return MongoDBRiver.getLastTimestamp(client, definition) != null;
118123
}
119124

125+
protected boolean isIndexEmpty() {
126+
return MongoDBRiver.getIndexCount(client, definition) == 0;
127+
}
128+
120129
/**
121130
* Does an initial sync the same way MongoDB does.
122131
* https://groups.google.com/
123132
* forum/?fromgroups=#!topic/mongodb-user/sOKlhD_E2ns
124-
*
133+
*
125134
* @return the last oplog timestamp before the import began
126135
* @throws InterruptedException
127136
* if the blocking queue stream is interrupted while waiting
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,9 @@
11
package org.elasticsearch.river.mongodb;
22

3-
import org.elasticsearch.river.mongodb.util.MongoDBRiverHelper;
3+
public enum Status {
44

5-
class Status implements Runnable {
6-
7-
private final MongoDBRiver mongoDBRiver;
8-
private final MongoDBRiverDefinition definition;
9-
private final SharedContext context;
10-
11-
public Status(MongoDBRiver mongoDBRiver, MongoDBRiverDefinition definition, SharedContext context) {
12-
this.mongoDBRiver = mongoDBRiver;
13-
this.definition = definition;
14-
this.context = context;
15-
}
16-
17-
@Override
18-
public void run() {
19-
while (true) {
20-
try {
21-
if (this.mongoDBRiver.startInvoked) {
22-
boolean enabled = MongoDBRiverHelper.isRiverEnabled(this.mongoDBRiver.client, this.definition.getRiverName());
23-
24-
if (this.context.isActive() && !enabled) {
25-
MongoDBRiver.logger.info("About to stop river: {}", this.definition.getRiverName());
26-
this.mongoDBRiver.close();
27-
}
28-
29-
if (!this.context.isActive() && enabled) {
30-
MongoDBRiver.logger.trace("About to start river: {}", this.definition.getRiverName());
31-
this.mongoDBRiver.start();
32-
}
33-
}
34-
Thread.sleep(1000L);
35-
} catch (InterruptedException e) {
36-
MongoDBRiver.logger.info("Status thread interrupted", e, (Object) null);
37-
Thread.currentThread().interrupt();
38-
break;
39-
}
40-
41-
}
42-
}
43-
}
5+
RUNNING,
6+
STOPPED,
7+
INITIAL_IMPORT_FAILED;
8+
9+
}

0 commit comments

Comments
 (0)