ActiveMQ

一、ActiveMQ安装

1.windows安装

首先去http://activemq.apache.org/download.html 下载最新版本
目录如下:
+bin (windows下面的bat和unix/linux下面的sh)
+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
+data (默认是空的)
+docs (index,replease版本里面没有文档,-.-b不知道为啥不带)
+example (几个例子)
+lib (activemMQ使用到的lib)
-apache-activemq-4.1-incubator.jar (ActiveMQ的binary)
-LICENSE.txt
运行信息
运行信息
-NOTICE.txt
-README.txt
-user-guide.html
这里写图片描述
启动bin/win64/activemq.bat
启动成功就可以访问管理员界面:http://localhost:8161/admin

默认用户名和密码admin/admin。如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改即可。

2.Linux安装

wget http://archive.apache.org/dist/activemq/5.14.1/apache-activemq-5.14.1-bin.tar.gz
cd apache-activemq-5.14.1/bin/linux-x86-64/
./activemq start

二、ActiveMQ应用

1.点对点方式(point to point Queue)

  点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和”发送消息已接受”到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

2.发布/订阅方式(public/subscriber Messaging Topic)

   发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

发送消息的基本步骤:

  • 创建连接使用的工厂类JMS ConnectionFactory
  • 使用管理对象JMS ConnectionFactory建立连接Connection,并启动
  • 使用连接Connection 建立会话Session
  • 使用会话Session和管理对象Destination创建消息生产者MessageSender
  • 使用消息生产者MessageSender发送消息

消息接收者从JMS接受消息的步骤 :

  • 创建连接使用的工厂类JMS ConnectionFactory
  • 使用管理对象JMS ConnectionFactory建立连接Connection,并启动
  • 使用连接Connection 建立会话Session
  • 使用会话Session和管理对象Destination创建消息接收者MessageReceiver
  • 使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。
    这里写图片描述

3.JMS方式

接受者

public class MessageReceiver {
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "hoo.mq.queue";
    public static void run() throws Exception {
        Connection connection = null;
        Session session = null;
        try {
            // 创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(DESTINATION);
            // 创建消息制作者
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                // 接收数据的时间(等待) 100 ms
                Message message = consumer.receive(1000 * 100);

                TextMessage text = (TextMessage) message;
                if (text != null) {
                    System.out.println("接收:" + text.getText());
                } else {
                    break;
                }
            }
            // 提交会话
            session.commit();
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }    

    public static void main(String[] args) throws Exception {
        MessageReceiver.run();
    }
}

发送者

