From ba280b408143145e87184171351eae2e2724f0a3 Mon Sep 17 00:00:00 2001 From: zhenglu Date: Wed, 30 Sep 2015 15:23:31 +0800 Subject: [PATCH] =?UTF-8?q?=E9=BB=98=E8=AE=A4=E8=B0=83=E7=94=A8=E7=9A=84?= =?UTF-8?q?=E6=97=B6=E5=80=99errorHandler=20=E9=83=BD=E4=B8=BA=E7=A9=BA?= =?UTF-8?q?=EF=BC=8C=E5=A6=82=E6=9E=9Cmemcache=E6=9C=8D=E5=8A=A1=E4=B8=8D?= =?UTF-8?q?=E5=8F=AF=E7=94=A8=E7=9A=84=E6=97=B6=E5=80=99=EF=BC=8C=E6=B2=A1?= =?UTF-8?q?=E6=9C=89log.error=E6=97=A5=E5=BF=97=EF=BC=8C=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E4=BA=86log.error=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/com/meetup/memcached/MemcachedClient.java | 1659 ++++++++--------- 1 file changed, 814 insertions(+), 845 deletions(-) diff --git a/src/com/meetup/memcached/MemcachedClient.java b/src/com/meetup/memcached/MemcachedClient.java index c17c1fa..99d68fe 100644 --- a/src/com/meetup/memcached/MemcachedClient.java +++ b/src/com/meetup/memcached/MemcachedClient.java @@ -1,27 +1,27 @@ /** * Copyright (c) 2008 Greg Whalin * All rights reserved. - * + *

* This library is free software; you can redistribute it and/or * modify it under the terms of the BSD license - * + *

* This library is distributed in the hope that it will be * useful, but WITHOUT ANY WARRANTY; without even the implied * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * PURPOSE. - * + *

* You should have received a copy of the BSD License along with this * library. * - * @author Greg Whalin + * @author Greg Whalin */ package com.meetup.memcached; import java.util.*; import java.util.zip.*; -import java.nio.*; +import java.nio.*; import java.net.InetAddress; -import java.nio.charset.*; +import java.nio.charset.*; import java.nio.channels.*; import java.nio.channels.spi.*; import java.io.*; @@ -158,48 +158,48 @@ public class MemcachedClient { // logger private static Logger log = - Logger.getLogger( MemcachedClient.class.getName() ); + Logger.getLogger(MemcachedClient.class.getName()); // return codes - private static final String VALUE = "VALUE"; // start of value line from server - private static final String STATS = "STAT"; // start of stats line from server - private static final String ITEM = "ITEM"; // start of item line from server - private static final String DELETED = "DELETED"; // successful deletion - private static final String NOTFOUND = "NOT_FOUND"; // record not found for delete or incr/decr - private static final String STORED = "STORED"; // successful store of data - private static final String NOTSTORED = "NOT_STORED"; // data not stored - private static final String OK = "OK"; // success - private static final String END = "END"; // end of data from server - - private static final String ERROR = "ERROR"; // invalid command name from client - private static final String CLIENT_ERROR = "CLIENT_ERROR"; // client error in input line - invalid protocol - private static final String SERVER_ERROR = "SERVER_ERROR"; // server error - - private static final byte[] B_END = "END\r\n".getBytes(); - private static final byte[] B_NOTFOUND = "NOT_FOUND\r\n".getBytes(); - private static final byte[] B_DELETED = "DELETED\r\r".getBytes(); - private static final byte[] B_STORED = "STORED\r\r".getBytes(); + private static final String VALUE = "VALUE"; // start of value line from server + private static final String STATS = "STAT"; // start of stats line from server + private static final String ITEM = "ITEM"; // start of item line from server + private static final String DELETED = "DELETED"; // successful deletion + private static final String NOTFOUND = "NOT_FOUND"; // record not found for delete or incr/decr + private static final String STORED = "STORED"; // successful store of data + private static final String NOTSTORED = "NOT_STORED"; // data not stored + private static final String OK = "OK"; // success + private static final String END = "END"; // end of data from server + + private static final String ERROR = "ERROR"; // invalid command name from client + private static final String CLIENT_ERROR = "CLIENT_ERROR"; // client error in input line - invalid protocol + private static final String SERVER_ERROR = "SERVER_ERROR"; // server error + + private static final byte[] B_END = "END\r\n".getBytes(); + private static final byte[] B_NOTFOUND = "NOT_FOUND\r\n".getBytes(); + private static final byte[] B_DELETED = "DELETED\r\r".getBytes(); + private static final byte[] B_STORED = "STORED\r\r".getBytes(); // default compression threshold private static final int COMPRESS_THRESH = 30720; - + // values for cache flags - public static final int MARKER_BYTE = 1; - public static final int MARKER_BOOLEAN = 8192; - public static final int MARKER_INTEGER = 4; - public static final int MARKER_LONG = 16384; - public static final int MARKER_CHARACTER = 16; - public static final int MARKER_STRING = 32; - public static final int MARKER_STRINGBUFFER = 64; - public static final int MARKER_FLOAT = 128; - public static final int MARKER_SHORT = 256; - public static final int MARKER_DOUBLE = 512; - public static final int MARKER_DATE = 1024; - public static final int MARKER_STRINGBUILDER = 2048; - public static final int MARKER_BYTEARR = 4096; - public static final int F_COMPRESSED = 2; - public static final int F_SERIALIZED = 8; - + public static final int MARKER_BYTE = 1; + public static final int MARKER_BOOLEAN = 8192; + public static final int MARKER_INTEGER = 4; + public static final int MARKER_LONG = 16384; + public static final int MARKER_CHARACTER = 16; + public static final int MARKER_STRING = 32; + public static final int MARKER_STRINGBUFFER = 64; + public static final int MARKER_FLOAT = 128; + public static final int MARKER_SHORT = 256; + public static final int MARKER_DOUBLE = 512; + public static final int MARKER_DATE = 1024; + public static final int MARKER_STRINGBUILDER = 2048; + public static final int MARKER_BYTEARR = 4096; + public static final int F_COMPRESSED = 2; + public static final int F_SERIALIZED = 8; + // flags private boolean sanitizeKeys; private boolean primitiveAsString; @@ -226,119 +226,119 @@ public MemcachedClient() { init(); } - /** + /** * Creates a new instance of MemCachedClient * accepting a passed in pool name. - * + * * @param poolName name of SockIOPool */ - public MemcachedClient( String poolName ) { + public MemcachedClient(String poolName) { this.poolName = poolName; init(); } - /** + /** * Creates a new instance of MemCacheClient but * acceptes a passed in ClassLoader. - * + * * @param classLoader ClassLoader object. */ - public MemcachedClient( ClassLoader classLoader ) { + public MemcachedClient(ClassLoader classLoader) { this.classLoader = classLoader; init(); } - /** + /** * Creates a new instance of MemCacheClient but * acceptes a passed in ClassLoader and a passed * in ErrorHandler. - * + * * @param classLoader ClassLoader object. * @param errorHandler ErrorHandler object. */ - public MemcachedClient( ClassLoader classLoader, ErrorHandler errorHandler ) { - this.classLoader = classLoader; + public MemcachedClient(ClassLoader classLoader, ErrorHandler errorHandler) { + this.classLoader = classLoader; this.errorHandler = errorHandler; init(); } - /** + /** * Creates a new instance of MemCacheClient but * acceptes a passed in ClassLoader, ErrorHandler, * and SockIOPool name. - * + * * @param classLoader ClassLoader object. * @param errorHandler ErrorHandler object. * @param poolName SockIOPool name */ - public MemcachedClient( ClassLoader classLoader, ErrorHandler errorHandler, String poolName ) { - this.classLoader = classLoader; + public MemcachedClient(ClassLoader classLoader, ErrorHandler errorHandler, String poolName) { + this.classLoader = classLoader; this.errorHandler = errorHandler; - this.poolName = poolName; + this.poolName = poolName; init(); } - /** + /** * Initializes client object to defaults. * * This enables compression and sets compression threshhold to 15 KB. */ private void init() { - this.sanitizeKeys = true; - this.primitiveAsString = false; - this.compressEnable = true; - this.compressThreshold = COMPRESS_THRESH; - this.defaultEncoding = "UTF-8"; - this.poolName = ( this.poolName == null ) ? "default" : this.poolName; + this.sanitizeKeys = true; + this.primitiveAsString = false; + this.compressEnable = true; + this.compressThreshold = COMPRESS_THRESH; + this.defaultEncoding = "UTF-8"; + this.poolName = (this.poolName == null) ? "default" : this.poolName; // get a pool instance to work with for the life of this instance - this.pool = SockIOPool.getInstance( poolName ); + this.pool = SockIOPool.getInstance(poolName); } - /** + /** * Sets an optional ClassLoader to be used for * serialization. - * - * @param classLoader + * + * @param classLoader */ - public void setClassLoader( ClassLoader classLoader ) { + public void setClassLoader(ClassLoader classLoader) { this.classLoader = classLoader; } - /** + /** * Sets an optional ErrorHandler. - * - * @param errorHandler + * + * @param errorHandler */ - public void setErrorHandler( ErrorHandler errorHandler ) { + public void setErrorHandler(ErrorHandler errorHandler) { this.errorHandler = errorHandler; } - /** + /** * Enables/disables sanitizing keys by URLEncoding. - * + * * @param sanitizeKeys if true, then URLEncode all keys */ - public void setSanitizeKeys( boolean sanitizeKeys ) { + public void setSanitizeKeys(boolean sanitizeKeys) { this.sanitizeKeys = sanitizeKeys; } - /** + /** * Enables storing primitive types as their String values. - * + * * @param primitiveAsString if true, then store all primitives as their string value. */ - public void setPrimitiveAsString( boolean primitiveAsString ) { + public void setPrimitiveAsString(boolean primitiveAsString) { this.primitiveAsString = primitiveAsString; } - /** + /** * Sets default String encoding when storing primitives as Strings. * Default is UTF-8. - * - * @param defaultEncoding + * + * @param defaultEncoding */ - public void setDefaultEncoding( String defaultEncoding ) { + public void setDefaultEncoding(String defaultEncoding) { this.defaultEncoding = defaultEncoding; } @@ -355,10 +355,10 @@ public void setDefaultEncoding( String defaultEncoding ) { * * @param compressEnable true to enable compression, false to disable compression */ - public void setCompressEnable( boolean compressEnable ) { + public void setCompressEnable(boolean compressEnable) { this.compressEnable = compressEnable; } - + /** * Sets the required length for data to be considered for compression. * @@ -369,18 +369,18 @@ public void setCompressEnable( boolean compressEnable ) { * * @param compressThreshold required length of data to consider compression */ - public void setCompressThreshold( long compressThreshold ) { + public void setCompressThreshold(long compressThreshold) { this.compressThreshold = compressThreshold; } - /** + /** * Checks to see if key exists in cache. - * + * * @param key the key to look for * @return true if key found in cache, false if not (or if cache is down) */ - public boolean keyExists( String key ) { - return ( this.get( key, null, true ) != null ); + public boolean keyExists(String key) { + return (this.get(key, null, true) != null); } /** @@ -389,19 +389,19 @@ public boolean keyExists( String key ) { * @param key the key to be removed * @return true, if the data was deleted successfully */ - public boolean delete( String key ) { - return delete( key, null, null ); + public boolean delete(String key) { + return delete(key, null, null); } - /** + /** * Deletes an object from cache given cache key and expiration date. - * + * * @param key the key to be removed * @param expiry when to expire the record. * @return true, if the data was deleted successfully */ - public boolean delete( String key, Date expiry ) { - return delete( key, null, expiry ); + public boolean delete(String key, Date expiry) { + return delete(key, null, expiry); } /** @@ -418,95 +418,91 @@ public boolean delete( String key, Date expiry ) { * @param expiry when to expire the record. * @return true, if the data was deleted successfully */ - public boolean delete( String key, Integer hashCode, Date expiry ) { + public boolean delete(String key, Integer hashCode, Date expiry) { - if ( key == null ) { - log.error( "null value for key passed to delete()" ); + if (key == null) { + log.error("null value for key passed to delete()"); return false; } try { - key = sanitizeKey( key ); - } - catch ( UnsupportedEncodingException e ) { + key = sanitizeKey(key); + } catch (UnsupportedEncodingException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnDelete( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnDelete(this, e, key); - log.error( "failed to sanitize your key!", e ); + log.error("failed to sanitize your key!", e); return false; } // get SockIO obj from hash or from key - SockIOPool.SockIO sock = pool.getSock( key, hashCode ); + SockIOPool.SockIO sock = pool.getSock(key, hashCode); // return false if unable to get SockIO obj - if ( sock == null ) { - if ( errorHandler != null ) - errorHandler.handleErrorOnDelete( this, new IOException( "no socket to server available" ), key ); + if (sock == null) { + log.error("unable to get connection to memcache ! key is {}", key); + if (errorHandler != null) + errorHandler.handleErrorOnDelete(this, new IOException("no socket to server available"), key); return false; } // build command - StringBuilder command = new StringBuilder( "delete " ).append( key ); - if ( expiry != null ) - command.append( " " + expiry.getTime() / 1000 ); + StringBuilder command = new StringBuilder("delete ").append(key); + if (expiry != null) + command.append(" " + expiry.getTime() / 1000); + + command.append("\r\n"); - command.append( "\r\n" ); - try { - sock.write( command.toString().getBytes() ); + sock.write(command.toString().getBytes()); sock.flush(); - + // if we get appropriate response back, then we return true String line = sock.readLine(); - if ( DELETED.equals( line ) ) { - if ( log.isInfoEnabled() ) - log.info( "++++ deletion of key: " + key + " from cache was a success" ); + if (DELETED.equals(line)) { + if (log.isInfoEnabled()) + log.info("++++ deletion of key: " + key + " from cache was a success"); // return sock to pool and bail here sock.close(); sock = null; return true; + } else if (NOTFOUND.equals(line)) { + if (log.isInfoEnabled()) + log.info("++++ deletion of key: " + key + " from cache failed as the key was not found"); + } else { + log.error("++++ error deleting key: " + key); + log.error("++++ server response: " + line); } - else if ( NOTFOUND.equals( line ) ) { - if ( log.isInfoEnabled() ) - log.info( "++++ deletion of key: " + key + " from cache failed as the key was not found" ); - } - else { - log.error( "++++ error deleting key: " + key ); - log.error( "++++ server response: " + line ); - } - } - catch ( IOException e ) { + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnDelete( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnDelete(this, e, key); // exception thrown - log.error( "++++ exception thrown while writing bytes to server on delete" ); - log.error( e.getMessage(), e ); + log.error("++++ exception thrown while writing bytes to server on delete"); + log.error(e.getMessage(), e); try { sock.trueClose(); - } - catch ( IOException ioe ) { - log.error( "++++ failed to close socket : " + sock.toString() ); + } catch (IOException ioe) { + log.error("++++ failed to close socket : " + sock.toString()); } sock = null; } - if ( sock != null ) { + if (sock != null) { sock.close(); sock = null; } return false; } - + /** * Stores data on the server; only the key and the value are specified. * @@ -514,8 +510,8 @@ else if ( NOTFOUND.equals( line ) ) { * @param value value to store * @return true, if the data was successfully stored */ - public boolean set( String key, Object value ) { - return set( "set", key, value, null, null, primitiveAsString ); + public boolean set(String key, Object value) { + return set("set", key, value, null, null, primitiveAsString); } /** @@ -526,8 +522,8 @@ public boolean set( String key, Object value ) { * @param hashCode if not null, then the int hashcode to use * @return true, if the data was successfully stored */ - public boolean set( String key, Object value, Integer hashCode ) { - return set( "set", key, value, null, hashCode, primitiveAsString ); + public boolean set(String key, Object value, Integer hashCode) { + return set("set", key, value, null, hashCode, primitiveAsString); } /** @@ -538,8 +534,8 @@ public boolean set( String key, Object value, Integer hashCode ) { * @param expiry when to expire the record * @return true, if the data was successfully stored */ - public boolean set( String key, Object value, Date expiry ) { - return set( "set", key, value, expiry, null, primitiveAsString ); + public boolean set(String key, Object value, Date expiry) { + return set("set", key, value, expiry, null, primitiveAsString); } /** @@ -551,8 +547,8 @@ public boolean set( String key, Object value, Date expiry ) { * @param hashCode if not null, then the int hashcode to use * @return true, if the data was successfully stored */ - public boolean set( String key, Object value, Date expiry, Integer hashCode ) { - return set( "set", key, value, expiry, hashCode, primitiveAsString ); + public boolean set(String key, Object value, Date expiry, Integer hashCode) { + return set("set", key, value, expiry, hashCode, primitiveAsString); } /** @@ -562,8 +558,8 @@ public boolean set( String key, Object value, Date expiry, Integer hashCode ) { * @param value value to store * @return true, if the data was successfully stored */ - public boolean add( String key, Object value ) { - return set( "add", key, value, null, null, primitiveAsString ); + public boolean add(String key, Object value) { + return set("add", key, value, null, null, primitiveAsString); } /** @@ -574,8 +570,8 @@ public boolean add( String key, Object value ) { * @param hashCode if not null, then the int hashcode to use * @return true, if the data was successfully stored */ - public boolean add( String key, Object value, Integer hashCode ) { - return set( "add", key, value, null, hashCode, primitiveAsString ); + public boolean add(String key, Object value, Integer hashCode) { + return set("add", key, value, null, hashCode, primitiveAsString); } /** @@ -586,8 +582,8 @@ public boolean add( String key, Object value, Integer hashCode ) { * @param expiry when to expire the record * @return true, if the data was successfully stored */ - public boolean add( String key, Object value, Date expiry ) { - return set( "add", key, value, expiry, null, primitiveAsString ); + public boolean add(String key, Object value, Date expiry) { + return set("add", key, value, expiry, null, primitiveAsString); } /** @@ -599,8 +595,8 @@ public boolean add( String key, Object value, Date expiry ) { * @param hashCode if not null, then the int hashcode to use * @return true, if the data was successfully stored */ - public boolean add( String key, Object value, Date expiry, Integer hashCode ) { - return set( "add", key, value, expiry, hashCode, primitiveAsString ); + public boolean add(String key, Object value, Date expiry, Integer hashCode) { + return set("add", key, value, expiry, hashCode, primitiveAsString); } /** @@ -610,8 +606,8 @@ public boolean add( String key, Object value, Date expiry, Integer hashCode ) { * @param value value to store * @return true, if the data was successfully stored */ - public boolean replace( String key, Object value ) { - return set( "replace", key, value, null, null, primitiveAsString ); + public boolean replace(String key, Object value) { + return set("replace", key, value, null, null, primitiveAsString); } /** @@ -622,8 +618,8 @@ public boolean replace( String key, Object value ) { * @param hashCode if not null, then the int hashcode to use * @return true, if the data was successfully stored */ - public boolean replace( String key, Object value, Integer hashCode ) { - return set( "replace", key, value, null, hashCode, primitiveAsString ); + public boolean replace(String key, Object value, Integer hashCode) { + return set("replace", key, value, null, hashCode, primitiveAsString); } /** @@ -634,8 +630,8 @@ public boolean replace( String key, Object value, Integer hashCode ) { * @param expiry when to expire the record * @return true, if the data was successfully stored */ - public boolean replace( String key, Object value, Date expiry ) { - return set( "replace", key, value, expiry, null, primitiveAsString ); + public boolean replace(String key, Object value, Date expiry) { + return set("replace", key, value, expiry, null, primitiveAsString); } /** @@ -647,11 +643,11 @@ public boolean replace( String key, Object value, Date expiry ) { * @param hashCode if not null, then the int hashcode to use * @return true, if the data was successfully stored */ - public boolean replace( String key, Object value, Date expiry, Integer hashCode ) { - return set( "replace", key, value, expiry, hashCode, primitiveAsString ); + public boolean replace(String key, Object value, Date expiry, Integer hashCode) { + return set("replace", key, value, expiry, hashCode, primitiveAsString); } - /** + /** * Stores data to cache. * * If data does not already exist for this key on the server, or if the key is being
@@ -662,7 +658,7 @@ public boolean replace( String key, Object value, Date expiry, Integer hashCode * the data will be stored in compressed form.
*
* As of the current release, all objects stored will use java serialization. - * + * * @param cmdname action to take (set, add, replace) * @param key key to store cache under * @param value object to cache @@ -671,112 +667,108 @@ public boolean replace( String key, Object value, Date expiry, Integer hashCode * @param asString store this object as a string? * @return true/false indicating success */ - private boolean set( String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString ) { + private boolean set(String cmdname, String key, Object value, Date expiry, Integer hashCode, boolean asString) { - if ( cmdname == null || cmdname.trim().equals( "" ) || key == null ) { - log.error( "key is null or cmd is null/empty for set()" ); + if (cmdname == null || cmdname.trim().equals("") || key == null) { + log.error("key is null or cmd is null/empty for set()"); return false; } try { - key = sanitizeKey( key ); - } - catch ( UnsupportedEncodingException e ) { + key = sanitizeKey(key); + } catch (UnsupportedEncodingException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnSet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnSet(this, e, key); - log.error( "failed to sanitize your key!", e ); + log.error("failed to sanitize your key!", e); return false; } - if ( value == null ) { - log.error( "trying to store a null value to cache" ); + if (value == null) { + log.error("trying to store a null value to cache"); return false; } // get SockIO obj - SockIOPool.SockIO sock = pool.getSock( key, hashCode ); - - if ( sock == null ) { - if ( errorHandler != null ) - errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key ); + SockIOPool.SockIO sock = pool.getSock(key, hashCode); + + if (sock == null) { + log.error("unable to get connection to memcache ! key is {}", key); + if (errorHandler != null) + errorHandler.handleErrorOnSet(this, new IOException("no socket to server available"), key); return false; } - - if ( expiry == null ) + + if (expiry == null) expiry = new Date(0); // store flags int flags = 0; - + // byte array to hold data byte[] val; - if ( NativeHandler.isHandled( value ) ) { - - if ( asString ) { + if (NativeHandler.isHandled(value)) { + + if (asString) { // useful for sharing data between java and non-java // and also for storing ints for the increment method try { - if ( log.isInfoEnabled() ) - log.info( "++++ storing data as a string for key: " + key + " for class: " + value.getClass().getName() ); - val = value.toString().getBytes( defaultEncoding ); - } - catch ( UnsupportedEncodingException ue ) { + if (log.isInfoEnabled()) + log.info("++++ storing data as a string for key: " + key + " for class: " + value.getClass() + .getName()); + val = value.toString().getBytes(defaultEncoding); + } catch (UnsupportedEncodingException ue) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnSet( this, ue, key ); + if (errorHandler != null) + errorHandler.handleErrorOnSet(this, ue, key); - log.error( "invalid encoding type used: " + defaultEncoding, ue ); + log.error("invalid encoding type used: " + defaultEncoding, ue); sock.close(); sock = null; return false; } - } - else { + } else { try { - if ( log.isInfoEnabled() ) - log.info( "Storing with native handler..." ); - flags |= NativeHandler.getMarkerFlag( value ); - val = NativeHandler.encode( value ); - } - catch ( Exception e ) { + if (log.isInfoEnabled()) + log.info("Storing with native handler..."); + flags |= NativeHandler.getMarkerFlag(value); + val = NativeHandler.encode(value); + } catch (Exception e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnSet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnSet(this, e, key); - log.error( "Failed to native handle obj", e ); + log.error("Failed to native handle obj", e); sock.close(); sock = null; return false; } } - } - else { + } else { // always serialize for non-primitive types try { - if ( log.isInfoEnabled() ) - log.info( "++++ serializing for key: " + key + " for class: " + value.getClass().getName() ); + if (log.isInfoEnabled()) + log.info("++++ serializing for key: " + key + " for class: " + value.getClass().getName()); ByteArrayOutputStream bos = new ByteArrayOutputStream(); - (new ObjectOutputStream( bos )).writeObject( value ); + (new ObjectOutputStream(bos)).writeObject(value); val = bos.toByteArray(); flags |= F_SERIALIZED; - } - catch ( IOException e ) { + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnSet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnSet(this, e, key); // if we fail to serialize, then // we bail - log.error( "failed to serialize obj", e ); - log.error( value.toString() ); + log.error("failed to serialize obj", e); + log.error(value.toString()); // return socket to pool and bail sock.close(); @@ -784,90 +776,86 @@ private boolean set( String cmdname, String key, Object value, Date expiry, Inte return false; } } - + // now try to compress if we want to // and if the length is over the threshold - if ( compressEnable && val.length > compressThreshold ) { + if (compressEnable && val.length > compressThreshold) { try { - if ( log.isInfoEnabled() ) { - log.info( "++++ trying to compress data" ); - log.info( "++++ size prior to compression: " + val.length ); + if (log.isInfoEnabled()) { + log.info("++++ trying to compress data"); + log.info("++++ size prior to compression: " + val.length); } - ByteArrayOutputStream bos = new ByteArrayOutputStream( val.length ); - GZIPOutputStream gos = new GZIPOutputStream( bos ); - gos.write( val, 0, val.length ); + ByteArrayOutputStream bos = new ByteArrayOutputStream(val.length); + GZIPOutputStream gos = new GZIPOutputStream(bos); + gos.write(val, 0, val.length); gos.finish(); gos.close(); - + // store it and set compression flag val = bos.toByteArray(); flags |= F_COMPRESSED; - if ( log.isInfoEnabled() ) - log.info( "++++ compression succeeded, size after: " + val.length ); - } - catch ( IOException e ) { + if (log.isInfoEnabled()) + log.info("++++ compression succeeded, size after: " + val.length); + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnSet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnSet(this, e, key); - log.error( "IOException while compressing stream: " + e.getMessage() ); - log.error( "storing data uncompressed" ); + log.error("IOException while compressing stream: " + e.getMessage()); + log.error("storing data uncompressed"); } } // now write the data to the cache server try { - String cmd = String.format( "%s %s %d %d %d\r\n", cmdname, key, flags, (expiry.getTime() / 1000), val.length ); - sock.write( cmd.getBytes() ); - sock.write( val ); - sock.write( "\r\n".getBytes() ); + String cmd = String + .format("%s %s %d %d %d\r\n", cmdname, key, flags, (expiry.getTime() / 1000), val.length); + sock.write(cmd.getBytes()); + sock.write(val); + sock.write("\r\n".getBytes()); sock.flush(); // get result code String line = sock.readLine(); - if ( log.isInfoEnabled() ) - log.info( "++++ memcache cmd (result code): " + cmd + " (" + line + ")" ); + if (log.isInfoEnabled()) + log.info("++++ memcache cmd (result code): " + cmd + " (" + line + ")"); - if ( STORED.equals( line ) ) { - if ( log.isInfoEnabled() ) - log.info("++++ data successfully stored for key: " + key ); + if (STORED.equals(line)) { + if (log.isInfoEnabled()) + log.info("++++ data successfully stored for key: " + key); sock.close(); sock = null; return true; + } else if (NOTSTORED.equals(line)) { + if (log.isInfoEnabled()) + log.info("++++ data not stored in cache for key: " + key); + } else { + log.error("++++ error storing data in cache for key: " + key + " -- length: " + val.length); + log.error("++++ server response: " + line); } - else if ( NOTSTORED.equals( line ) ) { - if ( log.isInfoEnabled() ) - log.info( "++++ data not stored in cache for key: " + key ); - } - else { - log.error( "++++ error storing data in cache for key: " + key + " -- length: " + val.length ); - log.error( "++++ server response: " + line ); - } - } - catch ( IOException e ) { + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnSet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnSet(this, e, key); // exception thrown - log.error( "++++ exception thrown while writing bytes to server on set" ); - log.error( e.getMessage(), e ); + log.error("++++ exception thrown while writing bytes to server on set"); + log.error(e.getMessage(), e); try { sock.trueClose(); - } - catch ( IOException ioe ) { - log.error( "++++ failed to close socket : " + sock.toString() ); + } catch (IOException ioe) { + log.error("++++ failed to close socket : " + sock.toString()); } sock = null; } - if ( sock != null ) { + if (sock != null) { sock.close(); sock = null; } @@ -875,159 +863,156 @@ else if ( NOTSTORED.equals( line ) ) { return false; } - /** + /** * Store a counter to memcached given a key - * + * * @param key cache key * @param counter number to store * @return true/false indicating success */ - public boolean storeCounter( String key, long counter ) { - return set( "set", key, new Long( counter ), null, null, true ); + public boolean storeCounter(String key, long counter) { + return set("set", key, new Long(counter), null, null, true); } - /** + /** * Store a counter to memcached given a key - * + * * @param key cache key * @param counter number to store * @return true/false indicating success */ - public boolean storeCounter( String key, Long counter ) { - return set( "set", key, counter, null, null, true ); + public boolean storeCounter(String key, Long counter) { + return set("set", key, counter, null, null, true); } - - /** + + /** * Store a counter to memcached given a key - * + * * @param key cache key * @param counter number to store * @param hashCode if not null, then the int hashcode to use * @return true/false indicating success */ - public boolean storeCounter( String key, Long counter, Integer hashCode ) { - return set( "set", key, counter, null, hashCode, true ); + public boolean storeCounter(String key, Long counter, Integer hashCode) { + return set("set", key, counter, null, hashCode, true); } - /** + /** * Returns value in counter at given key as long. * * @param key cache ket * @return counter value or -1 if not found */ - public long getCounter( String key ) { - return getCounter( key, null ); + public long getCounter(String key) { + return getCounter(key, null); } - /** + /** * Returns value in counter at given key as long. * * @param key cache ket * @param hashCode if not null, then the int hashcode to use * @return counter value or -1 if not found */ - public long getCounter( String key, Integer hashCode ) { + public long getCounter(String key, Integer hashCode) { - if ( key == null ) { - log.error( "null key for getCounter()" ); + if (key == null) { + log.error("null key for getCounter()"); return -1; } long counter = -1; try { - counter = Long.parseLong( (String)get( key, hashCode, true ) ); - } - catch ( Exception ex ) { + counter = Long.parseLong((String) get(key, hashCode, true)); + } catch (Exception ex) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, ex, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, ex, key); // not found or error getting out - if ( log.isInfoEnabled() ) - log.info( String.format( "Failed to parse Long value for key: %s", key ) ); + if (log.isInfoEnabled()) + log.info(String.format("Failed to parse Long value for key: %s", key)); } - + return counter; } - /** + /** * Thread safe way to initialize and increment a counter. - * + * * @param key key where the data is stored * @return value of incrementer */ - public long addOrIncr( String key ) { - return addOrIncr( key, 0, null ); + public long addOrIncr(String key) { + return addOrIncr(key, 0, null); } - /** + /** * Thread safe way to initialize and increment a counter. - * + * * @param key key where the data is stored * @param inc value to set or increment by * @return value of incrementer */ - public long addOrIncr( String key, long inc ) { - return addOrIncr( key, inc, null ); + public long addOrIncr(String key, long inc) { + return addOrIncr(key, inc, null); } - /** + /** * Thread safe way to initialize and increment a counter. - * + * * @param key key where the data is stored * @param inc value to set or increment by * @param hashCode if not null, then the int hashcode to use * @return value of incrementer */ - public long addOrIncr( String key, long inc, Integer hashCode ) { - boolean ret = set( "add", key, new Long( inc ), null, hashCode, true ); + public long addOrIncr(String key, long inc, Integer hashCode) { + boolean ret = set("add", key, new Long(inc), null, hashCode, true); - if ( ret ) { + if (ret) { return inc; - } - else { - return incrdecr( "incr", key, inc, hashCode ); + } else { + return incrdecr("incr", key, inc, hashCode); } } - /** + /** * Thread safe way to initialize and decrement a counter. - * + * * @param key key where the data is stored * @return value of incrementer */ - public long addOrDecr( String key ) { - return addOrDecr( key, 0, null ); + public long addOrDecr(String key) { + return addOrDecr(key, 0, null); } - /** + /** * Thread safe way to initialize and decrement a counter. - * + * * @param key key where the data is stored * @param inc value to set or increment by * @return value of incrementer */ - public long addOrDecr( String key, long inc ) { - return addOrDecr( key, inc, null ); + public long addOrDecr(String key, long inc) { + return addOrDecr(key, inc, null); } - /** + /** * Thread safe way to initialize and decrement a counter. - * + * * @param key key where the data is stored * @param inc value to set or increment by * @param hashCode if not null, then the int hashcode to use * @return value of incrementer */ - public long addOrDecr( String key, long inc, Integer hashCode ) { - boolean ret = set( "add", key, new Long( inc ), null, hashCode, true ); + public long addOrDecr(String key, long inc, Integer hashCode) { + boolean ret = set("add", key, new Long(inc), null, hashCode, true); - if ( ret ) { + if (ret) { return inc; - } - else { - return incrdecr( "decr", key, inc, hashCode ); + } else { + return incrdecr("decr", key, inc, hashCode); } } @@ -1037,19 +1022,19 @@ public long addOrDecr( String key, long inc, Integer hashCode ) { * @param key key where the data is stored * @return -1, if the key is not found, the value after incrementing otherwise */ - public long incr( String key ) { - return incrdecr( "incr", key, 1, null ); + public long incr(String key) { + return incrdecr("incr", key, 1, null); } - /** + /** * Increment the value at the specified key by passed in val. - * + * * @param key key where the data is stored * @param inc how much to increment by * @return -1, if the key is not found, the value after incrementing otherwise */ - public long incr( String key, long inc ) { - return incrdecr( "incr", key, inc, null ); + public long incr(String key, long inc) { + return incrdecr("incr", key, inc, null); } /** @@ -1060,18 +1045,18 @@ public long incr( String key, long inc ) { * @param hashCode if not null, then the int hashcode to use * @return -1, if the key is not found, the value after incrementing otherwise */ - public long incr( String key, long inc, Integer hashCode ) { - return incrdecr( "incr", key, inc, hashCode ); + public long incr(String key, long inc, Integer hashCode) { + return incrdecr("incr", key, inc, hashCode); } - + /** * Decrement the value at the specified key by 1, and then return it. * * @param key key where the data is stored * @return -1, if the key is not found, the value after incrementing otherwise */ - public long decr( String key ) { - return incrdecr( "decr", key, 1, null ); + public long decr(String key) { + return incrdecr("decr", key, 1, null); } /** @@ -1081,8 +1066,8 @@ public long decr( String key ) { * @param inc how much to increment by * @return -1, if the key is not found, the value after incrementing otherwise */ - public long decr( String key, long inc ) { - return incrdecr( "decr", key, inc, null ); + public long decr(String key, long inc) { + return incrdecr("decr", key, inc, null); } /** @@ -1093,13 +1078,13 @@ public long decr( String key, long inc ) { * @param hashCode if not null, then the int hashcode to use * @return -1, if the key is not found, the value after incrementing otherwise */ - public long decr( String key, long inc, Integer hashCode ) { - return incrdecr( "decr", key, inc, hashCode ); + public long decr(String key, long inc, Integer hashCode) { + return incrdecr("decr", key, inc, hashCode); } - /** + /** * Increments/decrements the value at the specified key by inc. - * + * * Note that the server uses a 32-bit unsigned integer, and checks for
* underflow. In the event of underflow, the result will be zero. Because
* Java lacks unsigned types, the value is returned as a 64-bit integer.
@@ -1112,92 +1097,87 @@ public long decr( String key, long inc, Integer hashCode ) { * @param hashCode if not null, then the int hashcode to use * @return new value or -1 if not exist */ - private long incrdecr( String cmdname, String key, long inc, Integer hashCode ) { + private long incrdecr(String cmdname, String key, long inc, Integer hashCode) { - if ( key == null ) { - log.error( "null key for incrdecr()" ); + if (key == null) { + log.error("null key for incrdecr()"); return -1; } try { - key = sanitizeKey( key ); - } - catch ( UnsupportedEncodingException e ) { + key = sanitizeKey(key); + } catch (UnsupportedEncodingException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); - log.error( "failed to sanitize your key!", e ); + log.error("failed to sanitize your key!", e); return -1; } // get SockIO obj for given cache key - SockIOPool.SockIO sock = pool.getSock( key, hashCode ); + SockIOPool.SockIO sock = pool.getSock(key, hashCode); - if ( sock == null ) { - if ( errorHandler != null ) - errorHandler.handleErrorOnSet( this, new IOException( "no socket to server available" ), key ); + if (sock == null) { + log.error("unable to get connection to memcache ! key is {}", key); + if (errorHandler != null) + errorHandler.handleErrorOnSet(this, new IOException("no socket to server available"), key); return -1; } - + try { - String cmd = String.format( "%s %s %d\r\n", cmdname, key, inc ); - if ( log.isDebugEnabled() ) - log.debug( "++++ memcache incr/decr command: " + cmd ); + String cmd = String.format("%s %s %d\r\n", cmdname, key, inc); + if (log.isDebugEnabled()) + log.debug("++++ memcache incr/decr command: " + cmd); - sock.write( cmd.getBytes() ); + sock.write(cmd.getBytes()); sock.flush(); // get result back String line = sock.readLine(); - if ( line.matches( "\\d+" ) ) { + if (line.matches("\\d+")) { // return sock to pool and return result sock.close(); try { - return Long.parseLong( line ); - } - catch ( Exception ex ) { + return Long.parseLong(line); + } catch (Exception ex) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, ex, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, ex, key); - log.error( String.format( "Failed to parse Long value for key: %s", key ) ); + log.error(String.format("Failed to parse Long value for key: %s", key)); } - } - else if ( NOTFOUND.equals( line ) ) { - if ( log.isInfoEnabled() ) - log.info( "++++ key not found to incr/decr for key: " + key ); - } - else { - log.error( "++++ error incr/decr key: " + key ); - log.error( "++++ server response: " + line ); + } else if (NOTFOUND.equals(line)) { + if (log.isInfoEnabled()) + log.info("++++ key not found to incr/decr for key: " + key); + } else { + log.error("++++ error incr/decr key: " + key); + log.error("++++ server response: " + line); } - } - catch ( IOException e ) { + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); // exception thrown - log.error( "++++ exception thrown while writing bytes to server on incr/decr" ); - log.error( e.getMessage(), e ); + log.error("++++ exception thrown while writing bytes to server on incr/decr"); + log.error(e.getMessage(), e); try { sock.trueClose(); - } - catch ( IOException ioe ) { - log.error( "++++ failed to close socket : " + sock.toString() ); + } catch (IOException ioe) { + log.error("++++ failed to close socket : " + sock.toString()); } sock = null; } - - if ( sock != null ) { + + if (sock != null) { sock.close(); sock = null; } @@ -1217,11 +1197,11 @@ else if ( NOTFOUND.equals( line ) ) { * @param key key where data is stored * @return the object that was previously stored, or null if it was not previously stored */ - public Object get( String key ) { - return get( key, null, false ); + public Object get(String key) { + return get(key, null, false); } - /** + /** * Retrieve a key from the server, using a specific hash. * * If the data was compressed or serialized when compressed, it will automatically
@@ -1234,8 +1214,8 @@ public Object get( String key ) { * @param hashCode if not null, then the int hashcode to use * @return the object that was previously stored, or null if it was not previously stored */ - public Object get( String key, Integer hashCode ) { - return get( key, hashCode, false ); + public Object get(String key, Integer hashCode) { + return get(key, hashCode, false); } /** @@ -1252,176 +1232,173 @@ public Object get( String key, Integer hashCode ) { * @param asString if true, then return string val * @return the object that was previously stored, or null if it was not previously stored */ - public Object get( String key, Integer hashCode, boolean asString ) { + public Object get(String key, Integer hashCode, boolean asString) { - if ( key == null ) { - log.error( "key is null for get()" ); + if (key == null) { + log.error("key is null for get()"); return null; } try { - key = sanitizeKey( key ); - } - catch ( UnsupportedEncodingException e ) { + key = sanitizeKey(key); + } catch (UnsupportedEncodingException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); - log.error( "failed to sanitize your key!", e ); + log.error("failed to sanitize your key!", e); return null; } // get SockIO obj using cache key - SockIOPool.SockIO sock = pool.getSock( key, hashCode ); - - if ( sock == null ) { - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, new IOException( "no socket to server available" ), key ); + SockIOPool.SockIO sock = pool.getSock(key, hashCode); + + if (sock == null) { + log.error("unable to get connection to memcache ! key is {}", key); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, new IOException("no socket to server available"), key); return null; } try { String cmd = "get " + key + "\r\n"; - if ( log.isDebugEnabled() ) + if (log.isDebugEnabled()) log.debug("++++ memcache get command: " + cmd); - - sock.write( cmd.getBytes() ); + + sock.write(cmd.getBytes()); sock.flush(); // ready object Object o = null; - while ( true ) { + while (true) { String line = sock.readLine(); - if ( log.isDebugEnabled() ) - log.debug( "++++ line: " + line ); + if (log.isDebugEnabled()) + log.debug("++++ line: " + line); - if ( line.startsWith( VALUE ) ) { + if (line.startsWith(VALUE)) { String[] info = line.split(" "); - int flag = Integer.parseInt( info[2] ); - int length = Integer.parseInt( info[3] ); + int flag = Integer.parseInt(info[2]); + int length = Integer.parseInt(info[3]); - if ( log.isDebugEnabled() ) { - log.debug( "++++ key: " + key ); - log.debug( "++++ flags: " + flag ); - log.debug( "++++ length: " + length ); + if (log.isDebugEnabled()) { + log.debug("++++ key: " + key); + log.debug("++++ flags: " + flag); + log.debug("++++ length: " + length); } - + // read obj into buffer byte[] buf = new byte[length]; - sock.read( buf ); + sock.read(buf); sock.clearEOL(); - if ( (flag & F_COMPRESSED) == F_COMPRESSED ) { + if ((flag & F_COMPRESSED) == F_COMPRESSED) { try { // read the input stream, and write to a byte array output stream since // we have to read into a byte array, but we don't know how large it // will need to be, and we don't want to resize it a bunch - GZIPInputStream gzi = new GZIPInputStream( new ByteArrayInputStream( buf ) ); - ByteArrayOutputStream bos = new ByteArrayOutputStream( buf.length ); - + GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buf)); + ByteArrayOutputStream bos = new ByteArrayOutputStream(buf.length); + int count; byte[] tmp = new byte[2048]; - while ( (count = gzi.read(tmp)) != -1 ) { - bos.write( tmp, 0, count ); + while ((count = gzi.read(tmp)) != -1) { + bos.write(tmp, 0, count); } // store uncompressed back to buffer buf = bos.toByteArray(); gzi.close(); - } - catch ( IOException e ) { + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); - - log.error( "++++ IOException thrown while trying to uncompress input stream for key: " + key + " -- " + e.getMessage() ); - throw new NestedIOException( "++++ IOException thrown while trying to uncompress input stream for key: " + key, e ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); + + log.error("++++ IOException thrown while trying to uncompress input stream for key: " + key + + " -- " + e.getMessage()); + throw new NestedIOException( + "++++ IOException thrown while trying to uncompress input stream for key: " + key, + e); } } // we can only take out serialized objects - if ( ( flag & F_SERIALIZED ) != F_SERIALIZED ) { - if ( primitiveAsString || asString ) { + if ((flag & F_SERIALIZED) != F_SERIALIZED) { + if (primitiveAsString || asString) { // pulling out string value - if ( log.isInfoEnabled() ) - log.info( "++++ retrieving object and stuffing into a string." ); - o = new String( buf, defaultEncoding ); - } - else { + if (log.isInfoEnabled()) + log.info("++++ retrieving object and stuffing into a string."); + o = new String(buf, defaultEncoding); + } else { // decoding object try { - o = NativeHandler.decode( buf, flag ); - } - catch ( Exception e ) { + o = NativeHandler.decode(buf, flag); + } catch (Exception e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); - log.error( "++++ Exception thrown while trying to deserialize for key: " + key, e ); - throw new NestedIOException( e ); + log.error("++++ Exception thrown while trying to deserialize for key: " + key, e); + throw new NestedIOException(e); } } - } - else { + } else { // deserialize if the data is serialized ContextObjectInputStream ois = - new ContextObjectInputStream( new ByteArrayInputStream( buf ), classLoader ); + new ContextObjectInputStream(new ByteArrayInputStream(buf), classLoader); try { o = ois.readObject(); - if ( log.isInfoEnabled() ) - log.info( "++++ deserializing " + o.getClass() ); - } - catch ( Exception e ) { - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (log.isInfoEnabled()) + log.info("++++ deserializing " + o.getClass()); + } catch (Exception e) { + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); o = null; - log.error( "++++ Exception thrown while trying to deserialize for key: " + key + " -- " + e.getMessage() ); + log.error("++++ Exception thrown while trying to deserialize for key: " + key + " -- " + e + .getMessage()); } } - } - else if ( END.equals( line ) ) { - if ( log.isDebugEnabled() ) - log.debug( "++++ finished reading from cache server" ); + } else if (END.equals(line)) { + if (log.isDebugEnabled()) + log.debug("++++ finished reading from cache server"); break; } } - + sock.close(); sock = null; return o; - } - catch ( IOException e ) { + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); // exception thrown - log.error( "++++ exception thrown while trying to get object from cache for key: " + key + " -- " + e.getMessage() ); + log.error("++++ exception thrown while trying to get object from cache for key: " + key + " -- " + e + .getMessage()); try { sock.trueClose(); - } - catch ( IOException ioe ) { - log.error( "++++ failed to close socket : " + sock.toString() ); + } catch (IOException ioe) { + log.error("++++ failed to close socket : " + sock.toString()); } sock = null; - } + } - if ( sock != null ) + if (sock != null) sock.close(); return null; } - /** + /** * Retrieve multiple objects from the memcache. * * This is recommended over repeated calls to {@link #get(String) get()}, since it
@@ -1430,11 +1407,11 @@ else if ( END.equals( line ) ) { * @param keys String array of keys to retrieve * @return Object array ordered in same order as key array containing results */ - public Object[] getMultiArray( String[] keys ) { - return getMultiArray( keys, null, false ); + public Object[] getMultiArray(String[] keys) { + return getMultiArray(keys, null, false); } - /** + /** * Retrieve multiple objects from the memcache. * * This is recommended over repeated calls to {@link #get(String) get()}, since it
@@ -1444,11 +1421,11 @@ public Object[] getMultiArray( String[] keys ) { * @param hashCodes if not null, then the Integer array of hashCodes * @return Object array ordered in same order as key array containing results */ - public Object[] getMultiArray( String[] keys, Integer[] hashCodes ) { - return getMultiArray( keys, hashCodes, false ); + public Object[] getMultiArray(String[] keys, Integer[] hashCodes) { + return getMultiArray(keys, hashCodes, false); } - /** + /** * Retrieve multiple objects from the memcache. * * This is recommended over repeated calls to {@link #get(String) get()}, since it
@@ -1459,16 +1436,16 @@ public Object[] getMultiArray( String[] keys, Integer[] hashCodes ) { * @param asString if true, retrieve string vals * @return Object array ordered in same order as key array containing results */ - public Object[] getMultiArray( String[] keys, Integer[] hashCodes, boolean asString ) { + public Object[] getMultiArray(String[] keys, Integer[] hashCodes, boolean asString) { - Map data = getMulti( keys, hashCodes, asString ); + Map data = getMulti(keys, hashCodes, asString); - if ( data == null ) + if (data == null) return null; - Object[] res = new Object[ keys.length ]; - for ( int i = 0; i < keys.length; i++ ) { - res[i] = data.get( keys[i] ); + Object[] res = new Object[keys.length]; + for (int i = 0; i < keys.length; i++) { + res[i] = data.get(keys[i]); } return res; @@ -1485,10 +1462,10 @@ public Object[] getMultiArray( String[] keys, Integer[] hashCodes, boolean asStr * keys that are not found are not entered into the hashmap, but attempting to * retrieve them from the hashmap gives you null. */ - public Map getMulti( String[] keys ) { - return getMulti( keys, null, false ); + public Map getMulti(String[] keys) { + return getMulti(keys, null, false); } - + /** * Retrieve multiple keys from the memcache. * @@ -1501,8 +1478,8 @@ public Map getMulti( String[] keys ) { * keys that are not found are not entered into the hashmap, but attempting to * retrieve them from the hashmap gives you null. */ - public Map getMulti( String[] keys, Integer[] hashCodes ) { - return getMulti( keys, hashCodes, false ); + public Map getMulti(String[] keys, Integer[] hashCodes) { + return getMulti(keys, hashCodes, false); } /** @@ -1518,293 +1495,291 @@ public Map getMulti( String[] keys, Integer[] hashCodes ) { * keys that are not found are not entered into the hashmap, but attempting to * retrieve them from the hashmap gives you null. */ - public Map getMulti( String[] keys, Integer[] hashCodes, boolean asString ) { + public Map getMulti(String[] keys, Integer[] hashCodes, boolean asString) { - if ( keys == null || keys.length == 0 ) { - log.error( "missing keys for getMulti()" ); + if (keys == null || keys.length == 0) { + log.error("missing keys for getMulti()"); return null; } - Map cmdMap = - new HashMap(); + Map cmdMap = + new HashMap(); - for ( int i = 0; i < keys.length; ++i ) { + for (int i = 0; i < keys.length; ++i) { String key = keys[i]; - if ( key == null ) { - log.error( "null key, so skipping" ); + if (key == null) { + log.error("null key, so skipping"); continue; } Integer hash = null; - if ( hashCodes != null && hashCodes.length > i ) - hash = hashCodes[ i ]; + if (hashCodes != null && hashCodes.length > i) + hash = hashCodes[i]; String cleanKey = key; try { - cleanKey = sanitizeKey( key ); - } - catch ( UnsupportedEncodingException e ) { + cleanKey = sanitizeKey(key); + } catch (UnsupportedEncodingException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); - log.error( "failed to sanitize your key!", e ); + log.error("failed to sanitize your key!", e); continue; } // get SockIO obj from cache key - SockIOPool.SockIO sock = pool.getSock( cleanKey, hash ); + SockIOPool.SockIO sock = pool.getSock(cleanKey, hash); - if ( sock == null ) { - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, new IOException( "no socket to server available" ), key ); + if (sock == null) { + log.error("unable to get connection to memcache ! key is {}", key); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, new IOException("no socket to server available"), key); continue; } // store in map and list if not already - if ( !cmdMap.containsKey( sock.getHost() ) ) - cmdMap.put( sock.getHost(), new StringBuilder( "get" ) ); + if (!cmdMap.containsKey(sock.getHost())) + cmdMap.put(sock.getHost(), new StringBuilder("get")); - cmdMap.get( sock.getHost() ).append( " " + cleanKey ); + cmdMap.get(sock.getHost()).append(" " + cleanKey); // return to pool sock.close(); } - - if ( log.isInfoEnabled() ) - log.info( "multi get socket count : " + cmdMap.size() ); + + if (log.isInfoEnabled()) + log.info("multi get socket count : " + cmdMap.size()); // now query memcache - Map ret = - new HashMap( keys.length ); + Map ret = + new HashMap(keys.length); // now use new NIO implementation - (new NIOLoader( this )).doMulti( asString, cmdMap, keys, ret ); + (new NIOLoader(this)).doMulti(asString, cmdMap, keys, ret); // fix the return array in case we had to rewrite any of the keys - for ( String key : keys ) { + for (String key : keys) { String cleanKey = key; try { - cleanKey = sanitizeKey( key ); - } - catch ( UnsupportedEncodingException e ) { + cleanKey = sanitizeKey(key); + } catch (UnsupportedEncodingException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); - log.error( "failed to sanitize your key!", e ); + log.error("failed to sanitize your key!", e); continue; } - if ( ! key.equals( cleanKey ) && ret.containsKey( cleanKey ) ) { - ret.put( key, ret.get( cleanKey ) ); - ret.remove( cleanKey ); + if (!key.equals(cleanKey) && ret.containsKey(cleanKey)) { + ret.put(key, ret.get(cleanKey)); + ret.remove(cleanKey); } // backfill missing keys w/ null value - if ( ! ret.containsKey( key ) ) - ret.put( key, null ); + if (!ret.containsKey(key)) + ret.put(key, null); } - if ( log.isDebugEnabled() ) - log.debug( "++++ memcache: got back " + ret.size() + " results" ); + if (log.isDebugEnabled()) + log.debug("++++ memcache: got back " + ret.size() + " results"); return ret; } - /** + /** * This method loads the data from cache into a Map. * * Pass a SockIO object which is ready to receive data and a HashMap
* to store the results. - * + * * @param sock socket waiting to pass back data * @param hm hashmap to store data into * @param asString if true, and if we are using NativehHandler, return string val * @throws IOException if io exception happens while reading from socket */ - private void loadMulti( LineInputStream input, Map hm, boolean asString ) throws IOException { + private void loadMulti(LineInputStream input, Map hm, boolean asString) throws IOException { - while ( true ) { + while (true) { String line = input.readLine(); - if ( log.isDebugEnabled() ) - log.debug( "++++ line: " + line ); + if (log.isDebugEnabled()) + log.debug("++++ line: " + line); - if ( line.startsWith( VALUE ) ) { + if (line.startsWith(VALUE)) { String[] info = line.split(" "); - String key = info[1]; - int flag = Integer.parseInt( info[2] ); - int length = Integer.parseInt( info[3] ); - - if ( log.isDebugEnabled() ) { - log.debug( "++++ key: " + key ); - log.debug( "++++ flags: " + flag ); - log.debug( "++++ length: " + length ); + String key = info[1]; + int flag = Integer.parseInt(info[2]); + int length = Integer.parseInt(info[3]); + + if (log.isDebugEnabled()) { + log.debug("++++ key: " + key); + log.debug("++++ flags: " + flag); + log.debug("++++ length: " + length); } - + // read obj into buffer byte[] buf = new byte[length]; - input.read( buf ); + input.read(buf); input.clearEOL(); // ready object Object o; - + // check for compression - if ( (flag & F_COMPRESSED) == F_COMPRESSED ) { + if ((flag & F_COMPRESSED) == F_COMPRESSED) { try { // read the input stream, and write to a byte array output stream since // we have to read into a byte array, but we don't know how large it // will need to be, and we don't want to resize it a bunch - GZIPInputStream gzi = new GZIPInputStream( new ByteArrayInputStream( buf ) ); - ByteArrayOutputStream bos = new ByteArrayOutputStream( buf.length ); - + GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buf)); + ByteArrayOutputStream bos = new ByteArrayOutputStream(buf.length); + int count; byte[] tmp = new byte[2048]; - while ( (count = gzi.read(tmp)) != -1 ) { - bos.write( tmp, 0, count ); + while ((count = gzi.read(tmp)) != -1) { + bos.write(tmp, 0, count); } // store uncompressed back to buffer buf = bos.toByteArray(); gzi.close(); - } - catch ( IOException e ) { + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); - log.error( "++++ IOException thrown while trying to uncompress input stream for key: " + key + " -- " + e.getMessage() ); - throw new NestedIOException( "++++ IOException thrown while trying to uncompress input stream for key: " + key, e ); + log.error("++++ IOException thrown while trying to uncompress input stream for key: " + key + + " -- " + e.getMessage()); + throw new NestedIOException( + "++++ IOException thrown while trying to uncompress input stream for key: " + key, e); } } // we can only take out serialized objects - if ( ( flag & F_SERIALIZED ) != F_SERIALIZED ) { - if ( primitiveAsString || asString ) { + if ((flag & F_SERIALIZED) != F_SERIALIZED) { + if (primitiveAsString || asString) { // pulling out string value - if ( log.isInfoEnabled() ) - log.info( "++++ retrieving object and stuffing into a string." ); - o = new String( buf, defaultEncoding ); - } - else { + if (log.isInfoEnabled()) + log.info("++++ retrieving object and stuffing into a string."); + o = new String(buf, defaultEncoding); + } else { // decoding object try { - o = NativeHandler.decode( buf, flag ); - } - catch ( Exception e ) { + o = NativeHandler.decode(buf, flag); + } catch (Exception e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); - log.error( "++++ Exception thrown while trying to deserialize for key: " + key + " -- " + e.getMessage() ); - throw new NestedIOException( e ); + log.error("++++ Exception thrown while trying to deserialize for key: " + key + " -- " + e + .getMessage()); + throw new NestedIOException(e); } } - } - else { + } else { // deserialize if the data is serialized ContextObjectInputStream ois = - new ContextObjectInputStream( new ByteArrayInputStream( buf ), classLoader ); + new ContextObjectInputStream(new ByteArrayInputStream(buf), classLoader); try { o = ois.readObject(); - if ( log.isInfoEnabled() ) - log.info( "++++ deserializing " + o.getClass() ); - } - catch ( InvalidClassException e ) { + if (log.isInfoEnabled()) + log.info("++++ deserializing " + o.getClass()); + } catch (InvalidClassException e) { /* Errors de-serializing are to be expected in the case of a * long running server that spans client restarts with updated * classes. */ // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); o = null; - log.error( "++++ InvalidClassException thrown while trying to deserialize for key: " + key + " -- " + e.getMessage() ); - } - catch ( ClassNotFoundException e ) { + log.error( + "++++ InvalidClassException thrown while trying to deserialize for key: " + key + " -- " + + e.getMessage()); + } catch (ClassNotFoundException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this, e, key ); + if (errorHandler != null) + errorHandler.handleErrorOnGet(this, e, key); o = null; - log.error( "++++ ClassNotFoundException thrown while trying to deserialize for key: " + key + " -- " + e.getMessage() ); + log.error("++++ ClassNotFoundException thrown while trying to deserialize for key: " + key + + " -- " + e.getMessage()); } } // store the object into the cache - if ( o != null ) - hm.put( key, o ); - } - else if ( END.equals( line ) ) { - if ( log.isDebugEnabled() ) - log.debug( "++++ finished reading from cache server" ); + if (o != null) + hm.put(key, o); + } else if (END.equals(line)) { + if (log.isDebugEnabled()) + log.debug("++++ finished reading from cache server"); break; } } } - private String sanitizeKey( String key ) throws UnsupportedEncodingException { - return ( sanitizeKeys ) ? URLEncoder.encode( key, "UTF-8" ) : key; + private String sanitizeKey(String key) throws UnsupportedEncodingException { + return (sanitizeKeys) ? URLEncoder.encode(key, "UTF-8") : key; } - /** + /** * Invalidates the entire cache. * * Will return true only if succeeds in clearing all servers. - * + * * @return success true/false */ public boolean flushAll() { - return flushAll( null ); + return flushAll(null); } - /** + /** * Invalidates the entire cache. * * Will return true only if succeeds in clearing all servers. * If pass in null, then will try to flush all servers. - * + * * @param servers optional array of host(s) to flush (host:port) * @return success true/false */ - public boolean flushAll( String[] servers ) { + public boolean flushAll(String[] servers) { // get SockIOPool instance // return false if unable to get SockIO obj - if ( pool == null ) { - log.error( "++++ unable to get SockIOPool instance" ); + if (pool == null) { + log.error("++++ unable to get SockIOPool instance"); return false; } // get all servers and iterate over them - servers = ( servers == null ) - ? pool.getServers() - : servers; + servers = (servers == null) + ? pool.getServers() + : servers; // if no servers, then return early - if ( servers == null || servers.length <= 0 ) { - log.error( "++++ no servers to flush" ); + if (servers == null || servers.length <= 0) { + log.error("++++ no servers to flush"); return false; } boolean success = true; - for ( int i = 0; i < servers.length; i++ ) { + for (int i = 0; i < servers.length; i++) { - SockIOPool.SockIO sock = pool.getConnection( servers[i] ); - if ( sock == null ) { - log.error( "++++ unable to get connection to : " + servers[i] ); + SockIOPool.SockIO sock = pool.getConnection(servers[i]); + if (sock == null) { + log.error("++++ unable to get connection to : " + servers[i]); success = false; - if ( errorHandler != null ) - errorHandler.handleErrorOnFlush( this, new IOException( "no socket to server available" ) ); + if (errorHandler != null) + errorHandler.handleErrorOnFlush(this, new IOException("no socket to server available")); continue; } @@ -1812,37 +1787,35 @@ public boolean flushAll( String[] servers ) { String command = "flush_all\r\n"; try { - sock.write( command.getBytes() ); + sock.write(command.getBytes()); sock.flush(); // if we get appropriate response back, then we return true String line = sock.readLine(); - success = ( OK.equals( line ) ) - ? success && true - : false; - } - catch ( IOException e ) { + success = (OK.equals(line)) + ? success && true + : false; + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnFlush( this, e ); + if (errorHandler != null) + errorHandler.handleErrorOnFlush(this, e); // exception thrown - log.error( "++++ exception thrown while writing bytes to server on flushAll" ); - log.error( e.getMessage(), e ); + log.error("++++ exception thrown while writing bytes to server on flushAll"); + log.error(e.getMessage(), e); try { sock.trueClose(); - } - catch ( IOException ioe ) { - log.error( "++++ failed to close socket : " + sock.toString() ); + } catch (IOException ioe) { + log.error("++++ failed to close socket : " + sock.toString()); } success = false; sock = null; } - if ( sock != null ) { + if (sock != null) { sock.close(); sock = null; } @@ -1851,210 +1824,207 @@ public boolean flushAll( String[] servers ) { return success; } - /** + /** * Retrieves stats for all servers. * * Returns a map keyed on the servername. * The value is another map which contains stats * with stat name as key and value as value. - * + * * @return Stats map */ public Map stats() { - return stats( null ); + return stats(null); } - /** + /** * Retrieves stats for passed in servers (or all servers). * * Returns a map keyed on the servername. * The value is another map which contains stats * with stat name as key and value as value. - * + * * @param servers string array of servers to retrieve stats from, or all if this is null * @return Stats map */ - public Map stats( String[] servers ) { - return stats( servers, "stats\r\n", STATS ); - } + public Map stats(String[] servers) { + return stats(servers, "stats\r\n", STATS); + } - /** + /** * Retrieves stats items for all servers. * * Returns a map keyed on the servername. * The value is another map which contains item stats * with itemname:number:field as key and value as value. - * + * * @return Stats map */ public Map statsItems() { - return statsItems( null ); + return statsItems(null); } - - /** + + /** * Retrieves stats for passed in servers (or all servers). * * Returns a map keyed on the servername. * The value is another map which contains item stats * with itemname:number:field as key and value as value. - * + * * @param servers string array of servers to retrieve stats from, or all if this is null * @return Stats map */ - public Map statsItems( String[] servers ) { - return stats( servers, "stats items\r\n", STATS ); + public Map statsItems(String[] servers) { + return stats(servers, "stats items\r\n", STATS); } - - /** + + /** * Retrieves stats items for all servers. * * Returns a map keyed on the servername. * The value is another map which contains slabs stats * with slabnumber:field as key and value as value. - * + * * @return Stats map */ public Map statsSlabs() { - return statsSlabs( null ); + return statsSlabs(null); } - - /** + + /** * Retrieves stats for passed in servers (or all servers). * * Returns a map keyed on the servername. * The value is another map which contains slabs stats * with slabnumber:field as key and value as value. - * + * * @param servers string array of servers to retrieve stats from, or all if this is null * @return Stats map */ - public Map statsSlabs( String[] servers ) { - return stats( servers, "stats slabs\r\n", STATS ); + public Map statsSlabs(String[] servers) { + return stats(servers, "stats slabs\r\n", STATS); } - - /** + + /** * Retrieves items cachedump for all servers. * * Returns a map keyed on the servername. * The value is another map which contains cachedump stats * with the cachekey as key and byte size and unix timestamp as value. - * + * * @param slabNumber the item number of the cache dump * @return Stats map */ - public Map statsCacheDump( int slabNumber, int limit ) { - return statsCacheDump( null, slabNumber, limit ); + public Map statsCacheDump(int slabNumber, int limit) { + return statsCacheDump(null, slabNumber, limit); } - - /** + + /** * Retrieves stats for passed in servers (or all servers). * * Returns a map keyed on the servername. * The value is another map which contains cachedump stats * with the cachekey as key and byte size and unix timestamp as value. - * + * * @param servers string array of servers to retrieve stats from, or all if this is null * @param slabNumber the item number of the cache dump * @return Stats map */ - public Map statsCacheDump( String[] servers, int slabNumber, int limit ) { - return stats( servers, String.format( "stats cachedump %d %d\r\n", slabNumber, limit ), ITEM ); + public Map statsCacheDump(String[] servers, int slabNumber, int limit) { + return stats(servers, String.format("stats cachedump %d %d\r\n", slabNumber, limit), ITEM); } - - private Map stats( String[] servers, String command, String lineStart ) { - if ( command == null || command.trim().equals( "" ) ) { - log.error( "++++ invalid / missing command for stats()" ); + private Map stats(String[] servers, String command, String lineStart) { + + if (command == null || command.trim().equals("")) { + log.error("++++ invalid / missing command for stats()"); return null; } // get all servers and iterate over them servers = (servers == null) - ? pool.getServers() - : servers; + ? pool.getServers() + : servers; // if no servers, then return early - if ( servers == null || servers.length <= 0 ) { - log.error( "++++ no servers to check stats" ); + if (servers == null || servers.length <= 0) { + log.error("++++ no servers to check stats"); return null; } // array of stats Maps - Map statsMaps = - new HashMap(); + Map statsMaps = + new HashMap(); - for ( int i = 0; i < servers.length; i++ ) { + for (int i = 0; i < servers.length; i++) { - SockIOPool.SockIO sock = pool.getConnection( servers[i] ); - if ( sock == null ) { - log.error( "++++ unable to get connection to : " + servers[i] ); - if ( errorHandler != null ) - errorHandler.handleErrorOnStats( this, new IOException( "no socket to server available" ) ); + SockIOPool.SockIO sock = pool.getConnection(servers[i]); + if (sock == null) { + log.error("++++ unable to get connection to : " + servers[i]); + if (errorHandler != null) + errorHandler.handleErrorOnStats(this, new IOException("no socket to server available")); continue; } // build command try { - sock.write( command.getBytes() ); + sock.write(command.getBytes()); sock.flush(); // map to hold key value pairs - Map stats = new HashMap(); + Map stats = new HashMap(); // loop over results - while ( true ) { + while (true) { String line = sock.readLine(); - if ( log.isDebugEnabled() ) - log.debug( "++++ line: " + line ); + if (log.isDebugEnabled()) + log.debug("++++ line: " + line); - if ( line.startsWith( lineStart ) ) { - String[] info = line.split( " ", 3 ); - String key = info[1]; - String value = info[2]; + if (line.startsWith(lineStart)) { + String[] info = line.split(" ", 3); + String key = info[1]; + String value = info[2]; - if ( log.isDebugEnabled() ) { - log.debug( "++++ key : " + key ); - log.debug( "++++ value: " + value ); + if (log.isDebugEnabled()) { + log.debug("++++ key : " + key); + log.debug("++++ value: " + value); } - stats.put( key, value ); - } - else if ( END.equals( line ) ) { + stats.put(key, value); + } else if (END.equals(line)) { // finish when we get end from server - if ( log.isDebugEnabled() ) - log.debug( "++++ finished reading from cache server" ); + if (log.isDebugEnabled()) + log.debug("++++ finished reading from cache server"); break; - } - else if ( line.startsWith( ERROR ) || line.startsWith( CLIENT_ERROR ) || line.startsWith( SERVER_ERROR ) ) { - log.error( "++++ failed to query stats" ); - log.error( "++++ server response: " + line ); + } else if (line.startsWith(ERROR) || line.startsWith(CLIENT_ERROR) || line + .startsWith(SERVER_ERROR)) { + log.error("++++ failed to query stats"); + log.error("++++ server response: " + line); break; } - statsMaps.put( servers[i], stats ); + statsMaps.put(servers[i], stats); } - } - catch ( IOException e ) { + } catch (IOException e) { // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnStats( this, e ); + if (errorHandler != null) + errorHandler.handleErrorOnStats(this, e); // exception thrown - log.error( "++++ exception thrown while writing bytes to server on stats" ); - log.error( e.getMessage(), e ); + log.error("++++ exception thrown while writing bytes to server on stats"); + log.error(e.getMessage(), e); try { sock.trueClose(); - } - catch ( IOException ioe ) { - log.error( "++++ failed to close socket : " + sock.toString() ); + } catch (IOException ioe) { + log.error("++++ failed to close socket : " + sock.toString()); } sock = null; } - if ( sock != null ) { + if (sock != null) { sock.close(); sock = null; } @@ -2069,125 +2039,126 @@ protected final class NIOLoader { protected MemcachedClient mc; protected Connection[] conns; - public NIOLoader( MemcachedClient mc ) { + public NIOLoader(MemcachedClient mc) { this.mc = mc; } private final class Connection { - + public List incoming = new ArrayList(); public ByteBuffer outgoing; public SockIOPool.SockIO sock; public SocketChannel channel; private boolean isDone = false; - - public Connection( SockIOPool.SockIO sock, StringBuilder request ) throws IOException { - if ( log.isDebugEnabled() ) - log.debug( "setting up connection to "+sock.getHost() ); - + + public Connection(SockIOPool.SockIO sock, StringBuilder request) throws IOException { + if (log.isDebugEnabled()) + log.debug("setting up connection to " + sock.getHost()); + this.sock = sock; - outgoing = ByteBuffer.wrap( request.append( "\r\n" ).toString().getBytes() ); - + outgoing = ByteBuffer.wrap(request.append("\r\n").toString().getBytes()); + channel = sock.getChannel(); - if ( channel == null ) - throw new IOException( "dead connection to: " + sock.getHost() ); + if (channel == null) + throw new IOException("dead connection to: " + sock.getHost()); - channel.configureBlocking( false ); - channel.register( selector, SelectionKey.OP_WRITE, this ); + channel.configureBlocking(false); + channel.register(selector, SelectionKey.OP_WRITE, this); } - + public void close() { try { - if ( isDone ) { + if (isDone) { // turn off non-blocking IO and return to pool - if ( log.isDebugEnabled() ) - log.debug( "++++ gracefully closing connection to "+sock.getHost() ); - - channel.configureBlocking( true ); + if (log.isDebugEnabled()) + log.debug("++++ gracefully closing connection to " + sock.getHost()); + + channel.configureBlocking(true); sock.close(); return; } + } catch (IOException e) { + log.warn("++++ memcache: unexpected error closing normally"); } - catch ( IOException e ) { - log.warn( "++++ memcache: unexpected error closing normally" ); - } - + try { - if ( log.isDebugEnabled() ) - log.debug("forcefully closing connection to "+sock.getHost()); + if (log.isDebugEnabled()) + log.debug("forcefully closing connection to " + sock.getHost()); channel.close(); sock.trueClose(); + } catch (IOException ignoreMe) { } - catch ( IOException ignoreMe ) { } } - + public boolean isDone() { // if we know we're done, just say so - if ( isDone ) + if (isDone) return true; - + // else find out the hard way - int strPos = B_END.length-1; + int strPos = B_END.length - 1; int bi = incoming.size() - 1; - while ( bi >= 0 && strPos >= 0 ) { - ByteBuffer buf = incoming.get( bi ); - int pos = buf.position()-1; - while ( pos >= 0 && strPos >= 0 ) { - if ( buf.get( pos-- ) != B_END[strPos--] ) + while (bi >= 0 && strPos >= 0) { + ByteBuffer buf = incoming.get(bi); + int pos = buf.position() - 1; + while (pos >= 0 && strPos >= 0) { + if (buf.get(pos--) != B_END[strPos--]) return false; } bi--; } - + isDone = strPos < 0; return isDone; } - + public ByteBuffer getBuffer() { - int last = incoming.size()-1; - if ( last >= 0 && incoming.get( last ).hasRemaining() ) { - return incoming.get( last ); - } - else { - ByteBuffer newBuf = ByteBuffer.allocate( 8192 ); - incoming.add( newBuf ); + int last = incoming.size() - 1; + if (last >= 0 && incoming.get(last).hasRemaining()) { + return incoming.get(last); + } else { + ByteBuffer newBuf = ByteBuffer.allocate(8192); + incoming.add(newBuf); return newBuf; } } - + public String toString() { return "Connection to " + sock.getHost() + " with " + incoming.size() + " bufs; done is " + isDone; } } - - public void doMulti( boolean asString, Map sockKeys, String[] keys, Map ret ) { - + + public void doMulti(boolean asString, Map sockKeys, String[] keys, + Map ret) { + long timeRemaining = 0; try { selector = Selector.open(); - + // get the sockets, flip them to non-blocking, and set up data // structures conns = new Connection[sockKeys.keySet().size()]; numConns = 0; - for ( Iterator i = sockKeys.keySet().iterator(); i.hasNext(); ) { + for (Iterator i = sockKeys.keySet().iterator(); i.hasNext(); ) { // get SockIO obj from hostname String host = i.next(); - SockIOPool.SockIO sock = pool.getConnection( host ); + SockIOPool.SockIO sock = pool.getConnection(host); - if ( sock == null ) { - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( this.mc, new IOException( "no socket to server available" ), keys ); + if (sock == null) { + log.error("unable to get connection to memcache ! key is {}", key); + if (errorHandler != null) + errorHandler + .handleErrorOnGet(this.mc, new IOException("no socket to server available"), keys); return; } - conns[numConns++] = new Connection( sock, sockKeys.get( host ) ); + conns[numConns++] = new Connection(sock, sockKeys.get(host)); } - + // the main select loop; ends when // 1) we've received data from all the servers, or // 2) we time out @@ -2195,116 +2166,114 @@ public void doMulti( boolean asString, Map sockKeys, Stri long timeout = pool.getMaxBusy(); timeRemaining = timeout; - - while ( numConns > 0 && timeRemaining > 0 ) { - int n = selector.select( Math.min( timeout, 5000 ) ); - if ( n > 0 ) { - // we've got some activity; handle it - Iterator it = selector.selectedKeys().iterator(); - while ( it.hasNext() ) { - SelectionKey key = it.next(); - it.remove(); - handleKey( key ); - } - } - else { - // timeout likely... better check + + while (numConns > 0 && timeRemaining > 0) { + int n = selector.select(Math.min(timeout, 5000)); + if (n > 0) { + // we've got some activity; handle it + Iterator it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = it.next(); + it.remove(); + handleKey(key); + } + } else { + // timeout likely... better check // TODO: This seems like a problem area that we need to figure out how to handle. - log.error( "selector timed out waiting for activity" ); + log.error("selector timed out waiting for activity"); } - + timeRemaining = timeout - (System.currentTimeMillis() - startTime); } - } - catch ( IOException e ) { + } catch (IOException e) { // errors can happen just about anywhere above, from // connection setup to any of the mechanics - handleError( e, keys ); + handleError(e, keys); return; - } - finally { - if ( log.isDebugEnabled() ) - log.debug( "Disconnecting; numConns=" + numConns + " timeRemaining=" + timeRemaining ); - + } finally { + if (log.isDebugEnabled()) + log.debug("Disconnecting; numConns=" + numConns + " timeRemaining=" + timeRemaining); + // run through our conns and either return them to the pool // or forcibly close them try { - if ( selector != null ) + if (selector != null) selector.close(); + } catch (IOException ignoreMe) { } - catch ( IOException ignoreMe ) { } - - for ( Connection c : conns ) { - if ( c != null ) + + for (Connection c : conns) { + if (c != null) c.close(); } } - + // Done! Build the list of results and return them. If we get // here by a timeout, then some of the connections are probably // not done. But we'll return what we've got... - for ( Connection c : conns ) { + for (Connection c : conns) { try { - if ( c.incoming.size() > 0 && c.isDone() ) - loadMulti( new ByteBufArrayInputStream( c.incoming ), ret, asString ); - } - catch ( Exception e ) { + if (c.incoming.size() > 0 && c.isDone()) + loadMulti(new ByteBufArrayInputStream(c.incoming), ret, asString); + } catch (Exception e) { // shouldn't happen; we have all the data already - log.warn( "Caught the aforementioned exception on "+c ); + log.warn("Caught the aforementioned exception on " + c); } } } - - private void handleError( Throwable e, String[] keys ) { - // if we have an errorHandler, use its hook - if ( errorHandler != null ) - errorHandler.handleErrorOnGet( MemcachedClient.this, e, keys ); - - // exception thrown - log.error( "++++ exception thrown while getting from cache on getMulti" ); - log.error( e.getMessage() ); + + private void handleError(Throwable e, String[] keys) { + // if we have an errorHandler, use its hook + if (errorHandler != null) + errorHandler.handleErrorOnGet(MemcachedClient.this, e, keys); + + // exception thrown + log.error("++++ exception thrown while getting from cache on getMulti"); + log.error(e.getMessage()); } - - private void handleKey( SelectionKey key ) throws IOException { - if ( log.isDebugEnabled() ) - log.debug( "handling selector op " + key.readyOps() + " for key " + key ); - - if ( key.isReadable() ) - readResponse( key ); - else if ( key.isWritable() ) - writeRequest( key ); + + private void handleKey(SelectionKey key) throws IOException { + if (log.isDebugEnabled()) + log.debug("handling selector op " + key.readyOps() + " for key " + key); + + if (key.isReadable()) + readResponse(key); + else if (key.isWritable()) + writeRequest(key); } - - public void writeRequest( SelectionKey key ) throws IOException { + + public void writeRequest(SelectionKey key) throws IOException { ByteBuffer buf = ((Connection) key.attachment()).outgoing; - SocketChannel sc = (SocketChannel)key.channel(); - - if ( buf.hasRemaining() ) { - if ( log.isDebugEnabled() ) - log.debug( "writing " + buf.remaining() + "B to " + ((SocketChannel) key.channel()).socket().getInetAddress() ); + SocketChannel sc = (SocketChannel) key.channel(); - sc.write( buf ); + if (buf.hasRemaining()) { + if (log.isDebugEnabled()) + log.debug("writing " + buf.remaining() + "B to " + ((SocketChannel) key.channel()).socket() + .getInetAddress()); + + sc.write(buf); } - - if ( !buf.hasRemaining() ) { - if ( log.isDebugEnabled() ) - log.debug( "switching to read mode for server " + ((SocketChannel)key.channel()).socket().getInetAddress() ); - key.interestOps( SelectionKey.OP_READ ); + if (!buf.hasRemaining()) { + if (log.isDebugEnabled()) + log.debug("switching to read mode for server " + ((SocketChannel) key.channel()).socket() + .getInetAddress()); + + key.interestOps(SelectionKey.OP_READ); } } - - public void readResponse( SelectionKey key ) throws IOException { - Connection conn = (Connection)key.attachment(); + + public void readResponse(SelectionKey key) throws IOException { + Connection conn = (Connection) key.attachment(); ByteBuffer buf = conn.getBuffer(); - int count = conn.channel.read( buf ); - if ( count > 0 ) { - if ( log.isDebugEnabled() ) - log.debug( "read " + count + " from " + conn.channel.socket().getInetAddress() ); - - if ( conn.isDone() ) { - if ( log.isDebugEnabled() ) - log.debug( "connection done to " + conn.channel.socket().getInetAddress() ); + int count = conn.channel.read(buf); + if (count > 0) { + if (log.isDebugEnabled()) + log.debug("read " + count + " from " + conn.channel.socket().getInetAddress()); + + if (conn.isDone()) { + if (log.isDebugEnabled()) + log.debug("connection done to " + conn.channel.socket().getInetAddress()); key.cancel(); numConns--;