RabbitMQ发布确认和交换机基础总结与实战

1、发布确认
 
1.1、发布确认的引出
 
一个消息的持久化需要经历的步骤:
 
设置要求队列持久化。
 
设置要求队列中的消息必须持久化。
 
发布确认
 
如果缺少了发布确认的话,那么消息在磁盘上持久化之前会发生丢失,从而不能满足消息持久化的目的。
 
1.2、发布确认的策略
 
1.2.1、开启发布确认的方法
 
Channel channel = RabbitmqUtil.getChannel();
 
//开启发布确认
 
channel.confirmSelect();
 
发布确认默认是没有开启的,如果需要开启需要调用confirmSelect,每当需要使用发布确认的时候,都需要调用该方法。
 
1.2.2、单个确认发布
 
单个确认发布是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布。
 
该确认方式主要通过waitForConfirms方法实现,这个方法只有在消息被确认的时候才会返回,如果在指定时间范围内这个消息没有被确认那么它将会抛出异常。
 
这种确认方式的最大的缺点就是:发布速度特别慢。
 
public static void ConfirmMessageIndividually() throws Exception{
 
        Channel channel = RabbitmqUtil.getChannel();
 
        String QUEUE_NAME = UUID.randomUUID()。toString();
 
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 
        channel.confirmSelect();
 
        long begin = System.currentTimeMillis();
 
        for (int i = 0; i < MESSAGE_COUNT; i++) {
 
            String message = i + “”;
 
            channel.basicPublish(””,QUEUE_NAME,null,message.getBytes());
 
            // 进行单个发布确认
 
            boolean flag = channel.waitForConfirms();
 
            if(flag){
 
                System.out.println(”消息发送成功”);
 
            }
 
        }
 
        long end = System.currentTimeMillis();
 
        System.out.println(”单个确认发送” + MESSAGE_COUNT + “条消息所消耗的时间是” + (end – begin) + “ms”);
 
    }
 
1.2.3、批量确认发布
 
先发布一批消息然后一起确认。
 
缺点:当发生故障导致发布出现问题时,不知道那个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的消息而后重新发布消息。
 
    public static void ConfirmMessageBatch() throws IOException, TimeoutException, InterruptedException {
 
        Channel channel = RabbitmqUtil.getChannel();
 
        String QUEUE_NAME = UUID.randomUUID()。toString();
 
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 
        channel.confirmSelect();
 
        long begin = System.currentTimeMillis();
 
        // 批量处理消息的个数
 
        int batchSize = 100;
 
        for (int i = 1; i <= MESSAGE_COUNT; i++) {
 
            String message = i + “”;
 
            channel.basicPublish(””,QUEUE_NAME,null,message.getBytes());
 
            // 进行批量发布确认
 
            if(i % batchSize == 0){
 
                channel.waitForConfirms();
 
                System.out.println(”批量处理消息成功”);
 
            }
 
        }
 
        long end = System.currentTimeMillis();
 
        System.out.println(”批量确认发送” + MESSAGE_COUNT + “条消息所消耗的时间是” + (end – begin) + “ms”);
 
    }

如需转载,请注明文章出处和来源网址:http://www.divcss5.com/html/h64834.shtml

张贴在3