Skip to content

Commit ce414d3

Browse files
author
Zhen
committed
Remove the two-direction reference between connection and packing protocol.
Background: A connection holds a socketClient. A socketClient has a protocol which specifies what protocol version the driver should work on. When a connection is created in a pool, the connection will first be `started` (where the socketClient will negotiate protocol version with the server) and then `initialized` (where the server version will be sent back). Problem description: The problem that we need to solve here is that for 3.2+ servers, the protocol the driver is working on should allow bytes, while for server version lower than 3.2.0, the protocol should disallow bytes to be sent to prevent from crashing server connection in a bad way. While the server version is only known after the protocol is created, so after the server version arrived the protocol need either be updated (this commit) or the protocol need to be informed (previous impl). Solution: This commit changed the behaviour in connection to update protocol regarding bytes support right after server version is received. By doing in this way we avoid some two-direction reference between connection and protocol.
1 parent cff2973 commit ce414d3

25 files changed

+142
-200
lines changed

driver/src/main/java/org/neo4j/driver/internal/cluster/RoutingPooledConnection.java

-6
Original file line numberDiff line numberDiff line change
@@ -220,12 +220,6 @@ public BoltServerAddress boltServerAddress()
220220
return delegate.boltServerAddress();
221221
}
222222

223-
@Override
224-
public boolean supportsBytes()
225-
{
226-
return delegate.supportsBytes();
227-
}
228-
229223
@Override
230224
public long lastUsedTimestamp()
231225
{

driver/src/main/java/org/neo4j/driver/internal/messaging/MessageFormat.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package org.neo4j.driver.internal.messaging;
2020

21-
import org.neo4j.driver.internal.spi.Connection;
22-
2321
import java.io.IOException;
2422
import java.nio.channels.ReadableByteChannel;
2523
import java.nio.channels.WritableByteChannel;
@@ -44,9 +42,7 @@ interface Reader
4442

4543
}
4644

47-
Writer newWriter( WritableByteChannel ch, Connection connection);
48-
49-
Writer newWriter( WritableByteChannel ch );
45+
Writer newWriter( WritableByteChannel ch, boolean byteArraySupportEnabled );
5046

5147
Reader newReader( ReadableByteChannel ch );
5248

driver/src/main/java/org/neo4j/driver/internal/messaging/PackStreamMessageFormatV1.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.neo4j.driver.internal.packstream.PackOutput;
3838
import org.neo4j.driver.internal.packstream.PackStream;
3939
import org.neo4j.driver.internal.packstream.PackType;
40-
import org.neo4j.driver.internal.spi.Connection;
40+
import org.neo4j.driver.internal.packstream.ByteArrayIncompatiblePacker;
4141
import org.neo4j.driver.internal.util.Iterables;
4242
import org.neo4j.driver.internal.value.InternalValue;
4343
import org.neo4j.driver.internal.value.ListValue;
@@ -80,16 +80,10 @@ public class PackStreamMessageFormatV1 implements MessageFormat
8080
private static final Map<String,Value> EMPTY_STRING_VALUE_MAP = new HashMap<>( 0 );
8181

8282
@Override
83-
public MessageFormat.Writer newWriter( WritableByteChannel ch, Connection connection )
83+
public MessageFormat.Writer newWriter( WritableByteChannel ch, boolean byteArraySupportEnabled )
8484
{
85-
ChunkedOutput output = new ChunkedOutput(ch, connection);
86-
return new Writer( output, output.messageBoundaryHook() );
87-
}
88-
89-
@Override
90-
public MessageFormat.Writer newWriter( WritableByteChannel ch )
91-
{
92-
return newWriter( ch, null );
85+
ChunkedOutput output = new ChunkedOutput( ch );
86+
return new Writer( output, output.messageBoundaryHook(), byteArraySupportEnabled );
9387
}
9488

