第一章:activeMQ原理,安装,queue,topic以及topic持久化方式介绍,包括修改ubuntu的jdk环境变量.?

大家好,又见面了,我是你们的朋友全栈君。

转载了好几篇关于mq的博文,但是总感觉对mq的理解使用都不到位。这里打算从原理到使用都从头来一遍。

1,原理

1.1通过类比理解mq

可以理解它是一个秘书,或是助手,你是老板,你告诉秘书说你要开会,那么秘书就会把开会的时间,地点,人员都安排好。你就省去了这些琐事,这有点类似于sping的面向切面。

当添加一个商品时,商品服务只需要告诉消息中间件MQ,MQ便去通知其它服务做各自该做的事情,比如通知搜索服务去同步索引库,通知redis服务去同步缓存,通知生成静态页面等等。

1.2常见的mq种类

mq的也被叫做中间件,种类有ActiveMQ,RabbitMQ,Kafka等,功能都差不多,这里我们学习ActiveMQ.

1,3什么是ActiveMQ?

它是Apache出品,最流行的,能力最强劲的开源消息总线。它完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。

再者mq也可称为分布式消息队列,因为在mq的订阅式中有多个消费者异步处理多个请求,这就已经达到了分布式处理的目的。

1.4特点

(1)多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

(2) 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) (3) 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 (4)通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上 (5) 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA (6) 支持通过JDBC和journal提供高速的消息持久化 (7) 从设计上保证了高性能的集群,客户端-服务器,点对点 (8)支持Ajax (9)支持与Axis的整合 (10) 可以很容易得调用内嵌JMS provider,进行测试

ActiveMQ的消息形式

对于消息的传递有两种类型: 一种是点对点的,即一个生产者和一个消费者一一对应; 另一种是发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。 JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。我们用的最多的也就是TextMessage而已。   · StreamMessage — Java原始值的数据流   · MapMessage–一套名称-值对   · TextMessage–一个字符串对象   · ObjectMessage–一个序列化的 Java对象   · BytesMessage–一个字节的数据流

我们可以通过下面一张图来加深理解,图上半部分是”发布/订阅者”模式,两个发布者各自发布了一条消息,每条消息都可以被多个Consumer接收到。图下半部分是”面对面”模式,两个发布者各自发布了一条消息,压入队列当中,队列的特点是先进先出,一旦有某个消费者拿走了一条消息,队列中就少了一条消息,剩下的消费者就不可能再消费那条消息了,因此也就做到了一对一。

二 安装ActiveMQ

我这里把mq安装在虚拟机上,当然虚拟机要能上网,还有jdk啥的这里就不说了。下载就到apache官网下载,地址:

http://activemq.apache.org/activemq-5112-release.html

找到如下图位置,点击下载:

下载好后上传,解压,

完了后我们到active目录下看到如下内容:

我们可以看到有一个名为activemq-all-5.12.0.jar的jar包,这个jar包,如果不与spring结合,只是简单用来当做activemq客户端的话,可以使用。如果要将activemq与spring整合的话,不要使用这个jar包,因为这个jar包当中包含了spring的包结构,而且里面的类与spring里面的类名称是一样的,但是方法不全,当我们将spring和activemq结合的时候,如果系统使用的是activemq的jar包当中的spring的类的话就会报错,启动都启动不了,而且错误还隐藏的特别深,难以捉摸其原因。因此整合的话,不要用这个jar包!!!activemq有一个版本5.11.2,里面没有spring的包结构,我们可以使用。

我们看下bin目录下的文件列表,如下图所示,其中activemq文件是用来启动activemq的。

conf目录存放的是一些配置文件,我们不用动,data目录存放的是服务端的缓存数据

webapps提供了管理的后台,如下所示。

3,不用做改动,直接启动mq

xiaoye@ubuntu3:~$ ./activemq/bin/activemq start

INFO: Loading ‘/home/xiaoye/activemq/bin/env’ INFO: Using java ‘/usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java’ INFO: Starting – inspect logfiles specified in logging.properties and log4j.properties to get details

INFO: pidfile created : ‘/home/xiaoye/activemq/data/activemq.pid’ (pid ‘1699’)

这样就启动成功了;

web访问一下,默认端口是8161

上面这个界面是没有用户登录的界面。下面我们用admin登录,默认的账号和密码都是admin