public class MessageSender {
    // 发送次数
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "hoo.mq.queue";
    @SuppressWarnings("unused")
    private static void sendMessage(Session session, MessageProducer producer) throws JMSException{
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送消息第" + (i + 1) + "条";
            TextMessage text = session.createTextMessage(message);

            System.out.println(message);
            producer.send(text);
        }       
    }

    public static void run() throws Exception {
        Connection connection = null;
        Session session = null;
        try {
            // 创建链接工厂
            ConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Destination destination = session.createQueue(DESTINATION);
            // 创建消息制作者
            MessageProducer producer = session.createProducer(destination);
            // 设置持久化模式
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, producer);
            // 提交会话
            session.commit();

        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }   
    public static void main(String[] args) {
        try {
            MessageSender.run();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Queue方式

接受者

public class QueueReceiver {
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String TARGET = "test.queue";
    public static void run() throws Exception {
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Queue queue = session.createQueue(TARGET);
            // 创建消息制作者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);

            receiver.setMessageListener(new MessageListener() { 
                public void onMessage(Message msg) { 
                    if (msg != null) {
                        MapMessage map = (MapMessage) msg;
                        try {
                            System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                } 
            }); 
            // 休眠100ms再关闭
            Thread.sleep(1000 * 100); 
            // 提交会话
            session.commit();
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        QueueReceiver.run();
    }
}

发送者

public class QueueSender {
    // 发送次数
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "test.queue";
    public static void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送消息第" + (i + 1) + "条";

            MapMessage map = session.createMapMessage();
            map.setString("text", message);
            map.setLong("time", System.currentTimeMillis());
            System.out.println(map);

            sender.send(map);
        }
    }

    public static void run() throws Exception {
        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Queue queue = session.createQueue(DESTINATION);
            // 创建消息发送者
            javax.jms.QueueSender sender = session.createSender(queue);
            // 设置持久化模式
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, sender);
            // 提交会话
            session.commit();

        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }
    public static void main(String[] args) throws Exception {
        QueueSender.run();
    }
}

topic方式

接受者

public class TopicReceiver {
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String TARGET = "hoo.mq.topic";

    public static void run() throws Exception {
        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 创建链接工厂
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createTopicConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Topic topic = session.createTopic(TARGET);
            // 创建消息制作者
            TopicSubscriber subscriber = session.createSubscriber(topic);

            subscriber.setMessageListener(new MessageListener() { 
                public void onMessage(Message msg) { 
                    if (msg != null) {
                        MapMessage map = (MapMessage) msg;
                        try {
                            System.out.println(map.getLong("time") + "接收#" + map.getString("text"));
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                } 
            }); 
            // 休眠100ms再关闭
            Thread.sleep(1000 * 100); 
            // 提交会话
            session.commit();

        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        TopicReceiver.run();
    }
}

发送者

public class TopicSender {
    // 发送次数
    public static final int SEND_NUM = 5;
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION = "hoo.mq.topic";
    public static void sendMessage(TopicSession session, TopicPublisher publisher) throws Exception {
        for (int i = 0; i < SEND_NUM; i++) {
            String message = "发送消息第" + (i + 1) + "条";

            MapMessage map = session.createMapMessage();
            map.setString("text", message);
            map.setLong("time", System.currentTimeMillis());
            System.out.println(map);

            publisher.send(map);
        }
    }

    public static void run() throws Exception {

        TopicConnection connection = null;
        TopicSession session = null;
        try {
            // 创建链接工厂
            TopicConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createTopicConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createTopicSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            Topic topic = session.createTopic(DESTINATION);
            // 创建消息发送者
            TopicPublisher publisher = session.createPublisher(topic);
            // 设置持久化模式
            publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, publisher);
            // 提交会话
            session.commit();
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        TopicSender.run();
    }
}

四、整合activeMQ

集成spring:引入jar包

<dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.14.1</version>
     </dependency> 
     <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>4.0.8.RELEASE</version>
     </dependency>
     <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-messaging</artifactId>
        <version>4.0.8.RELEASE</version>
     </dependency>   
     <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-spring</artifactId>
        <version>5.14.1</version>
     </dependency> 

配置文件spring-context-activemq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:amq="http://activemq.apache.org/schema/core"  
    xmlns:jms="http://www.springframework.org/schema/jms"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans     
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd     
        http://www.springframework.org/schema/context     
        http://www.springframework.org/schema/context/spring-context-4.0.xsd  
        http://www.springframework.org/schema/jms  
        http://www.springframework.org/schema/jms/spring-jms-4.0.xsd  
        http://activemq.apache.org/schema/core  
        http://activemq.apache.org/schema/core/activemq-core-5.14.1.xsd"> 
        <description>spring-activeMQ</description>
    <!-- 获取ActiveMQ提供的ConnectionFactory -->
    <amq:connectionFactory id="amqConnectionFactory" brokerURL="tcp://127.0.0.1:61616" userName="admin" password="admin"></amq:connectionFactory>

    <!-- spring连接activeMQ的conneciotnFactory --> 
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory"></constructor-arg>
        <property name="sessionCacheSize" value="100" />  
    </bean>

    <!-- 定义JmsTemplate的Queue类型 -->
    <bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->    
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- 非pub/sub模型(发布/订阅),即队列模式 -->  
        <property name="pubSubDomain" value="false" />  
    </bean> 

    <!-- 定义JmsTemplate的Topic类型 -->  
    <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">  
        <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->    
        <constructor-arg ref="connectionFactory"></constructor-arg>
        <!-- pub/sub模型(发布/订阅) -->  
        <property name="pubSubDomain" value="true" />  
    </bean>


    <bean id="queueReceiver" class="com.drink.modules.activemq.QueueReceiver"></bean>
    <!-- 定义Queue监听器 -->  
    <jms:listener-container destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">  
        <jms:listener destination="test.queue" ref="queueReceiver"/>  
    </jms:listener-container>  

    <bean id="topicReceiver" class="com.drink.modules.activemq.TopicReceiver"></bean>
    <!-- 定义Topic监听器 -->  
    <jms:listener-container destination-type="topic" container-type="default" connection-factory="connectionFactory" acknowledge="auto">  
        <jms:listener destination="test.topic" ref="topicReceiver"/>  
    </jms:listener-container>               
</beans>

Queue接收文件:

public class QueueReceiver implements MessageListener {
    @Override
    public void onMessage(Message message) {
         try {
            System.out.println("QueueReceiver接收到消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }       
    }
}

Topic接收文件:

public class TopicReceiver implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("TopicReceiver接收到消息:"+((TextMessage)message).getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }       
    }
}

Queue发送文件:

@Component("queueSender") 
public class QueueSender {
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsTemplate; 

    public void send(String queueName, final String message){
        jmsTemplate.send(queueName,new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

Topic发送文件:

@Component("topicSender")
public class TopicSender {
    @Autowired  
    @Qualifier("jmsTopicTemplate")  
    private JmsTemplate jmsTemplate; 
    public void send(String queueName, final String message){
        jmsTemplate.send(queueName,new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(message);
            }
        });
    }
}

持久化到数据配置

在mysql中手动创建一个名称为activemq的数据库,注意编码格式。
将mysql驱动的jar包拷贝到apache-activemq-5.14.1\lib目录下
修改conf下的activemq.xml文件
将默认的

<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true" />  

修改为

<!--createTablesOnStartup首次启动创建表时设置为true,其余情况改为false-->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>

在broker节点后添加数据源

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
<property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
<property name="url" value="jdbc:mysql://localhost/activemq"/> 
<property name="username" value="root"/> 
<property name="password" value="123456"/> 
<property name="poolPreparedStatements" value="true"/> 
</bean> 

注意:这里的版本是5.14.1,dbcp的包是commons-dbcp2-2.1.1.jar,所以这里的classorg.apache.commons.dbcp2.BasicDataSource;
低版本的MQ配置为org.apache.commons.dbcp.BasicDataSource

首次启动后就会看到数据库的创建的三张表
这里写图片描述

编码中发送者需要设置持久化的方式

 // 创建消息发送者
        javax.jms.QueueSender sender = session.createSender(queue);
        // 设置持久化模式
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);

spring中在JmsTemplate模板配置中设置

<bean id="jmsQueueTemplate" class="org.springframework.jms.core.JmsTemplate">  
    <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->    
    <constructor-arg ref="connectionFactory"></constructor-arg>
    <!-- 非pub/sub模型(发布/订阅),即队列模式 -->  
    <property name="pubSubDomain" value="false" />  
    <!--1为非持久化,2为持久化-->
    <property name="deliveryMode" value="2" /> 
</bean> 
版权声明:本文为HQZ820844012原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/HQZ820844012/article/details/80720023