9589
@Override
@@ -113,11 +107,19 @@ public static class Writer implements MessageFormat.Writer, MessageHandler
113107
/**
114108
* @param output interface to write messages to
115109
* @param onMessageComplete invoked for each message, after it's done writing to the output
110+
* @param byteArraySupportEnabled specify if support to pack/write byte array to server
116111
*/
117-
public Writer( PackOutput output, Runnable onMessageComplete )
112+
public Writer( PackOutput output, Runnable onMessageComplete, boolean byteArraySupportEnabled )
118113
{
119114
this.onMessageComplete = onMessageComplete;
120-
packer = new PackStream.Packer( output );
115+
if( byteArraySupportEnabled )
116+
{
117+
packer = new PackStream.Packer( output );
118+
}
119+
else
120+
{
121+
packer = new ByteArrayIncompatiblePacker( output );
122+
}
121123
}
122124

123125
@Override

driver/src/main/java/org/neo4j/driver/internal/net/ChunkedOutput.java

+3-12
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.nio.channels.WritableByteChannel;
2424

2525
import org.neo4j.driver.internal.packstream.PackOutput;
26-
import org.neo4j.driver.internal.spi.Connection;
2726
import org.neo4j.driver.v1.exceptions.ClientException;
2827

2928
import static java.lang.Math.max;
@@ -33,7 +32,6 @@ public class ChunkedOutput implements PackOutput
3332
public static final short MESSAGE_BOUNDARY = 0;
3433
public static final int CHUNK_HEADER_SIZE = 2;
3534

36-
private final Connection connection;
3735
private final ByteBuffer buffer;
3836
private final WritableByteChannel channel;
3937

@@ -42,14 +40,13 @@ public class ChunkedOutput implements PackOutput
4240
/** Are currently in the middle of writing a chunk? */
4341
private boolean chunkOpen = false;
4442

45-
public ChunkedOutput( WritableByteChannel ch, Connection connection )
43+
public ChunkedOutput( WritableByteChannel ch )
4644
{
47-
this(8192, ch, connection);
45+
this( 8192, ch );
4846
}
4947

50-
public ChunkedOutput( int bufferSize, WritableByteChannel ch, Connection connection )
48+
public ChunkedOutput( int bufferSize, WritableByteChannel ch )
5149
{
52-
this.connection = connection;
5350
buffer = ByteBuffer.allocate( max( 16, bufferSize ) );
5451
chunkOpen = false;
5552
channel = ch;
@@ -124,12 +121,6 @@ public PackOutput writeBytes( byte[] data, int offset, int length ) throws IOExc
124121
return this;
125122
}
126123

