Skip to content

Byte arrays support #370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ interface Reader

}

Writer newWriter( WritableByteChannel ch );
Writer newWriter( WritableByteChannel ch, boolean byteArraySupportEnabled );

Reader newReader( ReadableByteChannel ch );

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.neo4j.driver.internal.packstream.PackOutput;
import org.neo4j.driver.internal.packstream.PackStream;
import org.neo4j.driver.internal.packstream.PackType;
import org.neo4j.driver.internal.packstream.ByteArrayIncompatiblePacker;
import org.neo4j.driver.internal.util.Iterables;
import org.neo4j.driver.internal.value.InternalValue;
import org.neo4j.driver.internal.value.ListValue;
Expand Down Expand Up @@ -79,10 +80,10 @@ public class PackStreamMessageFormatV1 implements MessageFormat
private static final Map<String,Value> EMPTY_STRING_VALUE_MAP = new HashMap<>( 0 );

@Override
public MessageFormat.Writer newWriter( WritableByteChannel ch )
public MessageFormat.Writer newWriter( WritableByteChannel ch, boolean byteArraySupportEnabled )
{
ChunkedOutput output = new ChunkedOutput( ch );
return new Writer( output, output.messageBoundaryHook() );
return new Writer( output, output.messageBoundaryHook(), byteArraySupportEnabled );
}

@Override
Expand All @@ -106,11 +107,19 @@ public static class Writer implements MessageFormat.Writer, MessageHandler
/**
* @param output interface to write messages to
* @param onMessageComplete invoked for each message, after it's done writing to the output
* @param byteArraySupportEnabled specify if support to pack/write byte array to server
*/
public Writer( PackOutput output, Runnable onMessageComplete )
public Writer( PackOutput output, Runnable onMessageComplete, boolean byteArraySupportEnabled )
{
this.onMessageComplete = onMessageComplete;
packer = new PackStream.Packer( output );
if( byteArraySupportEnabled )
{
packer = new PackStream.Packer( output );
}
else
{
packer = new ByteArrayIncompatiblePacker( output );
}
}

@Override
Expand Down Expand Up @@ -223,6 +232,10 @@ private void packValue( Value value ) throws IOException
packer.packNull();
break;

case BYTES_TyCon:
packer.pack( value.asByteArray() );
break;

