java - Active MQ Topic Consumer fails to consume some messages -
i have producer/consumer client topic on active mq using stomp protocol. utilize gozirra.
problem not messages produced @ other end of conversation arrive @ consumer client coded below. messages arrives others fails come in.
i heard loss of messages due asynchronous nature of messaging implemented based on consumer/producer model.
some using receipt mechanism may help.
what think problem?
=========================================================================
code belowimport java.io.ioexception; import java.util.hashmap; import java.util.map; import javax.security.auth.login.loginexception; import com.lnisoft.ontocept.stringescapeutils; import net.ser1.stomp.client; import net.ser1.stomp.listener; import android.os.asynctask; // * active mq stomp client using gozirra public class communicator implements listener { private static communicator instance; private client stomp_client = null; private connecttoontoceptasync connector = null; private string emailasuseridentifier = ""; private string topicname = null; private string messagetosend = "no_message"; private boolean isloggedin = false; private streamconverter converter = null; private string errormessage = ""; private boolean haserror = false; public hashmap<string, messagestack> percommunicatorusermessagestacktbl = new hashmap<string, messagestack>(); public communicator() { converter = new streamconverter(); } public static communicator getinstance() { if ( instance == null ) { instance = new communicator(); } homecoming instance; } @override public void message( map headers, string body ) { string ascii_to_unicode = stringescapeutils.unescapejava( body ); if ( ((string)headers.get("sender")).contains("ontocept") ) { storemessagefromontocept( ascii_to_unicode ); } else { system.out.println("my message : "+ ascii_to_unicode); system.out.println("\n"); } } public boolean getisloggedin() { homecoming this.isloggedin; } public void initializecommunicator( string _emailasuseridentifier ) throws loginexception, ioexception { this.emailasuseridentifier = _emailasuseridentifier; topicname = "/topic/"+ this.emailasuseridentifier; connector = new connecttoontoceptasync(); connector.execute(); } public boolean isintialized() { if ( isloggedin == false ) { homecoming false; } else { homecoming true; } } protected class connecttoontoceptasync extends asynctask<string, void, string> { @override protected string doinbackground( string... params ) { seek { stomp_client = new client( "***.***.**.***", 61613, emailasuseridentifier, "1234" ); stomp_client.subscribe( topicname, communicator.this ); isloggedin = true; } grab (loginexception e) { e.printstacktrace(); } grab (ioexception e) { e.printstacktrace(); } homecoming "executed"; } @override protected void onpostexecute(string text) { // todo } } public boolean haserror() { homecoming this.haserror; } public string geterror() { homecoming this.errormessage; } public void disconnect() { stomp_client.unsubscribe( this.topicname ); stomp_client.disconnect(); this.isloggedin = false; } public void sendmessagetoontocept( string _messagetext ) { this.messagetosend = _messagetext; new sendasyncmessagetoontocept().execute(); } private class sendasyncmessagetoontocept extends asynctask<string, void, string> { @override protected string doinbackground( string... params ) { string unicode_formatted_message = converter.converttounicodetext( messagetosend ); map<string, string> header = new hashmap<string,string>(); // * http://www.germane-software.com/software/java/gozirra/ header.put( "type", "text/plain" ); header.put( "sender", "user" ); stomp_client.send( topicname, unicode_formatted_message, header ); seek { thread.sleep(2000); } grab (interruptedexception e) { e.printstacktrace(); } thread.yield(); homecoming "executed"; } @override protected void onpostexecute(string text) { } } public boolean hasnewmessage( string _messagerecipientidentifier ) { if ( _messagerecipientidentifier.contains("quiz_activity") ) { if ( percommunicatorusermessagestacktbl.get("quiz_activity") == null ) { homecoming false; } else { homecoming percommunicatorusermessagestacktbl.get("quiz_activity").existnewmessage(); } } else if ( _messagerecipientidentifier.contains("vms_activity") ) // visualizememoryspace 액티비티 약자 { if ( percommunicatorusermessagestacktbl.get("vms_activity") == null ) { homecoming false; } else { homecoming percommunicatorusermessagestacktbl.get("vms_activity").existnewmessage(); } } else // _messagerecipientidentifier = ontocept_activity { if ( percommunicatorusermessagestacktbl.get("ontocept_activity") == null ) { homecoming false; } else { homecoming percommunicatorusermessagestacktbl.get("ontocept_activity").existnewmessage(); } } } public message getnewmessage( string _messagerecipientidentifier ) { if ( _messagerecipientidentifier.contains("quiz_activity") ) { if ( percommunicatorusermessagestacktbl.get("quiz_activity") == null ) { homecoming null; } else { homecoming percommunicatorusermessagestacktbl.get("quiz_activity").getnewmessage(); } } else if ( _messagerecipientidentifier.contains("vms_activity") ) { if ( percommunicatorusermessagestacktbl.get("vms_activity") == null ) { homecoming null; } else { homecoming percommunicatorusermessagestacktbl.get("vms_activity").getnewmessage(); } } else // _messagerecipientidentifier = ontocept_activity { if ( percommunicatorusermessagestacktbl.get("ontocept_activity") == null ) { homecoming null; } else { homecoming percommunicatorusermessagestacktbl.get("ontocept_activity").getnewmessage(); } } } public string converttounicodetext( string str ) { stringbuffer ostr = new stringbuffer(); ( int i=0; i<str.length(); i++) { char ch = str.charat(i); if ((ch >= 0x0020) && (ch <= 0x007e)) // char need converted unicode? { ostr.append(ch); // no. } else // yes. { ostr.append("\\u") ; // standard unicode format. string hex = integer.tohexstring(str.charat(i) & 0xffff); // hex value of char. for(int j=0; j<4-hex.length(); j++) // prepend zeros because unicode requires 4 digits ostr.append("0"); ostr.append(hex.tolowercase()); // standard unicode format. //ostr.append(hex.tolowercase(locale.english)); } } homecoming (new string(ostr)); //return stringbuffer cast string. } private void storemessagefromontocept( string messageobjectstring ) { message messageobject = messageobjectgenerator.getinstance().parsemessagestringintomessageobject( messageobjectstring ); if ( messageobject.getrecipient().contains("ontocept_activity") ) { if ( this.percommunicatorusermessagestacktbl.get("ontocept_activity") == null ) { messagestack ms = new messagestack(); ms.addmessage( messageobject ); this.percommunicatorusermessagestacktbl.put( "ontocept_activity", ms ); } else { this.percommunicatorusermessagestacktbl.get("ontocept_activity").addmessage( messageobject ); } } else if ( messageobject.getrecipient().contains("vms_activity") ) { if ( this.percommunicatorusermessagestacktbl.get("vms_activity") == null ) { messagestack ms = new messagestack(); ms.addmessage( messageobject ); this.percommunicatorusermessagestacktbl.put( "vms_activity", ms ); } else { this.percommunicatorusermessagestacktbl.get("vms_activity").addmessage( messageobject ); } } else { if ( this.percommunicatorusermessagestacktbl.get("ontocept_activity") == null ) { messagestack ms = new messagestack(); ms.addmessage( messageobject ); this.percommunicatorusermessagestacktbl.put( "ontocept_activity", ms ); } else { this.percommunicatorusermessagestacktbl.get("ontocept_activity").addmessage( messageobject ); } } } public hashmap<string, messagestack> getmessagestack() { homecoming this.percommunicatorusermessagestacktbl; } }
this seems android app. required app active, or else communication server interrupted, causing message loss.
are sure listener called within background thread? connect server, asynctask connecttoontoceptasync runs , returns. on android, network communication must happen on separate thread. me looks suspicious.
i communication on background thread, stomp client private field of asynctask. instead of using listener interface prefer utilize blocking receive methods, give more detailed control.
java android activemq stomp
No comments:
Post a Comment