http://192.168.72.133:8161/admin/

打开这个链接会弹出输入账号密码的框,填进去就行了。

点击Quenes如下,这个是点对点消息发送界面

再点击topic是发布/订阅模式界面

在Send中可以测试发送点对点或发布/订阅两种消息,如下图所示。

三,代码测试ActiveMQ

下面我们要写java代码测试了。

新建一个maven工程。打开eclipse ,右键新建maven project –》

finish

修改pom.xml添加maven依赖,依赖我们到apeche,maven官网去找。可直接百度关键词,active maven,图下:

点开找到,我们下载active版本

把这个maven依赖拷过来:

<
    dependency>
    
    <
    groupId>
    org.apache.activemq<
    /groupId>
    
    <
    artifactId>
    activemq-all<
    /artifactId>
    
    <
    version>
    5.11.2<
    /version>
    
<
    /dependency>
    

这样maven就能够自动帮我们下载active的jar包了。

下面新建一个junit测试类:



finish即可

在类中加入一下内容做简单测试,报错,我们

如下,鼠标防止@Test上,给提示导入Junit包,导入后,就没问题了。右键运行也是OK的。

下面我们就来写测试类,先来测试queue点对点的消息发送方式:

package com.xiaoyexinxin.activeMQTest;
    
import javax.jms.Connection;
    
import javax.jms.ConnectionFactory;
    
import javax.jms.JMSException;
    
import javax.jms.MessageProducer;
    
import javax.jms.Queue;
    
import javax.jms.Session;
    
import javax.jms.TextMessage;
    
import org.apache.activemq.ActiveMQConnectionFactory;
    
import org.junit.Test;
    

import junit.framework.TestCase;


/**
 * 
 * @author liuxin
 * @date   2018年4月10日
 */
public class TestActiveMq extends TestCase {

	
	@Test
	public void testQueueProducer() throws JMSException{
    
		//创建一个链接工厂connectionFactory对象,需要指定mq服务ip和端口,注意brokerURL的开头是  
        //tcp://而不是我们通常的http://,端口是61616而不是我们访问activemq后台管理页面所使用的8161  
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
    
		//使用connectionFactory 链接一个connection对象
		Connection connection=connectionFactory.createConnection();
    
		
		//开启链接,调用connection 对象的start方法
		connection.start();
    
		//使用connection 创建一个session对象
		//第一个参数是是否开启事务,一般不使用分布式事务,因为它特别消耗性能,而且顾客体验特别差,现在互联网的  
        //做法是保证数据的最终一致(也就是允许暂时数据不一致),比如顾客下单购买东西,一旦订单生成完就立刻响应给用户  
        //下单成功。至于下单后一系列的操作,比如通知会计记账、通知物流发货、商品数量同步等等都先不用管,只需要  
        //发送一条消息到消息队列,消息队列来告知各模块进行相应的操作,一次告知不行就两次,直到完成所有相关操作为止,这  
        //也就做到了数据的最终一致性。如果第一个参数为true,那么第二个参数将会被忽略掉。如果第一个参数为false,那么  
        //第二个参数为消息的应答模式,常见的有手动和自动两种模式,我们一般使用自动模式。  
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
		//使用session对象创建一个Destination(目标)对象,两站形式,queue,topic两种,这里我们使用queue
		//参数就是消息队列的名称
		Queue queue=session.createQueue("test-queue");
    
		//创建生成者,
		MessageProducer producer=session.createProducer(queue);
    
		//创建消息内容
		//有两种方式,第一种方式:  
//      TextMessage textMessage = new ActiveMQTextMessage();
      
//      textMessage.setText("hello,activemq!!!");
      
        //第二种方式:  
        TextMessage textMessage = session.createTextMessage("hello,activemq!!");
      
         //发送消息
        producer.send(textMessage);
    
        //关闭资源,由内而外的关闭
        producer.close();
    
        session.close();
    
        connection.close();

	}


}

右键运行程序,成功后,在ActiveMQ的后台管理系统,点击”Queues”,可以看到我们刚才发送的那条消息”test-queue”。我们点击”test-queue”

点击test-queue,如下:Persistence为永久保存,priority优先级是4 ,Redelivered是否重复投递消息,这里是否,

接着点击,长串的id

保存了,打开日志看看是啥错;

