目录
RabbitMQ消息队列入门
/      

RabbitMQ消息队列入门

RabbitMQ消息队列

笔记来源视频:https://www.bilibili.com/video/BV1gW411H7Az

maven依赖

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.7.3</version>
</dependency>

<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
    <scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>

<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
</dependency>

简单队列

生产者 -> 消息队列 -> 消费者

连接MQ

  • 工具类

    package cn.lacknb.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConnectionUtils {
    
    
        /**
         * 获取MQ的连接
         * @return
         */
        public static Connection getConnection () throws IOException, TimeoutException {
            // 定义一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
    
            // 设置服务地址
            factory.setHost("192.168.44.130");
    
            // 设置端口号    AMQP协议
            factory.setPort(5672);
    
            // vhost 类似于 数据库
            factory.setVirtualHost("v_host_guest");
    
            // 设置用户名
            factory.setUsername("guest");
    
            // 设置密码
            factory.setPassword("guest");
            return factory.newConnection();
        }
    
    }
    
  • 生产者

    package cn.lacknb.simple;
    
    import cn.lacknb.util.ConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    
    /**
     * 生产者
     */
    public class Send {
    
        private static final String QUEUE_NAME = "test_simple";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 获取一个连接
            Connection connection = ConnectionUtils.getConnection();
    
            // 从连接中 创建 一个通道
            Channel channel = connection.createChannel();
    
            // 创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    
            String msg = "hello simple !";
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());;
    
            System.out.println("-- send msg");
            channel.close();
            connection.close();;
    
        }
    
    }
    
    
  • 消费者

    package cn.lacknb.simple;
    
    import cn.lacknb.util.ConnectionUtils;
    import com.rabbitmq.client.*;
    import com.rabbitmq.client.impl.AMQImpl;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * 消费者
     */
    public class Receiver {
    
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("监听消息中...................");
            Connection connection = ConnectionUtils.getConnection();
    
            // 创建频道
            Channel channel = connection.createChannel();
    
            // 队列声明
            channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
    
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                /**
                 * 拿到队列里的数据
                 * @param consumerTag
                 * @param envelope
                 * @param properties
                 * @param body
                 * @throws IOException
                 */
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    //                super.handleDelivery(consumerTag, envelope, properties, body);
                    // 拿到的消息
                    String msg = new String(body, "utf-8");
                    System.out.println("拿到的数据:" + msg);
                }
            };
    
            // 监听队列
            channel.basicConsume(Send.QUEUE_NAME, true, consumer);
    
        }
    
    }
    
    

简单队列的不足

耦合性高,生产者 一一 对应消费者(如果我想有多个消费者消费队列中消息,这时候就不行了),队列名变更,这时候同时变更。

Work queues 工作队列之Round-robin轮询分发

一个生产者 -> 消息队列 -> 多个消费者

为什么 会出现工作队列?

Simple队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般要跟业务相结合的,消费者接收消息之后就需要处理 可能花费时间,这时候队列就会积压了很多消息。

生产者

package cn.lacknb.work;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtils.getConnection();

        // 获取channel
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(cn.lacknb.simple.Send.QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 50 ; i++) {
            String msg = "hello " + i;
            System.out.println("发送消息:" + msg);
            // 发布消息
            channel.basicPublish("", cn.lacknb.simple.Send.QUEUE_NAME, null, msg.getBytes());
            Thread.sleep(i*20);
        }
        channel.close();
        connection.close();
    }

}

消费者1

package cn.lacknb.work;

import cn.lacknb.simple.Send;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver01 {

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("[1] 开始监听......");
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);

        // 定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // 一旦有消息就会触发该方法。
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[1] 收到的消息:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done !!!");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);


    }

}

消费者2

package cn.lacknb.work;

