Skip to content

Soft automatic schema reload #200

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Feel free to override any method of `TarantoolClientImpl`. For example, to hook
all the results, you could override this:

```java
protected void complete(TarantoolPacket packet, TarantoolOp<?> future);
protected void complete(TarantoolPacket packet, CompletableFuture<?> future);
```

### Client config options
Expand Down Expand Up @@ -181,6 +181,55 @@ Supported options are follow:
14. `operationExpiryTimeMillis` is a default request timeout in ms.
Default value is `1000` (1 second).

## String space/index resolution

Each operation that requires space or index to be executed, can work with
number ID as well as string name of a space or an index.
Assume, we have `my_space` space with space ID `512` and its primary index
`primary` with index ID `0`. Then, for instance, `select` operations can be
performed using their names:

```java
client.syncOps().select(512, 0, Collections.singletonList(1), 0, 1, Iterator.EQ);
// or using more convenient way
client.syncOps().select("my_space", "primary", Collections.singletonList(1), 0, 1, Iterator.EQ);
```

Because _iproto_ has not yet supported string spaces and indexes, a client caches current server
schema in memory. The client relies on protocol SCHEMA_ID and sends each request with respect to
cached schema version. The schema is used primarily to resolve string names of spaces or indexes
against its integer IDs.

### Schema update

1. Just after a (re-)connection to the Tarantool instance.
The client cannot guarantee that new instance is the same and has same schema,
thus, the client drops the cached schema and fetches new one.
2. Receiving a schema version error as a response to our request.
It's possible some request can be rejected by server because of schema
mismatching between client and server. In this case the schema will be
reloaded and the refused request will be resent using the updated schema
version.
3. Sending a DDL request and receiving a new version in a response.
4. Sending a request against a non-existent space/index name.
The client cannot exactly know whether name was not found because of
it does not exist or it has not the latest schema version. A ping request
is sent in the case to check a schema version and then a client will reload
it if needed. The original request will be retried if a space / an index
name will be found in a new schema.

### Schema support caveats

