diff --git a/pom.xml b/pom.xml index 76d7974e..621c2e60 100644 --- a/pom.xml +++ b/pom.xml @@ -5,10 +5,15 @@ connector 1.9.1-SNAPSHOT jar + UTF-8 5.3.1 + + 1.21 + 2.6 + Tarantool Connector for Java https://github.com/tarantool/tarantool-java Tarantool client for java @@ -40,11 +45,28 @@ org.apache.maven.plugins maven-compiler-plugin - 3.2 + 3.6.1 1.8 1.8 + + + + testCompile + + + + + + org.openjdk.jmh + jmh-generator-annprocess + ${jmh.version} + + + + + @@ -124,6 +158,31 @@ + + + maven-assembly-plugin + ${maven-assembly-plugin.version} + + src/main/assembly/perf-tests.xml + + + + make-assembly + package + + single + + + true + + + org.openjdk.jmh.Main + + + + + + @@ -134,6 +193,12 @@ ${junit.jupiter.version} test + + org.junit.jupiter + junit-jupiter-params + ${junit.jupiter.version} + test + org.mockito mockito-all @@ -146,6 +211,12 @@ 1.23 test + + org.openjdk.jmh + jmh-core + ${jmh.version} + test + diff --git a/src/it/java/org/tarantool/TestTarantoolClient.java b/src/it/java/org/tarantool/TestTarantoolClient.java index 24ec0e39..c45fac92 100644 --- a/src/it/java/org/tarantool/TestTarantoolClient.java +++ b/src/it/java/org/tarantool/TestTarantoolClient.java @@ -1,9 +1,9 @@ package org.tarantool; +import org.tarantool.server.TarantoolBinaryPacket; + import java.io.IOException; -import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; import java.sql.SQLException; import java.util.Arrays; import java.util.concurrent.CompletableFuture; @@ -33,8 +33,8 @@ public static class TarantoolClientTestImpl extends TarantoolClientImpl { final Semaphore s = new Semaphore(0); long latency = 1L; - public TarantoolClientTestImpl(SocketChannelProvider socketProvider, TarantoolClientConfig options) { - super(socketProvider, options); + public TarantoolClientTestImpl(InstanceConnectionProvider nodeComm, TarantoolClientConfig options) { + super(nodeComm, options); Thread t = new Thread(new Runnable() { @Override public void run() { @@ -55,16 +55,6 @@ public void run() { t.start(); } - @Override - protected void writeFully(SocketChannel channel, ByteBuffer buffer) throws IOException { - try { - Thread.sleep(1L); - } catch (InterruptedException e) { - e.printStackTrace(); - } - super.writeFully(channel, buffer); - } - @Override protected void configureThreads(String threadName) { super.configureThreads(threadName); @@ -81,14 +71,14 @@ protected void reconnect(int retry, Throwable lastError) { } @Override - protected void complete(long code, CompletableFuture q) { - super.complete(code, q); + protected void complete(TarantoolBinaryPacket pack, CompletableFuture q) { + super.complete(pack, q); + Long code = pack.getCode(); if (code != 0) { System.out.println(code); } s.release(); } - } public static void main(String[] args) throws IOException, InterruptedException, ExecutionException, SQLException { @@ -102,21 +92,12 @@ public static void main(String[] args) throws IOException, InterruptedException, config.sharedBufferSize = 128; //config.sharedBufferSize = 0; - SocketChannelProvider socketChannelProvider = new SocketChannelProvider() { - @Override - public SocketChannel get(int retryNumber, Throwable lastError) { - if (lastError != null) { - lastError.printStackTrace(System.out); - } - System.out.println("reconnect"); - try { - return SocketChannel.open(new InetSocketAddress("localhost", 3301)); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - }; - final TarantoolClientTestImpl client = new TarantoolClientTestImpl(socketChannelProvider, config); + + InstanceConnectionProvider nodeComm = + new SingleInstanceConnectionProvider("localhost:3301", config.username, config.password); + + + final TarantoolClientTestImpl client = new TarantoolClientTestImpl(nodeComm, config); config.writeTimeoutMillis = 2; client.latency = 1; client.syncOps.ping(); diff --git a/src/main/assembly/perf-tests.xml b/src/main/assembly/perf-tests.xml new file mode 100644 index 00000000..57d9eba6 --- /dev/null +++ b/src/main/assembly/perf-tests.xml @@ -0,0 +1,28 @@ + + perf-tests + + jar + + false + + + / + true + true + test + + + + + ${project.build.directory}/test-classes + / + + **/* + + true + + + \ No newline at end of file diff --git a/src/main/java-templates/org/tarantool/Version.java b/src/main/java-templates/org/tarantool/Version.java index 82dcfc6c..cdfa14ac 100644 --- a/src/main/java-templates/org/tarantool/Version.java +++ b/src/main/java-templates/org/tarantool/Version.java @@ -2,6 +2,6 @@ public final class Version { public static final String version = "${project.version}"; - public static final int majorVersion = ${parsedVersion.majorVersion}; - public static final int minorVersion = ${parsedVersion.minorVersion}; + public static final int majorVersion = Integer.parseInt("${parsedVersion.majorVersion}"); + public static final int minorVersion = Integer.parseInt("${parsedVersion.minorVersion}"); } diff --git a/src/main/java/org/tarantool/CountInputStream.java b/src/main/java/org/tarantool/CountInputStream.java index afef4f29..b0f627c1 100644 --- a/src/main/java/org/tarantool/CountInputStream.java +++ b/src/main/java/org/tarantool/CountInputStream.java @@ -3,5 +3,5 @@ import java.io.InputStream; public abstract class CountInputStream extends InputStream { - abstract long getBytesRead(); + public abstract long getBytesRead(); } diff --git a/src/main/java/org/tarantool/InstanceConnectionProvider.java b/src/main/java/org/tarantool/InstanceConnectionProvider.java new file mode 100644 index 00000000..e6c40109 --- /dev/null +++ b/src/main/java/org/tarantool/InstanceConnectionProvider.java @@ -0,0 +1,13 @@ +package org.tarantool; + +import org.tarantool.server.*; + +import java.io.*; +import java.nio.*; + +public interface InstanceConnectionProvider { + + TarantoolInstanceConnection connect() throws IOException; + + String getDescription(); +} diff --git a/src/main/java/org/tarantool/JDBCBridge.java b/src/main/java/org/tarantool/JDBCBridge.java index b31af64d..c190310e 100644 --- a/src/main/java/org/tarantool/JDBCBridge.java +++ b/src/main/java/org/tarantool/JDBCBridge.java @@ -1,5 +1,8 @@ package org.tarantool; +import org.tarantool.jdbc.SQLResultSet; +import org.tarantool.server.TarantoolBinaryPacket; + import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; @@ -7,8 +10,6 @@ import java.util.ListIterator; import java.util.Map; -import org.tarantool.jdbc.SQLResultSet; - public class JDBCBridge { public static final JDBCBridge EMPTY = new JDBCBridge(Collections.emptyList(), Collections.>emptyList()); @@ -16,8 +17,8 @@ public class JDBCBridge { final Map columnsByName; final List> rows; - protected JDBCBridge(TarantoolConnection connection) { - this(connection.getSQLMetadata(),connection.getSQLData()); + protected JDBCBridge(TarantoolBinaryPacket pack) { + this(SqlProtoUtils.getSQLMetadata(pack), SqlProtoUtils.getSQLData(pack)); } protected JDBCBridge(List sqlMetadata, List> rows) { @@ -30,8 +31,8 @@ protected JDBCBridge(List sqlMetadata, List fields, List> values) { } public static Object execute(TarantoolConnection connection, String sql, Object ... params) { - connection.sql(sql, params); - Long rowCount = connection.getSqlRowCount(); + TarantoolBinaryPacket pack = connection.sql(sql, params); + Long rowCount = SqlProtoUtils.getSqlRowCount(pack); if(rowCount == null) { - return new SQLResultSet(new JDBCBridge(connection)); + return new SQLResultSet(new JDBCBridge(pack)); } return rowCount.intValue(); } diff --git a/src/main/java/org/tarantool/RoundRobinInstanceConnectionProvider.java b/src/main/java/org/tarantool/RoundRobinInstanceConnectionProvider.java new file mode 100644 index 00000000..a492de25 --- /dev/null +++ b/src/main/java/org/tarantool/RoundRobinInstanceConnectionProvider.java @@ -0,0 +1,137 @@ +package org.tarantool; + +import org.tarantool.server.*; + +import java.io.*; +import java.util.*; +import java.util.stream.*; + +public class RoundRobinInstanceConnectionProvider implements InstanceConnectionProvider { + + /** Timeout to establish socket connection with an individual server. */ + private final int timeout; // 0 is infinite. + + private final String clusterUsername; + private final String clusterPassword; + + private TarantoolInstanceConnection currentConnection; + + private TarantoolInstanceInfo[] nodes; + private int pos = 0; + + public RoundRobinInstanceConnectionProvider(String[] slaveHosts, String username, String password, int timeout) { + this.timeout = timeout; + if (slaveHosts == null || slaveHosts.length < 1) { + throw new IllegalArgumentException("slave hosts is null ot empty"); + } + + clusterUsername = username; + clusterPassword = password; + + setNodes(slaveHosts); + } + + private void setNodes(String[] instanceAddresses) { + nodes = new TarantoolInstanceInfo[instanceAddresses.length]; + for (int i = 0; i < instanceAddresses.length; i++) { + String slaveHostAddress = instanceAddresses[i]; + nodes[i] = TarantoolInstanceInfo.create(slaveHostAddress, clusterUsername, clusterPassword); + } + + pos = 0; + } + + public void updateNodes(List instanceAddresses) { + if (instanceAddresses == null) { + throw new IllegalArgumentException("instanceAddresses can not be null"); + } + + this.nodes = (TarantoolInstanceInfo[]) instanceAddresses.toArray(); + pos = 0; + } + + + /** + * @return Non-empty list of round-robined nodes + */ + public TarantoolInstanceInfo[] getNodes() { + return nodes; + } + + /** + * Tries to connect amid nodes in {@code nodes} in round-robin manner. + * + * @return A request-ready connection to an instance + * @throws CommunicationException if it's failed to connect and authorize to a node in given deadline + * described in {@code timeout} field. + */ + public TarantoolInstanceConnection connectNextNode() { + int attempts = getAddressCount(); + long deadline = System.currentTimeMillis() + timeout * attempts; + while (!Thread.currentThread().isInterrupted()) { + TarantoolInstanceConnection connection = null; + try { + TarantoolInstanceInfo tarantoolInstanceInfo = getNextNode(); + connection = TarantoolInstanceConnection.connect(tarantoolInstanceInfo); + return connection; + } catch (IOException e) { + if (connection != null) { + try { + connection.close(); + } catch (IOException ignored) { + // No-op. + } + } + long now = System.currentTimeMillis(); + if (deadline <= now) { + throw new CommunicationException("Connection time out.", e); + } + if (--attempts == 0) { + // Tried all addresses without any lack, but still have time. + attempts = getAddressCount(); + try { + Thread.sleep((deadline - now) / attempts); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } + } + } + throw new CommunicationException("Thread interrupted.", new InterruptedException()); + } + + /** + * @return Socket address to use for the next reconnection attempt. + */ + protected TarantoolInstanceInfo getNextNode() { + TarantoolInstanceInfo res = nodes[pos]; + pos = (pos + 1) % nodes.length; + return res; + } + + + /** + * @return Number of configured addresses. + */ + protected int getAddressCount() { + return nodes.length; + } + + + @Override + public TarantoolInstanceConnection connect() { + currentConnection = connectNextNode(); + return currentConnection; + } + + @Override + public String getDescription() { + if (currentConnection != null) { + return currentConnection.getNodeInfo().getSocketAddress().toString(); + } else { + return "Unconnected. Available nodes [" + Arrays.stream(nodes) + .map(instanceInfo -> instanceInfo.getSocketAddress().toString()) + .collect(Collectors.joining(", ")); + } + } +} diff --git a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java index d16c6bf4..e2d77787 100644 --- a/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java +++ b/src/main/java/org/tarantool/RoundRobinSocketProviderImpl.java @@ -1,49 +1,71 @@ package org.tarantool; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SocketChannel; -import java.util.Arrays; +import org.tarantool.server.*; + +import java.io.*; +import java.net.*; +import java.nio.channels.*; +import java.util.List; /** * Basic reconnection strategy that changes addresses in a round-robin fashion. * To be used with {@link TarantoolClientImpl}. */ public class RoundRobinSocketProviderImpl implements SocketChannelProvider { + /** Timeout to establish socket connection with an individual server. */ private int timeout; // 0 is infinite. + /** Limit of retries. */ private int retriesLimit = -1; // No-limit. - /** Server addresses as configured. */ - private final String[] addrs; - /** Socket addresses. */ - private final InetSocketAddress[] sockAddrs; - /** Current position within {@link #sockAddrs} array. */ + + private TarantoolInstanceInfo[] nodes; + + /** Current position within {@link #nodes} array. */ private int pos; /** * Constructs an instance. * - * @param addrs Array of addresses in a form of [host]:[port]. + * @param slaveHosts Array of addresses in a form of [host]:[port]. */ - public RoundRobinSocketProviderImpl(String... addrs) { - if (addrs == null || addrs.length == 0) - throw new IllegalArgumentException("addrs is null or empty."); + public RoundRobinSocketProviderImpl(String[] slaveHosts, String username, String password) { + if (slaveHosts == null || slaveHosts.length < 1) { + throw new IllegalArgumentException("slave hosts is null ot empty"); + } - this.addrs = Arrays.copyOf(addrs, addrs.length); + updateNodes(slaveHosts, username, password); + } - sockAddrs = new InetSocketAddress[this.addrs.length]; + private void updateNodes(String[] slaveHosts, String username, String password) { + //todo add read-write lock + nodes = new TarantoolInstanceInfo[slaveHosts.length]; + for (int i = 0; i < slaveHosts.length; i++) { + String slaveHostAddress = slaveHosts[i]; + nodes[i] = TarantoolInstanceInfo.create(slaveHostAddress, username, password); + } + + pos = 0; + } - for (int i = 0; i < this.addrs.length; i++) { - sockAddrs[i] = parseAddress(this.addrs[i]); + + public void updateNodes(List slaveHosts) { + if (slaveHosts == null) { + throw new IllegalArgumentException("slaveHosts can not be null"); } + //todo add read-write lock + + this.nodes = (TarantoolInstanceInfo[]) slaveHosts.toArray(); + + pos = 0; } + /** - * @return Configured addresses in a form of [host]:[port]. + * @return Non-empty list of round-robined nodes */ - public String[] getAddresses() { - return this.addrs; + public TarantoolInstanceInfo[] getNodes() { + return nodes; } /** @@ -75,7 +97,7 @@ public int getTimeout() { /** * Sets maximum amount of reconnect attempts to be made before an exception is raised. - * The retry count is maintained by a {@link #get(int, Throwable)} caller + * The retry count is maintained by a {@link #getNext(int, Throwable)} caller * when a socket level connection was established. * * Negative value means unlimited. @@ -98,17 +120,15 @@ public int getRetriesLimit() { /** {@inheritDoc} */ @Override - public SocketChannel get(int retryNumber, Throwable lastError) { - if (areRetriesExhausted(retryNumber)) { - throw new CommunicationException("Connection retries exceeded.", lastError); - } + public SocketChannel get() { int attempts = getAddressCount(); long deadline = System.currentTimeMillis() + timeout * attempts; while (!Thread.currentThread().isInterrupted()) { SocketChannel channel = null; try { - channel = SocketChannel.open(); InetSocketAddress addr = getNextSocketAddress(); + + channel = SocketChannel.open(); channel.socket().connect(addr, timeout); return channel; } catch (IOException e) { @@ -141,42 +161,20 @@ public SocketChannel get(int retryNumber, Throwable lastError) { * @return Number of configured addresses. */ protected int getAddressCount() { - return sockAddrs.length; + return nodes.length; } /** * @return Socket address to use for the next reconnection attempt. */ protected InetSocketAddress getNextSocketAddress() { - InetSocketAddress res = sockAddrs[pos]; - pos = (pos + 1) % sockAddrs.length; + InetSocketAddress res = nodes[pos].getSocketAddress(); + pos = (pos + 1) % nodes.length; return res; } - /** - * Parse a string address in the form of [host]:[port] - * and builds a socket address. - * - * @param addr Server address. - * @return Socket address. - */ - protected InetSocketAddress parseAddress(String addr) { - int idx = addr.indexOf(':'); - String host = (idx < 0) ? addr : addr.substring(0, idx); - int port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1)); - return new InetSocketAddress(host, port); + protected TarantoolInstanceInfo getCurrentNode() { + return nodes[pos]; } - /** - * Provides a decision on whether retries limit is hit. - * - * @param retries Current count of retries. - * @return {@code true} if retries are exhausted. - */ - private boolean areRetriesExhausted(int retries) { - int limit = getRetriesLimit(); - if (limit < 0) - return false; - return retries >= limit; - } } diff --git a/src/main/java/org/tarantool/SimpleSocketChannelProvider.java b/src/main/java/org/tarantool/SimpleSocketChannelProvider.java new file mode 100644 index 00000000..119def39 --- /dev/null +++ b/src/main/java/org/tarantool/SimpleSocketChannelProvider.java @@ -0,0 +1,30 @@ +package org.tarantool; + +import org.tarantool.server.*; + +import java.io.*; +import java.net.*; +import java.nio.channels.*; + +public class SimpleSocketChannelProvider implements SocketChannelProvider{ + + private final TarantoolInstanceInfo tarantoolInstanceInfo; + + public SimpleSocketChannelProvider(InetSocketAddress socketAddress, String username, String password) { + this.tarantoolInstanceInfo = TarantoolInstanceInfo.create(socketAddress, username, password); + } + + public SimpleSocketChannelProvider(String address, String username, String password) { + this.tarantoolInstanceInfo = TarantoolInstanceInfo.create(address, username, password); + } + + @Override + public SocketChannel get() { + try { + return SocketChannel.open(tarantoolInstanceInfo.getSocketAddress()); + } catch (IOException e) { + String msg = "Exception occurred while connecting to instance " + tarantoolInstanceInfo; + throw new CommunicationException(msg, e); + } + } +} diff --git a/src/main/java/org/tarantool/SingleInstanceConnectionProvider.java b/src/main/java/org/tarantool/SingleInstanceConnectionProvider.java new file mode 100644 index 00000000..e6a9742c --- /dev/null +++ b/src/main/java/org/tarantool/SingleInstanceConnectionProvider.java @@ -0,0 +1,37 @@ +package org.tarantool; + +import org.tarantool.server.TarantoolInstanceConnection; +import org.tarantool.server.TarantoolInstanceInfo; + +import java.io.IOException; +import java.net.InetSocketAddress; + +public class SingleInstanceConnectionProvider implements InstanceConnectionProvider { + + private final TarantoolInstanceInfo tarantoolInstanceInfo; + + private TarantoolInstanceConnection nodeConnection; + + public SingleInstanceConnectionProvider(InetSocketAddress socketAddress, String username, String password) { + this.tarantoolInstanceInfo = TarantoolInstanceInfo.create(socketAddress, username, password); + } + + public SingleInstanceConnectionProvider(String address, String username, String password) { + this.tarantoolInstanceInfo = TarantoolInstanceInfo.create(address, username, password); + } + + @Override + public TarantoolInstanceConnection connect() throws IOException { + nodeConnection = TarantoolInstanceConnection.connect(tarantoolInstanceInfo); + return nodeConnection; + } + + @Override + public String getDescription() { + if (nodeConnection != null) { + return nodeConnection.getNodeInfo().getSocketAddress().toString(); + } else { + return "Unconnected. Node " + tarantoolInstanceInfo.getSocketAddress().toString(); + } + } +} diff --git a/src/main/java/org/tarantool/SocketChannelProvider.java b/src/main/java/org/tarantool/SocketChannelProvider.java index 09112dec..49224667 100644 --- a/src/main/java/org/tarantool/SocketChannelProvider.java +++ b/src/main/java/org/tarantool/SocketChannelProvider.java @@ -7,9 +7,7 @@ public interface SocketChannelProvider { /** * Provides socket channel to init restore connection. * You could change hosts on fail and sleep between retries in this method - * @param retryNumber number of current retry. Reset after successful connect. - * @param lastError the last error occurs when reconnecting * @return the result of SocketChannel open(SocketAddress remote) call */ - SocketChannel get(int retryNumber, Throwable lastError); + SocketChannel get(); } diff --git a/src/main/java/org/tarantool/SqlProtoUtils.java b/src/main/java/org/tarantool/SqlProtoUtils.java new file mode 100644 index 00000000..ccd0694c --- /dev/null +++ b/src/main/java/org/tarantool/SqlProtoUtils.java @@ -0,0 +1,50 @@ +package org.tarantool; + +import org.tarantool.server.TarantoolBinaryPacket; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public abstract class SqlProtoUtils { + + + public static List> readSqlResult(TarantoolBinaryPacket pack) { + List> data = (List>) pack.getBody().get(Key.DATA.getId()); + + List> values = new ArrayList>(data.size()); + List metaData = getSQLMetadata(pack); + LinkedHashMap value = new LinkedHashMap(); + for (List row : data) { + for (int i = 0; i < row.size(); i++) { + value.put(metaData.get(i).getName(), row.get(i)); + } + values.add(value); + } + return values; + } + + public static List> getSQLData(TarantoolBinaryPacket pack) { + return (List>) pack.getBody().get(Key.DATA.getId()); + } + + + public static List getSQLMetadata(TarantoolBinaryPacket pack) { + List> meta = (List>) pack.getBody().get(Key.SQL_METADATA.getId()); + List values = new ArrayList(meta.size()); + for (Map c : meta) { + values.add(new TarantoolBase.SQLMetaData((String) c.get(Key.SQL_FIELD_NAME.getId()))); + } + return values; + } + + public static Long getSqlRowCount(TarantoolBinaryPacket pack) { + Map info = (Map) pack.getBody().get(Key.SQL_INFO.getId()); + Number rowCount; + if (info != null && (rowCount = ((Number) info.get(Key.SQL_ROW_COUNT.getId()))) != null) { + return rowCount.longValue(); + } + return null; + } +} diff --git a/src/main/java/org/tarantool/TarantoolBase.java b/src/main/java/org/tarantool/TarantoolBase.java index 3fb1ce40..fc77f7d2 100644 --- a/src/main/java/org/tarantool/TarantoolBase.java +++ b/src/main/java/org/tarantool/TarantoolBase.java @@ -1,38 +1,25 @@ package org.tarantool; -import java.io.DataInputStream; -import java.io.DataOutputStream; +import org.tarantool.server.BinaryProtoUtils; +import org.tarantool.server.TarantoolInstanceConnection; +import org.tarantool.server.TarantoolInstanceConnectionMeta; + import java.io.IOException; -import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.EnumMap; -import java.util.LinkedHashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.atomic.AtomicLong; public abstract class TarantoolBase extends AbstractTarantoolOps, Object, Result> { - protected static final String WELCOME = "Tarantool "; - protected String serverVersion; + /** * Connection state */ - protected String salt; + TarantoolInstanceConnectionMeta currentNodeInfo; protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE; protected AtomicLong syncId = new AtomicLong(); protected int initialRequestSize = 4096; - /** - * Read properties - */ - protected DataInputStream is; - protected CountInputStream cis; - protected Map headers; - protected Map body; public TarantoolBase() { } @@ -40,106 +27,15 @@ public TarantoolBase() { public TarantoolBase(String username, String password, Socket socket) { super(); try { - this.is = new DataInputStream(cis = new CountInputStreamImpl(socket.getInputStream())); - byte[] bytes = new byte[64]; - is.readFully(bytes); - String firstLine = new String(bytes); - if (!firstLine.startsWith(WELCOME)) { - close(); - throw new CommunicationException("Welcome message should starts with tarantool but starts with '" + firstLine + "'", new IllegalStateException("Invalid welcome packet")); - } - serverVersion = firstLine.substring(WELCOME.length()); - is.readFully(bytes); - this.salt = new String(bytes); - if (username != null && password != null) { - ByteBuffer authPacket = createAuthPacket(username, password); - OutputStream os = socket.getOutputStream(); - os.write(authPacket.array(), 0, authPacket.remaining()); - os.flush(); - readPacket(is); - Long code = (Long) headers.get(Key.CODE.getId()); - if (code != 0) { - throw serverError(code, body.get(Key.ERROR.getId())); - } - } + this.currentNodeInfo = BinaryProtoUtils.connect(socket, username, password); + } catch (CommunicationException e) { + close(); + throw e; } catch (IOException e) { - try { - is.close(); - } catch (IOException ignored) { - - } - try { - cis.close(); - } catch (IOException ignored) { - - } throw new CommunicationException("Couldn't connect to tarantool", e); } } - - protected ByteBuffer createAuthPacket(String username, final String password) throws IOException { - final MessageDigest sha1; - try { - sha1 = MessageDigest.getInstance("SHA-1"); - } catch (NoSuchAlgorithmException e) { - throw new IllegalStateException(e); - } - List auth = new ArrayList(2); - auth.add("chap-sha1"); - - byte[] p = sha1.digest(password.getBytes()); - - sha1.reset(); - byte[] p2 = sha1.digest(p); - - sha1.reset(); - sha1.update(Base64.decode(salt), 0, 20); - sha1.update(p2); - byte[] scramble = sha1.digest(); - for (int i = 0, e = 20; i < e; i++) { - p[i] ^= scramble[i]; - } - auth.add(p); - return createPacket(Code.AUTH, 0L, null, Key.USER_NAME, username, Key.TUPLE, auth); - } - - protected ByteBuffer createPacket(Code code, Long syncId, Long schemaId, Object... args) throws IOException { - TarantoolClientImpl.ByteArrayOutputStream bos = new TarantoolClientImpl.ByteArrayOutputStream(initialRequestSize); - bos.write(new byte[5]); - DataOutputStream ds = new DataOutputStream(bos); - Map header = new EnumMap(Key.class); - Map body = new EnumMap(Key.class); - header.put(Key.CODE, code); - header.put(Key.SYNC, syncId); - if (schemaId != null) { - header.put(Key.SCHEMA_ID, schemaId); - } - if (args != null) { - for (int i = 0, e = args.length; i < e; i += 2) { - Object value = args[i + 1]; - body.put((Key) args[i], value); - } - } - msgPackLite.pack(header, ds); - msgPackLite.pack(body, ds); - ds.flush(); - ByteBuffer buffer = bos.toByteBuffer(); - buffer.put(0, (byte) 0xce); - buffer.putInt(1, bos.size() - 5); - return buffer; - } - - protected void readPacket(DataInputStream is) throws IOException { - int size = ((Number) msgPackLite.unpack(is)).intValue(); - long mark = cis.getBytesRead(); - headers = (Map) msgPackLite.unpack(is); - if (cis.getBytesRead() - mark < size) { - body = (Map) msgPackLite.unpack(is); - } - is.skipBytes((int) (cis.getBytesRead() - mark - size)); - } - protected static class SQLMetaData { protected String name; @@ -159,43 +55,6 @@ public String toString() { } } - protected List getSQLMetadata() { - List> meta = (List>) body.get(Key.SQL_METADATA.getId()); - List values = new ArrayList(meta.size()); - for(Map c:meta ) { - values.add(new SQLMetaData((String) c.get(Key.SQL_FIELD_NAME.getId()))); - } - return values; - } - - protected List> getSQLData() { - return (List>) body.get(Key.DATA.getId()); - } - - protected List> readSqlResult(List> data) { - List> values = new ArrayList>(data.size()); - List metaData = getSQLMetadata(); - LinkedHashMap value = new LinkedHashMap(); - for (List row : data) { - for (int i = 0; i < row.size(); i++) { - value.put(metaData.get(i).getName(), row.get(i)); - } - values.add(value); - } - return values; - } - - - protected Long getSqlRowCount() { - Map info = (Map) body.get(Key.SQL_INFO.getId()); - Number rowCount; - if (info != null && (rowCount = ((Number) info.get(Key.SQL_ROW_COUNT.getId()))) != null) { - return rowCount.longValue(); - } - return null; - } - - protected TarantoolException serverError(long code, Object error) { return new TarantoolException(code, error instanceof String ? (String) error : new String((byte[]) error)); } @@ -210,10 +69,10 @@ ByteBuffer toByteBuffer() { } } - protected void closeChannel(SocketChannel channel) { - if (channel != null) { + protected void closeChannel(TarantoolInstanceConnection connection) { + if (connection != null) { try { - channel.close(); + connection.close(); } catch (IOException ignored) { } @@ -235,6 +94,9 @@ public void setInitialRequestSize(int initialRequestSize) { } public String getServerVersion() { - return serverVersion; + if (currentNodeInfo == null) { + throw new IllegalStateException("Tarantool base is not initialized"); + } + return currentNodeInfo.getServerVersion(); } } diff --git a/src/main/java/org/tarantool/TarantoolClientImpl.java b/src/main/java/org/tarantool/TarantoolClientImpl.java index 0c510287..1e1eff10 100644 --- a/src/main/java/org/tarantool/TarantoolClientImpl.java +++ b/src/main/java/org/tarantool/TarantoolClientImpl.java @@ -1,8 +1,11 @@ package org.tarantool; -import java.io.DataInputStream; +import org.tarantool.server.BinaryProtoUtils; +import org.tarantool.server.TarantoolBinaryPacket; +import org.tarantool.server.TarantoolInstanceConnection; + import java.io.IOException; -import java.net.SocketException; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Iterator; @@ -30,7 +33,12 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar /** * External */ - protected SocketChannelProvider socketProvider; + protected InstanceConnectionProvider communicationProvider; + + /** + * Max amount of one by one reconnect attempts + */ + protected int reconnectRetriesLimit = -1; // No-limit. protected volatile Exception thumbstone; protected Map> futures; @@ -39,11 +47,13 @@ public class TarantoolClientImpl extends TarantoolBase> implements Tar * Write properties */ protected SocketChannel channel; + protected TarantoolInstanceConnection currConnection; + protected ByteBuffer sharedBuffer; protected ByteBuffer writerBuffer; - protected ReentrantLock bufferLock = new ReentrantLock(false); - protected Condition bufferNotEmpty = bufferLock.newCondition(); - protected Condition bufferEmpty = bufferLock.newCondition(); + protected ReentrantLock writerBufferLock = new ReentrantLock(false); + protected Condition writerBufferNotEmpty = writerBufferLock.newCondition(); + protected Condition writerBufferEmpty = writerBufferLock.newCondition(); protected ReentrantLock writeLock = new ReentrantLock(true); /** @@ -73,18 +83,32 @@ public void run() { } }); - public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClientConfig config) { + public TarantoolClientImpl(InetSocketAddress socketAddress, TarantoolClientConfig config) { + this(new SingleInstanceConnectionProvider(socketAddress, config.username, config.password), config); + } + + public TarantoolClientImpl(String address, TarantoolClientConfig config) { + this(new SingleInstanceConnectionProvider(address, config.username, config.password), config); + } + + protected TarantoolClientImpl() { +// delegate init to a descendant + } + + public TarantoolClientImpl(InstanceConnectionProvider communicationProvider, TarantoolClientConfig config) { super(); + init(communicationProvider, config); + } + + protected void init(InstanceConnectionProvider communicationProvider, TarantoolClientConfig config) { this.thumbstone = NOT_INIT_EXCEPTION; this.config = config; this.initialRequestSize = config.defaultRequestSize; - this.socketProvider = socketProvider; + this.communicationProvider = communicationProvider; this.stats = new TarantoolClientStats(); this.futures = new ConcurrentHashMap<>(config.predictedFutures); this.sharedBuffer = ByteBuffer.allocateDirect(config.sharedBufferSize); this.writerBuffer = ByteBuffer.allocateDirect(sharedBuffer.capacity()); - this.connector.setDaemon(true); - this.connector.setName("Tarantool connector"); this.syncOps = new SyncOps(); this.composableAsyncOps = new ComposableAsyncOps(); this.fireAndForgetOps = new FireAndForgetOps(); @@ -94,6 +118,9 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient this.fireAndForgetOps.setCallCode(Code.CALL); this.composableAsyncOps.setCallCode(Code.CALL); } + + this.connector.setDaemon(true); + this.connector.setName("Tarantool connector"); connector.start(); try { if (!waitAlive(config.initTimeoutMillis, TimeUnit.MILLISECONDS)) { @@ -111,73 +138,57 @@ public TarantoolClientImpl(SocketChannelProvider socketProvider, TarantoolClient } protected void reconnect(int retry, Throwable lastError) { - SocketChannel channel; while (!Thread.currentThread().isInterrupted()) { - try { - channel = socketProvider.get(retry++, lastError == NOT_INIT_EXCEPTION ? null : lastError); - } catch (Exception e) { - close(e); - return; + if (areRetriesExhausted(retry)) { + Throwable cause = lastError == NOT_INIT_EXCEPTION ? null : lastError; + close(new CommunicationException("Connection retries exceeded.", cause)); } try { - connect(channel); + connectAndStartThreads(); return; } catch (Exception e) { - closeChannel(channel); - lastError = e; - if (e instanceof InterruptedException) - Thread.currentThread().interrupt(); + close(e); +// closeChannel(currConnection); +// lastError = e; +// if (e instanceof InterruptedException) +// Thread.currentThread().interrupt(); } } } - protected void connect(final SocketChannel channel) throws Exception { - try { - DataInputStream is = new DataInputStream(cis = new ByteBufferInputStream(channel)); - byte[] bytes = new byte[64]; - is.readFully(bytes); - String firstLine = new String(bytes); - if (!firstLine.startsWith("Tarantool")) { - CommunicationException e = new CommunicationException("Welcome message should starts with tarantool " + - "but starts with '" + firstLine + "'", new IllegalStateException("Invalid welcome packet")); + protected void connectAndStartThreads() throws Exception { + connect(communicationProvider); - close(e); - throw e; - } - is.readFully(bytes); - this.salt = new String(bytes); - if (config.username != null && config.password != null) { - writeFully(channel, createAuthPacket(config.username, config.password)); - readPacket(is); - Long code = (Long) headers.get(Key.CODE.getId()); - if (code != 0) { - throw serverError(code, body.get(Key.ERROR.getId())); - } - } - this.is = is; - } catch (IOException e) { - try { - is.close(); - } catch (IOException ignored) { - - } - try { - cis.close(); - } catch (IOException ignored) { - - } - throw new CommunicationException("Couldn't connect to tarantool", e); - } - channel.configureBlocking(false); - this.channel = channel; - bufferLock.lock(); + writerBufferLock.lock(); try { sharedBuffer.clear(); } finally { - bufferLock.unlock(); + writerBufferLock.unlock(); } + this.thumbstone = null; - startThreads(channel.socket().getRemoteSocketAddress().toString()); + startThreads(communicationProvider.getDescription()); + } + + /** + * Provides a decision on whether retries limit is hit. + * + * @param retries Current count of retries. + * @return {@code true} if retries are exhausted. + */ + private boolean areRetriesExhausted(int retries) { + int limit = reconnectRetriesLimit; + if (limit < 0) + return false; + return retries >= limit; + } + + protected void connect(final InstanceConnectionProvider communicationProvider) throws Exception { + try { + currConnection = communicationProvider.connect(); + } catch (IOException e) { + throw new CommunicationException("Couldn't connect to tarantool", e); + } } protected void startThreads(String threadName) throws InterruptedException { @@ -270,12 +281,12 @@ protected synchronized void die(String message, Exception cause) { } } - bufferLock.lock(); + writerBufferLock.lock(); try { sharedBuffer.clear(); - bufferEmpty.signalAll(); + writerBufferEmpty.signalAll(); } finally { - bufferLock.unlock(); + writerBufferLock.unlock(); } stopIO(); } @@ -286,18 +297,18 @@ public void ping() { protected void write(Code code, Long syncId, Long schemaId, Object... args) throws Exception { - ByteBuffer buffer = createPacket(code, syncId, schemaId, args); + ByteBuffer msgBytes = BinaryProtoUtils.createPacket(code, syncId, schemaId, args); - if (directWrite(buffer)) { + if (directWrite(msgBytes)) { return; } - sharedWrite(buffer); + sharedWrite(msgBytes); } protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, TimeoutException { long start = System.currentTimeMillis(); - if (bufferLock.tryLock(config.writeTimeoutMillis, TimeUnit.MILLISECONDS)) { + if (writerBufferLock.tryLock(config.writeTimeoutMillis, TimeUnit.MILLISECONDS)) { try { int rem = buffer.remaining(); stats.sharedMaxPacketSize = Math.max(stats.sharedMaxPacketSize, rem); @@ -308,7 +319,7 @@ protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, Timeo stats.sharedEmptyAwait++; long remaining = config.writeTimeoutMillis - (System.currentTimeMillis() - start); try { - if (remaining < 1 || !bufferEmpty.await(remaining, TimeUnit.MILLISECONDS)) { + if (remaining < 1 || !writerBufferEmpty.await(remaining, TimeUnit.MILLISECONDS)) { stats.sharedEmptyAwaitTimeouts++; throw new TimeoutException(config.writeTimeoutMillis + "ms is exceeded while waiting for empty buffer you could configure write timeout it in TarantoolConfig"); } @@ -318,10 +329,10 @@ protected void sharedWrite(ByteBuffer buffer) throws InterruptedException, Timeo } sharedBuffer.put(buffer); wait.incrementAndGet(); - bufferNotEmpty.signalAll(); + writerBufferNotEmpty.signalAll(); stats.buffered++; } finally { - bufferLock.unlock(); + writerBufferLock.unlock(); } } else { stats.sharedWriteLockTimeouts++; @@ -338,7 +349,7 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx if (rem > initialRequestSize) { stats.directPacketSizeGrowth++; } - writeFully(channel, buffer); + sendToInstance(buffer); stats.directWrite++; wait.incrementAndGet(); } finally { @@ -353,18 +364,25 @@ private boolean directWrite(ByteBuffer buffer) throws InterruptedException, IOEx return false; } + private SocketChannel getWriteChannel() { + return channel; + } + + private SocketChannel getReadChannel() { + return channel; + } + protected void readThread() { try { while (!Thread.currentThread().isInterrupted()) { try { - long code; - readPacket(is); - code = (Long) headers.get(Key.CODE.getId()); - Long syncId = (Long) headers.get(Key.SYNC.getId()); - CompletableFuture future = futures.remove(syncId); + TarantoolBinaryPacket pack = readFromInstance(); + + CompletableFuture future = getFuture(pack); + stats.received++; wait.decrementAndGet(); - complete(code, future); + complete(pack, future); } catch (Exception e) { die("Cant read answer", e); return; @@ -375,26 +393,36 @@ protected void readThread() { } } + protected CompletableFuture getFuture(TarantoolBinaryPacket pack) { + return futures.remove(pack.getSync()); + } + + protected TarantoolBinaryPacket readFromInstance() throws IOException, InterruptedException { +// return BinaryProtoUtils.readPacket(currConnection.getReadChannel()); + return currConnection.readPacket(); + } + protected void writeThread() { writerBuffer.clear(); while (!Thread.currentThread().isInterrupted()) { try { - bufferLock.lock(); + writerBufferLock.lock(); try { while (sharedBuffer.position() == 0) { - bufferNotEmpty.await(); + writerBufferNotEmpty.await(); } sharedBuffer.flip(); writerBuffer.put(sharedBuffer); sharedBuffer.clear(); - bufferEmpty.signalAll(); + writerBufferEmpty.signalAll(); } finally { - bufferLock.unlock(); + writerBufferLock.unlock(); } writerBuffer.flip(); writeLock.lock(); try { - writeFully(channel, writerBuffer); + ByteBuffer writerBuffer = this.writerBuffer; + sendToInstance(writerBuffer); } finally { writeLock.unlock(); } @@ -407,33 +435,41 @@ protected void writeThread() { } } + + protected void sendToInstance(ByteBuffer writerBuffer) throws IOException { +// BinaryProtoUtils.writeFully(currConnection.getChannel(), writerBuffer); + currConnection.writeBuffer(writerBuffer); + } + + protected void fail(CompletableFuture q, Exception e) { q.completeExceptionally(e); } - protected void complete(long code, CompletableFuture q) { + protected void complete(TarantoolBinaryPacket pack, CompletableFuture q) { if (q != null) { + long code = pack.getCode(); if (code == 0) { - List data = (List) body.get(Key.DATA.getId()); if (code == Code.EXECUTE.getId()) { - completeSql(q, (List>) data); + completeSql(q, pack); } else { + List data = (List) pack.getBody().get(Key.DATA.getId()); ((CompletableFuture) q).complete(data); } } else { - Object error = body.get(Key.ERROR.getId()); + Object error = pack.getBody().get(Key.ERROR.getId()); fail(q, serverError(code, error)); } } } - protected void completeSql(CompletableFuture q, List> data) { - Long rowCount = getSqlRowCount(); + protected void completeSql(CompletableFuture q, TarantoolBinaryPacket pack) { + Long rowCount = SqlProtoUtils.getSqlRowCount(pack); if (rowCount!=null) { ((CompletableFuture) q).complete(rowCount); } else { - List> values = readSqlResult(data); + List> values = SqlProtoUtils.readSqlResult(pack); ((CompletableFuture) q).complete(values); } } @@ -454,15 +490,6 @@ protected T syncGet(Future r) { } } - protected void writeFully(SocketChannel channel, ByteBuffer buffer) throws IOException { - long code = 0; - while (buffer.remaining() > 0 && (code = channel.write(buffer)) > -1) { - } - if (code < 0) { - throw new SocketException("write failed code: " + code); - } - } - @Override public void close() { close(new Exception("Connection is closed.")); @@ -488,21 +515,7 @@ protected void stopIO() { if (writer != null) { writer.interrupt(); } - if (is != null) { - try { - is.close(); - } catch (IOException ignored) { - - } - } - if (cis != null) { - try { - cis.close(); - } catch (IOException ignored) { - - } - } - closeChannel(channel); + closeChannel(currConnection); } @Override @@ -668,6 +681,10 @@ protected int getState() { return state.get(); } + protected boolean isAtState(int state) { + return getState() == state; + } + protected boolean close() { for (;;) { int st = getState(); diff --git a/src/main/java/org/tarantool/TarantoolClusterClient.java b/src/main/java/org/tarantool/TarantoolClusterClient.java index 3a6c243a..de119af3 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClient.java +++ b/src/main/java/org/tarantool/TarantoolClusterClient.java @@ -1,11 +1,26 @@ package org.tarantool; +import org.tarantool.cluster.ClusterTopologyDiscoverer; +import org.tarantool.cluster.ClusterTopologyFromShardDiscovererImpl; +import org.tarantool.server.BinaryProtoUtils; +import org.tarantool.server.TarantoolBinaryPacket; +import org.tarantool.server.TarantoolInstanceConnection; +import org.tarantool.server.TarantoolInstanceInfo; + +import java.io.IOException; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.locks.LockSupport; +import java.util.concurrent.locks.ReentrantLock; import static org.tarantool.TarantoolClientImpl.StateHelper.CLOSED; @@ -23,23 +38,172 @@ public class TarantoolClusterClient extends TarantoolClientImpl { /* Collection of operations to be retried. */ private ConcurrentHashMap> retries = new ConcurrentHashMap>(); + private TarantoolInstanceInfo infoHost;//todo will be used (remove this comment later) + private Integer infoHostConnectionTimeout; + private ClusterTopologyDiscoverer topologyDiscoverer; + + + private volatile TarantoolInstanceConnection oldConnection; + private ConcurrentHashMap> futuresSentToOldConnection = new ConcurrentHashMap<>(); + private ReentrantLock initLock = new ReentrantLock(); + + private Selector readSelector; + /** * @param config Configuration. - * @param addrs Array of addresses in the form of [host]:[port]. */ - public TarantoolClusterClient(TarantoolClusterClientConfig config, String... addrs) { - this(config, new RoundRobinSocketProviderImpl(addrs).setTimeout(config.operationExpiryTimeMillis)); + public TarantoolClusterClient(TarantoolClusterClientConfig config) { +// this(config, new RoundRobinSocketProviderImpl(config.slaveHosts).setTimeout(config.operationExpiryTimeMillis)); + this(config, new RoundRobinInstanceConnectionProvider(config.slaveHosts, + config.username, config.password, config.operationExpiryTimeMillis)); } /** * @param provider Socket channel provider. * @param config Configuration. */ - public TarantoolClusterClient(TarantoolClusterClientConfig config, SocketChannelProvider provider) { - super(provider, config); + public TarantoolClusterClient(TarantoolClusterClientConfig config, InstanceConnectionProvider provider) { + init(provider, config); this.executor = config.executor == null ? Executors.newSingleThreadExecutor() : config.executor; + + if (config.infoHost != null) { + this.infoHost = TarantoolInstanceInfo.create(config.infoHost, config.username, config.password); + + this.infoHostConnectionTimeout = config.infoHostConnectionTimeout; + this.topologyDiscoverer = new ClusterTopologyFromShardDiscovererImpl(config); + } else { + if (config.slaveHosts == null || config.slaveHosts.length == 0) { + throw new IllegalArgumentException("Either slaveHosts or infoHost must be specified."); + } + } + + } + + /** + * @throws CommunicationException in case of communication with {@code infoNode} exception + * @throws IllegalArgumentException in case when the info node returned invalid address + */ + protected Collection refreshServerList() { + List newServerList = topologyDiscoverer + .discoverTarantoolInstances(infoHostConnectionTimeout); + + initLock.lock(); + try { + + RoundRobinInstanceConnectionProvider cp = (RoundRobinInstanceConnectionProvider) this.communicationProvider; + + int sameNodeIndex = newServerList.indexOf(currConnection.getNodeInfo()); + if (sameNodeIndex != -1) { + //current node is in the list + Collections.swap(newServerList, 0, sameNodeIndex); + cp.updateNodes(newServerList); + } else { + cp.updateNodes(newServerList); + + if (state.isAtState(StateHelper.ALIVE)) { + stopIO(); + //todo add wait for reconnect here + + futuresSentToOldConnection.values() + .forEach(f -> { + f.completeExceptionally(new CommunicationException("Connection is dead")); + }); + + futuresSentToOldConnection.putAll(futures); + futures.clear(); + + LockSupport.unpark(connector); + } + + // if not alive, then do nothing because we are closed or on RECONNECT state. + // in the last case reconnect thread will wait untill the initLock will be unlocked + + oldConnection = currConnection; + } + } finally { + initLock.unlock(); + } + + + return newServerList; + } + + @Override + protected void connect(InstanceConnectionProvider communicationProvider) throws Exception { + //region drop all registered selectors from channel +// if (currConnection != null) { +// SelectionKey registeredKey = currConnection.getChannel().keyFor(readSelector); +// if (registeredKey != null) { +// registeredKey.cancel(); +// } +// } + + if (readSelector != null) { + try { + readSelector.close();//todo must be closed at stopIO operation + } catch (IOException ignored) { + } + } + //endregion + + //set a new value to currConnection variable + super.connect(communicationProvider); + + //region reregister selector + + readSelector = Selector.open(); + + if (oldConnection != null) { + oldConnection.getChannel().register(readSelector, SelectionKey.OP_READ, oldConnection); + } + + currConnection.getChannel().register(readSelector, SelectionKey.OP_READ, currConnection); + //endregion + } + + @Override + protected TarantoolBinaryPacket readFromInstance() throws IOException, InterruptedException { + + readSelector.select(); + + SelectionKey selectedKey = readSelector.selectedKeys().iterator().next(); + + TarantoolInstanceConnection connection = (TarantoolInstanceConnection) selectedKey.attachment(); + + return connection.readPacket(); + } + + @Override + protected CompletableFuture getFuture(TarantoolBinaryPacket pack) { + Long sync = pack.getSync(); + if (!futuresSentToOldConnection.isEmpty()) { + CompletableFuture oldConnectionFuture = futuresSentToOldConnection.remove(sync); + if (oldConnectionFuture != null) { + long now = System.currentTimeMillis(); + futuresSentToOldConnection.entrySet() + .removeIf(entry -> { + ExpirableOp expirableOp = (ExpirableOp) entry.getValue(); + if (expirableOp.hasExpired(now)) { + expirableOp.completeExceptionally( + new CommunicationException("Operation timeout is expired")); + return true; + } else { + return false; + } + }); + if (futuresSentToOldConnection.isEmpty()) { + try { + oldConnection.close(); + } catch (IOException e) { + } + oldConnection = null; + } + } + + } + return futures.remove(sync); } @Override @@ -89,7 +253,7 @@ protected boolean checkFail(CompletableFuture q, Exception e) { q.completeExceptionally(e); return true; } else { - assert retries != null; + retries.put(((ExpirableOp) q).getId(), (ExpirableOp)q); return false; } @@ -109,6 +273,17 @@ protected void close(Exception e) { } } + @Override + protected void stopIO() { + super.stopIO(); + closeChannel(oldConnection); + try { + readSelector.close(); + } catch (IOException e) { + //ignored + } + } + protected boolean isTransientError(Exception e) { if (e instanceof CommunicationException) { return true; @@ -143,6 +318,7 @@ protected void onReconnect() { public void run() { futures.put(fut.getId(), fut); try { + //todo invoke sendToInstance method? (maybe not) write(fut.getCode(), fut.getId(), null, fut.getArgs()); } catch (Exception e) { futures.remove(fut.getId()); @@ -154,6 +330,16 @@ public void run() { } } + @Override + protected void connectAndStartThreads() throws Exception { + initLock.lock(); + try { + super.connectAndStartThreads(); + } finally { + initLock.unlock(); + } + } + /** * Holds operation code and arguments for retry. */ diff --git a/src/main/java/org/tarantool/TarantoolClusterClientConfig.java b/src/main/java/org/tarantool/TarantoolClusterClientConfig.java index 423896b3..8f3d4b54 100644 --- a/src/main/java/org/tarantool/TarantoolClusterClientConfig.java +++ b/src/main/java/org/tarantool/TarantoolClusterClientConfig.java @@ -11,4 +11,26 @@ public class TarantoolClusterClientConfig extends TarantoolClientConfig { /* Executor service that will be used as a thread of execution to retry writes. */ public Executor executor = null; + + /** + * Array of addresses in the form of [host]:[port]. + */ + public String[] slaveHosts; + + /** + * Address of a tarantool instance form which a cluster host list can be discovered. + */ + public String infoHost; + + /** + * Name of a function that called on info host instance to fetch the list of + * tarantool cluster instances + */ + public String infoFunctionName; + + /** + * timeout of connecting to a info host + */ + public int infoHostConnectionTimeout = 500; + } diff --git a/src/main/java/org/tarantool/TarantoolConnection.java b/src/main/java/org/tarantool/TarantoolConnection.java index b817988f..9bf08943 100644 --- a/src/main/java/org/tarantool/TarantoolConnection.java +++ b/src/main/java/org/tarantool/TarantoolConnection.java @@ -1,5 +1,8 @@ package org.tarantool; +import org.tarantool.server.BinaryProtoUtils; +import org.tarantool.server.TarantoolBinaryPacket; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -24,17 +27,35 @@ public TarantoolConnection(String username, String password, Socket socket) thro @Override protected List exec(Code code, Object... args) { + TarantoolBinaryPacket responsePackage = writeAndRead(code, args); + return (List) responsePackage.getBody().get(Key.DATA.getId()); + } + + protected TarantoolBinaryPacket writeAndRead(Code code, Object... args) { try { - ByteBuffer packet = createPacket(code, syncId.incrementAndGet(), null, args); + ByteBuffer packet = BinaryProtoUtils.createPacket( + initialRequestSize, + msgPackLite, + code, + syncId.incrementAndGet(), + null, + args + ); + out.write(packet.array(), 0, packet.remaining()); out.flush(); - readPacket(is); + + TarantoolBinaryPacket responsePackage = BinaryProtoUtils.readPacket(in); + + Map headers = responsePackage.getHeaders(); + Map body = responsePackage.getBody(); Long c = (Long) headers.get(Key.CODE.getId()); - if (c == 0) { - return (List) body.get(Key.DATA.getId()); - } else { + + if (c != 0) { throw serverError(c, body.get(Key.ERROR.getId())); } + + return responsePackage; } catch (IOException e) { close(); throw new CommunicationException("Couldn't execute query", e); @@ -61,21 +82,20 @@ public void close() { } } - @Override public Long update(String sql, Object... bind) { - sql(sql, bind); - return getSqlRowCount(); + TarantoolBinaryPacket pack = sql(sql, bind); + return SqlProtoUtils.getSqlRowCount(pack); } @Override public List> query(String sql, Object... bind) { - sql(sql, bind); - return readSqlResult((List>) body.get(Key.DATA)); + TarantoolBinaryPacket pack = sql(sql, bind); + return SqlProtoUtils.readSqlResult(pack); } - protected void sql(String sql, Object[] bind) { - exec(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind); + protected TarantoolBinaryPacket sql(String sql, Object[] bind) { + return writeAndRead(Code.EXECUTE, Key.SQL_TEXT, sql, Key.SQL_BIND, bind); } public boolean isClosed() { diff --git a/src/main/java/org/tarantool/cluster/ClusterTopologyDiscoverer.java b/src/main/java/org/tarantool/cluster/ClusterTopologyDiscoverer.java new file mode 100644 index 00000000..8843b90d --- /dev/null +++ b/src/main/java/org/tarantool/cluster/ClusterTopologyDiscoverer.java @@ -0,0 +1,9 @@ +package org.tarantool.cluster; + +import org.tarantool.server.TarantoolInstanceInfo; + +import java.util.List; + +public interface ClusterTopologyDiscoverer { + List discoverTarantoolInstances(Integer infoHostConnectionTimeout); +} diff --git a/src/main/java/org/tarantool/cluster/ClusterTopologyFromFunctionDiscovererImpl.java b/src/main/java/org/tarantool/cluster/ClusterTopologyFromFunctionDiscovererImpl.java new file mode 100644 index 00000000..038b1c72 --- /dev/null +++ b/src/main/java/org/tarantool/cluster/ClusterTopologyFromFunctionDiscovererImpl.java @@ -0,0 +1,57 @@ +package org.tarantool.cluster; + +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClientOps; +import org.tarantool.TarantoolClusterClientConfig; +import org.tarantool.server.TarantoolInstanceInfo; + +import java.util.List; +import java.util.stream.Collectors; + +public class ClusterTopologyFromFunctionDiscovererImpl implements ClusterTopologyDiscoverer { + + private final TarantoolClusterClientConfig clientConfig; + + private final TarantoolInstanceInfo infoNode; + private final String functionName; + + public ClusterTopologyFromFunctionDiscovererImpl(TarantoolClusterClientConfig clientConfig) { + this.clientConfig = clientConfig; + if (clientConfig.infoFunctionName == null) { + throw new IllegalArgumentException("infoFuntionName in the config cannot be null"); + } + if (clientConfig.infoHost == null) { + throw new IllegalArgumentException("infoHost in the config cannot be null"); + } + + functionName = clientConfig.infoFunctionName; + this.infoNode = TarantoolInstanceInfo.create( + clientConfig.infoHost, clientConfig.username, clientConfig.password); + } + + @Override + public List discoverTarantoolInstances(Integer infoHostConnectionTimeout) { + + TarantoolClientOps, Object, List> syncOps = + new TarantoolClientImpl(infoNode.getSocketAddress(), clientConfig) + .syncOps(); + + List list = syncOps + .call(functionName); + + List funcResult = (List) list.get(0); + return funcResult.stream() + .map(addr -> TarantoolInstanceInfo.create( + parseReplicaUri(addr.toString()), clientConfig.username, clientConfig.password)) + .collect(Collectors.toList()); + } + + private String parseReplicaUri(String uri) { + String[] split = uri.split("@"); + if (split.length == 2) { + return split[1]; + } else { + return split[0]; + } + } +} diff --git a/src/main/java/org/tarantool/cluster/ClusterTopologyFromShardDiscovererImpl.java b/src/main/java/org/tarantool/cluster/ClusterTopologyFromShardDiscovererImpl.java new file mode 100644 index 00000000..722c7b8a --- /dev/null +++ b/src/main/java/org/tarantool/cluster/ClusterTopologyFromShardDiscovererImpl.java @@ -0,0 +1,69 @@ +package org.tarantool.cluster; + +import org.tarantool.TarantoolClientImpl; +import org.tarantool.TarantoolClusterClientConfig; +import org.tarantool.server.TarantoolInstanceInfo; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class ClusterTopologyFromShardDiscovererImpl implements ClusterTopologyDiscoverer { + + private final TarantoolClusterClientConfig clientConfig; + private final TarantoolInstanceInfo infoNode; + + private static final String DEFAULT_TOPOLOGY_DISCOVER_CALL = "require('vshard').storage.internal.current_cfg"; + + public ClusterTopologyFromShardDiscovererImpl(TarantoolClusterClientConfig clientConfig) { + this.clientConfig = clientConfig; + this.infoNode = TarantoolInstanceInfo.create( + clientConfig.infoHost, clientConfig.username, clientConfig.password); + } + + @Override + public List discoverTarantoolInstances(Integer infoHostConnectionTimeout) { + + List list = new TarantoolClientImpl(infoNode.getSocketAddress(), clientConfig) + .syncOps() + .call(DEFAULT_TOPOLOGY_DISCOVER_CALL); + + Map funcResult = (Map) ((List) list.get(0)).get(0); + + Map shardHash2DescriptionMap = (Map) getValue(funcResult, "sharding"); + + List result = new ArrayList<>(); + + + for (Object shardHash2Description : shardHash2DescriptionMap.entrySet()) { + + Map replicas = (Map) getValue(((Map.Entry) shardHash2Description).getValue(), "replicas"); + + for (Object replica : replicas.entrySet()) { + Object replicaUri = getValue(((Map.Entry) replica).getValue(), "uri"); + + result.add(TarantoolInstanceInfo.create( + parseReplicaUri(replicaUri.toString()), clientConfig.username, clientConfig.password)); + } + } + + return result; + } + + private String parseReplicaUri(String uri) { + String[] split = uri.split("@"); + if (split.length == 2) { + return split[1]; + } else { + return split[0]; + } + } + + private Object getValue(Object map, String key) { + if (!(map instanceof Map)) { + throw new IllegalArgumentException("Argument 'map' is not instance of Map but " + map.getClass()); + } + + return ((Map) map).get(key); + } +} diff --git a/src/main/java/org/tarantool/server/BinaryProtoUtils.java b/src/main/java/org/tarantool/server/BinaryProtoUtils.java new file mode 100644 index 00000000..a7022e08 --- /dev/null +++ b/src/main/java/org/tarantool/server/BinaryProtoUtils.java @@ -0,0 +1,300 @@ +package org.tarantool.server; + +import org.tarantool.Base64; +import org.tarantool.Code; +import org.tarantool.CommunicationException; +import org.tarantool.CountInputStreamImpl; +import org.tarantool.Key; +import org.tarantool.MsgPackLite; +import org.tarantool.TarantoolException; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SocketChannel; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.EnumMap; +import java.util.List; +import java.util.Map; + +public abstract class BinaryProtoUtils { + + private final static int DEFAULT_INITIAL_REQUEST_SIZE = 4096; + private static final String WELCOME = "Tarantool "; + + /** + * Reads tarantool binary protocol's packet from {@code inputStream} + * + * @param inputStream ready to use input stream + * @return Nonnull instance of packet. + * @throws IOException in case of any io-error. + */ + public static TarantoolBinaryPacket readPacket(InputStream inputStream) throws IOException { + CountInputStreamImpl msgStream = new CountInputStreamImpl(inputStream); + + int size = ((Number) getMsgPackLite().unpack(msgStream)).intValue(); + long mark = msgStream.getBytesRead(); + + Map headers = (Map) getMsgPackLite().unpack(msgStream); + + Map body = null; + if (msgStream.getBytesRead() - mark < size) { + body = (Map) getMsgPackLite().unpack(msgStream); + } + + return new TarantoolBinaryPacket(headers, body); + } + + /** + * Connects to a tarantool node described by {@code socket}. Performs an authentication if required + * + * @param socket a socket channel to a tarantool node + * @param username auth username + * @param password auth password + * @return object with information about a connection/ + * @throws IOException in case of any IO fails + * @throws CommunicationException when welcome string is invalid + * @throws TarantoolException in case of failed authentication + */ + public static TarantoolInstanceConnectionMeta connect(Socket socket, + String username, + String password) throws IOException { + byte[] inputBytes = new byte[64]; + + InputStream inputStream = socket.getInputStream(); + inputStream.read(inputBytes); + + String firstLine = new String(inputBytes); + assertCorrectWelcome(firstLine, socket.getRemoteSocketAddress()); + String serverVersion = firstLine.substring(WELCOME.length()); + + inputStream.read(inputBytes); + String salt = new String(inputBytes); + if (username != null && password != null) { + ByteBuffer authPacket = createAuthPacket(username, password, salt); + + OutputStream os = socket.getOutputStream(); + os.write(authPacket.array(), 0, authPacket.remaining()); + os.flush(); + + TarantoolBinaryPacket responsePackage = readPacket(socket.getInputStream()); + assertNoErrCode(responsePackage); + } + + return new TarantoolInstanceConnectionMeta(salt, serverVersion); + } + + /** + * Connects to a tarantool node described by {@code socketChannel}. Performs an authentication if required. + * + * @param channel a socket channel to tarantool node. The channel have to be in blocking mode + * @param username auth username + * @param password auth password + * @return object with information about a connection/ + * @throws IOException in case of any IO fails + * @throws CommunicationException when welcome string is invalid + * @throws TarantoolException in case of failed authentication + */ + public static TarantoolInstanceConnectionMeta connect(SocketChannel channel, + String username, + String password) throws IOException { + ByteBuffer welcomeBytes = ByteBuffer.wrap(new byte[64]); + channel.read(welcomeBytes); + + String firstLine = new String(welcomeBytes.array()); + assertCorrectWelcome(firstLine, channel.getRemoteAddress()); + String serverVersion = firstLine.substring(WELCOME.length()); + + welcomeBytes.clear(); + channel.read(welcomeBytes); + String salt = new String(welcomeBytes.array()); + + if (username != null && password != null) { + ByteBuffer authPacket = createAuthPacket(username, password, salt); + writeFully(channel, authPacket); + + TarantoolBinaryPacket authResponse = readPacket(channel); + assertNoErrCode(authResponse); + } + + return new TarantoolInstanceConnectionMeta(salt, serverVersion); + } + + private static void assertCorrectWelcome(String firstLine, SocketAddress remoteAddress) { + if (!firstLine.startsWith(WELCOME)) { + String errMsg = "Failed to connect to node " + remoteAddress.toString() + ":" + + " Welcome message should starts with tarantool but starts with '" + firstLine + "'"; + throw new CommunicationException(errMsg, new IllegalStateException("Invalid welcome packet")); + } + } + + private static void assertNoErrCode(TarantoolBinaryPacket authResponse) { + Long code = (Long) authResponse.getHeaders().get(Key.CODE.getId()); + if (code != 0) { + Object error = authResponse.getBody().get(Key.ERROR.getId()); + String errorMsg = error instanceof String ? (String) error : new String((byte[]) error); + throw new TarantoolException(code, errorMsg); + } + } + + public static void writeFully(OutputStream stream, ByteBuffer buffer) throws IOException { + stream.write(buffer.array()); + stream.flush(); + } + + public static void writeFully(SocketChannel channel, ByteBuffer buffer) throws IOException { + long code = 0; + while (buffer.remaining() > 0 && (code = channel.write(buffer)) > -1) { + } + if (code < 0) { + throw new SocketException("write failed code: " + code); + } + } + + public static final int LENGTH_OF_SIZE_MESSAGE = 5; + + /** + * Reads a tarantool's binary protocol package from the reader + * + * @param bufferReader readable channel that have to be in blocking mode + * or instance of {@link ReadableViaSelectorChannel} + * @return tarantool binary protocol message wrapped by instance of {@link TarantoolBinaryPacket}. + * @throws IOException if any IO-error occurred during read from the channel + * @throws CommunicationException input stream bytes consitute msg pack message in wrong format. + * @throws java.nio.channels.NonReadableChannelException – If this channel was not opened for reading + */ + public static TarantoolBinaryPacket readPacket(ReadableByteChannel bufferReader) + throws CommunicationException, IOException { + + ByteBuffer buffer = ByteBuffer.allocate(LENGTH_OF_SIZE_MESSAGE); + bufferReader.read(buffer); + + buffer.flip(); + int size = ((Number) getMsgPackLite().unpack(new ByteBufferBackedInputStream(buffer))).intValue(); + + buffer = ByteBuffer.allocate(size); + bufferReader.read(buffer); + + buffer.flip(); + ByteBufferBackedInputStream msgBytesStream = new ByteBufferBackedInputStream(buffer); + Object unpackedHeaders = getMsgPackLite().unpack(msgBytesStream); + if (!(unpackedHeaders instanceof Map)) { + //noinspection ConstantConditions + throw new CommunicationException("Error while unpacking headers of tarantool response: " + + "expected type Map but was " + + unpackedHeaders != null ? unpackedHeaders.getClass().toString() : "null"); + } + //noinspection unchecked (checked above) + Map headers = (Map) unpackedHeaders; + + Map body = null; + if (msgBytesStream.hasAvailable()) { + Object unpackedBody = getMsgPackLite().unpack(msgBytesStream); + if (!(unpackedBody instanceof Map)) { + //noinspection ConstantConditions + throw new CommunicationException("Error while unpacking body of tarantool response: " + + "expected type Map but was " + + unpackedBody != null ? unpackedBody.getClass().toString() : "null"); + } + //noinspection unchecked (checked above) + body = (Map) unpackedBody; + } + + return new TarantoolBinaryPacket(headers, body); + } + + public static ByteBuffer createAuthPacket(String username, + final String password, + String salt) throws IOException { + final MessageDigest sha1; + try { + sha1 = MessageDigest.getInstance("SHA-1"); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException(e); + } + List auth = new ArrayList(2); + auth.add("chap-sha1"); + + byte[] p = sha1.digest(password.getBytes()); + + sha1.reset(); + byte[] p2 = sha1.digest(p); + + sha1.reset(); + sha1.update(Base64.decode(salt), 0, 20); + sha1.update(p2); + byte[] scramble = sha1.digest(); + for (int i = 0, e = 20; i < e; i++) { + p[i] ^= scramble[i]; + } + auth.add(p); + + return createPacket(DEFAULT_INITIAL_REQUEST_SIZE, Code.AUTH, 0L, null, + Key.USER_NAME, username, Key.TUPLE, auth); + } + + public static ByteBuffer createPacket(Code code, Long syncId, Long schemaId, Object... args) throws IOException { + return createPacket(DEFAULT_INITIAL_REQUEST_SIZE, code, syncId, schemaId, args); + } + + public static ByteBuffer createPacket(int initialRequestSize, + Code code, + Long syncId, + Long schemaId, + Object... args) throws IOException { + return createPacket(initialRequestSize, getMsgPackLite(), code, syncId, schemaId, args); + } + + public static ByteBuffer createPacket(int initialRequestSize, + MsgPackLite msgPackLite, + Code code, + Long syncId, + Long schemaId, + Object... args) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(initialRequestSize); + bos.write(new byte[5]); + DataOutputStream ds = new DataOutputStream(bos); + Map header = new EnumMap<>(Key.class); + Map body = new EnumMap<>(Key.class); + header.put(Key.CODE, code); + header.put(Key.SYNC, syncId); + if (schemaId != null) { + header.put(Key.SCHEMA_ID, schemaId); + } + if (args != null) { + for (int i = 0, e = args.length; i < e; i += 2) { + Object value = args[i + 1]; + body.put((Key) args[i], value); + } + } + msgPackLite.pack(header, ds); + msgPackLite.pack(body, ds); + ds.flush(); + ByteBuffer buffer = bos.toByteBuffer(); + buffer.put(0, (byte) 0xce); + buffer.putInt(1, bos.size() - 5); + return buffer; + } + + private static class ByteArrayOutputStream extends java.io.ByteArrayOutputStream { + public ByteArrayOutputStream(int size) { + super(size); + } + + ByteBuffer toByteBuffer() { + return ByteBuffer.wrap(buf, 0, count); + } + } + + private static MsgPackLite getMsgPackLite() { + return MsgPackLite.INSTANCE; + } +} \ No newline at end of file diff --git a/src/main/java/org/tarantool/server/ByteBufferBackedInputStream.java b/src/main/java/org/tarantool/server/ByteBufferBackedInputStream.java new file mode 100644 index 00000000..c2a7ebb9 --- /dev/null +++ b/src/main/java/org/tarantool/server/ByteBufferBackedInputStream.java @@ -0,0 +1,45 @@ +package org.tarantool.server; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +class ByteBufferBackedInputStream extends InputStream { + + private final ByteBuffer buf; + + /** + * //todo add a comment + * @param buf a buffer have to be ready fo read (flipped) + */ + public ByteBufferBackedInputStream(ByteBuffer buf) { + this.buf = buf; + } + + public int read() throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + return buf.get() & 0xFF; + } + + public int read(byte[] bytes, int off, int len) + throws IOException { + if (!buf.hasRemaining()) { + return -1; + } + + len = Math.min(len, buf.remaining()); + buf.get(bytes, off, len); + return len; + } + + @Override + public int available() { + return buf.remaining(); + } + + public boolean hasAvailable() { + return available() > 0; + } +} \ No newline at end of file diff --git a/src/main/java/org/tarantool/server/ReadableViaSelectorChannel.java b/src/main/java/org/tarantool/server/ReadableViaSelectorChannel.java new file mode 100644 index 00000000..9360b62e --- /dev/null +++ b/src/main/java/org/tarantool/server/ReadableViaSelectorChannel.java @@ -0,0 +1,57 @@ +package org.tarantool.server; + +import org.tarantool.CommunicationException; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.nio.channels.spi.SelectorProvider; + +class ReadableViaSelectorChannel implements ReadableByteChannel { + private final SocketChannel channel; + private final Selector selector; + + public ReadableViaSelectorChannel(SocketChannel channel) throws IOException { + if (channel.isBlocking()) { + throw new IllegalArgumentException("Channel have to be non-blocking"); + } + + this.channel = channel; + selector = SelectorProvider.provider().openSelector(); + channel.register(selector, SelectionKey.OP_READ); + } + + @Override + public int read(ByteBuffer buffer) throws IOException { + int count, n; + count = n = channel.read(buffer); + + if (n < 0) { + throw new CommunicationException("Channel read failed " + n); + } + + while (buffer.remaining() > 0) { + selector.select();//todo think about read timeout + n = channel.read(buffer); + if (n < 0) { + throw new CommunicationException("Channel read failed: " + n); + } + count += n; + } + return count; + } + + @Override + public boolean isOpen() { + return channel.isOpen(); + } + + @Override + public void close() throws IOException { + selector.close(); + channel.close(); + } +} diff --git a/src/main/java/org/tarantool/server/TarantoolBinaryPacket.java b/src/main/java/org/tarantool/server/TarantoolBinaryPacket.java new file mode 100644 index 00000000..e6421b6f --- /dev/null +++ b/src/main/java/org/tarantool/server/TarantoolBinaryPacket.java @@ -0,0 +1,49 @@ +package org.tarantool.server; + +import org.tarantool.Key; + +import java.util.Map; + +public class TarantoolBinaryPacket { + private final Map headers; + private final Map body; + + public TarantoolBinaryPacket(Map headers, Map body) { + this.headers = headers; + this.body = body; + } + + public TarantoolBinaryPacket(Map headers) { + this.headers = headers; + body = null; + } + + public Long getCode() { + Object potenticalCode = headers.get(Key.CODE.getId()); + + if (!(potenticalCode instanceof Long)) { + //noinspection ConstantConditions + throw new IllegalStateException("A value contained in the header by key '" + Key.CODE.name() + "'" + + " is not instance of Long class: " + + potenticalCode != null ? potenticalCode.getClass().toString() : "null"); + } + + return (Long) potenticalCode; + } + + public Long getSync() { + return (Long) getHeaders().get(Key.SYNC.getId()); + } + + public Map getHeaders() { + return headers; + } + + public Map getBody() { + return body; + } + + public boolean hasBody() { + return body != null && body.size() > 0; + } +} diff --git a/src/main/java/org/tarantool/server/TarantoolInstanceConnection.java b/src/main/java/org/tarantool/server/TarantoolInstanceConnection.java new file mode 100644 index 00000000..611c3cfa --- /dev/null +++ b/src/main/java/org/tarantool/server/TarantoolInstanceConnection.java @@ -0,0 +1,98 @@ +package org.tarantool.server; + +import org.tarantool.CommunicationException; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +public class TarantoolInstanceConnection implements Closeable { + + /** + * Information about connection + */ + private final TarantoolInstanceInfo nodeInfo; + + /** + * Connection metadata + */ + private final TarantoolInstanceConnectionMeta meta; + + /** + * Nonnull connection to a tarantool instance + */ + protected final SocketChannel channel; + + /** + * Nonnull connection to a tarantool instance + */ + protected final ReadableViaSelectorChannel readChannel; + + private TarantoolInstanceConnection(TarantoolInstanceInfo nodeInfo, + TarantoolInstanceConnectionMeta meta, + SocketChannel channel) throws IOException { + this.nodeInfo = nodeInfo; + this.meta = meta; + this.channel = channel; + this.readChannel = new ReadableViaSelectorChannel(channel); + } + + /** + * Attempts to connect to a tarantool instance and create correspond TarantoolInstanceConnection + * + * @param tarantoolInstanceInfo information about an instance to connect + * @throws CommunicationException if an error occurred during connection related exchanges + * @throws IOException in case of any IO fail + */ + public static TarantoolInstanceConnection connect(TarantoolInstanceInfo tarantoolInstanceInfo) throws IOException { + SocketChannel channel; + try { + channel = SocketChannel.open(tarantoolInstanceInfo.getSocketAddress()); + + String username = tarantoolInstanceInfo.getUsername(); + String password = tarantoolInstanceInfo.getPassword(); + TarantoolInstanceConnectionMeta meta = BinaryProtoUtils.connect(channel, username, password); + + channel.configureBlocking(false); + + return new TarantoolInstanceConnection(tarantoolInstanceInfo, meta, channel); + } catch (IOException e) { + //todo add toString method to TarantoolInstanceConnection to describe failed attempt to connect properly + throw new IOException("IOException occurred while connecting to node " + tarantoolInstanceInfo, e); + } + } + + public TarantoolInstanceInfo getNodeInfo() { + return nodeInfo; + } + + public TarantoolInstanceConnectionMeta getMeta() { + return meta; + } + + public SocketChannel getChannel() { + return channel; + } + + public void writeBuffer(ByteBuffer writerBuffer) throws IOException { + BinaryProtoUtils.writeFully(getChannel(), writerBuffer); + } + + public TarantoolBinaryPacket readPacket() throws IOException { + return BinaryProtoUtils.readPacket(readChannel); + } + + private void closeConnection() { + try { + readChannel.close();//also closes this.channel + } catch (IOException ignored) { + + } + } + + @Override + public void close() throws IOException { + closeConnection(); + } +} diff --git a/src/main/java/org/tarantool/server/TarantoolInstanceConnectionMeta.java b/src/main/java/org/tarantool/server/TarantoolInstanceConnectionMeta.java new file mode 100644 index 00000000..dba054b8 --- /dev/null +++ b/src/main/java/org/tarantool/server/TarantoolInstanceConnectionMeta.java @@ -0,0 +1,20 @@ +package org.tarantool.server; + +public class TarantoolInstanceConnectionMeta { + + private final String salt; + private final String serverVersion; + + public TarantoolInstanceConnectionMeta(String salt, String serverVersion) { + this.salt = salt; + this.serverVersion = serverVersion; + } + + public String getSalt() { + return salt; + } + + public String getServerVersion() { + return serverVersion; + } +} diff --git a/src/main/java/org/tarantool/server/TarantoolInstanceInfo.java b/src/main/java/org/tarantool/server/TarantoolInstanceInfo.java new file mode 100644 index 00000000..84b16cbe --- /dev/null +++ b/src/main/java/org/tarantool/server/TarantoolInstanceInfo.java @@ -0,0 +1,133 @@ +package org.tarantool.server; + +import java.net.*; +import java.util.*; + +/** + * Holds info about a tarantool instance. + */ +public class TarantoolInstanceInfo { + + private final InetSocketAddress socketAddress; + private final String username; + private final String password; + + private TarantoolInstanceInfo(InetSocketAddress socketAddress, String username, String password) { + this.socketAddress = socketAddress; + this.username = username; + this.password = password; + } + + /** + * @return A socket address that the client can be connected to + */ + public InetSocketAddress getSocketAddress() { + return socketAddress; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TarantoolInstanceInfo node = (TarantoolInstanceInfo) o; + return socketAddress.equals(node.socketAddress); + } + + @Override + public int hashCode() { + return Objects.hash(socketAddress); + } + + /** + * + * @param socketAddress Nonnull socket address + * @return Instance of {@link TarantoolInstanceInfo} + */ + public static TarantoolInstanceInfo create(InetSocketAddress socketAddress, String username, String password) { + if (socketAddress == null) { + throw new IllegalArgumentException("A socket address can not be null."); + } + + return new TarantoolInstanceInfo(socketAddress, username, password); + } + + + /** + * Creates an instance info object with no authentication data. + * + * @param address hostname address as String + * + * @throws IllegalArgumentException if the port parameter is outside the range + * of valid port values, or if the hostname parameter is null. + * @throws SecurityException if a security manager is present and + * permission to resolve the host name is + * denied. + */ + public static TarantoolInstanceInfo create(String address) { + return create(address, null, null); + } + + + + /** + * Creates an instance info object + * @param address hostname address as String + * @param username authentication username + * @param password authentication password + * + * @throws IllegalArgumentException if the port parameter is outside the range + * of valid port values, or if the hostname parameter is null. + * @throws SecurityException if a security manager is present and + * permission to resolve the host name is + * denied. + */ + public static TarantoolInstanceInfo create(String address, String username, String password) { + if (address == null) { + throw new IllegalArgumentException("A hostname address can not be null."); + } + + return new TarantoolInstanceInfo(parseAddress(address), username, password); + } + + /** + * Parse a string address in the form of [host]:[port] + * and builds a socket address. + * + * @param addr Server address as string. + * @throws IllegalArgumentException if the port parameter is outside the range + * of valid port values, or if the hostname parameter is null. + * @throws SecurityException if a security manager is present and + * permission to resolve the host name is + * denied. + */ + private static InetSocketAddress parseAddress(String addr) { + int idx = addr.indexOf(':'); + String host = (idx < 0) ? addr : addr.substring(0, idx); + + int port; + try { + port = (idx < 0) ? 3301 : Integer.parseInt(addr.substring(idx + 1)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Exception while parsing port in address '" + addr + "'", e); + } + + return new InetSocketAddress(host, port); + } + + @Override + public String toString() { + return "TarantoolInstanceInfo{" + + "socketAddress=" + socketAddress + + ", username='" + username + '\'' + + ", password='" + password + '\'' + + '}'; + } +} diff --git a/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java b/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java index 4c494f50..9ce92c34 100644 --- a/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java +++ b/src/test/java/org/tarantool/AbstractTarantoolConnectorIT.java @@ -37,9 +37,13 @@ public abstract class AbstractTarantoolConnectorIT { protected static final int TIMEOUT = 500; protected static final int RESTART_TIMEOUT = 2000; + @Deprecated protected static final SocketChannelProvider socketChannelProvider = new TestSocketChannelProvider(host, port, RESTART_TIMEOUT); + protected static final InstanceConnectionProvider TEST_INSTANCE_CONNECTION_PROVIDER = + new TestInstanceConnectionProvider(host + ":" + port, username, password, RESTART_TIMEOUT); + protected static TarantoolControl control; protected static TarantoolConsole console; @@ -132,14 +136,15 @@ protected void checkTupleResult(Object res, List tuple) { } protected TarantoolClient makeClient() { - return new TarantoolClientImpl(socketChannelProvider, makeClientConfig()); +// return new TarantoolClientImpl(socketChannelProvider, makeClientConfig()); + return new TarantoolClientImpl(TEST_INSTANCE_CONNECTION_PROVIDER, makeClientConfig()); } protected static TarantoolClientConfig makeClientConfig() { return fillClientConfig(new TarantoolClientConfig()); } - protected static TarantoolClusterClientConfig makeClusterClientConfig() { + public static TarantoolClusterClientConfig makeClusterClientConfig() { TarantoolClusterClientConfig config = fillClientConfig(new TarantoolClusterClientConfig()); config.executor = null; config.operationExpiryTimeMillis = TIMEOUT; diff --git a/src/test/java/org/tarantool/ClientReconnectClusterIT.java b/src/test/java/org/tarantool/ClientReconnectClusterIT.java index 35737a32..67046bb4 100644 --- a/src/test/java/org/tarantool/ClientReconnectClusterIT.java +++ b/src/test/java/org/tarantool/ClientReconnectClusterIT.java @@ -2,12 +2,19 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; +import org.mockito.Matchers; +import org.mockito.Mockito; +import org.tarantool.cluster.ClusterTopologyDiscoverer; +import org.tarantool.server.TarantoolInstanceInfo; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -110,8 +117,55 @@ public void execute() throws Throwable { assertEquals("Connection time out.", e.getMessage()); } - private TarantoolClientImpl makeClient(String...addrs) { + private TarantoolClusterClient makeClient(String...addrs) { TarantoolClusterClientConfig config = makeClusterClientConfig(); - return new TarantoolClusterClient(config, addrs); + config.slaveHosts = addrs; + return new TarantoolClusterClient(config); + } + + + @Test + @Disabled("Incomplete implementation") + void testUpdateNodeList() { + control.start(SRV1); + control.start(SRV2); + control.start(SRV3); + + control.waitStarted(SRV1); + control.waitStarted(SRV2); + control.waitStarted(SRV3); + + + String testSchemaCreateScript = "return box.schema.space.create('rr_test').id, " + + "box.space.rr_test:create_index('primary').id"; + control.executeCommand(testSchemaCreateScript, SRV1); + + String srv1_address = "localhost:" + PORTS[0]; + String srv2_address = "127.0.0.1:" + PORTS[1]; + String srv3_address = "localhost:" + PORTS[2]; + + String INFO_FUNCTION_NAME = "returnAddrsExceptSrv1"; + String INFO_FUNCTION_SCRIPT = + "function " + INFO_FUNCTION_NAME + "() return {'" + srv2_address + "', '" + srv3_address + "'} end"; + + control.executeCommand(INFO_FUNCTION_SCRIPT, SRV1); + control.waitReplication(SRV1, TIMEOUT); + + final TarantoolClusterClient client = makeClient( + srv1_address, + srv2_address); + List ids = client.syncOps().eval( + testSchemaCreateScript); + + +//todo + List newInstances = Stream.of(srv2_address, srv3_address) + .map(TarantoolInstanceInfo::create) + .collect(Collectors.toList()); + + ClusterTopologyDiscoverer discovererMock = Mockito.mock(ClusterTopologyDiscoverer.class); + Mockito.when(discovererMock.discoverTarantoolInstances(Matchers.anyInt())).thenReturn(newInstances); + + } } diff --git a/src/test/java/org/tarantool/ClientReconnectIT.java b/src/test/java/org/tarantool/ClientReconnectIT.java index 2472bf05..a2193db8 100644 --- a/src/test/java/org/tarantool/ClientReconnectIT.java +++ b/src/test/java/org/tarantool/ClientReconnectIT.java @@ -4,8 +4,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.function.Executable; +import org.tarantool.server.*; -import java.nio.channels.SocketChannel; +import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Random; @@ -64,7 +65,7 @@ public void execute() { }); assertTrue(CommunicationException.class.isAssignableFrom(e.getClass()) || - IllegalStateException.class.isAssignableFrom(e.getClass())); + IllegalStateException.class.isAssignableFrom(e.getClass())); assertNotNull(((TarantoolClientImpl) client).getThumbstone()); @@ -85,21 +86,28 @@ public void execute() { @Test public void testSpuriousReturnFromPark() { final CountDownLatch latch = new CountDownLatch(2); - SocketChannelProvider provider = new SocketChannelProvider() { + + TarantoolClientConfig config = makeClientConfig(); + + InstanceConnectionProvider instanceConnectionProvider = new InstanceConnectionProvider() { + @Override - public SocketChannel get(int retryNumber, Throwable lastError) { - if (lastError == null) { - latch.countDown(); - } - return socketChannelProvider.get(retryNumber, lastError); + public TarantoolInstanceConnection connect() throws IOException { + latch.countDown(); + return TEST_INSTANCE_CONNECTION_PROVIDER.connect(); + } + + @Override + public String getDescription() { + return TEST_INSTANCE_CONNECTION_PROVIDER.getDescription(); } }; - client = new TarantoolClientImpl(provider, makeClientConfig()); + client = new TarantoolClientImpl(instanceConnectionProvider, config); client.syncOps().ping(); // The park() will return inside connector thread. - LockSupport.unpark(((TarantoolClientImpl)client).connector); + LockSupport.unpark(((TarantoolClientImpl) client).connector); // Wait on latch as a proof that reconnect did not happen. // In case of a failure, latch will reach 0 before timeout occurs. @@ -116,7 +124,7 @@ public SocketChannel get(int retryNumber, Throwable lastError) { */ @Test public void testCloseWhileOperationsAreInProgress() { - client = new TarantoolClientImpl(socketChannelProvider, makeClientConfig()) { + client = new TarantoolClientImpl(TEST_INSTANCE_CONNECTION_PROVIDER, makeClientConfig()) { @Override protected void write(Code code, Long syncId, Long schemaId, Object... args) { // Skip write. @@ -124,7 +132,7 @@ protected void write(Code code, Long syncId, Long schemaId, Object... args) { }; final Future> res = client.asyncOps().select(SPACE_ID, PK_INDEX_ID, Collections.singletonList(1), - 0, 1, Iterator.EQ); + 0, 1, Iterator.EQ); client.close(); @@ -144,7 +152,7 @@ public void execute() throws Throwable { @Test public void testReconnectWhileOperationsAreInProgress() { final AtomicBoolean writeEnabled = new AtomicBoolean(false); - client = new TarantoolClientImpl(socketChannelProvider, makeClientConfig()) { + client = new TarantoolClientImpl(TEST_INSTANCE_CONNECTION_PROVIDER, makeClientConfig()) { @Override protected void write(Code code, Long syncId, Long schemaId, Object... args) throws Exception { if (writeEnabled.get()) { @@ -154,7 +162,7 @@ protected void write(Code code, Long syncId, Long schemaId, Object... args) thro }; final Future> mustFail = client.asyncOps().select(SPACE_ID, PK_INDEX_ID, Collections.singletonList(1), - 0, 1, Iterator.EQ); + 0, 1, Iterator.EQ); stopTarantool(INSTANCE_NAME); @@ -176,7 +184,7 @@ public void execute() throws Throwable { } Future> res = client.asyncOps().select(SPACE_ID, PK_INDEX_ID, Collections.singletonList(1), - 0, 1, Iterator.EQ); + 0, 1, Iterator.EQ); try { res.get(TIMEOUT, TimeUnit.MILLISECONDS); @@ -188,11 +196,11 @@ public void execute() throws Throwable { @Test public void testConcurrentCloseAndReconnect() { final CountDownLatch latch = new CountDownLatch(2); - client = new TarantoolClientImpl(socketChannelProvider, makeClientConfig()) { + client = new TarantoolClientImpl(TEST_INSTANCE_CONNECTION_PROVIDER, makeClientConfig()) { @Override - protected void connect(final SocketChannel channel) throws Exception { + protected void connect(InstanceConnectionProvider communicationProvider) throws Exception { latch.countDown(); - super.connect(channel); + super.connect(communicationProvider); } }; @@ -221,10 +229,10 @@ public void run() { public void testLongParallelCloseReconnects() { int numThreads = 4; int numClients = 4; - int timeBudget = 30*1000; + int timeBudget = 30 * 1000; final AtomicReferenceArray clients = - new AtomicReferenceArray(numClients); + new AtomicReferenceArray(numClients); for (int idx = 0; idx < clients.length(); idx++) { clients.set(idx, makeClient()); @@ -242,7 +250,7 @@ public void testLongParallelCloseReconnects() { @Override public void run() { while (!Thread.currentThread().isInterrupted() && - deadline > System.currentTimeMillis()) { + deadline > System.currentTimeMillis()) { int idx = rnd.nextInt(clients.length()); @@ -284,7 +292,7 @@ public void run() { fail(e); } if (deadline > System.currentTimeMillis()) { - System.out.println("" + (deadline - System.currentTimeMillis())/1000 + "s remains."); + System.out.println("" + (deadline - System.currentTimeMillis()) / 1000 + "s remains."); } } diff --git a/src/test/java/org/tarantool/TarantoolConsole.java b/src/test/java/org/tarantool/TarantoolConsole.java index f664f86b..c03f2618 100644 --- a/src/test/java/org/tarantool/TarantoolConsole.java +++ b/src/test/java/org/tarantool/TarantoolConsole.java @@ -168,7 +168,7 @@ private static class TarantoolTcpConsole extends TarantoolConsole { final Socket socket; TarantoolTcpConsole(String host, int port) { - socket = new TestSocketChannelProvider(host, port, TIMEOUT).get(1, null).socket(); + socket = new TestSocketChannelProvider(host, port, TIMEOUT).get().socket(); try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); writer = new OutputStreamWriter(socket.getOutputStream()); diff --git a/src/test/java/org/tarantool/TestInstanceConnectionProvider.java b/src/test/java/org/tarantool/TestInstanceConnectionProvider.java new file mode 100644 index 00000000..5e60bcc2 --- /dev/null +++ b/src/test/java/org/tarantool/TestInstanceConnectionProvider.java @@ -0,0 +1,36 @@ +package org.tarantool; + +import org.tarantool.server.TarantoolInstanceConnection; + +import java.io.IOException; + +public class TestInstanceConnectionProvider extends SingleInstanceConnectionProvider { + private final long restartTimeout; + + public TestInstanceConnectionProvider(String address, String username, String password, long restartTimeout1) { + super(address, username, password); + + this.restartTimeout = restartTimeout1; + } + + @Override + public TarantoolInstanceConnection connect() throws IOException { + + long budget = System.currentTimeMillis() + restartTimeout; + while (!Thread.currentThread().isInterrupted()) { + try { + return super.connect(); + } catch (Exception e) { + if (budget < System.currentTimeMillis()) + throw new RuntimeException(e); + try { + Thread.sleep(100); + } catch (InterruptedException ex) { + // No-op. + Thread.currentThread().interrupt(); + } + } + } + throw new RuntimeException(new InterruptedException()); + } +} diff --git a/src/test/java/org/tarantool/TestSocketChannelProvider.java b/src/test/java/org/tarantool/TestSocketChannelProvider.java index 469bc77c..570fb413 100644 --- a/src/test/java/org/tarantool/TestSocketChannelProvider.java +++ b/src/test/java/org/tarantool/TestSocketChannelProvider.java @@ -18,7 +18,7 @@ public TestSocketChannelProvider(String host, int port, int restart_timeout) { } @Override - public SocketChannel get(int retryNumber, Throwable lastError) { + public SocketChannel get() { long budget = System.currentTimeMillis() + restart_timeout; while (!Thread.currentThread().isInterrupted()) { try { diff --git a/src/test/java/org/tarantool/VshardClusterClientIT.java b/src/test/java/org/tarantool/VshardClusterClientIT.java new file mode 100644 index 00000000..2365b4c1 --- /dev/null +++ b/src/test/java/org/tarantool/VshardClusterClientIT.java @@ -0,0 +1,4 @@ +package org.tarantool; + +public class VshardClusterClientIT { +} diff --git a/src/test/java/org/tarantool/cluster/ClusterTopologyFromFunctionDiscovererImplIT.java b/src/test/java/org/tarantool/cluster/ClusterTopologyFromFunctionDiscovererImplIT.java new file mode 100644 index 00000000..7d1d04ef --- /dev/null +++ b/src/test/java/org/tarantool/cluster/ClusterTopologyFromFunctionDiscovererImplIT.java @@ -0,0 +1,87 @@ +package org.tarantool.cluster; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.tarantool.AbstractTarantoolConnectorIT; +import org.tarantool.TarantoolClusterClientConfig; +import org.tarantool.TarantoolControl; +import org.tarantool.server.TarantoolInstanceInfo; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.*; +import static org.tarantool.TestUtils.makeInstanceEnv; + +@DisplayName("ClusterTopologyFromFunctionDiscovererImpl integration tests") +class ClusterTopologyFromFunctionDiscovererImplIT { + protected static final int INSTANCE_LISTEN_PORT = 3301; + protected static final int INSTANCE_ADMIN_PORT = 3313; + private static final String LUA_FILE = "jdk-testing.lua"; + + private static final String INSTANCE_NAME = "jdk-testing"; + private static TarantoolControl control; + private static TarantoolClusterClientConfig clusterConfig; + + private static String INFO_FUNCTION_NAME = "retAddrs"; + private static String TEST_RETURN_HOSTLIST_SCRIPT = + "function " + INFO_FUNCTION_NAME + "() return {'localhost:80', '127.0.0.1:3301'} end"; + + private static final Integer DISCOVER_TIMEOUT_MILLIS = 1000; + + @BeforeAll + public static void setupEnv() { + control = new TarantoolControl(); + control.createInstance(INSTANCE_NAME, LUA_FILE, makeInstanceEnv(INSTANCE_LISTEN_PORT, INSTANCE_ADMIN_PORT)); + + control.start(INSTANCE_NAME); + control.waitStarted("jdk-testing"); + + clusterConfig = AbstractTarantoolConnectorIT.makeClusterClientConfig(); + clusterConfig.infoHost = "localhost:" + INSTANCE_LISTEN_PORT; + clusterConfig.infoFunctionName = INFO_FUNCTION_NAME; + } + + @AfterAll + public static void tearDownEnv() { + control.stop(INSTANCE_NAME); + } + + @Test + @DisplayName("Discoverer successfully fetches and parses a node list from an info node.") + void testSuccessfulAddressParsing() { + //inject the function + control.openConsole(INSTANCE_NAME).exec(TEST_RETURN_HOSTLIST_SCRIPT); + ClusterTopologyFromFunctionDiscovererImpl discoverer = + new ClusterTopologyFromFunctionDiscovererImpl(clusterConfig); + + + List instances = discoverer.discoverTarantoolInstances(DISCOVER_TIMEOUT_MILLIS); + + + assertNotNull(instances); + assertEquals(2, instances.size()); + assertTrue(instances.contains(TarantoolInstanceInfo.create("localhost:80"))); + assertTrue(instances.contains(TarantoolInstanceInfo.create("127.0.0.1:3301"))); + } + + @Test + @DisplayName("Gracefully process case when the function returns empty node list") + void testFunctionReturnedEmptyList() { + String functionCode = "function " + INFO_FUNCTION_NAME + "() return {} end"; + //inject the function + control.openConsole(INSTANCE_NAME).exec(functionCode); + ClusterTopologyFromFunctionDiscovererImpl discoverer = + new ClusterTopologyFromFunctionDiscovererImpl(clusterConfig); + + + List instances = discoverer.discoverTarantoolInstances(DISCOVER_TIMEOUT_MILLIS); + + + assertNotNull(instances); + assertTrue(instances.isEmpty()); + } +} \ No newline at end of file diff --git a/src/test/java/org/tarantool/cluster/ClusterTopologyFromShardDiscovererImplTest.java b/src/test/java/org/tarantool/cluster/ClusterTopologyFromShardDiscovererImplTest.java new file mode 100644 index 00000000..511b4742 --- /dev/null +++ b/src/test/java/org/tarantool/cluster/ClusterTopologyFromShardDiscovererImplTest.java @@ -0,0 +1,27 @@ +package org.tarantool.cluster; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.tarantool.TarantoolClusterClientConfig; +import org.tarantool.server.TarantoolInstanceInfo; + +@Deprecated +class ClusterTopologyFromShardDiscovererImplTest { + + @DisplayName("Test that a list which describes the topology is fetched correctly") + @Test + void testListFetching() { + //todo + TarantoolInstanceInfo tarantoolInstanceInfo = TarantoolInstanceInfo + .create("localhost:3301", "testUsername", "testPassword"); + TarantoolClusterClientConfig clientConfig = new TarantoolClusterClientConfig(); + + clientConfig.username = "storage"; + clientConfig.password = "storage"; + +// Collection tarantoolNodes = +// new ClusterTopologyFromShardDiscovererImpl(clientConfig) +// .discoverTarantoolInstances(tarantoolInstanceInfo, 5000); + int i = 0; + } +} \ No newline at end of file diff --git a/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java b/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java index e02c0e03..7fc6493e 100644 --- a/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java +++ b/src/test/java/org/tarantool/jdbc/JdbcExceptionHandlingTest.java @@ -5,6 +5,7 @@ import org.junit.jupiter.api.function.ThrowingConsumer; import org.tarantool.CommunicationException; import org.tarantool.TarantoolConnection; +import org.tarantool.server.TarantoolBinaryPacket; import java.io.IOException; import java.net.InetSocketAddress; @@ -32,8 +33,8 @@ import static org.mockito.Mockito.verify; import static org.tarantool.jdbc.SQLDatabaseMetadata.FORMAT_IDX; import static org.tarantool.jdbc.SQLDatabaseMetadata.INDEX_FORMAT_IDX; -import static org.tarantool.jdbc.SQLDatabaseMetadata.SPACE_ID_IDX; import static org.tarantool.jdbc.SQLDatabaseMetadata.SPACES_MAX; +import static org.tarantool.jdbc.SQLDatabaseMetadata.SPACE_ID_IDX; import static org.tarantool.jdbc.SQLDatabaseMetadata._VINDEX; import static org.tarantool.jdbc.SQLDatabaseMetadata._VSPACE; import static org.tarantool.jdbc.SQLDriver.PROP_SOCKET_TIMEOUT; @@ -297,8 +298,8 @@ class TestTarantoolConnection extends TarantoolConnection { super(null, null, mock(Socket.class)); } @Override - protected void sql(String sql, Object[] bind) { - super.sql(sql, bind); + protected TarantoolBinaryPacket sql(String sql, Object[] bind) { + return super.sql(sql, bind); } } } diff --git a/src/test/java/org/tarantool/jdbc/JdbcPreparedStatementIT.java b/src/test/java/org/tarantool/jdbc/JdbcPreparedStatementIT.java index 93f99443..8e117bf7 100644 --- a/src/test/java/org/tarantool/jdbc/JdbcPreparedStatementIT.java +++ b/src/test/java/org/tarantool/jdbc/JdbcPreparedStatementIT.java @@ -189,7 +189,7 @@ public void testSetLong() throws SQLException { @Test public void testSetString() throws SQLException { makeHelper(String.class) - .setColumns(TntSqlType.CHAR, TntSqlType.VARCHAR, TntSqlType.TEXT) + .setColumns(TntSqlType.VARCHAR, TntSqlType.TEXT) .setValues(STRING_VALS) .testSetParameter(); } @@ -213,9 +213,7 @@ public void testSetDouble() throws SQLException { @Test public void testSetBigDecimal() throws SQLException { makeHelper(BigDecimal.class) - .setColumns(TntSqlType.DECIMAL, TntSqlType.DECIMAL_PREC, TntSqlType.DECIMAL_PREC_SCALE, - TntSqlType.NUMERIC, TntSqlType.NUMERIC_PREC, TntSqlType.NUMERIC_PREC_SCALE, - TntSqlType.NUM, TntSqlType.NUM_PREC, TntSqlType.NUM_PREC_SCALE) + .setColumns(TntSqlType.REAL, TntSqlType.FLOAT, TntSqlType.DOUBLE) .setValues(BIGDEC_VALS) .testSetParameter(); } @@ -224,7 +222,7 @@ public void testSetBigDecimal() throws SQLException { @Test public void testSetByteArray() throws SQLException { makeHelper(byte[].class) - .setColumns(TntSqlType.BLOB) + .setColumns(TntSqlType.SCALAR) .setValues(BINARY_VALS) .testSetParameter(); } diff --git a/src/test/java/org/tarantool/jdbc/JdbcResultSetIT.java b/src/test/java/org/tarantool/jdbc/JdbcResultSetIT.java index c81f4edf..d7c53e1d 100644 --- a/src/test/java/org/tarantool/jdbc/JdbcResultSetIT.java +++ b/src/test/java/org/tarantool/jdbc/JdbcResultSetIT.java @@ -85,9 +85,7 @@ public void testGetLongColumn() throws SQLException { @Test public void testGetBigDecimalColumn() throws SQLException { makeHelper(BigDecimal.class) - .setColumns(TntSqlType.DECIMAL, TntSqlType.DECIMAL_PREC, TntSqlType.DECIMAL_PREC_SCALE, - TntSqlType.NUMERIC, TntSqlType.NUMERIC_PREC, TntSqlType.NUMERIC_PREC_SCALE, - TntSqlType.NUM, TntSqlType.NUM_PREC, TntSqlType.NUM_PREC_SCALE) + .setColumns(TntSqlType.REAL, TntSqlType.FLOAT, TntSqlType.DOUBLE) .setValues(BIGDEC_VALS) .testGetColumn(); } @@ -111,7 +109,7 @@ public void testGetDoubleColumn() throws SQLException { @Test public void testGetStringColumn() throws SQLException { makeHelper(String.class) - .setColumns(TntSqlType.CHAR, TntSqlType.VARCHAR, TntSqlType.TEXT) + .setColumns(TntSqlType.VARCHAR, TntSqlType.TEXT) .setValues(STRING_VALS) .testGetColumn(); } @@ -119,7 +117,7 @@ public void testGetStringColumn() throws SQLException { @Test public void testGetByteArrayColumn() throws SQLException { makeHelper(byte[].class) - .setColumns(TntSqlType.BLOB) + .setColumns(TntSqlType.SCALAR) .setValues(BINARY_VALS) .testGetColumn(); } diff --git a/src/test/java/org/tarantool/jdbc/TntSqlType.java b/src/test/java/org/tarantool/jdbc/TntSqlType.java index 66d518c2..16f078ab 100644 --- a/src/test/java/org/tarantool/jdbc/TntSqlType.java +++ b/src/test/java/org/tarantool/jdbc/TntSqlType.java @@ -4,37 +4,19 @@ * Enumeration of SQL types recognizable by tarantool. */ public enum TntSqlType { - FLOAT("FLOAT"), + FLOAT("FLOAT"), DOUBLE("DOUBLE"), - REAL("REAL"), INT("INT"), INTEGER("INTEGER"), - DECIMAL("DECIMAL"), - DECIMAL_PREC("DECIMAL(20)"), - DECIMAL_PREC_SCALE("DECIMAL(20,10)"), - - NUMERIC("NUMERIC"), - NUMERIC_PREC("NUMERIC(20)"), - NUMERIC_PREC_SCALE("NUMERIC(20,10)"), - - NUM("NUM"), - NUM_PREC("NUM(20)"), - NUM_PREC_SCALE("NUM(20,10)"), - - CHAR("CHAR(128)"), - VARCHAR("VARCHAR(128)"), - TEXT("TEXT"), - BLOB("BLOB"); + SCALAR("SCALAR"); - //DATE("DATE"), - //TIME("TIME"), //TIMESTAMP("TIMESTAMP"), public String sqlType; diff --git a/src/test/java/org/tarantool/server/TarantoolInstanceInfoTest.java b/src/test/java/org/tarantool/server/TarantoolInstanceInfoTest.java new file mode 100644 index 00000000..818f25c1 --- /dev/null +++ b/src/test/java/org/tarantool/server/TarantoolInstanceInfoTest.java @@ -0,0 +1,23 @@ +package org.tarantool.server; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import static org.junit.jupiter.api.Assertions.assertThrows; + +class TarantoolInstanceInfoTest { + + @DisplayName("Test that a TarantoolInstanceInfo throws an illegal argument exception" + + "in case when it's being created with wrong address string") + @ParameterizedTest + @ValueSource(strings = { + "hostname: 333", + "127.0.0.1:333333" + }) + void testThrowsExceptionInCaseOfInvalidStringAddress(String address) { + assertThrows(IllegalArgumentException.class, + () -> TarantoolInstanceInfo.create(address, "validUsername", "validPassword"), + "We expect the code under test to throw an IllegalArgumentException, but it didn't"); + } +} \ No newline at end of file diff --git a/src/test/perf/org/tarantool/DodgeSocketChannel.java b/src/test/perf/org/tarantool/DodgeSocketChannel.java new file mode 100644 index 00000000..a8dd86f8 --- /dev/null +++ b/src/test/perf/org/tarantool/DodgeSocketChannel.java @@ -0,0 +1,152 @@ +package org.tarantool; + +import java.io.*; +import java.net.*; +import java.nio.*; +import java.nio.channels.*; +import java.nio.channels.spi.*; +import java.util.*; +import java.util.concurrent.*; + +public class DodgeSocketChannel extends SocketChannel { + + private ArrayBlockingQueue blockingQueue; + + public static String FAKE_WELCOME_STRING = "Tarantool 1.10.2 (Binary) 53b5547d-0560-4383-b303-4861572d4517\n" + + "1bsTc5Ibljs94bsexVze+ZngV1vBJcstoYDxSTa9h8k="; + private boolean isWellcomePerformed = false; + + public DodgeSocketChannel(Integer queueSize) { + super(null); + this.blockingQueue = new ArrayBlockingQueue<>(queueSize); + } + + /** + * Initializes a new instance of this class. + * + * @param provider The provider that created this channel + */ + protected DodgeSocketChannel(SelectorProvider provider) { + super(provider); + } + + @Override + public SocketChannel bind(SocketAddress local) throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public SocketChannel setOption(SocketOption name, T value) throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public T getOption(SocketOption name) throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public Set> supportedOptions() { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public SocketChannel shutdownInput() throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public SocketChannel shutdownOutput() throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public Socket socket() { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public boolean isConnected() { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public boolean isConnectionPending() { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public boolean connect(SocketAddress remote) throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public boolean finishConnect() throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public SocketAddress getRemoteAddress() throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public int read(ByteBuffer dst) throws IOException { + synchronized (this) { + if (isWellcomePerformed) { + return dodgeRead(dst); + } else { + byte[] bytes = FAKE_WELCOME_STRING.getBytes(); + dst.put(bytes); + isWellcomePerformed = true; + return bytes.length; + } + } + } + + private int dodgeRead(ByteBuffer dst) { + try { + byte[] bytes = blockingQueue.take(); + dst.put(bytes); + return bytes.length; + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public int write(ByteBuffer src) throws IOException { + synchronized (this) { + if (isWellcomePerformed) { + blockingQueue.add(src.array()); + } + return src.array().length; + } +// throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public long write(ByteBuffer[] srcs, int offset, int length) throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + public SocketAddress getLocalAddress() throws IOException { + throw new UnsupportedOperationException("This operation is not implemented"); + } + + @Override + protected void implCloseSelectableChannel() throws IOException { + + } + + @Override + protected void implConfigureBlocking(boolean block) throws IOException { + + } +} diff --git a/src/test/perf/org/tarantool/DodgeSocketChannelTest.java b/src/test/perf/org/tarantool/DodgeSocketChannelTest.java new file mode 100644 index 00000000..76e9dcfe --- /dev/null +++ b/src/test/perf/org/tarantool/DodgeSocketChannelTest.java @@ -0,0 +1,43 @@ +package org.tarantool; + +import org.junit.jupiter.api.*; + +import java.io.*; +import java.nio.*; + +public class DodgeSocketChannelTest { + + + @Test + void testReadWellcome() throws IOException { + DodgeSocketChannel dodgeSocketChannel = new DodgeSocketChannel(10); + + ByteBuffer buffer = readWellcome(dodgeSocketChannel); + + Assertions.assertEquals(DodgeSocketChannel.FAKE_WELCOME_STRING, new String(buffer.array())); + } + + private ByteBuffer readWellcome(DodgeSocketChannel dodgeSocketChannel) throws IOException { + ByteBuffer buffer = ByteBuffer.wrap(new byte[DodgeSocketChannel.FAKE_WELCOME_STRING.length()]); + dodgeSocketChannel.read(buffer); + return buffer; + } + + @Test + void testCorrectDodge() throws IOException { + DodgeSocketChannel dodgeSocketChannel = new DodgeSocketChannel(10); + + readWellcome(dodgeSocketChannel); + + dodgeSocketChannel.write(ByteBuffer.wrap("one".getBytes())); + dodgeSocketChannel.write(ByteBuffer.wrap("two".getBytes())); + + ByteBuffer oneBuffer = ByteBuffer.allocate("one".getBytes().length); + ByteBuffer twoBuffer = ByteBuffer.allocate("two".getBytes().length); + dodgeSocketChannel.read(oneBuffer); + dodgeSocketChannel.read(twoBuffer); + + Assertions.assertEquals("one", new String(oneBuffer.array())); + Assertions.assertEquals("two", new String(twoBuffer.array())); + } +} diff --git a/src/test/perf/org/tarantool/MyBenchmark.java b/src/test/perf/org/tarantool/MyBenchmark.java new file mode 100644 index 00000000..e188e75c --- /dev/null +++ b/src/test/perf/org/tarantool/MyBenchmark.java @@ -0,0 +1,88 @@ +package org.tarantool; + +import org.openjdk.jmh.annotations.*; +import org.tarantool.server.TarantoolInstanceConnection; + +import java.io.IOException; +import java.nio.channels.*; +import java.util.*; +import java.util.concurrent.*; + +public class MyBenchmark { + + private static class DodgeSocketChannelProvider implements SocketChannelProvider { + + private final Integer defaultSocketQueueSize; + + private DodgeSocketChannelProvider(Integer defaultSocketQueueSize) { + this.defaultSocketQueueSize = defaultSocketQueueSize; + } + + @Override + public SocketChannel get() { + return new DodgeSocketChannel(defaultSocketQueueSize); + } + } + + /** + * todo + */ + private static class DodgeCommunication implements InstanceConnectionProvider { + @Override + public TarantoolInstanceConnection connect() throws IOException { + return null; + } + + @Override + public String getDescription() { + return null; + } + } + + + @State(Scope.Benchmark) + public static class SpeedOfWriteAndReadState { + + public TarantoolClientImpl tarantoolClient; + + @Setup(Level.Invocation) + public void init() { + System.out.println("INIT BENCHMARK"); + TarantoolClientConfig config = new TarantoolClientConfig(); + this.tarantoolClient = new TarantoolClientImpl(new DodgeCommunication(), config); + } + + @TearDown(Level.Invocation) + public void tearDown() { + System.out.println("TEARDOWN BENCHMARK"); + tarantoolClient.close(); + } + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(TimeUnit.MINUTES) + public void measureSpeedOfWriteAndReadViaSharedBuffer(SpeedOfWriteAndReadState state) { + try { + System.out.println("RUN BENCHMARK. state is " + state); + state.tarantoolClient.asyncOps().insert(1, Arrays.asList("a", "b")).get(); + } catch (Exception e) { + throw new IllegalArgumentException("Exception occurred while benchmarking", e); + } + } + + @State(Scope.Thread) + public static class MyState { + public int a = 1; + public int b = 2; + public int sum ; + } + + @Benchmark + @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.MINUTES) + public void testMethod(MyState state) { + state.sum = state.a + state.b; + System.out.println("Run from: " + Thread.currentThread().getName()); + } + +}