import cn.lacknb.simple.Send;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver02 {

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("[2] 开始监听......");

        // 获取连接
        Connection connection = ConnectionUtils.getConnection();
        // 获取channel
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);

        // 定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // 一旦有消息就会触发该方法。
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[2] 收到的消息:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done !!!");
                }
            }
        };
        boolean autoAck = true;
        channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);

    }

}

现象

消费者1 和 消费者2 处理的消息数据是一样的。

消费者1 处理等待2s,而消费者2是等待1s的。

消费者1:偶数

消费者2 :奇数

这种方式叫做 轮询分发。(round-robin)

结果就是 消费者之间处理的数据 个数 总是相同的,任务消息总是你一个 我一个。

工作队列之Fair dipatch 公平分发。

使用basicQos(prefech=1) 来限制 rabbitmq 每次只发不超过一条消息 到同一个消费者。

使用公平分发,必须关闭自动应答。即autoAck改成手动 false。

生产者

package cn.lacknb.work;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtils.getConnection();

        // 获取channel
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(cn.lacknb.simple.Send.QUEUE_NAME, false, false, false, null);

        /**
         * 每个消费者 发送确认消息之前,消息队列不发送下一个消息到消费者
         * 一次只处理一个消息。
         * 限制发送同一个消费者不得超过一条消息。
         */
        int prefetchCount = 1;
        channel.basicQos(prefetchCount);


        for (int i = 0; i < 50 ; i++) {
            String msg = "hello " + i;
            System.out.println("发送消息:" + msg);
            // 发布消息
            channel.basicPublish("", cn.lacknb.simple.Send.QUEUE_NAME, null, msg.getBytes());
            Thread.sleep(i*5);
        }
        channel.close();
        connection.close();
    }

}

消费者1

package cn.lacknb.work;

import cn.lacknb.simple.Send;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver01 {

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("[1] 开始监听......");
        // 获取连接
        Connection connection = ConnectionUtils.getConnection();

        /**
         * channel 在内部类中 访问, 需要添加 final
         */
        // 获取channel
        final Channel channel = connection.createChannel();

        // 保证一次 只 分发一次。
        channel.basicQos(1);

        // 声明队列
        channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);

        // 定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // 一旦有消息就会触发该方法。
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[1] 收到的消息:" + msg);
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[1] done !!!");
                    // 发送一个 手动的 回执,即确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };

        // 为false时,自动应答 为关闭状态。
        boolean autoAck = false;
        channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);


    }

}

消费者2

package cn.lacknb.work;

import cn.lacknb.simple.Send;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver02 {

    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("[2] 开始监听......");

        // 获取连接
        Connection connection = ConnectionUtils.getConnection();

        // 获取channel
        final Channel channel = connection.createChannel();

        // 保证一次,只分发一次。
        channel.basicQos(1);

        // 声明队列
        channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);

        // 定义一个消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            // 一旦有消息就会触发该方法。
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                String msg = new String(body, "utf-8");
                System.out.println("[2] 收到的消息:" + msg);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    System.out.println("[2] done !!!");
                    // 发送 确认消息。 手动回执一个消息。
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        // 为false, 关闭自动应答。
        boolean autoAck = false;
        channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);

    }

}

现象

消费者2 处理的消息比消费者1多,也叫能者多劳

消息应答与消息持久化

boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);

boolean autoAck = true;自动确认模式, 一旦rabbitmq将消息发给消费者,就会从内存中删除。

这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息。

boolean autoAck = false;手动模式, 一旦有一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个确认消息,告诉rabbitmq这个消息我已经处理完成,你可以删除了,然后rabbitmq就删除内存中的消息。

消息应答默认是打开的。即 autoAck = false;

Message acknowledgment

如果rabbitmq挂了,我们的消息仍然会丢失。

消息的持久化

// 声明队列
boolean durable = false;
channel.queueDeclare(cn.lacknb.simple.Send.QUEUE_NAME, durable, false, false, null);

