`
hellohank
  • 浏览: 143926 次
  • 性别: Icon_minigender_1
  • 来自: 南京
社区版块
存档分类
最新评论

jms与ActiveMQ实践与应用

 
阅读更多

前言

这是我自己从不知道JMS为何物到学习如何使用第三方工具实现跨服务器的知识总结,在整个过程中可能考虑不全。另外,如果想尽快使用JMS,建议直接看实例那一节就可以了。有问题多交流。

词语解释

(有些词可能用的不是很正确,在这里我把自己能意识到的词拿出来解释一下):

1、  跨服务器:专业术语好像叫“跨实例”。意思是,可以在多个服务器(可以是不同的服务器,如resintomcat)之间相互通信。与之对应的是单服务器版。

2、  消息生产者:就是专门制造消息的类。

3、  消息消费者:也叫消息接收者,它主要是实现了消息监听的一个接口,当然,也可以难过Spring提供的一个转换器接口指定任意一个类中的任意方法。

我们都知道,任何一个系统从整体上来看,其实质就是由无数个小的服务或事件(我们可以称之为事务单元)有机地组合起来的。对于系统中任何一个比较复杂的功能,都是通过调用各个独立的事务单元以实现统一的协调运作而实现的。

现在我们的问题是,如果有两个完全独立的服务(比如说两个不同系统间的服务)需要相互交换数据,我们该如何实现?

好吧,我承认,我很傻很天真,我想到的第一个方法就是在需要的系统中将代码再写一遍,但我也知道,这绝对不现实!好吧,那我就应该好好学习学习达人们是如何去解决这样的问题。

第一种方法,估计也是用的最多的,就是rpc模式。这种方法就是在自己的代码中远程调用其它程序中的代码以达到交换数据的目的。但是这种方法很显然地存在了一个问题:就是一定要等到获取了数据之后才能继续下面的操作。当然,如果一些逻辑是需要这些数据才能操作,那这就是我们需要的。

第二种方法就是Hessian,我个人觉得Hessian的实现在本质上与rpc模式的一样,只是它采用了配置,简化了代码。

上面这两个方法,基本上能解决所有的远程调用的问题了。但是美中不足的是,如果我在A系统中有一个操作是需要让B系统做一个响应的,但我又不需要等它响应完才做下面的操作,这该怎么办?于是新的解决方案就需要被提出来,而SUN公司的设计师们也考虑到了,在JAVA中这就被体现为JMSjava message service)。

一、认识JMS

JMS模块的功能只提供了接口,并没有给予实现,实现JMS接口的消息中间件叫JMS Provider,这样的消息中间件可以从Java里通过JMS接口进行调用。

JMS消息由两部分构成:headerbodyheader包含消息的识别信息和路由信息,body包含消息的实际数据。

JMS的通用接口集合以异步方式发送或接收消息。另外, JMS采用一种宽松结合方式整合企业系统的方法,其主要的目的就是创建能够使用跨平台数据信息的、可移植的企业级应用程序,而把开发人力解放出来。

Java消息服务支持两种消息模型:Point-to-Point消息(P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub,也就是广播模式)。

根据数据格式,JMS消息可分为以下五种:

BytesMessage   消息是字节流。

MapMessage   消息是一系列的命名和值的对应组合。

ObjectMessage   消息是一个流化(即继承Serializable)Java对象。

StreamMessage   消息是Java中的输入输出流。

TextMessage   消息是一个字符串,这种类型将会广泛用于XML格式的数据。

二、使用JMS

在使用JMS时,其步骤很像使用JDBC一样,需要的步骤为:

1、建立消息连接(也就是建立连接工厂);

2、设定消息目的地(其实与步骤1中用的类是一样的,只是它是用来指定目的地,而步骤1中是用来指定消息服务器地址的)

3、创建jmsTemplate实例(为下一步构建消息sessin作准备);

4、创建消息生产者(其中就用到了23两步的产物),它就是一个普通的类,一般是通过send方法发送消息,也可以通过MessageListenerAdapter指定发送信息的方法;

5、创建MDP(也就是消息接收者,它是一个必须实现MessageListener接口的类);

6、为每个MDP建立一个监听容器,当有相应的消息传来,则它会自动调用对应的MDP消费消息。

整个过程就像编写JDBC一样,代码维护量很大。为此,让Spring对其进行管理是个不错的选择。

三、Spring整合JMS

Spring框架提供了一个模板机制来隐藏Java APIs的细节。开发人员可以使用JDBCTemplateJNDITemplate类来分别访问后台数据库和JEE资源(数据源,连接池)。JMS也不例外,Spring提供JMSTemplate类,因此开发人员不用为一个JMS实现去编写样本代码。接下来是在开发JMS应用程序时Spring所具有一些的优势。

1. 提供JMS抽象API,简化了访问目标(队列或主题)和向指定目标发布消息时JMS的使用。

2. 开发人员不需要关心JMS不同版本(例如JMS 1.0.2JMS 1.1)之间的差异。

3. 开发人员不必专门处理JMS异常,因为Spring为所有JMS异常提供了一个未经检查的异常,并在JMS代码中重新抛出

具体的详细步骤与方法参考 spring-reference2.5.pdf 中的第十九章。

下面,我就将我在整个学习过程中实践过的例子一一列举出来,并将在其中遇到的问题和心得给出一定的说明,希望对大家能有所帮助。

四、实例

(一)、配置单服务器版消息机制

1、首先,我们需要配置resin下的resin.conf文件,在其中(<server></server>之间)加上:

<!-- The ConnectionFactory resource defines the JMS factory for creating JMS connections -->

<resource jndi-name="jms/factory"

    type="com.caucho.jms.ConnectionFactoryImpl">

</resource>

<!-- Queue configuration with Resin's database  -->

<resource jndi-name="jms/queue"

    type="com.caucho.jms.memory. MemoryQueue">

<init>

    <queue-name>OssQueue</queue-name>

    </init>

</resource>

<!-- Queue configuration with Resin's database  -->

<resource jndi-name="jms/topic"

    type="com.caucho.jms.memory. MemoryTopic">

<init>

    <queue-name>ossTopic</queue-name>

    </init>

</resource>

注:i、我现在只知道JNDI方式配置消息的连接工厂,我并不知道有没有其它的方式,但我看了许多资料上也没提到其它配置方式。

ii、网上很少有关于在resin中配置JMS消息工厂的资料,只有在resin的官网上才能见到。

iii、上面JNDI配置的地方需要注意的是,大家如果在网上看资料的话,可能会发现网上会比我给出的总是会多一些,也就是总是多一些<data-source>的初始化配置,如:

<resource jndi-name="jms/factory"

    type="com.caucho.jms.ConnectionFactoryImpl">

  <init>

    <data-source>jdbc/database</data-source>

  </init>

</resource>

就这样的配置,单独启动resin是没有问题的,但是如果将其按照下面的Spring配置加到系统中,就会出异常(具体的异常名称我忘了,中文的大概意思是:数据库对象不能转换成JMS连接对象,还有一种情况是启动系统时会内存溢出)。我认为这种配置可能是数据库消息模式的配置(因为JMS有内存和数据库两种管理方式,我目前只学习了内存管理的方式,至于数据库管理方式大家要是有兴趣可以参考:

http://www.oracle.com/technology/books/pdfs/2352_Ch06_FINAL.pdf

2、在web.xml文件中配置一个spring用的上下文:

<context-param>

    <param-name>contextConfigLocation</param-name>

    <param-value>/WEB-INF/jmsconfig.xml</param-value>

</context-param>

<!-- 配置Spring容器 -->

<listener>

    <listener-class>

    org.springframework.web.context.ContextLoaderListener

</listener-class>

</listener>

注:我是将jmsconfig.xml加载到service.xml中随系统启动的。

3、创建jmsconfig.xml用来装配jms,内容如下:

<?xml version="1.0" encoding="UTF-8"?>  

<!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN"

    "http://www.springframework.org/dtd/spring-beans.dtd">  

 

<beans>

    <bean id="jmsConnectionFactory"

 class="org.springframework.jndi.JndiObjectFactoryBean">

 <property name="jndiName">

     <value>java:comp/env/jms/factory </value>

 </property>

    </bean>

   

    <bean id="destination" class="org.springframework.jndi.JndiObjectFactoryBean">

 <property name="jndiName">

     <value> java:comp/env/jms/queue</value>

 </property>

    </bean>

   

    <!--  Spring JmsTemplate config -->

    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

 <property name="connectionFactory">

     <bean

  class="org.springframework.jms.connection.SingleConnectionFactory">

  <property name="targetConnectionFactory"

      ref="jmsConnectionFactory"/>

     </bean>

 </property>

    </bean>

   

    <!-- POJO which send Message uses Spring JmsTemplate --> <!--配置消息生产者-->

    <bean id="messageProducer" class="com.focustech.jms.MessageProducer">

 <property name="template" ref="jmsTemplate"/>

 <property name="destination" ref="destination"/>

    </bean>

   

    <!--  Message Driven POJO (MDP) -->

    <bean id="messageListener" class=" com.focustech.jms.MessageConsumer"/>

   

    <!--  listener containerMDP无需实现接口 -->

    <bean id="listenerContainer"

 class="org.springframework.jms.listener.DefaultMessageListenerContainer">

 <property name="connectionFactory" ref="jmsConnectionFactory"/>

 <property name="destination" ref="destination"/>

 <property name="messageListener" ref="messageListener"/>

    </bean>

</beans>

其中:

1)   jmsConnectionFactorydestination都是使用自定义的,而且你会发现,这两个对象的加载类其实是一样的,都是JndiObjectFactoryBean,这是从JNDI读取连接的意思。

3)  MessageProducer是消息发送方。

4)  MessageConsumer实现了一个MessageListener,监听是否收到消息。

4、发送和接收消息的class如下(主要代码)

MessageProducer.java

public class MessageProducer {

    private JmsTemplate template;

    private Destination destination;

 

    public void send(final String message) {

 template.send(destination, new MessageCreator() {

     public Message createMessage(Session session) throws JMSException {

  Message m = session.createTextMessage(message);

  return m;

     }

 });

    }

 

    public void setDestination(Destination destination) {

 this.destination = destination;

    }

 

    public void setTemplate(JmsTemplate template) {

 this.template = template;

    }

 

}

 

MessageConsumer.java

public class MessageConsumer implements MessageListener {

 

    public void onMessage(Message message) {

        try

       {

           System.out.println(((TextMessage) message).getText());

       }

       catch (JMSException e)

       {

       }

    }

}

注:在上面的实例类中,由于在发送方发送的是文本消息(TextMessage),所以在上面的接收者代码中我直接将其转换成TextMessage就行了。如果是在真正的环境下,应该首先判断一下对方发送的是什么类型,然后才转换成对应的消息。

5、测试消息

为了测试的方便,可以在webroot下新建一个test.jsp,然后将下面的代码放到JSP的代码中,然后在网页地址栏中输入链接(如:http://oss.vemic.com/test.jsp 注:oss.vemic.com是本地服务器链接)就可以看到发送的消息了。

<%

try {

     ServletContext servletContext = this.getServletContext();

     WebApplicationContext wac = WebApplicationContextUtils

      .getRequiredWebApplicationContext(servletContext);

     MessageProducer mp = (MessageProducer) wac.getBean("messageProducer");

           mp.send("JMS TEST!!");

      } catch (JmsException e) {

  }

%>

(二)、配置跨服务器(即两个或多个resin之间)版消息机制

上面介绍的是单服务器的消息模式配置,使用消息模式,是因为我们需要在两个或多个服务器之间进行消息的传递,而不是单个服务器内容的消息传递。看过很多资料才发现,几乎所有的服务器都不支持跨服务器的消息模式,就算有(如JBoss),那也是因为它们本身集成了第三方的工具而实现的。而在第三方软件里面,我最终选择了apache activeMQ

apache activeMQ的简介可以去其官网查看:http://activemq.apache.org/

或参考:http://www.blogjava.net/cctvx1/archive/2007/02/07/98457.html

IActiveMQ的安装与配置

在其官网上下载最新的对应系统的版本。一般来说,下载完解压之后就可能通过运行:apache-activemq-5.2.0\bin\ activemq.bat就可以成功启动。具体详细的信息参考:

http://andyao.javaeye.com/blog/153171,或者也可参考其官网。

II、整合SpringJMS消息发送

在真正操作之前,为了不至于糊涂,我们应该忘掉前面所说的所有配置(当然,JMS的基础知识我们还是应该记住的,因为所有的JMS操作都是基于此的。还有那两个生产消息与消费消息的类与测试页面我们也要保留,因为我们下面还需要它们),好吧,现在将所有的配置回归到开始的状态吧(如:resin.conf, JMSSpring中的配置等等都回到原始状态吧)。

先说一下我运行时所需的环境吧:JDK1.5.0_12JDK1.6.0_05都可以;resin-3.0.25(其它版本没有试过);配置ActiveMQ所需的JAR包有:

activemq-core-5.2.0.jaractivemq-web-5.2.0.jargeronimo-j2ee-management_1.0_spec-1.0.jargeronimo-jms_1.1_spec-1.1.1.jargeronimo-jta_1.0.1B_spec-1.0.1.jarxbean-spring-3.4.jar

好了,一切准备就绪了。那就让ActiveMQ先在系统中运行吧(也就是先单服务器运行,先易后难嘛)。为了让它能够运行起来,我们需要做以下的准备工作:

1、 使用ActiveMQJMSSpring中的配置

其实这里的许多配置和上面说的单服务器的配置是差不多的,只是这里不再需要配置resinweb.xml的配置与上面的一模一样(当然,我还是按照我的方式配置在了service.xml中),好了,现在不同的配置是jmsconfig.xml

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

       <!-- 配置ActiveMQ服务 -->

       <amq:broker useJmx="false" persistent="false">

              <amq:transportConnectors>

<!-- ActiveMQ目前支持的transport有:VM TransportTCP TransportSSL TransportPeer TransportUDP TransportMulticast TransportHTTP and HTTPS TransportFailover TransportFanout TransportDiscovery TransportZeroConf Transport-->

                     <amq:transportConnector uri="tcp://test.vemic.com:61616" />

              </amq:transportConnectors>

       </amq:broker>

       <!-- 配置JMS连接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,因为这里指定连接的对象) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息发送的目的地(注:”amq:queue”是用于指定是发送topic还是queue -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 创建JMSSession生成类,也就是jmsTemplate -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

       class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

       </bean>

       <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) -->

       <bean id="messageProducer"

              class="com.focustech.jms.MessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收类(这个类需要继承javax.jms.MessageListener,当然也可以通过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->

       <bean id="messageListener"

              class=" com.focustech.jms.MessageConsumer">

       </bean>

       <!-- 消息监听容器,其各属性的意义为:

              connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ

              destination:监听的消息模式;

              messageListener:接收者

              -->

       <bean id="listenerContainer"

       class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="messageListener" />

       </bean>

</beans>

注:test.vemic.com是我本机的URL,和localhost一样。

2、 消息测试

采用上面单机测试的消息就可以了。最终运行的结果为:

JMS TEST!!

注:

注意到了没有?上面的配置从jmsTemplate开始往下就和前面介绍过的单服务器的配置一样了。看到这里,我相信大家对JMS的工作过程应该很清楚了。我个人认为我们可以简单的这样理解其工作过程:

生产者

 

消费者

 

JMS connectionFactory

产生消息并发往指定的目的地

接收消息并给出已消费确认信息,JMS connectionFactory收到确认信息后就将对应的信息从自己的管理库中删除

从上面的图很清楚地看出,要想实现跨服务器的JMS消息机制,JMS connectionFactory是关键的地方,简单地说:connectionFactory就决定了JMS的作用范围。如果connectionFactory是受制于系统(也就是说,当系统停掉之后connectionFactory也就跟着销毁),那么它就不能实现跨服务器功能。要想实现跨服务功能,connectionFactory就必须独立于系统或服务器。由此可见,结合前面的知识,我们就可以知道,能够实现跨服务器的JMS消息机制其实有两种方式:JDBC方式和采用第三方工具。前面我也说过,我选择了后者(而且我也一直这么做了)。

3、  实现多服务器的JMS共享,即实现JMS跨服务器功能

ActiveMQ的单服务器版我们已经成功搭建并能成功运行了。现在让我们实现JMS跨服务器功能吧。等等,我们先准备另一个服务器环境。为了明显的区别两个服务,我将上面所有的环境重新弄了一份(一个新的MyEclipse;一个新的web工程,当然web工程里面的环境与上面的一样;一个新的resin;一个新的resin端口8081,上面的resin端口是80),我称之为client

接下来,我们来配置client中的JMS。在所有的系统配置中只有一个配置文件与上面的有区别,那就是jmsconfig.xml

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://people.apache.org/repository/org.apache.activemq/xsds/activemq-core-4.1-incubator-SNAPSHOT.xsd">

       <!-- 配置JMS连接工厂(注:brokerURL是关键,它应该是上面的amq:transportConnectors里面的值之一对应,因为这里指定连接的对象) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息发送的目的地(注:” amq:queue”是用于指定是发送topic还是queue,对应上面配置中的amq:destinations -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 创建JMSSession生成类 -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

       class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

       </bean>

       <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) -->

       <bean id="messageProducer"

              class="com.focustech.jms.MessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收类(这个类需要继承javax.jms.MessageListener,当然也可以通过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->

       <bean id="messageListener"

              class="com.focustech.jms.MessageConsumer">

       </bean>

       <!-- 消息监听容器,其各属性的意义为:

              connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ

              destination:监听的消息模式;

              messageListener:接收者

              -->

       <bean id="listenerContainer"

       class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="messageListener" />

       </bean>

</beans>

注意了!这个配置与上面的ActiveMQ的配置只有一个地方不一样,也就是:这个配置中没有配置ActiveMQ的相关信息。为什么?结合上面所述的JMS简单工作方式,我们应该不难得到答案:因为ActiveMQ要实现跨服务器就必须独立运行,所以我们只需要启动一个就够了。

注:

其实在这个简单的跨服务器的例子中,其中一方只需要配置消息生产者,而另一方只需要配置消息的消费者就可以测试通过了,而且测试效果会很明显:在消息生产者的那个服务器上运行测试程序,在消息接收的服务器上就会有相应的响应!在这里我之所以这样做,是让大家在测试时发现一个奇怪的现象:当在一端运行测试程序,第一个消息会被当前运行测试程序的服务消费掉,而接下来的消息又被另一个服务器消费掉,如此循环(我没有测试过三个及以上的服务器运行情况,我想可能是消息一个个的均摊下去的吧)。之所以要让大家看到这个现象,是为了让大家有个疑问,容易接受后面高级应用中关于JMS的两种消息机制:Point-to-Point消息(P2P)和发布订阅消息(Publish Subscribe messaging,简称Pub/Sub,也就是广播模式)的使用方法。

III、高级应用

从这里开始,我们将进入JMS消息的一些特殊用法,或者叫高级应用。在介绍这些应用的时候,我们会用到上面已经布署好的跨服务器的应用来作例子。为了区别两个服务器,我们把配置有ActiveMQ的叫server,另一个还是叫client

1、使用指定的消息,即消息的P2PPub/Sub的应用

上节内容的最后给大家留下了一个有趣的现象。在这里,我们将针对这个现象进行详细的解析。

我们前面也知道了,消息模式有两种,但怎么使用却一直没提过。但是如果仔细的看过官网的资料也许已经知道一些了。在这里,我将用实例的方式给大家展现一下它的具体使用。(参考:http://andyao.javaeye.com/blog/234101

首先,我们来看Queue消息的使用实例。上面的跨服务器的实例其实就是queue实例的应用,但问题是:如何指定唯一的接收者呢?也就是不能出现上面提到的那个奇怪的循环现象呢?

其实这个现象也并不难回答,首先让我们来仔细看一下queue消息的目的地配置:

<amq:queue name="destination" physicalName="ossQueue" />

对于上面的配置,我们可以一一解读其中各参数的含义就知道奥妙所在了:

amq:queue:表示这是配置是queue消息;

name:指定的消息发送与接收的目的地的名称;

physicalName:指定消息队列的物理名称,在ActiveMQ中它就是一个消息集群的表示形式。

根据上面配置的含义,我们不难发现,其实奥妙就在physicalName这个属性中体现的。具体来说,对于queue消息而言,只要发送方与接收方都使用同一个physicalName,这就是点对点指定了。例如:将上面的例子中client中的:

<amq:queue name="destination" physicalName="ossQueue" />

改成:

<amq:queue name="destination" physicalName="ossQueue1" />

这样的话,我们上面说的“奇怪的现象”就不会存在了。因为server中消息是发送到ossQueue这个消息队列里的,而client中消息目的地是指向ossQueue1的,当然就收不到server里面ossQueue中的消息了。

我们再来看一下如何使用Topic消息。其实很简单,只要将第II节配置中的

<amq:queue name="destination" physicalName="ossQueue" />

改成:

<amq:topic name="destination" physicalName="ossQueue" />

就可以了。运行测试程序时,会发现在两个服务都会收到响应信息。

注:关于topic,有一个概念,叫“订阅”。关于这个词我也不是很了解,但我理解是:在系统中配置了amq:topic并且connectionFactory指定到对应的uri上的前提上,只要amq:topic中对应的physicalNamepublish端相同,这就是订阅,这样的配置之后它就能收到发送的信息了。

总之一句话,点对点(或发布订阅模式)的消息发送关键在于收发双方是否共同指定同一个physicalName

2、自定义消息的收发类

正如前面例子中配置文件中的一行注释说的一样,消息的收发类用户可以自定义,它是通过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发。具体的操作我们还是来看实例吧(为了简单起见,我们以单服务器的配置与运行作实例,跨服务配置是一样的):

jmsconfig.xml

<beans xmlns="http://www.springframework.org/schema/beans"

       xmlns:amq="http://activemq.org/config/1.0"

       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

  http://activemq.org/config/1.0 http://activemq.apache.org/schema/core/activemq-core-5.0.0.xsd">

       <!-- 配置ActiveMQ服务 -->

       <amq:broker useJmx="false" persistent="false">

              <amq:transportConnectors>

                     <!-- 提供的连接方式有:VM TransportTCP TransportSSL Transport

                            Peer TransportUDP TransportMulticast TransportHTTP and HTTPS Transport

                            Failover TransportFanout TransportDiscovery TransportZeroConf Transport -->

                     <amq:transportConnector uri="tcp://test.vemic.com:61616" />

              </amq:transportConnectors>

       </amq:broker>

       <!-- 配置JMS连接工厂(注:brokerURL是关键,

它应该是上面的amq:transportConnectors里面的值之一) -->

       <amq:connectionFactory id="jmsConnectionFactory"

              brokerURL="tcp://test.vemic.com:61616" />

       <!-- 消息发送的目的地(注:amq:queue是用于指定是发送topic不是queue,对应上面配置中的amq:destinations -->

       <amq:queue name="destination" physicalName="ossQueue" />

       <!-- 创建JMSSession生成类 -->

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

                            class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

              <!-- 指定发送信息时使用的消息转换类.

                     这个选项不填的话,默认的是:SimpleMessageConverter,它只支持4种类型的对象:String, byte[],Map,Serializable

              -->

              <!—如果加上下面这段配置就会出错, 错误原因是Book不是一个原始类, 但我已经将它继承Serializable,可还是不行, 我想可能有其他什么原因吧, 但我现在不清楚 -->

              <!-- <property name="messageConverter"

                     ref="resourceMessageConverter" /> -->

       </bean>

       <!-- 发送消息的转换类

(这个类要继承org.springframework.jms.support.converter.MessageConverter -->

       <bean id="resourceMessageConverter"

              class=" com.focustech.jms.ResourceMessageConverter" />

       <!-- 消息生产者(通过指定目的地, 就可以同时指定其发送的消息模式是topic还是queue) -->

       <bean id="resourceMessageProducer"

              class=" com.focustech.jms.ResourceMessageProducer">

              <property name="template" ref="jmsTemplate" />

              <property name="destination" ref="destination" />

       </bean>

       <!-- 消息接收类(这个类需要继承,当然也可以通过MessageListenerAdapter指定消息转换器来实现用户自定义的消息收发) -->

       <bean id="resourceMessageListener"

              class="org.springframework.jms.listener.adapter.MessageListenerAdapter">

              <constructor-arg>

                     <bean

                            class=" com.focustech.jms.ResourceMessageConsumer">

                     </bean>

              </constructor-arg>

              <property name="defaultListenerMethod" value="recieve" />

              <!—自定义接收类与接收的方法 -->

              <property name="messageConverter"

                     ref="resourceMessageConverter" />

       </bean>

       <!-- 消息监听容器,其各属性的意义为:

              connectionFactory:指定所监听的对象,在这里就是监听连接到tcp://test.vemic.com:61616上面的ActiveMQ

              destination:监听的消息模式;

              messageListener:接收者

       -->

       <bean id="listenerContainer"

              class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="resourceMessageListener" />

       </bean>

</beans>

在这里,我们需要发送自己定义的消息格式,这样,我们就需要不同的消息的生产者与消费者,当然,也需要一个自定义的将两者消息进行转换的一个自定义的类,如上面配置文件中指定的一样,这三个自定义的类的主要代码如下:

ResourceMessageProducer

public class ResourceMessageProducer

{

       private JmsTemplate    template;

       private Destination      destination;

       public JmsTemplate getTemplate()

       {

              return template;

       }

       public void setTemplate(JmsTemplate template)

       {

              this.template = template;

       }

       public Destination getDestination()

       {

              return destination;

       }

       public void setDestination(Destination destination)

       {

              this.destination = destination;

       }

       public void send(Book book)

       {

              System.out.println("=======================================");

              System.out.println("do send ......");

              long l1 = System.currentTimeMillis();

              template.convertAndSend(this.destination, book);

              System.out.println("send time:" + (System.currentTimeMillis() - l1) / 1000 + "s");

              System.out.println("=======================================");

       }

}

ResourceMessageConverter

public class ResourceMessageConverter implements MessageConverter

{

       @SuppressWarnings("unchecked")

       public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException

       {

              // check Type

              if (obj instanceof Book)

              {

                     // 采用ActiveMQ的方式传递消息

                     ActiveMQObjectMessage objMsg = (ActiveMQObjectMessage) session.createObjectMessage();

                     Map map = new HashMap();

                     map.put("Book", obj);

                     // objMsg.setObjectProperty里面放置的类型只能是:String, Map, Object, List

                     objMsg.setObjectProperty("book", map);

                     return objMsg;

              }

              else

              {

                     throw new JMSException("Object:[" + obj + "] is not Book");

              }

       }

       public Object fromMessage(Message msg) throws JMSException, MessageConversionException

       {

              if (msg instanceof ObjectMessage)

              {

                     Object obj = ((ObjectMessage) msg).getObject();

                     return obj;

              }

              else

              {

                     throw new JMSException("Msg:[" + msg + "] is not Map");

              }

       }

}

ResourceMessageConsumer

public class ResourceMessageConsumer

{

       public void recieve(Object obj)

       {

              Book book = (Book) obj;

              System.out.println("=======================================");

              System.out.println("receiveing message ...");

              System.out.println(book.toString());

              System.out.println("here to invoke our business method...");

              System.out.println("=======================================");

       }

}

Book

public class Book implements Serializable

{

       /**

        *

        */

       private static final long       serialVersionUID   = -6988445616774288928L;

       long                                    id;

       String                                 name;

       String                                 author;

       public String getAuthor()

       {

              return author;

       }

       public void setAuthor(String author)

       {

              this.author = author;

       }

       public long getId()

       {

              return id;

       }

       public void setId(long id)

       {

              this.id = id;

       }

       public String getName()

       {

              return name;

       }

       public void setName(String name)

       {

              this.name = name;

       }

}

消息测试:将测试JSP中的JAVA代码改成:

<%

try {

     ServletContext servletContext = this.getServletContext();

     WebApplicationContext wac = WebApplicationContextUtils

      .getRequiredWebApplicationContext(servletContext);

ResourceMessageProducer resourceMessageProducer = (ResourceMessageProducer) context.getBean("messageProducer");

Book book = new Book();

book.setId(123);

book.setName("jms test!");

book.setAuthor("taofucheng");

resourceMessageProducer.send(book);

      } catch (JmsException e) {

  }

%>

运行系统,打开测试页面,会发现消息已经成功接收!

注:(1)、通过这种方法,我们就可以发送我们想发送的任何对象了(有些限制:这些对象的类型必须是:String, Map, byte[],Serializable。上面的例子已经注释得很清楚)

2)、如果大家有兴趣的话,看一下MessageListenerAdapter的源码,你就会发现其实它就是MessageListener的实现类,在它实现的onMessage方法中使用了用户自定义的转换类而已。

3、集成事务

Spring提供的JMSAPI中已经有了集成事务的功能,我们只要将上面监听容器的配置改成下面的就行了:

首先,将jmsTemplate设置成支持事务(它默认是不支持事务的):

       <bean id="jmsTemplate"

              class="org.springframework.jms.core.JmsTemplate">

              <property name="connectionFactory">

                     <bean

                            class="org.springframework.jms.connection.SingleConnectionFactory">

                            <property name="targetConnectionFactory"

                                   ref="jmsConnectionFactory" />

                     </bean>

              </property>

              <property name="sessionTransacted" value="true"/>

       </bean>

然后再在消息监听容器中设置指定的事务管理:

    <bean id="listenerContainer"

              class="org.springframework.jms.listener.DefaultMessageListenerContainer">

              <property name="connectionFactory" ref="jmsConnectionFactory" />

              <property name="destination" ref="destination" />

              <property name="messageListener" ref="resourceMessageListener" />

              <!—jtaTransactionManager是系统中的事务管理类,在我们的系统中,是由Spring托管的 -->

              <property name="transactionManager" ref="jtaTransactionManager" />

       </bean>

这样配置之后,当事务发生回滚时,消息也会有回滚,即不发送出去。

4、其它高级应用

ActiveMQ还有许多其它高级的应用,如:自动重连机制,也就是保证当通信双方或多方的链接断裂后它会根据用户的设置自动连接,以保证建立可靠的传输;另外,ActiveMQ还有其它方式嵌入到Spring中,如它可以通过xbean, file等方式建立应用;它还可以通过JMX对消息的发送与接收进行实时查看;消息的确认方式等等,还有很多高级的应用,请参考:《ActiveMQ in Action(网址:http://whitesock.javaeye.com/blog/164925)

分享到:
评论
1 楼 hellohank 2015-07-29  
grefr 写道
楼主写的不错,关于jms和activemq原理性的文章,可以看看这个
http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt127

谢谢,文章一定拜读一下~

相关推荐

Global site tag (gtag.js) - Google Analytics