case STRING_TyCon:
packer.pack( value.asString() );
break;
Expand Down Expand Up @@ -502,8 +515,6 @@ private Value unpackValue() throws IOException
PackType type = unpacker.peekNextType();
switch ( type )
{
case BYTES:
break;
case NULL:
return value( unpacker.unpackNull() );
case BOOLEAN:
Expand All @@ -512,6 +523,8 @@ private Value unpackValue() throws IOException
return value( unpacker.unpackLong() );
case FLOAT:
return value( unpacker.unpackDouble() );
case BYTES:
return value( unpacker.unpackBytes() );
case STRING:
return value( unpacker.unpackString() );
case MAP:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public class ChunkedOutput implements PackOutput
/** Are currently in the middle of writing a chunk? */
private boolean chunkOpen = false;


public ChunkedOutput( WritableByteChannel ch )
{
this( 8192, ch );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import static java.lang.String.format;
import static java.nio.ByteOrder.BIG_ENDIAN;
import static org.neo4j.driver.internal.util.ServerVersion.v3_2_0;
import static org.neo4j.driver.internal.util.ServerVersion.version;

public class SocketClient
{
Expand Down Expand Up @@ -123,9 +125,7 @@ public void start()
{
setChannel( ChannelFactory.create( address, securityPlan, timeoutMillis, logger ) );
}
protocol = negotiateProtocol();
reader = protocol.reader();
writer = protocol.writer();
setProtocol( negotiateProtocol() );
}
catch ( ConnectException e )
{
Expand All @@ -139,6 +139,21 @@ public void start()
}
}

public void updateProtocol( String serverVersion )
{
if( version( serverVersion ).lessThan( v3_2_0 ) )
{
setProtocol( SocketProtocolV1.createWithoutByteArraySupport( channel ) );
}
}

private void setProtocol( SocketProtocol protocol )
{
this.protocol = protocol;
this.reader = protocol.reader();
this.writer = protocol.writer();
}

public void send( Queue<Message> messages ) throws IOException
{
int messageCount = 0;
Expand Down Expand Up @@ -255,7 +270,7 @@ private SocketProtocol negotiateProtocol() throws IOException
{
case VERSION1:
logger.debug( "S: [HANDSHAKE] -> 1" );
return new SocketProtocolV1( channel );
return SocketProtocolV1.create( channel );
case NO_VERSION:
throw new ClientException( "The server does not support any of the protocol versions supported by " +
"this driver. Ensure that you are using driver and server versions that " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public void init( String clientName, Map<String,Value> authToken )
queueMessage( new InitMessage( clientName, authToken ), initCollector );
sync();
this.serverInfo = new InternalServerInfo( socket.address(), initCollector.serverVersion() );
socket.updateProtocol( serverInfo.version() );
}

@Override
Expand Down Expand Up @@ -167,6 +168,7 @@ public synchronized void flush()
}
catch ( IOException e )
{
close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to close here?
I think connection will be properly close by the session if it throws from here.

throw new ServiceUnavailableException( "Unable to send messages to server: " + e.getMessage(), e );
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.neo4j.driver.internal.net;

import java.io.IOException;
import java.nio.channels.ByteChannel;

import org.neo4j.driver.internal.messaging.MessageFormat;
Expand All @@ -32,15 +31,22 @@ public class SocketProtocolV1 implements SocketProtocol
private final Reader reader;
private final Writer writer;

public SocketProtocolV1( ByteChannel channel ) throws IOException
public static SocketProtocol create( ByteChannel channel )
{
messageFormat = new PackStreamMessageFormatV1();
/*by default the byte array support is enabled*/
return new SocketProtocolV1( channel, true );
}

ChunkedOutput output = new ChunkedOutput( channel );
BufferingChunkedInput input = new BufferingChunkedInput( channel );
public static SocketProtocol createWithoutByteArraySupport( ByteChannel channel )
{
return new SocketProtocolV1( channel, false );
}

this.writer = new PackStreamMessageFormatV1.Writer( output, output.messageBoundaryHook() );
this.reader = new PackStreamMessageFormatV1.Reader( input, input.messageBoundaryHook() );
private SocketProtocolV1( ByteChannel channel, boolean byteArraySupportEnabled )
{
messageFormat = new PackStreamMessageFormatV1();
this.writer = messageFormat.newWriter( channel, byteArraySupportEnabled );
this.reader = messageFormat.newReader( channel );
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.neo4j.driver.internal.packstream;

import java.io.IOException;

public class ByteArrayIncompatiblePacker extends PackStream.Packer
{
public ByteArrayIncompatiblePacker( PackOutput out )
{
super( out );
}

public void packBytesHeader( int size ) throws IOException
{
throw new PackStream.UnPackable( "Packing bytes is not supported " +
"as the current server this driver connected to does not support unpack bytes." );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -501,39 +501,38 @@ public double unpackDouble() throws IOException
throw new Unexpected( "Expected a double, but got: " + toHexString( markerByte ));
}

public String unpackString() throws IOException
public byte[] unpackBytes() throws IOException
{
final byte markerByte = in.readByte();
if( markerByte == TINY_STRING ) // Note no mask, so we compare to 0x80.
switch(markerByte)
{
return EMPTY_STRING;
case BYTES_8: return unpackRawBytes( unpackUINT8() );
case BYTES_16: return unpackRawBytes( unpackUINT16() );
case BYTES_32:
{
long size = unpackUINT32();
if ( size <= Integer.MAX_VALUE )
{
return unpackRawBytes( (int) size );
}
else
{
throw new Overflow( "BYTES_32 too long for Java" );
}
}
default: throw new Unexpected( "Expected bytes, but got: 0x" + toHexString( markerByte & 0xFF ));
}

return new String(unpackUtf8(markerByte), UTF_8);
}

public byte[] unpackBytes() throws IOException
public String unpackString() throws IOException
{
final byte markerByte = in.readByte();

switch(markerByte)
if( markerByte == TINY_STRING ) // Note no mask, so we compare to 0x80.
{
case BYTES_8: return unpackBytes( unpackUINT8() );
case BYTES_16: return unpackBytes( unpackUINT16() );
case BYTES_32:
{
long size = unpackUINT32();
if ( size <= Integer.MAX_VALUE )
{
return unpackBytes( (int) size );
}
else
{
throw new Overflow( "BYTES_32 too long for Java" );
}
}
default: throw new Unexpected( "Expected binary data, but got: 0x" + toHexString( markerByte & 0xFF ));
return EMPTY_STRING;
}

return new String(unpackUtf8(markerByte), UTF_8);
}

/**
Expand All @@ -558,17 +557,17 @@ private byte[] unpackUtf8(byte markerByte) throws IOException
final byte markerHighNibble = (byte) (markerByte & 0xF0);
final byte markerLowNibble = (byte) (markerByte & 0x0F);

if ( markerHighNibble == TINY_STRING ) { return unpackBytes( markerLowNibble ); }
if ( markerHighNibble == TINY_STRING ) { return unpackRawBytes( markerLowNibble ); }
switch(markerByte)
{
case STRING_8: return unpackBytes( unpackUINT8() );
case STRING_16: return unpackBytes( unpackUINT16() );
case STRING_8: return unpackRawBytes( unpackUINT8() );
case STRING_16: return unpackRawBytes( unpackUINT16() );
case STRING_32:
{
long size = unpackUINT32();
if ( size <= Integer.MAX_VALUE )
{
return unpackBytes( (int) size );
return unpackRawBytes( (int) size );
}
else
{
Expand Down Expand Up @@ -608,7 +607,7 @@ private long unpackUINT32() throws IOException
return in.readInt() & 0xFFFFFFFFL;
}

private byte[] unpackBytes( int size ) throws IOException
private byte[] unpackRawBytes(int size ) throws IOException
{
byte[] heapBuffer = new byte[size];
in.readBytes( heapBuffer, 0, heapBuffer.length );
Expand Down Expand Up @@ -711,5 +710,4 @@ public UnPackable( String message )
super( message );
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.neo4j.driver.internal.summary;

import org.neo4j.driver.internal.net.BoltServerAddress;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.neo4j.driver.internal.types.TypeConstructor.NUMBER_TyCon;
import static org.neo4j.driver.internal.types.TypeConstructor.PATH_TyCon;
import static org.neo4j.driver.internal.types.TypeConstructor.RELATIONSHIP_TyCon;
import static org.neo4j.driver.internal.types.TypeConstructor.BYTES_TyCon;
import static org.neo4j.driver.internal.types.TypeConstructor.STRING_TyCon;

/**
Expand All @@ -47,6 +48,7 @@ public class InternalTypeSystem implements TypeSystem

private final TypeRepresentation anyType = constructType( ANY_TyCon );
private final TypeRepresentation booleanType = constructType( BOOLEAN_TyCon );
private final TypeRepresentation bytesType = constructType( BYTES_TyCon );
private final TypeRepresentation stringType = constructType( STRING_TyCon );
private final TypeRepresentation numberType = constructType( NUMBER_TyCon );
private final TypeRepresentation integerType = constructType( INTEGER_TyCon );
Expand Down Expand Up @@ -76,6 +78,13 @@ public Type BOOLEAN()
return booleanType;
}

/** the Cypher type BYTES */
@Override
public Type BYTES()
{
return bytesType;
}

/** the Cypher type STRING */
@Override
public Type STRING()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ public String typeName()
}
},

BYTES_TyCon {
@Override
public String typeName()
{
return "BYTES";
}
},

STRING_TyCon {
@Override
public String typeName()
Expand Down
Loading