1. Each schema reloading requires at least two extra requests to fetch spaces and
indexes metadata respectively. There is also a ping request followed by reloading
of the schema to check whether the client has outdated version (see point 4 in
[Schema update](#schema-update)).
2. In some circumstance, requests can be rejected several times until both client's
and server's versions matches. It may take significant amount of time or even be
a cause of request timeout.
3. The client guarantees an order of synchronous requests per thread. Other cases such
as asynchronous or multi-threaded requests may be out of order before the execution.

## Spring NamedParameterJdbcTemplate usage example

The JDBC driver uses `TarantoolClient` implementation to provide a communication with server.
Expand Down
192 changes: 167 additions & 25 deletions src/main/java/org/tarantool/AbstractTarantoolOps.java
Original file line number Diff line number Diff line change
@@ -1,62 +1,204 @@
package org.tarantool;

import static org.tarantool.TarantoolRequestArgumentFactory.cacheLookupValue;
import static org.tarantool.TarantoolRequestArgumentFactory.value;

public abstract class AbstractTarantoolOps<Space, Tuple, Operation, Result>
implements TarantoolClientOps<Space, Tuple, Operation, Result> {
import org.tarantool.schema.TarantoolSchemaMeta;

import java.util.List;

public abstract class AbstractTarantoolOps<Result>
implements TarantoolClientOps<Integer, List<?>, Object, Result> {

private Code callCode = Code.CALL;

protected abstract Result exec(Code code, Object... args);
protected abstract Result exec(TarantoolRequest request);

protected abstract TarantoolSchemaMeta getSchemaMeta();

public Result select(Integer space, Integer index, List<?> key, int offset, int limit, Iterator iterator) {
return select(space, index, key, offset, limit, iterator.getValue());
}

public Result select(Space space, Space index, Tuple key, int offset, int limit, Iterator iterator) {
@Override
public Result select(String space, String index, List<?> key, int offset, int limit, Iterator iterator) {
return select(space, index, key, offset, limit, iterator.getValue());
}

public Result select(Space space, Space index, Tuple key, int offset, int limit, int iterator) {
@Override
public Result select(Integer space, Integer index, List<?> key, int offset, int limit, int iterator) {
return exec(
new TarantoolRequest(
Code.SELECT,
value(Key.SPACE), value(space),
value(Key.INDEX), value(index),
value(Key.KEY), value(key),
value(Key.ITERATOR), value(iterator),
value(Key.LIMIT), value(limit),
value(Key.OFFSET), value(offset)
)
);
}

@Override
public Result select(String space, String index, List<?> key, int offset, int limit, int iterator) {
return exec(
new TarantoolRequest(
Code.SELECT,
value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()),
value(Key.INDEX), cacheLookupValue(() -> getSchemaMeta().getSpaceIndex(space, index).getId()),
value(Key.KEY), value(key),
value(Key.ITERATOR), value(iterator),
value(Key.LIMIT), value(limit),
value(Key.OFFSET), value(offset)
)
);
}

@Override
public Result insert(Integer space, List<?> tuple) {
return exec(new TarantoolRequest(
Code.INSERT,
value(Key.SPACE), value(space),
value(Key.TUPLE), value(tuple)
)
);
}

@Override
public Result insert(String space, List<?> tuple) {
return exec(
new TarantoolRequest(
Code.INSERT,
value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()),
value(Key.TUPLE), value(tuple)
)
);
}

@Override
public Result replace(Integer space, List<?> tuple) {
return exec(
new TarantoolRequest(
Code.REPLACE,
value(Key.SPACE), value(space),
value(Key.TUPLE), value(tuple)
)
);
}

@Override
public Result replace(String space, List<?> tuple) {
return exec(
Code.SELECT,
Key.SPACE, space,
Key.INDEX, index,
Key.KEY, key,
Key.ITERATOR, iterator,
Key.LIMIT, limit,
Key.OFFSET, offset
new TarantoolRequest(
Code.REPLACE,
value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()),
value(Key.TUPLE), value(tuple)
)
);
}

public Result insert(Space space, Tuple tuple) {
return exec(Code.INSERT, Key.SPACE, space, Key.TUPLE, tuple);
@Override
public Result update(Integer space, List<?> key, Object... operations) {
return exec(
new TarantoolRequest(
Code.UPDATE,
value(Key.SPACE), value(space),
value(Key.KEY), value(key),
value(Key.TUPLE), value(operations)
)
);
}

public Result replace(Space space, Tuple tuple) {
return exec(Code.REPLACE, Key.SPACE, space, Key.TUPLE, tuple);
@Override
public Result update(String space, List<?> key, Object... operations) {
return exec(
new TarantoolRequest(
Code.UPDATE,
value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()),
value(Key.KEY), value(key),
value(Key.TUPLE), value(operations)
)
);
}

@Override
public Result upsert(Integer space, List<?> key, List<?> defTuple, Object... operations) {
return exec(
new TarantoolRequest(
Code.UPSERT,
value(Key.SPACE), value(space),
value(Key.KEY), value(key),
value(Key.TUPLE), value(defTuple),
value(Key.UPSERT_OPS), value(operations)
)
);
}

public Result update(Space space, Tuple key, Operation... args) {
return exec(Code.UPDATE, Key.SPACE, space, Key.KEY, key, Key.TUPLE, args);
@Override
public Result upsert(String space, List<?> key, List<?> defTuple, Object... operations) {
return exec(
new TarantoolRequest(
Code.UPSERT,
value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()),
value(Key.KEY), value(key),
value(Key.TUPLE), value(defTuple),
value(Key.UPSERT_OPS), value(operations)
)
);
}

public Result upsert(Space space, Tuple key, Tuple def, Operation... args) {
return exec(Code.UPSERT, Key.SPACE, space, Key.KEY, key, Key.TUPLE, def, Key.UPSERT_OPS, args);
@Override
public Result delete(Integer space, List<?> key) {
return exec(
new TarantoolRequest(
Code.DELETE,
value(Key.SPACE), value(space),
value(Key.KEY), value(key)
)
);
}

public Result delete(Space space, Tuple key) {
return exec(Code.DELETE, Key.SPACE, space, Key.KEY, key);
@Override
public Result delete(String space, List<?> key) {
return exec(
new TarantoolRequest(
Code.DELETE,
value(Key.SPACE), cacheLookupValue(() -> getSchemaMeta().getSpace(space).getId()),
value(Key.KEY), value(key)
)
);
}

@Override
public Result call(String function, Object... args) {
return exec(callCode, Key.FUNCTION, function, Key.TUPLE, args);
return exec(
new TarantoolRequest(
callCode,
value(Key.FUNCTION), value(function),
value(Key.TUPLE), value(args)
)
);
}

@Override
public Result eval(String expression, Object... args) {
return exec(Code.EVAL, Key.EXPRESSION, expression, Key.TUPLE, args);
return exec(
new TarantoolRequest(
Code.EVAL,
value(Key.EXPRESSION), value(expression),
value(Key.TUPLE), value(args)
)
);
}

@Override
public void ping() {
exec(Code.PING);
exec(new TarantoolRequest(Code.PING));
}

public void setCallCode(Code callCode) {
this.callCode = callCode;
}

}
10 changes: 10 additions & 0 deletions src/main/java/org/tarantool/Iterator.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.tarantool;

import java.util.Arrays;

// Iterator info was taken from here https://github.com/tarantool/tarantool/blob/f66584c3bcdffe61d6d99a4868a9b72d62338a11/src/box/iterator_type.h#L62
public enum Iterator {
EQ(0), // key == x ASC order
Expand All @@ -24,4 +26,12 @@ public enum Iterator {
public int getValue() {
return value;
}

public static Iterator valueOf(int value) {
return Arrays.stream(Iterator.values())
.filter(v -> value == v.getValue())
.findFirst()
.orElseThrow(IllegalArgumentException::new);
}

}
13 changes: 1 addition & 12 deletions src/main/java/org/tarantool/TarantoolBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
import java.io.IOException;
import java.net.Socket;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Integer, List<?>, Object, Result> {
public abstract class TarantoolBase<Result> extends AbstractTarantoolOps<Result> {
protected String serverVersion;
protected MsgPackLite msgPackLite = MsgPackLite.INSTANCE;
protected AtomicLong syncId = new AtomicLong();
Expand Down Expand Up @@ -42,16 +41,6 @@ protected void closeChannel(SocketChannel channel) {
}
}

protected void validateArgs(Object[] args) {
if (args != null) {
for (int i = 0; i < args.length; i += 2) {
if (args[i + 1] == null) {
throw new NullPointerException(((Key) args[i]).name() + " should not be null");
}
}
}
}

public void setInitialRequestSize(int initialRequestSize) {
this.initialRequestSize = initialRequestSize;
}
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/tarantool/TarantoolClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.tarantool;

import org.tarantool.schema.TarantoolSchemaMeta;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
Expand Down Expand Up @@ -29,4 +31,6 @@ public interface TarantoolClient {

boolean waitAlive(long timeout, TimeUnit unit) throws InterruptedException;

TarantoolSchemaMeta getSchemaMeta();

}
Loading