2、Rabbitmq Work Queues

这里写图片描述
在第一个教程中,我们编写了从指定队列发送和接收消息的程序。在这个过程中,我们将创建一个工作队列,它将用于在多个工人之间分配耗时的任务。

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并必须等待它完成。相反,我们将任务安排在以后完成。我们将一个任务封装为一条消息并将其发送到队列。在后台运行的工作进程将会弹出这些任务并最终执行任务。当你运行许多 Consumer 时,任务将在他们之间共享。

这个概念在 web 应用程序中特别有用,因为在短的 HTTP 请求窗口中不可能处理复杂的任务。

1、准备

在本教程的前一部分中,我们发送了一个包含“Hello World!”的消息。现在我们将发送支持复杂任务的字符串。我们没有一个真实的任务,比如要调整的图像或者pdf文件,所以让我们假装我们很忙 —— 通过使用thread.sleep()函数来假装它。我们将把弦上的点数作为它的复杂度;每个点都将占“工作”的一秒。例如,一个由 Hello 所描述的虚假任务。需要三秒钟。

我们将稍微修改 Send.java 从我们前面的例子中使用的java代码,允许从命令行发送任意消息。这个程序将把任务安排到我们的工作队列中,所以让我们把它命名为 NewTask.java:

NewTask.java

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.20.128");
        factory.setPort(5672);
        factory.setUsername("carl");
        factory.setPassword("198918");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        for(int i = 1; i <= 6; i++) {
            StringBuilder message = new StringBuilder("Hello World!");
            Thread.sleep(2000);
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.append(i).toString().getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }

        channel.close();
        connection.close();
    }

}

我们的之前的 Recv.java 程序还需要一些改变:它需要为消息体中的每个点伪造一秒钟的工作。它将处理传递的消息并执行任务,所以我们称它为 Worker.java:

Worker.java

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.20.128");
        factory.setPort(5672);
        factory.setUsername("carl");
        factory.setPassword("198918");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                }
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, true, consumer);
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

2、循环分发消息

使用任务队列的优点之一是能够轻松地并行工作。如果我们正在积累大量的工作,我们可以增加更多的 Worker ,这样就可以很容易地进行横向扩展,扩大规模。

首先,让我们同时运行两个 orker 实例。它们都将从队列中获取消息,但具体如何呢?我们在 NewTask.java 里面发送了 6 次消息:

这里写图片描述

让我们看看给这两个 Worker 实例是如何接收消息的:

consumer-1

这里写图片描述

consumer-2

这里写图片描述

默认情况下,RabbitMQ 将依次向下一个使用者发送每条消息。平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环。读者可以试着和三个或更多的 consumer 一起尝试。

3、消息确认

完成一项任务可能需要几秒钟。你可能想知道,如果一个消费者开始了一项长期的任务,并且只完成了部分的工作,会发生什么。有了我们当前的代码,一旦 RabbitMQ 向客户发送一条消息,它立即将其标记为删除。在这种情况下,如果你杀死一个 Worker,我们就会失去它只是处理的信息。我们还会丢失发送给这个特定工作人员的所有消息,但还没有处理。

但我们不想丢失任何任务。如果一个 Worker 死了,我们希望这个任务被交付给另一个 Worker。

为了确保消息永远不会丢失,RabbitMQ 支持 消息确认。一个ack(nowledgement)被消费者送回,告诉 RabbitMQ,一个特定的消息已经被接收、处理,并且 RabbitMQ 可以自由地删除它。

如果一个消费者死了(它的通道关闭了,连接关闭了,或者TCP连接丢失了),而不发送ack,RabbitMQ就会明白消息没有被完全处理,并且会重新排队。如果同时在网上有其他消费者,它就会迅速将其重新交付给另一个消费者。这样即使 Worker 偶尔会死亡, 你也可以确保没有信息丢失。

没有任何消息超时;RabbitMQ 将在使用者死亡时重新传送消息。即使处理一条消息需要很长时间,也可以。

手动消息确认 在默认情况下是打开的。在前面的例子中,我们显式地通过 autoAck=true 这个标签关闭它们。现在是时候把这个标志设置为false,并在完成任务后向worker发送适当的确认信息。

channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
  @Override
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");
    try {
      doWork(message);
    } finally {
      System.out.println(" [x] Done");
      channel.basicAck(envelope.getDeliveryTag(), false);
    }
  }
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

使用这段代码,我们可以确定,即使你在处理消息时使用杀死了一个 worker 实例,也不会丢失任何东西。在 worker 死后不久,所有未被确认的信息将被重新交付。

确认必须在接收到的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。请参阅 文档指南以了解更多信息。

4、被遗忘的确认

忘记进行 acs 确认这是一个容易犯的错误,但后果是严重的。当你的客户端退出时,消息将被重新发送(这可能看起来像是随机的重新交付),但是 RabbitMQ 将会吃掉越来越多的内存,因为它将无法释放任何未被删除的消息。

为了调试这种错误,您可以使用 rabbitmqctl 打印 messages_unacknowledged 字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

在Windows上,sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

5、持久化消息

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非你告诉它不要。要确保消息不会丢失,需要两件事:我们需要将队列和消息标记为持久。

首先,我们需要确保 RabbitMQ 永远不会丢失我们的队列。为了做到这一点,我们需要声明它是持久的:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

尽管这个命令本身是正确的,但是它在我们现在的设置中是行不通的。那是因为我们已经定义了一个名为 hello 的队列,它不是持久的。RabbitMQ 不允许你重新定义具有不同参数的现有队列,并将向任何试图这样做的程序返回一个错误。但是有一个快速的解决方案 —— 让我们声明一个具有不同名称的队列,例如 task_queue

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

这个 queueDeclare 更改需要同时应用于生产者和消费者代码。

在这一点上,我们确信即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。现在,我们需要将我们的消息标记为持久性 —— 通过将 MessageProperties(实现了BasicProperties)设置值为 PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;

channel.basicPublish("", "task_queue",
            MessageProperties.PERSISTENT_TEXT_PLAIN,
            message.getBytes());

注意消息的持久性

将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉 RabbitMQ 将消息保存到磁盘上,但 RabbitMQ 已经接受了一条消息,并且还没有保存它,仍然有一个很短的时间窗口。此外,RabbitMQ 不为每条消息执行fsync(2)——它可能被保存到缓存中,而不是真正写入磁盘。持久性保证并不强大,但对于我们的简单任务队列来说已经足够了。如果你需要一个更强的保证那么你可以使用 publisher confirms

6、公平的分配

你可能已经注意到,分派仍然不能完全按照我们的要求工作。例如,在 worker 工作的情况下,当所有的奇怪信息都很重,甚至消息都很轻时,一个 worker 就会一直很忙,而另一个 worker 几乎不会做任何工作。好吧,RabbitMQ对此一无所知,仍然会均匀地发送消息。

之所以发生这种情况,是因为 RabbitMQ 在消息进入队列时才会发送一条消息。它不关注消费者未被认可的消息的数量。它只是盲目地将每一个 n 条信息发送给第 n 个消费者。

这里写图片描述

在这种情况下我们可以使用 basicQos 方法和 prefetchCount=1 设置。这告诉 RabbitMQ,不要一次给一个 worker 发送多个消息。或者,换句话说,在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会把它分派给下一个不太忙的 worker。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列大小

如果所有的 worker 都很忙,你的队伍就会被填满。你会想要关注这一点,也许会增加更多的wroker,或者有其他的策略。

NewTask.java

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.20.128");
        factory.setPort(5672);
        factory.setUsername("carl");
        factory.setPassword("198918");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

        for(int i = 1; i <= 6; i++) {
            StringBuilder message = new StringBuilder("Hello World!");
            Thread.sleep(2000);
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.append(i).toString().getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }

        channel.close();
        connection.close();
    }

}

Worker.java

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.20.128");
        factory.setPort(5672);
        factory.setUsername("carl");
        factory.setPassword("198918");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);

        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");

                System.out.println(" [x] Received '" + message + "'");
                try {
                    doWork(message);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
    }

    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

使用消息确认和 prefetchCount,你可以设置一个工作队列。即使 RabbitMQ 重新启动,持久性选项也能让任务存活下来。

有关 Channel 方法和 MessageProperties 的更多信息,你可以在网上浏览 在线JavaDocs

版权声明:本文为u012410733原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u012410733/article/details/81589076