xiaoye@ubuntu3:~/activemq cd data xiaoye@ubuntu3:~/activemq/data ls activemq.log activemq.pid audit.log kahadb

xiaoye@ubuntu3:~/activemq/data$ tail -200 activemq.log

看到日志有下图的错:

这个是activemq不支持jdk1.8造成的,这里我把虚拟机的jdk换成1.7的试试

官网下载后上传,我这里用的是谷歌浏览器下载,谷歌浏览器下载的jdk不知道为何,应该是tar.gz结尾的jdk,却只有gz.于是百度一圈,直接修改压缩包后缀,改为.tar结尾解压

xiaoye@ubuntu3:~/activemq/data cd ~ xiaoye@ubuntu3:~ cd Downloads/ xiaoye@ubuntu3:~/Downloads

xiaoye@ubuntu3:~/Downloads$ ls apache-activemq-5.11.2-bin.tar.gz apache-activemq-5.15.3-bin.tar.gz hbase-1.0.0-cdh5.5.1.tar.gz jdk-7u80-linux-x64.tar.gz

apache-activemq-5.12.0-bin.tar.gz hadoop-2.5.0-cdh5.2.0.tar.gz hive-0.13.1-cdh5.2.0.tar.gz sqoop-1.4.6-cdh5.5.4.tar.gz

解压到当前目录。

xiaoye@ubuntu3:~ cd Downloads/ xiaoye@ubuntu3:~/Downloads ls apache-activemq-5.11.2-bin.tar.gz hive-0.13.1-cdh5.2.0.tar.gz apache-activemq-5.12.0-bin.tar.gz jdk1.7.0_79 apache-activemq-5.15.3-bin.tar.gz jdk-7u79-linux-x64.tar.gz hadoop-2.5.0-cdh5.2.0.tar.gz sqoop-1.4.6-cdh5.5.4.tar.gz

hbase-1.0.0-cdh5.5.1.tar.gz

下面设置环境变量。

切换到root用户。

root@ubuntu3:~# vim /etc/profile

export JAVA_HOME=/home/xiaoye/Downloads/jdk1.7.0_79

修改保存后,soruce /etc/profile

这个改完后,在修改当前用户下的环境变量。

xiaoye@ubuntu3:~$ vim .bashrc

export JAVA_HOME=/home/xiaoye/Downloads/jdk1.7.0_79 export CLASSPATH=${ JAVA_HOME} /lib

export PATH={ JAVA_HOME} /bin:PATH

修改保存后,source .bashrc

启动activemq

xiaoye@ubuntu3:~$ ./activemq/bin/activemq start INFO: Loading ‘/home/xiaoye/activemq/bin/env’ INFO: Using java ‘/home/xiaoye/Downloads/jdk1.7.0_79/bin/java’

INFO: Process with pid ‘1454’ is already running

显示还在运行,那就kill -9 1454

再次重启,就好了。

这样在点击ID的时候就不会报错了。

四,消费者

下面我们写消费者方法,写在同生产者一个类里面;

内容如下:

/*
	 * 消费者
	 */
	@Test
	public void testQueueConsumer() throws Exception{
    
		//跟创建生产者一样,先连接mq
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
    
		//连接一个connection 对象
		Connection connection=connectionFactory.createConnection();
    
		//开启链接
		connection.start();
    
		//创建一个session会话对象
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
		//使用session对象创建一个Destination对象,两种形式queue,topic ,这里我们使用queue
		//参数是消息队列的名称
		Queue queue=session.createQueue("test-queue");
    
		//使用session创建一个consumer对象
		MessageConsumer consumer=session.createConsumer(queue);

		//向Consumer对象中设置一个Messagelistener对象,用来接受消息
		consumer.setMessageListener(new MessageListener() {

			public void onMessage(Message arg0) {

				// TODO Auto-generated method stub
				if(arg0 instanceof TextMessage){
    
					TextMessage textMessage=(TextMessage) arg0;
    
					String text;

					try {
    
						text = textMessage.getText();
    
						System.out.println(text);

					}
 catch (JMSException e) {
    
						// TODO Auto-generated catch block
						e.printStackTrace();

					}

				}

			}

		}
    );
    
		//程序等待接收用户结束操作  
        //程序自己并不知道什么时候有消息,也不知道什么时候不再发送消息了,这就需要手动干预,  
        //当我们想停止接收消息时,可以在控制台输入任意键,然后回车即可结束接收操作(也可以直接按回车)。 
		System.in.read();
    
		//关闭资源
		consumer.close();
    
		session.close();
    
		connection.close();

	}

