JEE. Mensajería JMS

Introducción JMS

En JEE el concepto de mensajería se refiere a la comunicación con poco acoplamiento entre emisor y receptor y totalmente asíncrona. Se podría definir su comportamiento como buzón de voz, donde un emisor deja un mensaje para que un receptor lo recoja cuando pueda. En terminología JEE tenemos un producer (emisor) que almacena un mensaje en un destino, un almacén específico. Posteriormente un consumer (receptor) comprueba si hay algún mensaje para él en el destino.

 

Modelos de mensajería

Un modelo se define por la forma en que se comunican los producers y los consumers:

 

  • point-to-point (punto a punto): el mensaje va de un emisor A a un receptor B, quedando almacenado en una cola (queue). La cola no garantiza el orden de entrega de los mensajes (sí, mala elección del nombre). Si existe más de un receptor potencial se elige uno de forma aleatoria. Una vez entregado el mensaje desaparece de la cola. La analogía sería un mensaje en una botella, lanzada al mar (queue) que es recibida por alguien desconocido en una playa. Toda persona en la playa es un posible receptor pero no sabemos quién la recibirá.
  • publish-subscriber: lo más parecido a publicar algo en un chat. El emisor (publisher) envía el mensaje a un “topic”, un almacen temático de mensajes. Los múltiples receptores (subscribers) que estén conectados en ese momento reciben el mensaje. Una vez enviado el mensaje se pierde.

La API JMSes para la mensajería lo que JDBC es para las bases de datos. Proporciona un acceso simple y uniforme a los servicios de mensajería y es soportado por la mayoría de sistemas.

 

Message-Driven Bean

Los MDB están pensados para consumir los mensajes creados usando JMS. De hecho son más potentes, usando JCA (Java EE Connector Arquitecture) pueden recibir mensajes de cualquier EIS (Enterprise Information Service) lo que los hace muy versátiles y facilita la integración con sistemas obsolescentes. Pero la mayoría de las veces los usareis para recibir mensajes desde JMS.

Existen varias razones para usar MDB frente a otras soluciones. La primera es su sistema de pooling, transparente al programador, que facilita el proceso de mensajes en paralelo al usar múltiples instancias del MDB. También automatizan el proceso de conectar al almacén de mensajes y leer los pendientes, reduciendo los errores y el tiempo de desarrollo. Y por último gestionan el proceso de lectura de mensajes (saber cuando hay que leer el almacén ya que hay nuevos mensajes) de forma interna, sin necesidad de configuración extra.Como los Session EJB, los MDB son POJOs con ciertas normas:

  • Deben implementar una interficie de escucha de mensajes, directamente (implements) o mediante anotaciones
  • No puede ser una clase final o abstract
  • Debe ser un POJO, nunca una subclase de otro MDB
  • Debe ser una clase pública
  • Debe tener un constructor sin argumentos
  • No puede tener un método “finalize”
  • Debe implementar todos los métodos de la interface como métodos públicos no estáticos ni finales
  • No debe lanzar ninguna excepción del tipo RemoteException o RuntimeException

 

Ciclo de vida de un MDB

Ahora que ya hemos visto como funcionan los MDB es hora de explicar brevemente su ciclo de vida. El ciclo de vida de los MDB tiene tan sólo 3 fases: no existe, desocupado y ocupado. Un bean en estado “no existe” no ha sido instanciado por el contenedor. Una vez instanciado pasa a la fase “desocupado” donde espera un mensaje. Cuando lo recibe, pasa a la fase “ocupado” mientras lo procesa. Una vez lo ha procesado vuelve a la fase “desocupado”. Si se necesita liberar recursos en el contenedor los beans de la fase “desocupado” pueden ser destruidos e ir a la fase “no existe”.

El control de estas fases del ciclo se hace mediante dos callback:

@PostConstruct y @PreDestroy. El primero se ejecuta cuando el bean llega a la fase “desocupado” por primera vez. El segundo cuando estamos a punto de volver a la fase “no existe”.

 

Ejemplo de implementación

 

package app3.beans;

 

import javax.ejb.ActivationConfigProperty;

import javax.ejb.MessageDriven;

import javax.jms.Message;

import javax.jms.MessageListener;

import javax.jms.TextMessage;

 

@MessageDriven(mappedName = “jms/demoQueue”, activationConfig =  {

@ActivationConfigProperty(propertyName = “acknowledgeMode”, propertyValue = “Auto-acknowledge”),

@ActivationConfigProperty(propertyName = “destinationType”, propertyValue = “javax.jms.Queue”)

})

 

