Wednesday, September 12, 2012

Teiid and HornetQ Integration

In my previous article "Simpe JMS Provider on JBoss Teiid" you can learn about how to create a Teiid translator which will "convert" your SQL commands into messages and put them into queue. This article talks about opposite approach: how to retrieve messages from queue using SQL SELECT statements, so our integration pattern will be complete. The schema below explains the integration logic.
Our goal is to pass a specific command (CALL... or INSERT...) to Teiid with some parameters. This parameters will be converted in Teiid by our custom translator (JMS Producer) into the message, which will be immediately placed into queue. New player is another custom translator (JMS Consumer). Client can issue a Continuous Execution to Teiid, which will automatically repeat itself with some time interval. Each round of execution will retrieve messages from queue and pass back to client as execution results. Client gets notified by Teiid when result is available - client obtains a callback object from Teiid when the execution initiated. The StatementCallback.onRow() method gets executed when data is passed back to client.
The process can be further improved, if instead of retrieving messages periodically we will listen queue and retrieve message back to client right after it will appear. So instead of polling we will implement pushing approach.

Implementation Details

The custom translator code for JMS Consumer should implement MessageListener interface. The provided onMessage(Message msg) method will be called each time the message will appear on subscribed queue. Translator also implements ResultSetExecution (so it will appear as a view in Teiid Virtual Database), and ReusableExecution interfaces. The DataNotAvailableException.NO_POLLING will be thrown from the translator next() method, to put translator "into sleep". This is a non-blocking process and part of Continuous Execution functionality. To bring translator back to life, we will call ExecutionContext.dataAvailable() from onMessage() method.
The complete code of JMS Consumer Translator (MessageReceiveExecution.java) is attached, and below are the most important methods:
      public void execute() throws TranslatorException {  
           this.callNext = DataNotAvailableException.NO_POLLING;  
           this.callNext.setStrict(true);  
      }  
      public List<?> next() throws TranslatorException, DataNotAvailableException {  
           if (callNext != null) {  
                DataNotAvailableException e = callNext;  
                callNext = null;  
                throw e;  
           }  
           if (msgRow != null) {  
                List<Object> results = msgRow;  
                msgRow = null;  
                return results;  
           }  
           return null;  
      }  
      public void dispose() {  
           try {  
                qreceiver.close();  
                qsession.close();  
                qcon.close();  
           } catch (JMSException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
           }  
      }  
      public void onMessage(Message msg) {  
           try {  
                String messageContent;  
                if (msg instanceof TextMessage) {  
                     messageContent = ((TextMessage) msg).getText();  
                } else {  
                     messageContent = msg.toString();  
                }  
                System.out.println("!!!!!! MSG:" + messageContent);  
                setMessage(messageContent);  
           } catch (JMSException jmse) {  
                jmse.printStackTrace();  
           }  
           this.context.dataAvailable();            
      }  

To make sure it works I used a JBoss with HornetQ configuration from the previous article, and made a sample java servlet which connects to Teiid and executes a SELECT statement continuously. Also I have another SQL client application (SQuirreL or Eclipse Database Explorer) connected to Teiid, which I use to produce messages (CALL JMS Producer, like in my previous article).
Testing approach:
  • start JBoss/HornetQ/Teiid with deployed translator jar module and war file with servlet;
  • open browser, call servlet to issue a continuous execution;
  • run call statement from SQL client to send a message;
  • check servlet logs that message successfully retrieved by translator and passed back to servlet;
 MessageReceiveExecution (Google Docs).

No comments:

Post a Comment