我们将程序中的boolean durable = false 改成true,是不可以的。尽管代码是正确的,它也不会允许成功。因为我们已经定义了一个叫 test _simple,这个queue是未持久化,rabbitmq不准许重新定义(不同参数)一个已存在的队列

订阅模式 publish/subscrible

生产者 -> 交换机 -> 多个队列 -> 多个消费者

解读:

  • 一个生产者, 多个消费者
  • 每一个消费者都有自己的队列
  • 生产者没有直接把消息发送到队列,而是发送到了交换机 转发器exchange
  • 每个队列都要绑定到交换机上。
  • 生产者发送的消息 经过交换机 到达队列 就能实现一个消息被多个消费者消费。

生产者

package cn.lacknb.subscrible;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明交换机
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");  // 分发

        String msg = "Hello exchange";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
        System.out.println("Send: " + msg);

        channel.close();
        connection.close();
    }
}

UTOOLS1594366088915.png

消息去哪了?丢失了!因为交换机没有存储的能力,在rabbitmq里面只有队列有存储能力。因为这时候还没有队列绑定到这个交换机,所以数据丢失了。

消费者1

package cn.lacknb.subscrible;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver01 {

    private static final String QUEUE_NAME = "test_queue_fanout_email";
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();

        final Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 保证一次只发一次
        channel.basicQos(1);

        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[1] receive : " + msg);
                // 手动发送 一条 确认信息。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;  // 关闭自动应答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);


    }

}

消费者2

package cn.lacknb.subscrible;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver02 {

    private static final String QUEUE_NAME = "test_queue_fanout_sms";
    private static final String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();

        final Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 保证一次只发一次
        channel.basicQos(1);

        // 定义一个消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "utf-8");
                System.out.println("[2] receive : " + msg);
                // 手动发送 一条 确认信息。
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        boolean autoAck = false;  // 关闭自动应答
        channel.basicConsume(QUEUE_NAME, autoAck, consumer);
    
    }
}

UTOOLS1594367137007.png

交换机转发器 绑定 两个队列。

结果:


两个消费者 均 收到消息。

Exchange交换机 转发器

一方面是接收生产者的消息,另一方面是向队列推送消息。

匿名转发:""

fanout(不处理路由键) routingkey

direct(处理路由键)

路由模式 - routing key

UTOOLS1594370051566.png

生产者

package cn.lacknb.routemode;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;


/**
 * 生产者
 */
public class Send {

    private static final String EXCHNAGE_NAME = "test_exchange_direct";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 获取一个连接
        Connection connection = ConnectionUtils.getConnection();

        // 从连接中 创建 一个通道
        Channel channel = connection.createChannel();

        // 声明 交换机转发器
        channel.exchangeDeclare(EXCHNAGE_NAME, "direct");

        String msg = "Hello direct";
        String routingkey = "error";
        channel.basicPublish(EXCHNAGE_NAME, routingkey, null, msg.getBytes());
        System.out.println("发送:" + msg);
        channel.close();
        connection.close();
    }

}

消费者1

package cn.lacknb.routemode;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Receiver01 {

    private static final String QUEUE_NAME = "first_queue_routing";
    private static final String EXCHANGE_NAME = "test_exchange_direct";


    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("监听消息中...................");
        Connection connection = ConnectionUtils.getConnection();

        // 创建频道
        final Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        // 绑定 队列和交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 拿到队列里的数据
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                // 拿到的消息
                String msg = new String(body, "utf-8");
                System.out.println("拿到的数据:" + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }

}

消费者2

package cn.lacknb.routemode;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Receiver02 {

    private static final String QUEUE_NAME = "second _queue_routing";
    private static final String EXCHNAGE_NAME = "test_exchange_direct";


    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("监听消息中...................");
        Connection connection = ConnectionUtils.getConnection();

        // 创建频道
        final Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        // 绑定 队列和交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "error");
        channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "warning");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 拿到队列里的数据
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                // 拿到的消息
                String msg = new String(body, "utf-8");
                System.out.println("拿到的数据:" + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }

}