127-
@Override
128-
public boolean supportsBytes()
129-
{
130-
return connection != null && connection.supportsBytes();
131-
}
132-
133124
private void closeChunkIfOpen()
134125
{
135126
if( chunkOpen )

driver/src/main/java/org/neo4j/driver/internal/net/ConcurrencyGuardingConnection.java

-6
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,4 @@ public BoltServerAddress boltServerAddress()
223223
{
224224
return delegate.boltServerAddress();
225225
}
226-
227-
@Override
228-
public boolean supportsBytes()
229-
{
230-
return delegate.supportsBytes();
231-
}
232226
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketClient.java

+20-8
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,15 @@
2727
import org.neo4j.driver.internal.messaging.Message;
2828
import org.neo4j.driver.internal.messaging.MessageFormat;
2929
import org.neo4j.driver.internal.security.SecurityPlan;
30-
import org.neo4j.driver.internal.spi.Connection;
3130
import org.neo4j.driver.internal.util.BytePrinter;
3231
import org.neo4j.driver.v1.Logger;
3332
import org.neo4j.driver.v1.exceptions.ClientException;
3433
import org.neo4j.driver.v1.exceptions.ServiceUnavailableException;
3534

3635
import static java.lang.String.format;
3736
import static java.nio.ByteOrder.BIG_ENDIAN;
37+
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
38+
import static org.neo4j.driver.internal.util.ServerVersion.version;
3839

3940
public class SocketClient
4041
{
@@ -44,7 +45,6 @@ public class SocketClient
4445
private static final int NO_VERSION = 0;
4546
private static final int[] SUPPORTED_VERSIONS = new int[]{VERSION1, NO_VERSION, NO_VERSION, NO_VERSION};
4647

47-
private final Connection connection;
4848
private final BoltServerAddress address;
4949
private final SecurityPlan securityPlan;
5050
private final int timeoutMillis;
@@ -56,9 +56,8 @@ public class SocketClient
5656

5757
private ByteChannel channel;
5858

59-
public SocketClient( Connection connection, BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logger logger )
59+
public SocketClient( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logger logger )
6060
{
61-
this.connection = connection;
6261
this.address = address;
6362
this.securityPlan = securityPlan;
6463
this.timeoutMillis = timeoutMillis;
@@ -126,9 +125,7 @@ public void start()
126125
{
127126
setChannel( ChannelFactory.create( address, securityPlan, timeoutMillis, logger ) );
128127
}
129-
protocol = negotiateProtocol();
130-
reader = protocol.reader();
131-
writer = protocol.writer();
128+
setProtocol( negotiateProtocol() );
132129
}
133130
catch ( ConnectException e )
134131
{
@@ -142,6 +139,21 @@ public void start()
142139
}
143140
}
144141

142+
public void updateProtocol( String serverVersion )
143+
{
144+
if( version( serverVersion ).lessThan( v3_2_0 ) )
145+
{
146+
setProtocol( SocketProtocolV1.create( channel, false ) );
147+
}
148+
}
149+
150+
private void setProtocol( SocketProtocol protocol )
151+
{
152+
this.protocol = protocol;
153+
this.reader = protocol.reader();
154+
this.writer = protocol.writer();
155+
}
156+
145157
public void send( Queue<Message> messages ) throws IOException
146158
{
147159
int messageCount = 0;
@@ -258,7 +270,7 @@ private SocketProtocol negotiateProtocol() throws IOException
258270
{
259271
case VERSION1:
260272
logger.debug( "S: [HANDSHAKE] -> 1" );
261-
return new SocketProtocolV1( connection, channel );
273+
return SocketProtocolV1.create( channel );
262274
case NO_VERSION:
263275
throw new ClientException( "The server does not support any of the protocol versions supported by " +
264276
"this driver. Ensure that you are using driver and server versions that " +

driver/src/main/java/org/neo4j/driver/internal/net/SocketConnection.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class SocketConnection implements Connection
6161
SocketConnection( BoltServerAddress address, SecurityPlan securityPlan, int timeoutMillis, Logging logging )
6262
{
6363
Logger logger = new DelegatingLogger( logging.getLog( LOG_NAME ), String.valueOf( hashCode() ) );
64-
this.socket = new SocketClient( this, address, securityPlan, timeoutMillis, logger );
64+
this.socket = new SocketClient( address, securityPlan, timeoutMillis, logger );
6565
this.responseHandler = createResponseHandler( logger );
6666

6767
startSocketClient();
@@ -117,6 +117,7 @@ public void init( String clientName, Map<String,Value> authToken )
117117
queueMessage( new InitMessage( clientName, authToken ), initCollector );
118118
sync();
119119
this.serverInfo = new InternalServerInfo( socket.address(), initCollector.serverVersion() );
120+
socket.updateProtocol( serverInfo.version() );
120121
}
121122

122123
@Override
@@ -167,6 +168,7 @@ public synchronized void flush()
167168
}
168169
catch ( IOException e )
169170
{
171+
close();
170172
throw new ServiceUnavailableException( "Unable to send messages to server: " + e.getMessage(), e );
171173
}
172174
}
@@ -303,10 +305,4 @@ public BoltServerAddress boltServerAddress()
303305
{
304306
return this.serverInfo.boltServerAddress();
305307
}
306-
307-
@Override
308-
public boolean supportsBytes()
309-
{
310-
return this.serverInfo.atLeast("Neo4j", 3, 2);
311-
}
312308
}

driver/src/main/java/org/neo4j/driver/internal/net/SocketProtocolV1.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -18,30 +18,35 @@
1818
*/
1919
package org.neo4j.driver.internal.net;
2020

21-
import java.io.IOException;
2221
import java.nio.channels.ByteChannel;
2322

2423
import org.neo4j.driver.internal.messaging.MessageFormat;
2524
import org.neo4j.driver.internal.messaging.MessageFormat.Reader;
2625
import org.neo4j.driver.internal.messaging.MessageFormat.Writer;
2726
import org.neo4j.driver.internal.messaging.PackStreamMessageFormatV1;
28-
import org.neo4j.driver.internal.spi.Connection;
2927

3028
public class SocketProtocolV1 implements SocketProtocol
3129
{
3230
private final MessageFormat messageFormat;
3331
private final Reader reader;
3432
private final Writer writer;
3533

36-
public SocketProtocolV1( Connection connection, ByteChannel channel ) throws IOException
34+
public static SocketProtocol create( ByteChannel channel )
3735
{
38-
messageFormat = new PackStreamMessageFormatV1();
36+
/*by default the byte array support is enabled*/
37+
return create( channel, true );
38+
}
3939

40-
ChunkedOutput output = new ChunkedOutput(channel, connection);
41-
BufferingChunkedInput input = new BufferingChunkedInput( channel );
40+
public static SocketProtocol create( ByteChannel channel, boolean byteArraySupportEnabled )
41+
{
42+
return new SocketProtocolV1( channel, byteArraySupportEnabled );
43+
}
4244

43-
this.writer = new PackStreamMessageFormatV1.Writer( output, output.messageBoundaryHook() );
44-
this.reader = new PackStreamMessageFormatV1.Reader( input, input.messageBoundaryHook() );
45+
public SocketProtocolV1( ByteChannel channel, boolean byteArraySupportEnabled )
46+
{
47+
messageFormat = new PackStreamMessageFormatV1();
48+
this.writer = messageFormat.newWriter( channel, byteArraySupportEnabled );
49+
this.reader = messageFormat.newReader( channel );
4550
}
4651

4752
@Override

driver/src/main/java/org/neo4j/driver/internal/net/pooling/PooledSocketConnection.java

-12
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,12 @@
2222

2323
import org.neo4j.driver.internal.SessionResourcesHandler;
2424
import org.neo4j.driver.internal.net.BoltServerAddress;
25-
import org.neo4j.driver.internal.packstream.PackStream;
2625
import org.neo4j.driver.internal.spi.Collector;
2726
import org.neo4j.driver.internal.spi.Connection;
2827
import org.neo4j.driver.internal.spi.PooledConnection;
2928
import org.neo4j.driver.internal.util.Clock;
3029
import org.neo4j.driver.internal.util.Consumer;
3130
import org.neo4j.driver.v1.Value;
32-
import org.neo4j.driver.v1.exceptions.ClientException;
3331
import org.neo4j.driver.v1.exceptions.Neo4jException;
3432
import org.neo4j.driver.v1.summary.ServerInfo;
3533

@@ -171,10 +169,6 @@ public void flush()
171169
{
172170
delegate.flush();
173171
}
174-
catch ( PackStream.BytesNotSupportedException e )
175-
{
176-
throw new ClientException("PackStream BYTES are not supported by this server");
177-
}
178172
catch ( RuntimeException e )
179173
{
180174
onDelegateException( e );
@@ -251,12 +245,6 @@ public BoltServerAddress boltServerAddress()
251245
return delegate.boltServerAddress();
252246
}
253247

254-
@Override
255-
public boolean supportsBytes()
256-
{
257-
return delegate.supportsBytes();
258-
}
259-
260248
@Override
261249
public void dispose()
262250
{
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright (c) 2002-2017 "Neo Technology,"
3+
* Network Engine for Objects in Lund AB [http://neotechnology.com]
4+
*
5+
* This file is part of Neo4j.
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
package org.neo4j.driver.internal.packstream;
20+
21+
import java.io.IOException;
22+
23+
public class ByteArrayIncompatiblePacker extends PackStream.Packer
24+
{
25+
public ByteArrayIncompatiblePacker( PackOutput out )
26+
{
27+
super( out );
28+
}
29+
30+
public void packBytesHeader( int size ) throws IOException
31+
{
32+
throw new PackStream.UnPackable( "Packing bytes is not supported " +
33+
"as the current server this driver connected to does not support unpack bytes." );
34+
}
35+
}

driver/src/main/java/org/neo4j/driver/internal/packstream/PackOutput.java

-3
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,4 @@ public interface PackOutput
4545

4646
/** Produce an 8-byte IEEE 754 "double format" floating-point number */
4747
PackOutput writeDouble( double value ) throws IOException;
48-
49-
/** Return a boolean indicating whether or not this output channel supports the PackStream BYTES type. */
50-
boolean supportsBytes();
5148
}

0 commit comments

Comments
 (0)