Skip to content

Low Level REST Client: Support host selection #29211

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import javax.net.ssl.SSLHandshakeException;

import org.apache.http.ConnectionClosedException;
import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.conn.ConnectTimeoutException;

/**
* Abstract implementation of {@link RestClientActions} shared by
* {@link RestClient} and {@link RestClientView}.
*/
abstract class AbstractRestClientActions implements RestClientActions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my mind, I thought that we could have this new RestClient view that delegates to RestClient without having RestClient extend another base class. I guess that is done so RestClient and RestClientView are more interchangeable ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this so I only have to implement all the performRequest flavors. Without it I'd basically copy and paste the guts of it into both RestClientView and RestClient. Or I could declare 8 more methods in RestClient that take the NodeSelector and then delegate directly to them. I don't think either of those is particularly clean though.

I get wanting to return the abstract class and node having an interface at all, but I feel like the interface makes mocking simpler and it will allow us to change more things later without them being breaking changes.

abstract SyncResponseListener syncResponseListener();

abstract void performRequestAsyncNoCatch(String method, String endpoint, Map<String, String> params,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add javadocs for this?

HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
ResponseListener responseListener, Header[] headers) throws IOException;

@Override
public final Response performRequest(String method, String endpoint, Header... headers) throws IOException {
return performRequest(method, endpoint, Collections.<String, String>emptyMap(), null, headers);
}

@Override
public final Response performRequest(String method, String endpoint, Map<String, String> params, Header... headers) throws IOException {
return performRequest(method, endpoint, params, (HttpEntity)null, headers);
}

@Override // TODO this method is not final so the tests for the High Level REST Client don't have to change. We'll fix this soon.
public Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, Header... headers) throws IOException {
return performRequest(method, endpoint, params, entity, HttpAsyncResponseConsumerFactory.DEFAULT, headers);
}

@Override
public final Response performRequest(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
Header... headers) throws IOException {
SyncResponseListener listener = syncResponseListener();
performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
listener, headers);
return listener.get();
}

public final void performRequestAsync(String method, String endpoint, ResponseListener responseListener, Header... headers) {
performRequestAsync(method, endpoint, Collections.<String, String>emptyMap(), null, responseListener, headers);
}

@Override
public final void performRequestAsync(String method, String endpoint, Map<String, String> params,
ResponseListener responseListener, Header... headers) {
performRequestAsync(method, endpoint, params, null, responseListener, headers);
}

@Override // TODO this method is not final so the tests for the High Level REST Client don't have to change. We'll fix this soon.
public void performRequestAsync(String method, String endpoint, Map<String, String> params,
HttpEntity entity, ResponseListener responseListener, Header... headers) {
performRequestAsync(method, endpoint, params, entity, HttpAsyncResponseConsumerFactory.DEFAULT, responseListener, headers);
}

@Override
public final void performRequestAsync(String method, String endpoint, Map<String, String> params,
HttpEntity entity, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
ResponseListener responseListener, Header... headers) {
try {
performRequestAsyncNoCatch(method, endpoint, params, entity, httpAsyncResponseConsumerFactory,
responseListener, headers);
} catch (Exception e) {
responseListener.onFailure(e);
}
}

/**
* Listener used in any sync performRequest calls, it waits for a response or an exception back up to a timeout
*/
static class SyncResponseListener implements ResponseListener {
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<Response> response = new AtomicReference<>();
private final AtomicReference<Exception> exception = new AtomicReference<>();

private final long timeout;

SyncResponseListener(long timeout) {
assert timeout > 0;
this.timeout = timeout;
}

@Override
public void onSuccess(Response response) {
Objects.requireNonNull(response, "response must not be null");
boolean wasResponseNull = this.response.compareAndSet(null, response);
if (wasResponseNull == false) {
throw new IllegalStateException("response is already set");
}

latch.countDown();
}

@Override
public void onFailure(Exception exception) {
Objects.requireNonNull(exception, "exception must not be null");
boolean wasExceptionNull = this.exception.compareAndSet(null, exception);
if (wasExceptionNull == false) {
throw new IllegalStateException("exception is already set");
}
latch.countDown();
}

/**
* Waits (up to a timeout) for some result of the request: either a response, or an exception.
*/
Response get() throws IOException {
try {
//providing timeout is just a safety measure to prevent everlasting waits
//the different client timeouts should already do their jobs
if (latch.await(timeout, TimeUnit.MILLISECONDS) == false) {
throw new IOException("listener timeout after waiting for [" + timeout + "] ms");
}
} catch (InterruptedException e) {
throw new RuntimeException("thread waiting for the response was interrupted", e);
}

Exception exception = this.exception.get();
Response response = this.response.get();
if (exception != null) {
if (response != null) {
IllegalStateException e = new IllegalStateException("response and exception are unexpectedly set at the same time");
e.addSuppressed(exception);
throw e;
}
/*
* Wrap and rethrow whatever exception we received, copying the type
* where possible so the synchronous API looks as much as possible
* like the asynchronous API. We wrap the exception so that the caller's
* signature shows up in any exception we throw.
*/
if (exception instanceof ResponseException) {
throw new ResponseException((ResponseException) exception);
}
if (exception instanceof ConnectException) {
ConnectException e = new ConnectException(exception.getMessage());
e.initCause(exception);
throw e;
}
if (exception instanceof ConnectTimeoutException) {
ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage());
e.initCause(exception);
throw e;
}
if (exception instanceof SocketTimeoutException) {
SocketTimeoutException e = new SocketTimeoutException(exception.getMessage());
e.initCause(exception);
throw e;
}
if (exception instanceof ConnectionClosedException) {
ConnectionClosedException e = new ConnectionClosedException(exception.getMessage());
e.initCause(exception);
throw e;
}
if (exception instanceof SSLHandshakeException) {
SSLHandshakeException e = new SSLHandshakeException(exception.getMessage());
e.initCause(exception);
throw e;
}
if (exception instanceof IOException) {
throw new IOException(exception.getMessage(), exception);
}
if (exception instanceof RuntimeException){
throw new RuntimeException(exception.getMessage(), exception);
}
throw new RuntimeException("error while performing request", exception);
}

if (response == null) {
throw new IllegalStateException("response not set and no exception caught either");
}
return response;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ private DeadHostState() {
this.failedAttempts = previousDeadHostState.failedAttempts + 1;
}

DeadHostState(int failedAttempts, long deadUntilNanos) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this - it helps with testing but I expect it conflicts with something @javanna is doing now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a big deal, I will split my work in smaller PRs. the one around max retry timeout will need rewriting at this point but that's ok.

this.failedAttempts = failedAttempts;
this.deadUntilNanos = deadUntilNanos;
}