右键方法运行消费者方法,会在控制台看到生产者发送的hello world消息,如下:

这里执行消费者方法后并没有停止运行,还在等待新新的消息进来,那么我们右键生产者方法再次运行会发现有两个hello输出。

我们修改一下生产者内容,再次运行。

发现有一个语句输出,说明没有问题。

我们到activeMQ后台管理页面看看

说一下这几个标签的含义,number of pending messages 待发送消息数

Number Of Consumers 消费者消息数

Messages Enqueued 压入队列的消息数量

Messages Dequeued 出队列的消息数量,也就是被消费的消息数

五,topic 发布/订阅模式

生产者代码:

	/*
	 * 订阅模式的生产者
	 */
	@Test
	public void testTopicProducer() throws JMSException{
    
		//创建工程连接mq
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
    
		//使用工程创建一个连接对象
		Connection connection=connectionFactory.createConnection();
    
		//开启链接
		connection.start();
    
		//使用链接创建一个会话
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
		//使用会话创建一个目标对象
		Topic topic=session.createTopic("test-topic");
    
		//创建一个生产者
		MessageProducer producer=session.createProducer(topic);
    
		 //7.创建一个TextMessage对象  ,并写入要传输的消息内容
        //有两种方式,第一种方式:  
//      TextMessage textMessage = new ActiveMQTextMessage();
      
//      textMessage.setText("hello,activemq!!!");
      
        //第二种方式:  
		TextMessage textMessage=session.createTextMessage("hello topic!");
    
		//发送消息
		producer.send(textMessage);
    
		//关闭资源
		producer.close();
    
		session.close();
    
		connection.close();

		
	}

运行上面的测试方法,运行成功后,我们访问activemq的管理后台页面,点击”Topics”,可以看到有”test-topic”这一行,压入消息队列一条消息,但由于没有消费者,因此没有消费掉该消息。

点开test-topic发现:消息体里并没有我们发送的内容。

而queue就不同,queue有持久化一栏,发送的消息会被保存下来。这样的话,就会有个问题,那就是如果发送topic消息时没有消费者,那么这条消息便不存在了,不会再被消费了。因此我们要想消息不会被遗失掉,我们要先打开消费者,然后再发送topic消息。

我们来写消费topic消息的方法,如下图所示,该方法与我们上面学习的消费队列消息的方法不同的是创建Destination的时候不一样,同时为了模拟多个消费者,在该方法中添加一条输出信息,标明该方法是第几个消费者。

消费者代码:

/*
	 * 订阅模式的消费者
	 */
	@Test
	public void testTopicConsumer() throws JMSException{
    
		//创建工厂,链接mq
		ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
    
		//使用工程创建一个链接
		Connection connection=connectionFactory.createConnection();
    
		//打开链接
		connection.start();
    
		//使用链接创建一个会话
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
		//创建消息队列
		Topic topic=session.createTopic("test-topic");
    
		//消费者
		MessageConsumer consumer=session.createConsumer(topic);

		//消费者设置监听器,监听传来的消息
		consumer.setMessageListener(new MessageListener() {

			
			public void onMessage(Message arg0) {

				// TODO Auto-generated method stub
				if(arg0 instanceof TextMessage){
    
					TextMessage textMessage=(TextMessage) arg0;

					try {
    
						String text=textMessage.getText();
    
						System.out.println(text);

					}
 catch (JMSException e) {
    
						// TODO Auto-generated catch block
						e.printStackTrace();

					}

				}

			}

		}
    );
    
		
		//程序等待接收用户结束操作  
        //程序自己并不知道什么时候有消息,也不知道什么时候不再发送消息了,这就需要手动干预,  
        //当我们想停止接收消息时,可以在控制台输入任意键,然后回车即可结束接收操作(也可以直接按回车)。  
        System.out.println("topic消费者1111。。。。。");
  
        try {
    
			System.in.read();

		}
 catch (IOException e) {
    
			// TODO Auto-generated catch block
			e.printStackTrace();

		}
      
        //9.关闭资源  
        consumer.close();
      
        session.close();
      
        connection.close();
 
		
	}

