Showing posts with label hornetq. Show all posts
Showing posts with label hornetq. Show all posts

Friday, April 19, 2013

Running HornetQ Bridge Under JBoss 7

Here is the explanation of Bridge from the HornetQ documentation: "Some messaging systems allow isolated clusters or single nodes to be bridged together, typically over unreliable connections like a wide area network (WAN), or the internet. A bridge normally consumes from a queue on one server and forwards messages to another queue on a different server. Bridges cope with unreliable connections, automatically reconnecting when the connections becomes available again".

My goal is to setup a bridge which will forward messages between two queues residing on a different physical servers over the TCP connection. I plan to use two HornetQ services running as part of the latest JBoss (EAP 6.1.0). The "jms-bridge" example from standalone HornetQ distribution has a sample configuration (standalone-example.xml) for JBoss to setup a bridge on a single JBoss instance with HornetQ service. We will have to modify it to support two servers over the network.

In my configuration I have two systems set up, let's name them "laptop1" and "remoteHost", each of them has JBoss with HornetQ service installed. Below are the changes to the configuration from "jms-bridge" example:
  1. Define new HornetQ connector and acceptor. Note that HornetQ has "netty-connector" defined for JBoss configuration, but seems it does not support "host" and "port" parameters, and rely on mandatory "socket-binding" attribute (see jboss-as-messaging_1_3.xsd from JBoss distribution for more details). That's why I had to redefine both connector and acceptor. For simplicity the acceptor has "0.0.0.0" for host to allow connections from everywhere. Also note that on second server the ports should be listed as opposite - 5457 for acceptor and 5456 for connector.
  2. <connector name="netty-bridge">
        <factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
        <param key="host" value="remoteHost" />
        <param key="port" value="5457" />
    </connector>
    <acceptor name="netty-bridge">
        <factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
        <param key="host" value="0.0.0.0" />
        <param key="port" value="5456" />
    </acceptor>
  3. Disable the HornetQ security, also for simplicity reasons. The proper approach will be to specify "user" and "password" as parameters for connector above.
  4. <hornetq-server>
            <security-enabled>false</security-enabled>
            ...
  5. Define two queues: "source" and "target". If desired, for the purposes of this example would be enough to have only one "source" query on local server and one "target" on remote. 
  6. <jms-queue name="sourceQueue">
        <entry name="queue/sourceQueue"/>
        <entry name="java:jboss/exported/jms/queues/sourceQueue"/>
    </jms-queue>
    <jms-queue name="targetQueue">
        <entry name="java:/queue/targetQueue"/>
         <entry name="java:jboss/exported/jms/queues/targetQueue"/>
    </jms-queue>
  7. Define a new connection factory to use in bridge.
  8. <connection-factory name="RemoteConnectionFactoryBridge">
        <connectors>
           <connector-ref connector-name="netty-bridge"/>
        </connectors>
        <entries>
           <entry name="RemoteConnectionFactoryBridge"/>
           <entry name="java:jboss/exported/jms/RemoteConnectionFactoryBridge"/>
        </entries>
    </connection-factory>
  9. Modify bridge definition added after "hornetq-system" tag in settings.xml.
  10. <jms-bridge name="myBridge">
        <source>
            <connection-factory name="ConnectionFactory" />
            <destination name="queue/sourceQueue" />
        </source>
        <target>
            <connection-factory name="RemoteConnectionFactoryBridge" />
            <destination name="queue/targetQueue" />
        </target>
        <quality-of-service>AT_MOST_ONCE</quality-of-service>
        <failure-retry-interval>1000</failure-retry-interval>
        <max-retries>7890</max-retries>
        <max-batch-size>1</max-batch-size>
        <max-batch-time>1000</max-batch-time>
    </jms-bridge>
