Monday, September 3, 2012

Simple JMS Producer on JBoss Teiid

With it's Continuous Execution feature, new Teiid 8.1 came closer to functionality which reminds me SQL Server Service Broker. In particular, it might be possible to write Teiid extensions which can serve as integration points between relational data from VDB and asynchronous data available in message-oriented middleware.
For example, Teiid translator can be a JMS client which can send messages into some queue. Another Teiid translator running in Continuous Execution mode can retrieve messages from queue.
Below is results of my experiments with first approach: to have a stored procedure which will act as JMS producer.

Configure JMS in JBoss

The  JBoss 7.1.1 has built-in JMS support with HornetQ, so it is only a matter of properly editing config files. The HornetQ stuff included in standalone-full.xml profile, and Teiid configuration is in standalone-teiid.xml, so I took simplest approach and merged all JMS-related parts from -full.xml into *-teiid.xml one. Also I defined a test queue "MyQueue":
 ....  
      <address-setting match="jms.queue.MyQueue">  
        <dead-letter-address>jms.queue.MyDLQ</dead-letter-address>  
        <redelivery-delay>0</redelivery-delay>  
        <max-delivery-attempts>3</max-delivery-attempts>  
        <max-size-bytes>10485760</max-size-bytes>  
        <address-full-policy>BLOCK</address-full-policy>  
        <message-counter-history-day-limit>10</message-counter-history-day-limit>  
      </address-setting>  
 ....  
  <jms-destinations>  
    <jms-queue name="MyQueue">  
      <entry name="java:jboss/exported/MyQueue"/>  
      <durable>false</durable>  
    </jms-queue>  
 ....  


It is worth to write a small producer and consumer java classes to test the queue and make sure all configured correctly. The JBoss Management console and this post might be helpful.

Write Teiid Translator

I made a simple translator which is a stored procedure, implements the ProcedureExecution and acts as a JMS producer. Queue name and message passed as stored procedure IN parameters.

 package com.rokhmanov.test.teiid.translator.async;  
   
 import java.util.ArrayList;  
 import java.util.List;  
   
 import javax.jms.JMSException;  
 import javax.jms.Queue;  
 import javax.jms.QueueConnection;  
 import javax.jms.QueueConnectionFactory;  
 import javax.jms.QueueSender;  
 import javax.jms.QueueSession;  
 import javax.jms.Session;  
 import javax.jms.TextMessage;  
 import javax.naming.Context;  
 import javax.naming.InitialContext;  
 import javax.naming.NamingException;  
   
 import org.teiid.language.Argument;  
 import org.teiid.language.Call;  
 import org.teiid.translator.DataNotAvailableException;  
 import org.teiid.translator.ExecutionContext;  
 import org.teiid.translator.ProcedureExecution;  
 import org.teiid.translator.TranslatorException;  
   
 public class MessageSendExecution implements ProcedureExecution {  
        
      public final static String CNN_FACTORY="/ConnectionFactory";  
        
      private String queueName;  
      private String messageContent;  
      private Call cmd;  
        
      private QueueConnectionFactory qconFactory;  
      private QueueConnection qcon;  
      private QueueSession qsession;  
      private QueueSender qsender;  
      private Queue queue;  
      private TextMessage msg;  
        
      public MessageSendExecution(Call command,  
                ExecutionContext executionContext, Object connection) {  
           this.cmd = command;  
             
     final List<Argument> procArguments = new ArrayList<Argument>(this.cmd.getArguments());  
     this.queueName = procArguments.get(0).getArgumentValue().getValue().toString();  
     this.messageContent = procArguments.get(1).getArgumentValue().getValue().toString();   
      }  
   
      public List<?> next() throws TranslatorException, DataNotAvailableException {  
           return null;  
      }  
   
      public void close() {  
           // TODO Auto-generated method stub  
      }  
   
      public void cancel() throws TranslatorException {  
           try {  
                qcon.close();  
           } catch (JMSException e) {  
                throw new TranslatorException(e);  
           }  
      }  
   
      public void execute() throws TranslatorException {            
           InitialContext ic = getInitialContext();  
           initJMS(ic, queueName);  
           sendMsg(messageContent);  
      }  
   
      public List<?> getOutputParameterValues() throws TranslatorException {  
           return null;  
      }  
   
        
      private InitialContext getInitialContext() throws TranslatorException  
      {  
           try {  
                return new InitialContext();  
           } catch (NamingException e) {  
                throw new TranslatorException(e);  
           }  
      }  
        
        
      public void initJMS(Context ctx, String queueName) throws TranslatorException  
      {  
           try {  
                qconFactory = (QueueConnectionFactory) ctx.lookup(CNN_FACTORY);  
                qcon = qconFactory.createQueueConnection();  
                qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);  
                queue = (Queue) ctx.lookup(queueName);  
                qsender = qsession.createSender(queue);  
                msg = qsession.createTextMessage();  
                qcon.start();  
           } catch (NamingException e) {  
                throw new TranslatorException(e);  
           } catch (JMSException e) {  
                throw new TranslatorException(e);  
           }  
      }  
        
        
      private void sendMsg(String userMessage) throws TranslatorException  
      {  
           try {  
                msg.setText(userMessage);  
                qsender.send(msg);  
                qcon.close();  
           } catch (JMSException e) {  
                throw new TranslatorException(e);  
           }  
      }  
 }  
   

This is the important parts from the Execution Factory:

      @Override  
      public ProcedureExecution createProcedureExecution(Call command,  
                ExecutionContext executionContext, RuntimeMetadata metadata,  
                Object connection) throws TranslatorException   
      {  
           return new MessageSendExecution(command, executionContext, connection);  
      }  
   
        
      @Override  
      public void getMetadata(MetadataFactory metadataFactory, Object conn)  
                throws TranslatorException   
      {      
     final Procedure messageSender = metadataFactory.addProcedure("messageSender");  
     metadataFactory.addProcedureParameter("queue", TypeFacility.RUNTIME_NAMES.STRING, Type.In, messageSender);  
     metadataFactory.addProcedureParameter("message", TypeFacility.RUNTIME_NAMES.STRING, Type.In, messageSender);  
      }  
   

To post the message, use this SQL statement:

 call Times.messageSender('java:jboss/exported/MyQueue', 'My Message');  

Then monitor JBoss Management console or use your sample java JMS client to retrieve a messsage from queue:

 Sep 03, 2012 10:34:45 PM org.xnio.Xnio <clinit>  
 INFO: XNIO Version 3.0.3.GA  
 Sep 03, 2012 10:34:45 PM org.xnio.nio.NioXnio <clinit>  
 INFO: XNIO NIO Implementation Version 3.0.3.GA  
 Sep 03, 2012 10:34:45 PM org.jboss.remoting3.EndpointImpl <clinit>  
 INFO: JBoss Remoting version 3.2.3.GA  
 JMS Ready To Receive Messages (To quit, send a "quit" message from QueueSender.class).  
   
       My Message  

Note: this translator code is not optimized for performance or thread-safety, I wrote it just for illustration purposes. Areas of potential improvement:
  • implement UpdateExecution to make a translator handle INSERT SQL statement (more intuitive approach);
  • reuse connection to Queue;

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.