public class MensajeroBean implements MessageListener {

public MensajeroBean() { }

 

public void onMessage(Message message) {

try{

if (message instanceof TextMessage){

System.out.println( “>>” +((TextMessage)message).getText() );

System.out.println(“<tratado>”);

}

}catch(Exception e){

System.out.println(“—-> error: ” + e.getMessage());

}

}

}

El anterior código muestra un mensaje de Texto que ha llegado a la cola de mensajería. Es evidente pensar que no sólo puede ser texto lo que el MDB puede tratar, sino cualquier tipo de mensaje que el cliente emita.

Como se puede apreciar la principal diferencia con un Session Bean esta en la anotación inicial, más compleja. La complejidad adicional es debido a que la anotación además de definir el nombre indica a que “almacén” estamos escuchando, en este caso una Queue. El resto del código es muy similar. La clase implementa la inteface MessageListener dela API JMS, para indicar que recibiremos mensajes. Se usa inyección de dependencias para crear un contexto (MessageDrivenContext) y dar acceso a unos recursos JNDI (la conexión al datasource). Tenemos dos métodos para ejecutar en el ciclo de vida del bean, uno después de construir el objeto (initialize) y otro antes de destruirlo (cleanup). Y por último un método exigido por la interficie (onMessage) que contiene la lógica de negocio del bean.

Por supuesto  los recursos físicos tales como las colas de mensajería  las factorías de conexiones  etc., deberán estar dadas de alta y disponibles bajo la atenta mirada del administrador del servidor de aplicaciones.

Sobre las propiedades de la anotacion ActivationConfigProperty, cada una indica un parámetro de configuración. Existen las siguientes:

  •  destinationType: indica el tipo de almacén (Queue, Topic)
  •  connectionFactoryJndiName: indica la factoría de conexiones JMS que usará el MDB
  •  destinationName: dirección del almacén de mensajes (jndi)
  •  acknowledgeMode: cómo se indica al almacén que hemos procesado el mensaje para que lo borre. Hay dos valores: AUTO_ACKNOWLEDGE (por defecto, lo hace solo) y DUPS_OK_ACKNOWLEDGE (como auto pero menos riguroso, permite la existencia de envíos duplicados)
  •  subscriptionDurability: sólo para Topics, puede ser “non_durable” (default) o “durable”. En el segundo caso el mensaje no se borra hasta que todo subscriptor del tipo “durable” lo ha leído, aunque no estuviera online en el momento de su emisión.
  • messageSelector: permite filtrar los mensajes que consume nuestro MDB

 

Envío de mensajes

A continuación vemos un código que actúa de cliente para un servicio de mensajería. Debemos hacer notar el hecho de que el cliente no tiene sentido práctico, sino más bien conceptual. Pretende ilustrar un servlet que, tras la invocación por el cliente web, generará un mensaje de tipo texto y lo enviará al servicio de mensajería, a un destino físico que será el mismo al que el MDB hemos asociado en el ejercicio anterior.

 

package app3.web;

import java.io.*;

import javax.annotation.Resource;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import javax.servlet.*;

import javax.servlet.http.*;

 

 

public class ClienteMensajeServlet extends HttpServlet {

@Resource(name=”jms/demoConnectionFactory”)

ConnectionFactory cf;

@Resource(name=”jms/demoQueue”)

Destination destino;

protected void processRequest(HttpServletRequest request, HttpServletResponse response)

throws ServletException, IOException {

// Connection y Session

Connection connection = null;

Session session = null;

response.setContentType(“text/html;charset=UTF-8″);

PrintWriter out = response.getWriter();

try {

// Creamos la conexion

connection = cf.createConnection();

 

// Creamos la session

session = connection.createSession(true,

Session.AUTO_ACKNOWLEDGE);

 

// Creamos el productor

MessageProducer productor = session.createProducer(destino);

 

// Creamos el ObjectMessage

TextMessage obj = session.createTextMessage();

 

obj.setText(“Hola son las “+new java.util.Date());

// Establecemos el ObjetMessage

// Enviamos el mensaje

productor.send(obj);

 

// Cerramos la sision y la conexion

session.close();

connection.close();

out.println(“<pre>”);

out.println(“Mensaje eviado”);

out.println(“</pre>”);

}

 

catch (JMSException ex)   {

ex.printStackTrace(out);

}

finally {

out.close();

}

}

 

protected void doGet(HttpServletRequest request, HttpServletResponse response)  throws ServletException, IOException {

processRequest(request, response);

}

protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {

processRequest(request, response);

}

 

}

Comments are closed.