After starting up both JBoss servers, watch for log message indicating that the bridge is started and connection between two HornetQ instances has been established:
18:54:55,621 INFO [org.hornetq.core.server] (MSC service thread 1-1) HQ221024: Started Netty Acceptor version 3.6.2.Final-c0d783c 0.0.0.0:5456 for CORE protocol
...
18:54:57,953 DEBUG [org.hornetq.core.client] (ServerService Thread Pool -- 61) Remote destination: rokan01-VM3762.ca.com/10.130.248.122:5457
...
18:54:58,723 INFO [org.jboss.messaging] (ServerService Thread Pool -- 61) JBAS011610: Started JMS Bridge myBridge
To test the configuration, send a sample message to a "sourceQueue" using for example Teiid messageSender from one of my previous articles:
call Times.messageSender('queue/sourceQueue', 'My Message1'); 
Now login to JBoss Management Console on your remoteHost (http://remoteHost:9990/console/App.html#jms-metrics) and make sure the messages are gets delivered over the bridge to remote host "targetQueue":


There might be a moment when you have only a "connector" side of the bridge up and running. The bridge will automatically reattempt to instantiate the connection every second (configurable by "failure-retry-interval" setting in bridge definition above), until the "acceptor" side of the bridge will be available.

Wednesday, September 12, 2012

Teiid and HornetQ Integration

In my previous article "Simpe JMS Provider on JBoss Teiid" you can learn about how to create a Teiid translator which will "convert" your SQL commands into messages and put them into queue. This article talks about opposite approach: how to retrieve messages from queue using SQL SELECT statements, so our integration pattern will be complete. The schema below explains the integration logic.
Our goal is to pass a specific command (CALL... or INSERT...) to Teiid with some parameters. This parameters will be converted in Teiid by our custom translator (JMS Producer) into the message, which will be immediately placed into queue. New player is another custom translator (JMS Consumer). Client can issue a Continuous Execution to Teiid, which will automatically repeat itself with some time interval. Each round of execution will retrieve messages from queue and pass back to client as execution results. Client gets notified by Teiid when result is available - client obtains a callback object from Teiid when the execution initiated. The StatementCallback.onRow() method gets executed when data is passed back to client.
The process can be further improved, if instead of retrieving messages periodically we will listen queue and retrieve message back to client right after it will appear. So instead of polling we will implement pushing approach.

Implementation Details

The custom translator code for JMS Consumer should implement MessageListener interface. The provided onMessage(Message msg) method will be called each time the message will appear on subscribed queue. Translator also implements ResultSetExecution (so it will appear as a view in Teiid Virtual Database), and ReusableExecution interfaces. The DataNotAvailableException.NO_POLLING will be thrown from the translator next() method, to put translator "into sleep". This is a non-blocking process and part of Continuous Execution functionality. To bring translator back to life, we will call ExecutionContext.dataAvailable() from onMessage() method.
The complete code of JMS Consumer Translator (MessageReceiveExecution.java) is attached, and below are the most important methods:
      public void execute() throws TranslatorException {  
           this.callNext = DataNotAvailableException.NO_POLLING;  
           this.callNext.setStrict(true);  
      }  
      public List<?> next() throws TranslatorException, DataNotAvailableException {  
           if (callNext != null) {  
                DataNotAvailableException e = callNext;  
                callNext = null;  
                throw e;  
           }  
           if (msgRow != null) {  
                List<Object> results = msgRow;  
                msgRow = null;  
                return results;  
           }  
           return null;  
      }  
      public void dispose() {  
           try {  
                qreceiver.close();  
                qsession.close();  
                qcon.close();  
           } catch (JMSException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
           }  
      }  
      public void onMessage(Message msg) {  
           try {  
                String messageContent;  
                if (msg instanceof TextMessage) {  
                     messageContent = ((TextMessage) msg).getText();  
                } else {  
                     messageContent = msg.toString();  
                }  
                System.out.println("!!!!!! MSG:" + messageContent);  
                setMessage(messageContent);  
           } catch (JMSException jmse) {  
                jmse.printStackTrace();  
           }  
           this.context.dataAvailable();            
      }  

To make sure it works I used a JBoss with HornetQ configuration from the previous article, and made a sample java servlet which connects to Teiid and executes a SELECT statement continuously. Also I have another SQL client application (SQuirreL or Eclipse Database Explorer) connected to Teiid, which I use to produce messages (CALL JMS Producer, like in my previous article).
Testing approach:
  • start JBoss/HornetQ/Teiid with deployed translator jar module and war file with servlet;
  • open browser, call servlet to issue a continuous execution;
  • run call statement from SQL client to send a message;
  • check servlet logs that message successfully retrieved by translator and passed back to servlet;
 MessageReceiveExecution (Google Docs).