/**
* Returns the timestamp (nanos) till the host is supposed to stay dead without being retried.
* After that the host should be retried.
Expand Down
159 changes: 159 additions & 0 deletions client/rest/src/main/java/org/elasticsearch/client/HostMetadata.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import java.util.Objects;

import org.apache.http.HttpHost;

/**
* Metadata about an {@link HttpHost} running Elasticsearch.
*/
public class HostMetadata {
/**
* Look up metadata about the provided host. Implementers should not
* make network calls, instead, they should look up previously fetched
* data. See Elasticsearch's Sniffer for an example implementation.
*/
public interface HostMetadataResolver {
/**
* @return {@link HostMetadata} about the provided host if we have any
* metadata, {@code null} otherwise
*/
HostMetadata resolveMetadata(HttpHost host);
}
/**
* Resolves metadata for all hosts to {@code null}, meaning "I have
* no metadata for this host".
*/
public static final HostMetadataResolver EMPTY_RESOLVER = new HostMetadataResolver() {
@Override
public HostMetadata resolveMetadata(HttpHost host) {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we rather have an EMPTY placeholder here and avoid null values? Or use Optional ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional is a Java 8 thing.

I'm not sure what the right semantics for EMPTY would be. Passing null requires the implementer to think about what they want to do if they don't have any metadata which I kind of like. I think Optional would be more clear, but we don't have it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh I thought it was introduced much earlier. Maybe in this case null is ok here

}
};

/**
* Version of Elasticsearch that the host is running.
*/
private final String version;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wondering if strings are enough for version comparisons etc. but surely using our own Version class is not a good idea here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the high level rest client will have access to our Version class and can compare. That is kind of what I do in the yaml tests further down. I don't think it is worth building our own Version class, yeah.

I think a string is the right thing, at least for now. It all depends on how much we can break this in the future as things get better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately the Version class is the one that currently brings lucene in :) but yea I tend to agree that for now we are good. I was contemplating making it a number for better comparisons, but that may prove even more confusing to users?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think it'd be more confusing....

/**
* Roles that the Elasticsearch process on the host has.
*/
private final Roles roles;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would definitely add here node attributes too, think of a rack aware selector for instance.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I was trying to keep this PR from getting huge. I didn't accomplish that, but I tried.


public HostMetadata(String version, Roles roles) {
this.version = Objects.requireNonNull(version, "version is required");
this.roles = Objects.requireNonNull(roles, "roles is required");
}

/**
* Version of the node.
*/
public String version() {
return version;
}

/**
* Roles implemented by the Elasticsearch process.
*/
public Roles roles() {
return roles;
}

@Override
public String toString() {
return "[version=" + version + ", roles=" + roles + "]";
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
HostMetadata other = (HostMetadata) obj;
return version.equals(other.version)
&& roles.equals(other.roles);
}

@Override
public int hashCode() {
return Objects.hash(version, roles);
}

/**
* Role information about and Elasticsearch process.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/and/an ?

*/
public static final class Roles {
private final boolean master;
private final boolean data;
private final boolean ingest;

public Roles(boolean master, boolean data, boolean ingest) {
this.master = master;
this.data = data;
this.ingest = ingest;
}

/**
* The node <strong>could</strong> be elected master.
*/
public boolean master() {
return master;
}
/**
* The node stores data.
*/
public boolean data() {
return data;
}
/**
* The node runs ingest pipelines.
*/
public boolean ingest() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe use the is prefix?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

has prefix instead ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind. maybe also call master masterEligible instead just to prevent misunderstandings?

return ingest;
}

@Override
public String toString() {
StringBuilder result = new StringBuilder(3);
if (master) result.append('m');
if (data) result.append('d');
if (ingest) result.append('i');
return result.toString();
}

@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
Roles other = (Roles) obj;
return master == other.master
&& data == other.data
&& ingest == other.ingest;
}

@Override
public int hashCode() {
return Objects.hash(master, data, ingest);
}
}
}
Loading