Skip to content

Commit 750420b

Browse files
committed
Merge remote-tracking branch 'elastic/master' into ack-as-publish-timeout
2 parents 4c9b6f8 + 74c6f18 commit 750420b

File tree

177 files changed

+2787
-2267
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

177 files changed

+2787
-2267
lines changed

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ task verifyVersions {
131131
new URL('https://repo1.maven.org/maven2/org/elasticsearch/elasticsearch/maven-metadata.xml').openStream().withStream { s ->
132132
xml = new XmlParser().parse(s)
133133
}
134-
Set<Version> knownVersions = new TreeSet<>(xml.versioning.versions.version.collect { it.text() }.findAll { it ==~ /\d\.\d\.\d/ }.collect { Version.fromString(it) })
134+
Set<Version> knownVersions = new TreeSet<>(xml.versioning.versions.version.collect { it.text() }.findAll { it ==~ /\d+\.\d+\.\d+/ }.collect { Version.fromString(it) })
135135

136136
// Limit the known versions to those that should be index compatible, and are not future versions
137137
knownVersions = knownVersions.findAll { it.major >= bwcVersions.currentVersion.major - 1 && it.before(VersionProperties.elasticsearch) }

buildSrc/src/main/groovy/org/elasticsearch/gradle/test/AntFixture.groovy

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,11 @@ public class AntFixture extends AntTask implements Fixture {
149149
}
150150

151151
// the process is started (has a pid) and is bound to a network interface
152-
// so now wait undil the waitCondition has been met
152+
// so now evaluates if the waitCondition is successful
153153
// TODO: change this to a loop?
154154
boolean success
155155
try {
156-
success = waitCondition(this, ant) == false
156+
success = waitCondition(this, ant)
157157
} catch (Exception e) {
158158
String msg = "Wait condition caught exception for ${name}"
159159
logger.error(msg, e)

client/rest-high-level/src/test/java/org/elasticsearch/client/ClusterClientIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,6 @@ public void testClusterHealthYellowClusterLevel() throws IOException {
129129
createIndex("index2", Settings.EMPTY);
130130
ClusterHealthRequest request = new ClusterHealthRequest();
131131
request.timeout("5s");
132-
request.level(ClusterHealthRequest.Level.CLUSTER);
133132
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
134133

135134
assertYellowShards(response);
@@ -170,6 +169,7 @@ public void testClusterHealthYellowSpecificIndex() throws IOException {
170169
createIndex("index", Settings.EMPTY);
171170
createIndex("index2", Settings.EMPTY);
172171
ClusterHealthRequest request = new ClusterHealthRequest("index");
172+
request.level(ClusterHealthRequest.Level.SHARDS);
173173
request.timeout("5s");
174174
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);
175175

client/rest-high-level/src/test/java/org/elasticsearch/client/RequestConvertersTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1568,7 +1568,7 @@ public void testClusterHealth() {
15681568
healthRequest.level(level);
15691569
expectedParams.put("level", level.name().toLowerCase(Locale.ROOT));
15701570
} else {
1571-
expectedParams.put("level", "shards");
1571+
expectedParams.put("level", "cluster");
15721572
}
15731573
if (randomBoolean()) {
15741574
Priority priority = randomFrom(Priority.values());

distribution/src/bin/elasticsearch-cli.bat

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ call "%~dp0elasticsearch-env.bat" || exit /b 1
22

33
if defined ES_ADDITIONAL_SOURCES (
44
for %%a in ("%ES_ADDITIONAL_SOURCES:;=","%") do (
5-
call %~dp0%%a
5+
call "%~dp0%%a"
66
)
77
)
88

docs/java-rest/high-level/cluster/health.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-wai
6767
include-tagged::{doc-tests}/ClusterClientDocumentationIT.java[health-request-level]
6868
--------------------------------------------------
6969
<1> The level of detail of the returned health information. Accepts a `ClusterHealthRequest.Level` value.
70+
Default value is `cluster`.
7071

7172
["source","java",subs="attributes,callouts,macros"]
7273
--------------------------------------------------

docs/reference/aggregations/metrics/scripted-metric-aggregation.asciidoc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
[[search-aggregations-metrics-scripted-metric-aggregation]]
22
=== Scripted Metric Aggregation
33

4-
experimental[]
5-
64
A metric aggregation that executes using scripts to provide a metric output.
75

86
Example:

docs/reference/migration/migrate_7_0/restclient.asciidoc

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,11 @@ header, e.g. `client.index(indexRequest)` becomes
1010
`client.index(indexRequest, RequestOptions.DEFAULT)`.
1111
In case you are specifying headers
1212
e.g. `client.index(indexRequest, new Header("name" "value"))` becomes
13-
`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());`
13+
`client.index(indexRequest, RequestOptions.DEFAULT.toBuilder().addHeader("name", "value").build());`
14+
15+
==== Cluster Health API default to `cluster` level
16+
17+
The Cluster Health API used to default to `shards` level to ease migration
18+
from transport client that doesn't support the `level` parameter and always
19+
returns information including indices and shards details. The level default
20+
value has been aligned with the Elasticsearch default level: `cluster`.

libs/nio/src/main/java/org/elasticsearch/nio/ChannelContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ protected void setSelectionKey(SelectionKey selectionKey) {
6666
* @throws IOException during channel / context close
6767
*/
6868
public void closeFromSelector() throws IOException {
69-
if (closeContext.isDone() == false) {
69+
if (isOpen()) {
7070
try {
7171
rawChannel.close();
7272
closeContext.complete(null);

libs/nio/src/main/java/org/elasticsearch/nio/EventHandler.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,7 @@ protected void listenerException(Exception exception) {
159159
}
160160

161161
/**
162-
* This method is called after ready events (READ, ACCEPT, WRITE, CONNECT) have been handled for a
163-
* channel.
162+
* This method is called after events (READ, WRITE, CONNECT) have been handled for a channel.
164163
*
165164
* @param context that was handled
166165
*/

libs/nio/src/main/java/org/elasticsearch/nio/NioSelector.java

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,6 @@
4343
* {@link #runLoop()}, the selector will run until {@link #close()} is called. This instance handles closing
4444
* of channels. Users should call {@link #queueChannelClose(NioChannel)} to schedule a channel for close by
4545
* this selector.
46-
* <p>
47-
* Children of this class should implement the specific {@link #processKey(SelectionKey)},
48-
* {@link #preSelect()}, and {@link #cleanup()} functionality.
4946
*/
5047
public class NioSelector implements Closeable {
5148

@@ -65,7 +62,7 @@ public NioSelector(EventHandler eventHandler) throws IOException {
6562
this(eventHandler, Selector.open());
6663
}
6764

68-
public NioSelector(EventHandler eventHandler, Selector selector) throws IOException {
65+
public NioSelector(EventHandler eventHandler, Selector selector) {
6966
this.selector = selector;
7067
this.eventHandler = eventHandler;
7168
}
@@ -165,7 +162,7 @@ void singleLoop() {
165162
}
166163

167164
void cleanupAndCloseChannels() {
168-
cleanup();
165+
cleanupPendingWrites();
169166
channelsToClose.addAll(channelsToRegister);
170167
channelsToRegister.clear();
171168
channelsToClose.addAll(selector.keys().stream().map(sk -> (ChannelContext<?>) sk.attachment()).collect(Collectors.toList()));
@@ -234,16 +231,6 @@ void preSelect() {
234231
handleQueuedWrites();
235232
}
236233

237-
/**
238-
* Called once as the selector is being closed.
239-
*/
240-
void cleanup() {
241-
WriteOperation op;
242-
while ((op = queuedWrites.poll()) != null) {
243-
executeFailedListener(op.getListener(), new ClosedSelectorException());
244-
}
245-
}
246-
247234
/**
248235
* Queues a write operation to be handled by the event loop. This can be called by any thread and is the
249236
* api available for non-selector threads to schedule writes.
@@ -284,20 +271,31 @@ public void scheduleForRegistration(NioChannel channel) {
284271
}
285272

286273
/**
287-
* Queues a write operation directly in a channel's buffer. Channel buffers are only safe to be accessed
288-
* by the selector thread. As a result, this method should only be called by the selector thread.
274+
* Queues a write operation directly in a channel's buffer. If this channel does not have pending writes
275+
* already, the channel will be flushed. Channel buffers are only safe to be accessed by the selector
276+
* thread. As a result, this method should only be called by the selector thread. If this channel does
277+
* not have pending writes already, the channel will be flushed.
289278
*
290279
* @param writeOperation to be queued in a channel's buffer
291280
*/
292-
public void queueWriteInChannelBuffer(WriteOperation writeOperation) {
281+
public void writeToChannel(WriteOperation writeOperation) {
293282
assertOnSelectorThread();
294283
SocketChannelContext context = writeOperation.getChannel();
284+
// If the channel does not currently have anything that is ready to flush, we should flush after
285+
// the write operation is queued.
286+
boolean shouldFlushAfterQueuing = context.readyForFlush() == false;
295287
try {
296288
SelectionKeyUtils.setWriteInterested(context.getSelectionKey());
297289
context.queueWriteOperation(writeOperation);
298290
} catch (Exception e) {
291+
shouldFlushAfterQueuing = false;
299292
executeFailedListener(writeOperation.getListener(), e);
300293
}
294+
295+
if (shouldFlushAfterQueuing) {
296+
handleWrite(context);
297+
eventHandler.postHandling(context);
298+
}
301299
}
302300

303301
/**
@@ -332,6 +330,13 @@ public <V> void executeFailedListener(BiConsumer<V, Exception> listener, Excepti
332330
}
333331
}
334332

333+
private void cleanupPendingWrites() {
334+
WriteOperation op;
335+
while ((op = queuedWrites.poll()) != null) {
336+
executeFailedListener(op.getListener(), new ClosedSelectorException());
337+
}
338+
}
339+
335340
private void wakeup() {
336341
// TODO: Do we need the wakeup optimizations that some other libraries use?
337342
selector.wakeup();
@@ -394,7 +399,7 @@ private void handleQueuedWrites() {
394399
WriteOperation writeOperation;
395400
while ((writeOperation = queuedWrites.poll()) != null) {
396401
if (writeOperation.getChannel().isOpen()) {
397-
queueWriteInChannelBuffer(writeOperation);
402+
writeToChannel(writeOperation);
398403
} else {
399404
executeFailedListener(writeOperation.getListener(), new ClosedChannelException());
400405
}

libs/nio/src/main/java/org/elasticsearch/nio/SocketChannelContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ public void sendMessage(Object message, BiConsumer<Void, Exception> listener) {
135135
return;
136136
}
137137

138-
selector.queueWriteInChannelBuffer(writeOperation);
138+
selector.writeToChannel(writeOperation);
139139
}
140140

141141
public void queueWriteOperation(WriteOperation writeOperation) {
@@ -164,7 +164,7 @@ protected FlushOperation getPendingFlush() {
164164
@Override
165165
public void closeFromSelector() throws IOException {
166166
getSelector().assertOnSelectorThread();
167-
if (channel.isOpen()) {
167+
if (isOpen()) {
168168
ArrayList<IOException> closingExceptions = new ArrayList<>(3);
169169
try {
170170
super.closeFromSelector();

libs/nio/src/test/java/org/elasticsearch/nio/NioSelectorTests.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,11 +262,28 @@ public void testQueueWriteSuccessful() throws Exception {
262262
public void testQueueDirectlyInChannelBufferSuccessful() throws Exception {
263263
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
264264

265-
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) == 0);
265+
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
266266

267-
selector.queueWriteInChannelBuffer(writeOperation);
267+
when(channelContext.readyForFlush()).thenReturn(true);
268+
selector.writeToChannel(writeOperation);
268269

269270
verify(channelContext).queueWriteOperation(writeOperation);
271+
verify(eventHandler, times(0)).handleWrite(channelContext);
272+
verify(eventHandler, times(0)).postHandling(channelContext);
273+
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
274+
}
275+
276+
public void testShouldFlushIfNoPendingFlushes() throws Exception {
277+
WriteOperation writeOperation = new FlushReadyWrite(channelContext, buffers, listener);
278+
279+
assertEquals(0, (selectionKey.interestOps() & SelectionKey.OP_WRITE));
280+
281+
when(channelContext.readyForFlush()).thenReturn(false);
282+
selector.writeToChannel(writeOperation);
283+
284+
verify(channelContext).queueWriteOperation(writeOperation);
285+
verify(eventHandler).handleWrite(channelContext);
286+
verify(eventHandler).postHandling(channelContext);
270287
assertTrue((selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0);
271288
}
272289

@@ -277,10 +294,13 @@ public void testQueueDirectlyInChannelBufferSelectionKeyThrowsException() throws
277294
CancelledKeyException cancelledKeyException = new CancelledKeyException();
278295

279296
when(channelContext.getSelectionKey()).thenReturn(selectionKey);
297+
when(channelContext.readyForFlush()).thenReturn(false);
280298
when(selectionKey.interestOps(anyInt())).thenThrow(cancelledKeyException);
281-
selector.queueWriteInChannelBuffer(writeOperation);
299+
selector.writeToChannel(writeOperation);
282300

283301
verify(channelContext, times(0)).queueWriteOperation(writeOperation);
302+
verify(eventHandler, times(0)).handleWrite(channelContext);
303+
verify(eventHandler, times(0)).postHandling(channelContext);
284304
verify(listener).accept(null, cancelledKeyException);
285305
}
286306

libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ public void testSendMessageFromSameThreadIsQueuedInChannel() {
170170
when(readWriteHandler.createWriteOperation(context, buffers, listener)).thenReturn(writeOperation);
171171
context.sendMessage(buffers, listener);
172172

173-
verify(selector).queueWriteInChannelBuffer(writeOpCaptor.capture());
173+
verify(selector).writeToChannel(writeOpCaptor.capture());
174174
WriteOperation writeOp = writeOpCaptor.getValue();
175175

176176
assertSame(writeOperation, writeOp);

modules/reindex/build.gradle

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,11 @@ if (Os.isFamily(Os.FAMILY_WINDOWS)) {
121121
baseDir,
122122
unzip.temporaryDir,
123123
version == '090'
124+
waitCondition = { fixture, ant ->
125+
// the fixture writes the ports file when Elasticsearch's HTTP service
126+
// is ready, so we can just wait for the file to exist
127+
return fixture.portsFile.exists()
128+
}
124129
}
125130
integTest.dependsOn fixture
126131
integTestRunner {

modules/repository-url/src/test/java/org/elasticsearch/repositories/url/URLFixture.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.Map;
4040
import java.util.Objects;
4141

42+
import static java.nio.charset.StandardCharsets.UTF_8;
4243
import static java.util.Collections.emptyMap;
4344
import static java.util.Collections.singleton;
4445
import static java.util.Collections.singletonMap;
@@ -67,7 +68,6 @@ public static void main(String[] args) throws Exception {
6768
writeFile(workingDirectory, "ports", addressAndPort);
6869

6970
// Exposes the repository over HTTP
70-
final String url = "http://" + addressAndPort;
7171
httpServer.createContext("/", new ResponseHandler(dir(args[1])));
7272
httpServer.start();
7373

@@ -110,7 +110,13 @@ static class ResponseHandler implements HttpHandler {
110110
@Override
111111
public void handle(HttpExchange exchange) throws IOException {
112112
Response response;
113-
if ("GET".equalsIgnoreCase(exchange.getRequestMethod())) {
113+
114+
final String userAgent = exchange.getRequestHeaders().getFirst("User-Agent");
115+
if (userAgent != null && userAgent.startsWith("Apache Ant")) {
116+
// This is a request made by the AntFixture, just reply "OK"
117+
response = new Response(RestStatus.OK, emptyMap(), "text/plain; charset=utf-8", "OK".getBytes(UTF_8));
118+
119+
} else if ("GET".equalsIgnoreCase(exchange.getRequestMethod())) {
114120
String path = exchange.getRequestURI().toString();
115121
if (path.length() > 0 && path.charAt(0) == '/') {
116122
path = path.substring(1);
@@ -125,13 +131,13 @@ public void handle(HttpExchange exchange) throws IOException {
125131
Map<String, String> headers = singletonMap("Content-Length", String.valueOf(content.length));
126132
response = new Response(RestStatus.OK, headers, "application/octet-stream", content);
127133
} else {
128-
response = new Response(RestStatus.NOT_FOUND, emptyMap(), "text/plain", new byte[0]);
134+
response = new Response(RestStatus.NOT_FOUND, emptyMap(), "text/plain; charset=utf-8", new byte[0]);
129135
}
130136
} else {
131-
response = new Response(RestStatus.FORBIDDEN, emptyMap(), "text/plain", new byte[0]);
137+
response = new Response(RestStatus.FORBIDDEN, emptyMap(), "text/plain; charset=utf-8", new byte[0]);
132138
}
133139
} else {
134-
response = new Response(RestStatus.INTERNAL_SERVER_ERROR, emptyMap(), "text/plain",
140+
response = new Response(RestStatus.INTERNAL_SERVER_ERROR, emptyMap(), "text/plain; charset=utf-8",
135141
"Unsupported HTTP method".getBytes(StandardCharsets.UTF_8));
136142
}
137143
exchange.sendResponseHeaders(response.status.getStatus(), response.body.length);

plugins/examples/rest-handler/src/test/java/org/elasticsearch/example/resthandler/ExampleFixtureIT.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,41 @@
2323
import org.elasticsearch.test.ESTestCase;
2424

2525
import java.io.BufferedReader;
26+
import java.io.BufferedWriter;
2627
import java.io.InputStreamReader;
28+
import java.io.OutputStreamWriter;
2729
import java.net.InetAddress;
2830
import java.net.Socket;
2931
import java.net.URL;
3032
import java.nio.charset.StandardCharsets;
31-
import java.util.Objects;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
36+
import static org.hamcrest.Matchers.hasItems;
3237

3338
public class ExampleFixtureIT extends ESTestCase {
3439

3540
public void testExample() throws Exception {
36-
final String stringAddress = Objects.requireNonNull(System.getProperty("external.address"));
37-
final URL url = new URL("http://" + stringAddress);
41+
final String externalAddress = System.getProperty("external.address");
42+
assertNotNull("External address must not be null", externalAddress);
3843

44+
final URL url = new URL("http://" + externalAddress);
3945
final InetAddress address = InetAddress.getByName(url.getHost());
4046
try (
4147
Socket socket = new MockSocket(address, url.getPort());
48+
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
4249
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
4350
) {
44-
assertEquals("TEST", reader.readLine());
51+
writer.write("GET / HTTP/1.1\r\n");
52+
writer.write("Host: elastic.co\r\n\r\n");
53+
writer.flush();
54+
55+
final List<String> lines = new ArrayList<>();
56+
String line;
57+
while ((line = reader.readLine()) != null) {
58+
lines.add(line);
59+
}
60+
assertThat(lines, hasItems("HTTP/1.1 200 OK", "TEST"));
4561
}
4662
}
4763
}

0 commit comments

Comments
 (0)