ActiveMq

ActiveMq

1.JMS(Java Message Service)

​ 它是一种与厂商无关的API,用来访问消息收发系统消息。它类似于JDBC,JDBC是可以用来访问不同种类关系型数据库的API,而JMS则提供同样与厂商无关的访问消息收发服务的方法,这样就可以通过消息收发服务实现从一个JMS客户机向另一个JMS客户机发送消息,所需要的是厂商支持JMS。换句话说,JMS是Java平台上有关面向消息中间件的技术规范。

2.JMS提供的对象

  • ConnectionFactory 连接工厂,是客户用来创建连接的对象,ActiveMQ提供的是ActiveMQConnectionFactory;

  • Connection , 生产者或者消费者和消息中间件之间的连接

  • Session,会话 , 是发送和接收消息的上下文,用于创建消息生产者,消息消费者,相比rocketMQ会话session是提供事务性的;

  • Destination , 目的地,指定生产消息的目的地和消费消息的来源对象.

3.消息通信机制

1.点对点模式(p2p)

在这里插入图片描述

​ 注意:P2P 目的地 queue

​ 应用场景:订单-商品(扣库存)

2.发布订阅

在这里插入图片描述

​ 注意:pub/sub 目的地 topic

​ 应用场景: 消息推送

4.使用场景

https://blog.csdn.net/wqc19920906/article/details/82193593

1.应用解耦

2.异步消息

3.流量削锋

4.消息通讯

5.ActiveMQ的引言

ActiveMQ是由Apache出品的,一款最流行的,能力强劲的消息中间件(MOM:Message Orient middleware)。并且是对消息通信规范JMS的一种具体实现

6.ActiveMQ的安装

1.安装JDK

1.导入JDK压缩包
2.解压  tar -zxvf ....
3.vi /etc/profile
4.添加
	export JAVA_HOME=JDK安装的路径
	export PATH=$PATH:$JAVA_HOME/bin
5.source /etc/profile
6.java -version

2.安装ActiveMQ

1.导入ActiveMQ的压缩包
2.解压
3.启动
	./activeMq start
4.开启日志
	tail -f activeMq.log  开启实时的日志
5.通过客户端工具连接
	ip:8161/admin
	用户名:admin
	密码:admin
	用户名,密码在哪看
	vi /conf/users.properties
	admin=admin

在这里插入图片描述

7.ActiveMQ与java集成

1.引入的依赖

<dependency>
      <groupId>org.apache.activemq</groupId>
      <artifactId>activemq-core</artifactId>
      <version>5.7.0</version>
</dependency>

2.P2P

​ 1.生产者

@Test
    public void testProduct() throws JMSException {
        //1.创建链接工厂
        String brokerURL = "tcp://192.168.150.142:61616";
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
        //2.创建连接
        Connection connection = connectionFactory.createConnection();
        //3.创建回话    //第一个参数 是否开启事务   第二个参数开启自动回执
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //4.创建生产者
        Destination destination = new ActiveMQQueue("152-queue1");
        MessageProducer producer = session.createProducer(destination);
        //5.创建消息
        TextMessage textMessage = session.createTextMessage("hello-152");
        //6.生产者发布消息
        producer.send(textMessage);
        //7.提交
        session.commit();
        //8释放资源
        connection.close();
        producer.close();
        session.close();
    }

​ 2.消费者

@Test
    public void testProduct() throws JMSException {
        //1.创建链接工厂
        String brokerURL = "tcp://192.168.150.142:61616";
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);

        //2.创建连接
        Connection connection = connectionFactory.createConnection();
        //注意:
        connection.start();

        //3.创建回话    //第一个参数 是否开启事务   第二个参数开启自动回执
        Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        //4.创建消费者
        Destination destination = new ActiveMQQueue("150-queue1");
        MessageConsumer consumer = session.createConsumer(destination);

        //5.消费
        TextMessage message = (TextMessage) consumer.receive();
        System.out.println(message.getText());

        //6.事务提交
        session.commit();

        //7.资源释放
        connection.close();
        consumer.close();
        session.close();
    }

2.发布订阅