消费者3

package cn.lacknb.routemode;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Receiver03 {

    private static final String QUEUE_NAME = "third_queue_routing";
    private static final String EXCHNAGE_NAME = "test_exchange_direct";


    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("监听消息中...................");
        Connection connection = ConnectionUtils.getConnection();

        // 创建频道
        final Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        // 绑定 队列和交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "info");
        channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "warning");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 拿到队列里的数据
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                // 拿到的消息
                String msg = new String(body, "utf-8");
                System.out.println("拿到的数据:" + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }

}

结果:

消费者1 和 消费者2 均绑定了 routingkey “eroor”

而消费者3 没有绑定 'error'

最终导致 消费者1 和 消费者2 收到了消息。

消费者3 没有收到任何消息。

topic exchange 主题模式

将路由键和某模式匹配

# 匹配一个或多个

* 匹配一个

UTOOLS1594370418463.png

模型

UTOOLS1594370538920.png

例如 商品:发布 删除 修改 查询...

生产者

package cn.lacknb.topicmode;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Send {

    private static final String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        // exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        String msgString = "商品....";
//        String routingKey = "goods.add";
        String routingKey = "goods.delete";
        channel.basicPublish(EXCHANGE_NAME, routingKey, null, msgString.getBytes());

        System.out.println("--send: " + msgString);
        channel.close();
        connection.close();

    }

}

消费者1

package cn.lacknb.topicmode;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Receiver01 {

    private static final String QUEUE_NAME = "first_queue_routing";
    private static final String EXCHANGE_NAME = "test_exchange_topic";


    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("监听消息中...................");
        Connection connection = ConnectionUtils.getConnection();

        // 创建频道
        final Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        String roukingKey = "goods.add";
        // 绑定 队列和交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, roukingKey);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 拿到队列里的数据
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                // 拿到的消息
                String msg = new String(body, "utf-8");
                System.out.println("[1] 拿到的数据:" + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }

}

消费者2

package cn.lacknb.topicmode;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 消费者
 */
public class Receiver02 {

    private static final String QUEUE_NAME = "second_queue_routing";
    private static final String EXCHANGE_NAME = "test_exchange_topic";


    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("监听消息中...................");
        Connection connection = ConnectionUtils.getConnection();

        // 创建频道
        final Channel channel = connection.createChannel();

        // 队列声明
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        channel.basicQos(1);
        String roukingKey = "goods.#";  // # 是个匹配符,匹配多个。
        // 绑定 队列和交换机转发器
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, roukingKey);
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            /**
             * 拿到队列里的数据
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                super.handleDelivery(consumerTag, envelope, properties, body);
                // 拿到的消息
                String msg = new String(body, "utf-8");
                System.out.println("[2] 拿到的数据:" + msg);
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 监听队列
        channel.basicConsume(QUEUE_NAME, false, consumer);

    }

}

RabbitMQ 的消息确认机制(事务 + confirm)

在rabbitmq中,我们可以通过持久化数据 解决 rabbitmq服务器异常 的数据丢失问题。

问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器?默认情况下是不知道的。

两种方式:

  • AMQP协议实现了事务机制(类似于MySQL的事务)
  • confirm模式

事务机制

txSelect:用户将当前channel设置成transaction模式

txCommit:用于提交事务

txRollback:回滚 事务

生产者

package cn.lacknb.transaction;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TxSend {

    private static final String QUEUE_NAME = "test_queue_transaction";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        // 声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "hello tx message ";

        try {
            // 开启事务
            channel.txSelect();
            int xx = 1 / 0;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("send message txRollback!");
        } finally {
            channel.close();
            connection.close();
        }

    }

}

消费者

package cn.lacknb.transaction;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class TxSend {

    private static final String QUEUE_NAME = "test_queue_transaction";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();

        Channel channel = connection.createChannel();

        // 声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String msg = "hello tx message ";

        try {
            // 开启事务
            channel.txSelect();
            int xx = 1 / 0;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

            channel.txCommit();
        } catch (Exception e) {
            channel.txRollback();
            System.out.println("send message txRollback!");
        } finally {
            channel.close();
            connection.close();
        }

    }

}

Confirm模式

生产者端confirm模式的实现原理

生产者将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一个ID(从1开始),一旦消息被投递到所有匹配的队列后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产的确认消息中deliver-tag域报包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。

confirm模式最大的好处在于它是异步

开启confirm模式

channel.confirmSelect()

编程模式:

  • 普通 发一条 waitForConfirms()
  • 批量的 发一批 waitForConfirms
  • 异步confirm模式:提供一个回调方法。

Confirm单挑消息确认

package cn.lacknb.confirm;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;


/**
 * 单条消息
 */
