ActiveMQ(三):ActiveMQ的安全机制、api及订阅模式demo

标签: activemq

一、ActiveMQ安全机制

ActiveMQ是使用jetty部署的,修改密码需要到相应的配置文件
配置文件是这个:

配置文件

在其第123行添加用户名和密码,添加配置如下:

    <plugins>
            <simpleAuthenticationPlugin>
                <users>
                    <authenticationUser username="bhz" password="bhz" groups="users,admins"/>
                </users>
            </simpleAuthenticationPlugin>
        </plugins>

这时候这个需要改为这样才能进行连接:

//第一步:建立ConnectionFactory工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

二、ActiveMQ的api的使用

1. Connection的使用

省略

2. Session方法的使用

一旦从ConnectionFactory中获得一个Connection,必须从Connection中创建一个或多个Session.Session是一个发送或接收消息的线程
session可以被事务化,也可以不被事务化
如果使用事务的话,那么需要commit,如下

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
        Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);

        //第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通过session创建消息的生产者或消费者,下面是创建生产者
        MessageProducer messageProducer = session.createProducer(desiDestination);

        //第六步:使用MessageProducer的set方法为其设置持久化特性和非持久化特性,后面再详细介绍
        messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//这里先设置为非持久化

        //第七步:通过JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据
        for(int i =0; i<5;i++){
            TextMessage textMessage =  session.createTextMessage("这是消息内容,id为"+i);
            messageProducer.send(textMessage);
        }
        session.commit();
        if(connection!=null){
            connection.close();
        }

    }

}

注意session.commit();
session的签收模式有三种情况:
1. Session.AUTO_ACKNOWLEDGE :自动签收
2. Session.CLIENT_ACKNOWLEDGE:客户端通过调用消息(Message)的acknowledge方法签收消息,在这种情况下,签收发生在Session层面:签收一个已消费的消息会自动签收这个Session所有已消费消息的收条
3. Session.DUPS_OK_ACKNOWLEDGE:此选项指示Session不必确保对传送消息的签收。它可能引起消息的重复,但是降低了Session的开销,所以只有客户端能容忍重复的消息,才可使用
上面三种模式,推荐使用的是Session.CLIENT_ACKNOWLEDGE
它的操作如下:

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
        Session session = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);

        //第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通过session创建消息的生产者或消费者,下面是创建消费者
        MessageConsumer messageConsumer = session.createConsumer(desiDestination);

        while(true){
            TextMessage msg = (TextMessage) messageConsumer.receive();
            msg.acknowledge();
            if(msg==null) {
                break;
            }
            System.out.println("收到的内容为"+msg.getText());
        }
        if(connection!=null){
            connection.close();
        }

    }

}

使用该模式,需要手动的设置接收完毕的信号msg.acknowledge();,这可以当做告诉队列,这个消息已经接收完毕,可以销毁了。

3.MessageProducer

MessageProducer详解
下面是改造后的代码:

package test.mq.helloworld;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

    public static void main(String[] agrs) throws Exception{

        //第一步:建立ConnectionFactory工厂对象
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz",
                "bhz","tcp://localhost:61616");

        //第二步:通过ConnectionFactory工厂对象我们创建一个Connection对象
        Connection connection = connectionFactory.createConnection();
        connection.start();

        //第三步:通过connection对象创建Session会话,第一个参数为是否开启事务,第二个参数为签收模式,一般设置为自动签收
        Session session = connection.createSession(Boolean.TRUE,Session.CLIENT_ACKNOWLEDGE);

        //第四步:通过Session创建Destination对象,queue1可看作是放入队列的消息名称,可以自定义
        Destination desiDestination = session.createQueue("queue1");

        //第五步:通过session创建消息的生产者或消费者,下面是创建生产者
        MessageProducer messageProducer = session.createProducer(null);

        //第六步:使用MessageProducer的set方法为其设置持久化特性和非持久化特性,后面再详细介绍
    //  messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//这里先设置为非持久化

        //第七步:通过JMS规范的TextMessage形式创建数据(通过session对象),并用MessageProducer的send方法发送数据
        for(int i =0; i<5;i++){
            //第一个参数:目的地
            //第二个参数:消息文本
            //第三个参数:接收模式
            //第四个参数:优先级
            //第五个参数:消息在消息队列保存的时间,这里指保存2分钟
            TextMessage textMessage =  session.createTextMessage("这是消息内容,id为"+i);
            messageProducer.send(desiDestination,textMessage,DeliveryMode.NON_PERSISTENT, i,1000*60*2);
        }
        session.commit();
        if(connection!=null){
            connection.close();
        }

    }

}

需要注意以下内容:
1. 优先级并不是严格遵循的
2. 消息在队列中超过保存事件后,还没有人取,那么就会自动消失

4.MessageConsumer

MessageConsumer详解

创建临时消息

三、订阅模式的使用

上面及之前的文章的是p2p模式(点对点模式)
那么发布订阅模式又是如何处理的?下面先看发布/订阅模式相关的含义:
发布订阅模式
下面是demo案例
消息生产者