​ 1.发布

 @Test
    public void testProduct() throws JMSException {
        //1.创建发布发布的连接工厂
        String brokerURL = "tcp://192.168.150.142:61616";
        TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);

        //2.创建连接
        TopicConnection topicConnection = connectionFactory.createTopicConnection();

        //3.创建回话
        TopicSession topicSession = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);

        //4.创建发布
        Topic topic = new ActiveMQTopic("150-Topic");
        TopicPublisher publisher = topicSession.createPublisher(topic);

        //5.创建主题
        TextMessage textMessage = topicSession.createTextMessage("Hello-TestTopic");

        //6.发布
        publisher.send(textMessage);

        //7.提交
        topicSession.commit();

        //8.释放
        topicConnection.close();
        topicSession.close();
        publisher.close();
    }

​ 2.订阅

 @Test
    public void testProduct() throws JMSException {
        //1.创建发布发布的连接工厂
        String brokerURL = "tcp://192.168.150.142:61616";
        TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);

        //2.创建连接
        TopicConnection topicConnection = connectionFactory.createTopicConnection();
        topicConnection.start();

        //3.创建回话
        TopicSession topicSession = topicConnection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
        Topic topic = new ActiveMQTopic("144-Topic");
        MessageConsumer consumer = topicSession.createConsumer(topic);

        while (true) {
            Message receive = consumer.receive();
            TextMessage textMessage = (TextMessage) receive;
            if (textMessage != null) {
                System.out.println("1 号"+textMessage.getText());
            }else{
                break;
            }
        }
    }

8.现有项目存在的问题

消费者需要访问才能消费 —监听器

9.ActiveMq和Springboot集成

1.依赖

	   <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <version>2.1.5.RELEASE</version>
        </dependency>

2.相关配置

server:
  port: 9090
spring:
  activemq:
    broker-url: tcp://192.168.150.142:61616
    user: admin
    password: admin

3.相关代码

controller
@Component
public class ProductSend {
    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMsg(String msg){
        Destination destination = new ActiveMQQueue("Springboot-Queue");
        jmsTemplate.convertAndSend(destination,msg);
    }
}
@Component
public class ProductSend {
    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMsg(String msg){
        Destination destination = new ActiveMQQueue("Springboot-Queue");
        jmsTemplate.convertAndSend(destination,msg);
    }
}
@Component
public class ConsumerListenner {
    @JmsListener(destination = "Springboot-Queue")
    public void consum(Message message){
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("springboot被监听"+textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

4.解决springboot集成ActiveMq不支持发布订阅模式

​ 以及配置两种模式同时监听 如何配置 如下链接

https://www.cnblogs.com/sjq0928/p/11371620.html

5.添加配置类

@Configuration
public class ConsumerConfiguration {

/*    @Value("tcp://192.168.44.121:61616")
    private String host;*/

/*    @Bean
    public ConnectionFactory getActiveMqConnection(){
        return new ActiveMQConnectionFactory(host);
    }*/

    @Bean(name="queueListenerContainerFactory")
    public JmsListenerContainerFactory queueListenerContailerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }
    @Bean(name="topicListenerContainerFactory")
    public JmsListenerContainerFactory topicListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

6.监听器类上加如下注解(消费者消费消息)

@Component
public class QueueListener {
    @JmsListener(destination = "queue",containerFactory =
            "queueListenerContainerFactory")
    public void listener(String message){
        System.out.println(message);
    }
    @JmsListener(destination = "topic1",containerFactory =
            "topicListenerContainerFactory")
    public void listener1(String message){
        System.out.println(message);
    }
}

7.消息服务者相关代码

@RestController
@RequestMapping("producer")
public class ProducerController {
    @Autowired
    private JmsTemplate jmsTemplate;
    @RequestMapping("send")
    public void send(){
        String message = "测试";
        jmsTemplate.convertAndSend("queue",message);
    }
    @RequestMapping("send1")
    public void send1(){
        String message = "测试topic";
        Destination destination = new ActiveMQTopic("topic1");
        jmsTemplate.convertAndSend(destination,message);
    }
}
版权声明:本文为FishLearning原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/FishLearning/article/details/103177190