1
1
package org .tarantool ;
2
2
3
- import java .io .DataInputStream ;
4
- import java .io .DataOutputStream ;
3
+ import org .tarantool .protocol .ProtoUtils ;
4
+ import org .tarantool .protocol .TarantoolGreeting ;
5
+
5
6
import java .io .IOException ;
6
- import java .io .OutputStream ;
7
7
import java .net .Socket ;
8
- import java .nio .ByteBuffer ;
9
8
import java .nio .channels .SocketChannel ;
10
- import java .security .MessageDigest ;
11
- import java .security .NoSuchAlgorithmException ;
12
- import java .util .ArrayList ;
13
- import java .util .EnumMap ;
14
- import java .util .LinkedHashMap ;
15
9
import java .util .List ;
16
- import java .util .Map ;
17
10
import java .util .concurrent .atomic .AtomicLong ;
18
11
19
12
public abstract class TarantoolBase <Result > extends AbstractTarantoolOps <Integer , List <?>, Object , Result > {
20
- protected static final String WELCOME = "Tarantool " ;
21
13
protected String serverVersion ;
22
14
/**
23
15
* Connection state
24
16
*/
25
- protected String salt ;
26
17
protected MsgPackLite msgPackLite = MsgPackLite .INSTANCE ;
27
18
protected AtomicLong syncId = new AtomicLong ();
28
19
protected int initialRequestSize = 4096 ;
29
- /**
30
- * Read properties
31
- */
32
- protected DataInputStream is ;
33
- protected CountInputStream cis ;
34
- protected Map <Integer , Object > headers ;
35
- protected Map <Integer , Object > body ;
36
20
37
21
public TarantoolBase () {
38
22
}
39
23
40
24
public TarantoolBase (String username , String password , Socket socket ) {
41
25
super ();
42
26
try {
43
- cis = new CountInputStreamImpl (socket .getInputStream ());
44
- is = new DataInputStream (cis );
45
- byte [] bytes = new byte [64 ];
46
- is .readFully (bytes );
47
- String firstLine = new String (bytes );
48
- if (!firstLine .startsWith (WELCOME )) {
49
- closeStreams ();
50
- close ();
51
- throw new CommunicationException ("Welcome message should starts with tarantool but starts with '" + firstLine + "'" , new IllegalStateException ("Invalid welcome packet" ));
52
- }
53
- serverVersion = firstLine .substring (WELCOME .length ());
54
- is .readFully (bytes );
55
- this .salt = new String (bytes );
56
- if (username != null && password != null ) {
57
- ByteBuffer authPacket = createAuthPacket (username , password );
58
- OutputStream os = socket .getOutputStream ();
59
- os .write (authPacket .array (), 0 , authPacket .remaining ());
60
- os .flush ();
61
- readPacket ();
62
- Long code = (Long ) headers .get (Key .CODE .getId ());
63
- if (code != 0 ) {
64
- closeStreams ();
65
- throw serverError (code , body .get (Key .ERROR .getId ()));
66
- }
67
- }
27
+ TarantoolGreeting greeting = ProtoUtils .connect (socket , username , password );
28
+ this .serverVersion = greeting .getServerVersion ();
68
29
} catch (IOException e ) {
69
- closeStreams ();
70
30
throw new CommunicationException ("Couldn't connect to tarantool" , e );
71
31
}
72
32
}
73
33
74
-
75
- protected ByteBuffer createAuthPacket (String username , final String password ) throws IOException {
76
- final MessageDigest sha1 ;
77
- try {
78
- sha1 = MessageDigest .getInstance ("SHA-1" );
79
- } catch (NoSuchAlgorithmException e ) {
80
- throw new IllegalStateException (e );
81
- }
82
- List auth = new ArrayList (2 );
83
- auth .add ("chap-sha1" );
84
-
85
- byte [] p = sha1 .digest (password .getBytes ());
86
-
87
- sha1 .reset ();
88
- byte [] p2 = sha1 .digest (p );
89
-
90
- sha1 .reset ();
91
- sha1 .update (Base64 .decode (salt ), 0 , 20 );
92
- sha1 .update (p2 );
93
- byte [] scramble = sha1 .digest ();
94
- for (int i = 0 , e = 20 ; i < e ; i ++) {
95
- p [i ] ^= scramble [i ];
96
- }
97
- auth .add (p );
98
- return createPacket (Code .AUTH , 0L , null , Key .USER_NAME , username , Key .TUPLE , auth );
99
- }
100
-
101
- protected ByteBuffer createPacket (Code code , Long syncId , Long schemaId , Object ... args ) throws IOException {
102
- TarantoolClientImpl .ByteArrayOutputStream bos = new TarantoolClientImpl .ByteArrayOutputStream (initialRequestSize );
103
- bos .write (new byte [5 ]);
104
- DataOutputStream ds = new DataOutputStream (bos );
105
- Map <Key , Object > header = new EnumMap <Key , Object >(Key .class );
106
- Map <Key , Object > body = new EnumMap <Key , Object >(Key .class );
107
- header .put (Key .CODE , code );
108
- header .put (Key .SYNC , syncId );
109
- if (schemaId != null ) {
110
- header .put (Key .SCHEMA_ID , schemaId );
111
- }
112
- if (args != null ) {
113
- for (int i = 0 , e = args .length ; i < e ; i += 2 ) {
114
- Object value = args [i + 1 ];
115
- body .put ((Key ) args [i ], value );
116
- }
117
- }
118
- msgPackLite .pack (header , ds );
119
- msgPackLite .pack (body , ds );
120
- ds .flush ();
121
- ByteBuffer buffer = bos .toByteBuffer ();
122
- buffer .put (0 , (byte ) 0xce );
123
- buffer .putInt (1 , bos .size () - 5 );
124
- return buffer ;
125
- }
126
-
127
- protected void readPacket () throws IOException {
128
- int size = ((Number ) msgPackLite .unpack (is )).intValue ();
129
- long mark = cis .getBytesRead ();
130
- headers = (Map <Integer , Object >) msgPackLite .unpack (is );
131
- if (cis .getBytesRead () - mark < size ) {
132
- body = (Map <Integer , Object >) msgPackLite .unpack (is );
133
- }
134
- is .skipBytes ((int ) (cis .getBytesRead () - mark - size ));
135
- }
136
-
137
34
protected static class SQLMetaData {
138
35
protected String name ;
139
36
@@ -153,56 +50,10 @@ public String toString() {
153
50
}
154
51
}
155
52
156
- protected List <SQLMetaData > getSQLMetadata () {
157
- List <Map <Integer , Object >> meta = (List <Map <Integer , Object >>) body .get (Key .SQL_METADATA .getId ());
158
- List <SQLMetaData > values = new ArrayList <SQLMetaData >(meta .size ());
159
- for (Map <Integer ,Object > c :meta ) {
160
- values .add (new SQLMetaData ((String ) c .get (Key .SQL_FIELD_NAME .getId ())));
161
- }
162
- return values ;
163
- }
164
-
165
- protected List <List <Object >> getSQLData () {
166
- return (List <List <Object >>) body .get (Key .DATA .getId ());
167
- }
168
-
169
- protected List <Map <String , Object >> readSqlResult (List <List <?>> data ) {
170
- List <Map <String , Object >> values = new ArrayList <Map <String , Object >>(data .size ());
171
- List <SQLMetaData > metaData = getSQLMetadata ();
172
- LinkedHashMap <String , Object > value = new LinkedHashMap <String , Object >();
173
- for (List row : data ) {
174
- for (int i = 0 ; i < row .size (); i ++) {
175
- value .put (metaData .get (i ).getName (), row .get (i ));
176
- }
177
- values .add (value );
178
- }
179
- return values ;
180
- }
181
-
182
- protected Long getSqlRowCount () {
183
- Map <Key , Object > info = (Map <Key , Object >) body .get (Key .SQL_INFO .getId ());
184
- Number rowCount ;
185
- if (info != null && (rowCount = ((Number ) info .get (Key .SQL_ROW_COUNT .getId ()))) != null ) {
186
- return rowCount .longValue ();
187
- }
188
- return null ;
189
- }
190
-
191
-
192
53
protected TarantoolException serverError (long code , Object error ) {
193
54
return new TarantoolException (code , error instanceof String ? (String ) error : new String ((byte []) error ));
194
55
}
195
56
196
- protected class ByteArrayOutputStream extends java .io .ByteArrayOutputStream {
197
- public ByteArrayOutputStream (int size ) {
198
- super (size );
199
- }
200
-
201
- ByteBuffer toByteBuffer () {
202
- return ByteBuffer .wrap (buf , 0 , count );
203
- }
204
- }
205
-
206
57
protected void closeChannel (SocketChannel channel ) {
207
58
if (channel != null ) {
208
59
try {
@@ -213,21 +64,6 @@ protected void closeChannel(SocketChannel channel) {
213
64
}
214
65
}
215
66
216
- protected void closeStreams () {
217
- if (is != null ) {
218
- try {
219
- is .close ();
220
- } catch (IOException ignored ) {
221
- }
222
- }
223
- if (cis != null ) {
224
- try {
225
- cis .close ();
226
- } catch (IOException ignored ) {
227
- }
228
- }
229
- }
230
-
231
67
protected void validateArgs (Object [] args ) {
232
68
if (args != null ) {
233
69
for (int i = 0 ; i < args .length ; i += 2 ) {
0 commit comments