右键运行消费者方法:

修改为2222.。。。。。

再次运行消费者方法。

修改为33333.。。。。

再次运行消费者方法

发现这里消费者的数量是3 了

启动了三个消费者后,我们再发送一次topic消息,发完之后,我们看各个控制台的信息。如下图所示。可以看到都打印出了我们发送的topic信息。

三个进程控制台都有打印生产者消息。

六,topic消息持久化

topic消息没有持久化,也就意味着,如果消息发送者发送消息的时候,如果消费者没有运行的话,它将无法消费这个消息了(即使它启动也无法再接收到那条topic消息了),这样问题就来了,如果那条消息非常重要呢?我们不能容忍接收不到消息的情况。

生产者代码:

/*
	 * 订阅发布式 可持久化生产者
	 */
	@Test
	public void TestTopicPersistenceProducer() throws JMSException{
    
		//创建工程连接mq
		ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
    
		//设置异步发送消息可显著提高发送性能
		connectionFactory.setUseAsyncSend(true);
    
		//使用工程创建一个连接对象
		Connection connection=connectionFactory.createConnection();
    
		//对每个生产者来说其clientID值必须唯一
		connection.setClientID("producerTopic2");
    
		//开启链接
		connection.start();
    
		//使用链接创建一个会话
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
		//使用会话创建一个目标对象
		Topic topic=session.createTopic("test-topic");
    
		//创建一个生产者
		MessageProducer producer=session.createProducer(topic);
    
		//DelieveryMode设置为PERSISTENCE(持久化)
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
		 //创建一个TextMessage对象  ,并写入要传输的消息内容
        //有两种方式,第一种方式:  
//		      TextMessage textMessage = new ActiveMQTextMessage();
      
//		      textMessage.setText("hello,activemq!!!");
      
        //第二种方式:  
		TextMessage textMessage=session.createTextMessage("hello topic!persistence2");
    
		//发送消息
		producer.send(textMessage);
    
		//关闭资源
		producer.close();
    
		session.close();
    
		connection.close();

	}

消费者代码:

/*
	 * 订阅发布式 可持久化消费者
	 */

	@Test
	public void TestTopicPersistenceConsumer() throws JMSException{
    
		//创建工厂,链接mq
		ActiveMQConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.72.133:61616");
    
		//设置异步接受消息,可提高接受性能
		connectionFactory.setUseAsyncSend(true);
    
		//使用工程创建一个链接
		Connection connection=connectionFactory.createConnection();
    
		//设置每个消费者id,每个都要不同
		connection.setClientID("consumer1");
    
		//打开链接
		connection.start();
    
		//使用链接创建一个会话
		Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
		//创建消息队列
		Topic topic=session.createTopic("test-topic");
    
		//消费者
		MessageConsumer consumer=session.createDurableSubscriber(topic, "consumer1");

		//消费者设置监听器,监听传来的消息
		consumer.setMessageListener(new MessageListener() {

			
			public void onMessage(Message arg0) {

				// TODO Auto-generated method stub
				if(arg0 instanceof TextMessage){
    
					TextMessage textMessage=(TextMessage) arg0;

					try {
    
						String text=textMessage.getText();
    
						System.out.println(text);

					}
 catch (JMSException e) {
    
						// TODO Auto-generated catch block
						e.printStackTrace();

					}

				}

			}

		}
    );
    
		
		//程序等待接收用户结束操作  
        //程序自己并不知道什么时候有消息,也不知道什么时候不再发送消息了,这就需要手动干预,  
        //当我们想停止接收消息时,可以在控制台输入任意键,然后回车即可结束接收操作(也可以直接按回车)。  
        System.out.println("topic消费者3333。。。。。");
  
        try {
    
			System.in.read();

		}
 catch (IOException e) {
    
			// TODO Auto-generated catch block
			e.printStackTrace();

		}
      
        //9.关闭资源  
        consumer.close();
      
        session.close();
      
        connection.close();
 
	}
    

我们还需要配置下activemq的activemq.xml文件,只需要添加一句配置,就是在< broker的末尾添加一句关于持久化的配置persistent=”true”即可。如下:

<
broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${
activemq.data}
    " persistent="true">
      

然后重新启动mq;

这样设置持久化了就无所谓哪个先启动了。