package bhz.mq.pd;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Publish {

    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageProducer producer;

    public Publish(){
        try {
            factory = new ActiveMQConnectionFactory("bhz",
                    "bhz","tcp://localhost:61616");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            producer = session.createProducer(null);
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    public void sendMessage() throws Exception{
        Destination destination = session.createTopic("topic");
        TextMessage textMessage = session.createTextMessage("我是内容");
        producer.send(destination, textMessage);
    }

    public static void main(String[] agrs) throws Exception{
        Publish p = new Publish();
        p.sendMessage();
    }
}

上面的主题是:topic
当发布一条消息的时候,便会在后台topics上显示
后台多了一条主题

下面是消费者的代码,消费者订阅了topic主题

package bhz.mq.pd;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Consumer1 {

    private ConnectionFactory factory;
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;

    public Consumer1(){
        try {
            factory = new ActiveMQConnectionFactory("bhz",
                    "bhz","tcp://localhost:61616");
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }
    public void receive() throws Exception{
        Destination destionation = session.createTopic("topic");
        consumer = session.createConsumer(destionation);
        consumer.setMessageListener(new Listener());
    }

    public static void main(String[] agrs) throws Exception{
        Consumer1 consumer1 = new Consumer1();
        consumer1.receive();
    }
    class Listener implements MessageListener{
        @Override
        public void onMessage(Message msg) {
            // TODO Auto-generated method stub
            System.out.println("这是接收的内容:"+msg);
        }

    }
}

这时候只要生产者发布消息,那么消费者就会接收到消息

版权声明:本文为milsevol原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/milsevol/article/details/72634181

智能推荐

DC-8靶机教程

多喝热水:http://hackergu.com/ 一、主机发现 ​ 使用netdiscover,发现主机IP为192.168.203.134。 二、端口扫描 ​ 目标开放22端口和80端口,web站点是一个DrupalCMS的站点,版本为7。 三、漏洞利用 ​ 我们访问此站点: ​ 关注点为两个红框,如果点击上面那个红框的内容,url显示为: ​ 倘若点击的是下面的红框,则url显示为: ​ 看...

Android VIEW简单绘图

画布canvas 提供两个构造函数: Canvas():创建一个空的Canvas对象 Canvas(Bitmap bitmap):创建一个以bitmap位图为背景的Canvas 常用的Canvas方法: void drawBitmap(Bitmap bitmap,float left,float top,Paint paint):在指定坐标绘制位图 void drawLine(float star...

数据去重的各种方法汇总(三)

Pandas去重DataFrame 接上篇,这篇就剩最后的用Pandas对DataFrame里面的数据去重,这里也有两种方法,可以直接看官方文档(但是掌柜觉得其实就是一种方法,因为第一种只是显示哪些重复。。。): 使用duplicated方法,它会返回一个布尔向量,其长度为行数,表示行是否重复,但是并不会删除重复数据。来看实例: 然后会得到这样一个表格: 现在使用duplicated方法: 会得到...

Linux环境下RabbitMQ消息队列的安装和配置

一、什么是RabbitMQ? RabbitMQ就是一个在AMQP基础上实现的企业级消息系统,简单的说,就是一个消息队列系统。具体的介绍,可以网上去搜!目前只介绍RabbitMQ在Linux系统的安装。 二、RabbitMQ的安装 1、 RabbitMQ是基于Erlang开发,所以使用之前必须安装Erlang语言开发包。 wget http://www.erlang.org/download/otp...

train_test_split切分数据集工具

顾名思义,这是一个切分训练集与测试集的工具   如果我们不使用,而是手动进行划分,要么进行简单的操作——划去前80%为训练集,后20%为测试集,这样会带来很多的问题,因为这样做,我们切出来的会让训练集和测试集的分布很不一致,我们可以看一下简单粗暴方法切出来的分布图: 红色的训练集,蓝色是测试集(点击图片放大可以看得很清楚,直接看博客好像图片模糊)   然...

猜你喜欢

shell编程第一节 和shell

shell编程看的博客感觉写的挺好的:http://www.cnblogs.com/dongying/p/6262935.html 以及  https://www.cnblogs.com/clsn/p/8028337.html#auto_id_0 简单总结:shell编程就是对一堆Linux命令的逻辑化处理。 chmod +x hello_world.sh ./hello_world.s...

微信开发:js sdk 分享(java)

今天记录一下微信jssdk 的分享给朋友的功能,获取config接口注入。 1.需要绑定域名(注意:设置js安全域名的时候,需要设置微信ip白名单,ip白名单新出来的,非白名单内的ip无法获取access_token 更无法获取jsapi) 在设置js 安全域名在 设置–>公众号设置–>功能设置里边 appid appSercret 在开发–>...

js--HTML美术馆

前言 因为之前库房合作的时候交给我一个任务,就是在点击某一项物品的时候显示出几张相印的小图片,然后点击小图片之后显示出一张大图片,因为当时还没有接触JavaScript,所以这方面的知识不是很了解,一直拖着,大概有两天吧,是在是解决不了,于是将这个任务交给了老付和建华,今天在学习JavaScript的时候突然之后就看到了有这么一项功能,于是就有感而发。 内容 首先向大家展示代码。 这部分是HTML...

Jenkins持续集成环境部署(入门篇)

为什么要持续集成 持续集成是一种软件开发实践,即团队开发成员经常集成它们的工作,通过每个成员每天至少集成一次,也就意味着每天可能会发生多次集成。每次集成都通过自动化的构建(包括编译,发布,自动化测试)来验证,从而尽早地发现集成错误。 价值: 1、减少风险 一天中进行多次的集成,并做了相应的测试,有利于检查缺陷,了解软件的健康状况,减少假定。 2、减少重复过程 节省时间、费用和工作量,通过自动化的持...

linux 下rabbitmq的安装以及设置远程用户访问

安装过程中会有提示,一路输入“y”即可。 完成后安装RabbitMQ: 3.自己建个文件夹 进行下载安装也可  直接执行也可 先下载rpm: 完成后启动服务: 可以查看服务状态: 这里可以看到log文件的位置,转到文件位置,打开文件: 这里显示的是没有找到配置文件,我们可以自己创建这个文件 编辑内容如下: 这里的意思是开放使用,rabbitmq默认创建的用户gue...