public class Send {

    private static final String QUEUE_NAME = "test_queue_confirm_01";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /*
        * 如果 当前队列 已经设置为了事务模式,就不能再设 confirm 模式,否则会报异常。
        *  */

        // 生产者调用confirmSelect 将 channel设置为confirm模式
        channel.confirmSelect();

        String msg = "hello confirm message ";
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());

        // 发送之后, 确认是否发送成功
        if (!channel.waitForConfirms()) {
            System.out.println("message send failed !");
        } else {
            System.out.println("message end success");
        }
        channel.close();
        connection.close();
    }

}

批量

其实 就在发送消息那 加一个for循环,发完之后,再确认是否发送成功。

估计 如果有一条消息发送失败,channel.waitForConfirms() 就会false

暂时无法测试

都是串行 的。

消息确认机制之confirm异步

Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,没publish一条数据,集合中元素加1,每回调一次handAck方法,unconfirm集合删掉 相应的一条(multiple=false) 或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。

package cn.lacknb.confirm;

import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;


/**
 * 单条消息
 */
public class SendMore {

    private static final String QUEUE_NAME = "test_queue_confirm_01";

    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {

        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明一个队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        /*
        * 如果 当前队列 已经设置为了事务模式,就不能再设 confirm 模式,否则会报异常。
        *  */

        // 生产者调用confirmSelect 将 channel设置为confirm模式
        channel.confirmSelect();

        /**
         * 首先 将所有的未处理消息序号存入集合中,然后每处理一条,集合-1条
         */

        // 未确认的消息标识
        final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());

        channel.addConfirmListener(new ConfirmListener() {

            /**
             * 成功了 会调这个
             * @param deliveryTag
             * @param multiple  这个值为true时,表示处理成功多条消息。
             * @throws IOException
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("----handleAck --- multiple");
                    // headSet方法返回的是指定的元素之前的 元素SortedSet,需要加一才能包含上当前返回的元素
                    // 将之前的所有元素(+1 包括当前元素) 全部清理掉。
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    System.out.println("-----handleAck --- multiple false");
                    confirmSet.remove(deliveryTag);
                }
            }

            /**
             *  失败了 会调这个。
             * @param deliveryTag
             * @param multiple 这个值为true时,表示处理成功多条消息。
             * @throws IOException
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                if (multiple) {
                    System.out.println("----handleNack --- multiple");
                    confirmSet.headSet(deliveryTag + 1).clear();
                } else {
                    System.out.println("-----handleNack --- multiple false");
                    confirmSet.remove(deliveryTag);
                }
            }
        });


        String msg = "hello confirm message ";
        while (true) {
            long seqNo = channel.getNextPublishSeqNo();
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            confirmSet.add(seqNo);
        }

    }

}

Spring集成rabbitmq-client


标题:RabbitMQ消息队列入门
作者:gitsilence
地址:https://blog.lacknb.cn/articles/2020/07/12/1594516869295.html