<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
生产者 -> 消息队列 -> 消费者
工具类
package cn.lacknb.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
/**
* 获取MQ的连接
* @return
*/
public static Connection getConnection () throws IOException, TimeoutException {
// 定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost("192.168.44.130");
// 设置端口号 AMQP协议
factory.setPort(5672);
// vhost 类似于 数据库
factory.setVirtualHost("v_host_guest");
// 设置用户名
factory.setUsername("guest");
// 设置密码
factory.setPassword("guest");
return factory.newConnection();
}
}
生产者
package cn.lacknb.simple;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Send {
private static final String QUEUE_NAME = "test_simple";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中 创建 一个通道
Channel channel = connection.createChannel();
// 创建队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello simple !";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());;
System.out.println("-- send msg");
channel.close();
connection.close();;
}
}
消费者
package cn.lacknb.simple;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import com.rabbitmq.client.impl.AMQImpl;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Receiver {
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("监听消息中...................");
Connection connection = ConnectionUtils.getConnection();
// 创建频道
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 拿到队列里的数据
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
// 拿到的消息
String msg = new String(body, "utf-8");
System.out.println("拿到的数据:" + msg);
}
};
// 监听队列
channel.basicConsume(Send.QUEUE_NAME, true, consumer);
}
}
耦合性高,生产者 一一 对应消费者(如果我想有多个消费者消费队列中消息,这时候就不行了),队列名变更,这时候同时变更。
一个生产者 -> 消息队列 -> 多个消费者
为什么 会出现工作队列?
Simple队列是一一对应的,而且我们实际开发,生产者发送消息是毫不费力的,而消费者一般要跟业务相结合的,消费者接收消息之后就需要处理 可能花费时间,这时候队列就会积压了很多消息。
package cn.lacknb.work;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
// 获取channel
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(cn.lacknb.simple.Send.QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 50 ; i++) {
String msg = "hello " + i;
System.out.println("发送消息:" + msg);
// 发布消息
channel.basicPublish("", cn.lacknb.simple.Send.QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i*20);
}
channel.close();
connection.close();
}
}
package cn.lacknb.work;
import cn.lacknb.simple.Send;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver01 {
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("[1] 开始监听......");
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取channel
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 一旦有消息就会触发该方法。
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[1] 收到的消息:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done !!!");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);
}
}
package cn.lacknb.work;
import cn.lacknb.simple.Send;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver02 {
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("[2] 开始监听......");
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取channel
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 一旦有消息就会触发该方法。
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[2] 收到的消息:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done !!!");
}
}
};
boolean autoAck = true;
channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);
}
}
消费者1 和 消费者2 处理的消息数据是一样的。
消费者1 处理等待2s,而消费者2是等待1s的。
消费者1:偶数
消费者2 :奇数
这种方式叫做 轮询分发。(round-robin)
结果就是 消费者之间处理的数据 个数 总是相同的,任务消息总是你一个 我一个。
使用basicQos(prefech=1) 来限制 rabbitmq 每次只发不超过一条消息 到同一个消费者。
使用公平分发,必须关闭自动应答。即autoAck改成手动 false。
package cn.lacknb.work;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
// 获取channel
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(cn.lacknb.simple.Send.QUEUE_NAME, false, false, false, null);
/**
* 每个消费者 发送确认消息之前,消息队列不发送下一个消息到消费者
* 一次只处理一个消息。
* 限制发送同一个消费者不得超过一条消息。
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);
for (int i = 0; i < 50 ; i++) {
String msg = "hello " + i;
System.out.println("发送消息:" + msg);
// 发布消息
channel.basicPublish("", cn.lacknb.simple.Send.QUEUE_NAME, null, msg.getBytes());
Thread.sleep(i*5);
}
channel.close();
connection.close();
}
}
package cn.lacknb.work;
import cn.lacknb.simple.Send;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver01 {
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("[1] 开始监听......");
// 获取连接
Connection connection = ConnectionUtils.getConnection();
/**
* channel 在内部类中 访问, 需要添加 final
*/
// 获取channel
final Channel channel = connection.createChannel();
// 保证一次 只 分发一次。
channel.basicQos(1);
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 一旦有消息就会触发该方法。
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[1] 收到的消息:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[1] done !!!");
// 发送一个 手动的 回执,即确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 为false时,自动应答 为关闭状态。
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);
}
}
package cn.lacknb.work;
import cn.lacknb.simple.Send;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver02 {
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("[2] 开始监听......");
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取channel
final Channel channel = connection.createChannel();
// 保证一次,只分发一次。
channel.basicQos(1);
// 声明队列
channel.queueDeclare(Send.QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
// 一旦有消息就会触发该方法。
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
String msg = new String(body, "utf-8");
System.out.println("[2] 收到的消息:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("[2] done !!!");
// 发送 确认消息。 手动回执一个消息。
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 为false, 关闭自动应答。
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);
}
}
消费者2 处理的消息比消费者1多,也叫能者多劳
boolean autoAck = false;
channel.basicConsume(Send.QUEUE_NAME, autoAck, defaultConsumer);
boolean autoAck = true;
自动确认模式, 一旦rabbitmq将消息发给消费者,就会从内存中删除。
这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息。
boolean autoAck = false;
手动模式, 一旦有一个消费者挂掉,就会交付给其他消费者,rabbitmq支持消息应答,消费者发送一个确认消息,告诉rabbitmq这个消息我已经处理完成,你可以删除了,然后rabbitmq就删除内存中的消息。
消息应答默认是打开的。即 autoAck = false;
Message acknowledgment
如果rabbitmq挂了,我们的消息仍然会丢失。
// 声明队列
boolean durable = false;
channel.queueDeclare(cn.lacknb.simple.Send.QUEUE_NAME, durable, false, false, null);
我们将程序中的boolean durable = false 改成true,是不可以的。尽管代码是正确的,它也不会允许成功。因为我们已经定义了一个叫 test _simple,这个queue是未持久化,rabbitmq不准许重新定义(不同参数)一个已存在的队列
生产者 -> 交换机 -> 多个队列 -> 多个消费者
解读:
package cn.lacknb.subscrible;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 分发
String msg = "Hello exchange";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("Send: " + msg);
channel.close();
connection.close();
}
}
消息去哪了?丢失了!因为交换机没有存储的能力,在rabbitmq里面只有队列有存储能力。因为这时候还没有队列绑定到这个交换机,所以数据丢失了。
package cn.lacknb.subscrible;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver01 {
private static final String QUEUE_NAME = "test_queue_fanout_email";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 保证一次只发一次
channel.basicQos(1);
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[1] receive : " + msg);
// 手动发送 一条 确认信息。
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
boolean autoAck = false; // 关闭自动应答
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
package cn.lacknb.subscrible;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Receiver02 {
private static final String QUEUE_NAME = "test_queue_fanout_sms";
private static final String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 保证一次只发一次
channel.basicQos(1);
// 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[2] receive : " + msg);
// 手动发送 一条 确认信息。
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
boolean autoAck = false; // 关闭自动应答
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
交换机转发器 绑定 两个队列。
结果:
两个消费者 均 收到消息。
一方面是接收生产者的消息,另一方面是向队列推送消息。
匿名转发:""
fanout(不处理路由键) routingkey
direct(处理路由键)
package cn.lacknb.routemode;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.sun.scenario.effect.impl.sw.sse.SSEBlend_SRC_OUTPeer;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Send {
private static final String EXCHNAGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中 创建 一个通道
Channel channel = connection.createChannel();
// 声明 交换机转发器
channel.exchangeDeclare(EXCHNAGE_NAME, "direct");
String msg = "Hello direct";
String routingkey = "error";
channel.basicPublish(EXCHNAGE_NAME, routingkey, null, msg.getBytes());
System.out.println("发送:" + msg);
channel.close();
connection.close();
}
}
package cn.lacknb.routemode;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Receiver01 {
private static final String QUEUE_NAME = "first_queue_routing";
private static final String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("监听消息中...................");
Connection connection = ConnectionUtils.getConnection();
// 创建频道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 绑定 队列和交换机转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 拿到队列里的数据
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
// 拿到的消息
String msg = new String(body, "utf-8");
System.out.println("拿到的数据:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
package cn.lacknb.routemode;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Receiver02 {
private static final String QUEUE_NAME = "second _queue_routing";
private static final String EXCHNAGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("监听消息中...................");
Connection connection = ConnectionUtils.getConnection();
// 创建频道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 绑定 队列和交换机转发器
channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "warning");
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 拿到队列里的数据
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
// 拿到的消息
String msg = new String(body, "utf-8");
System.out.println("拿到的数据:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
package cn.lacknb.routemode;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Receiver03 {
private static final String QUEUE_NAME = "third_queue_routing";
private static final String EXCHNAGE_NAME = "test_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("监听消息中...................");
Connection connection = ConnectionUtils.getConnection();
// 创建频道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
// 绑定 队列和交换机转发器
channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHNAGE_NAME, "warning");
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 拿到队列里的数据
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
// 拿到的消息
String msg = new String(body, "utf-8");
System.out.println("拿到的数据:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
结果:
消费者1 和 消费者2 均绑定了 routingkey “eroor”
而消费者3 没有绑定 'error'
最终导致 消费者1 和 消费者2 收到了消息。
消费者3 没有收到任何消息。
将路由键和某模式匹配
#
匹配一个或多个
*
匹配一个
模型
例如 商品:发布 删除 修改 查询...
package cn.lacknb.topicmode;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Send {
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String msgString = "商品....";
// String routingKey = "goods.add";
String routingKey = "goods.delete";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, msgString.getBytes());
System.out.println("--send: " + msgString);
channel.close();
connection.close();
}
}
package cn.lacknb.topicmode;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Receiver01 {
private static final String QUEUE_NAME = "first_queue_routing";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("监听消息中...................");
Connection connection = ConnectionUtils.getConnection();
// 创建频道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
String roukingKey = "goods.add";
// 绑定 队列和交换机转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, roukingKey);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 拿到队列里的数据
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
// 拿到的消息
String msg = new String(body, "utf-8");
System.out.println("[1] 拿到的数据:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
package cn.lacknb.topicmode;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Receiver02 {
private static final String QUEUE_NAME = "second_queue_routing";
private static final String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("监听消息中...................");
Connection connection = ConnectionUtils.getConnection();
// 创建频道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
String roukingKey = "goods.#"; // # 是个匹配符,匹配多个。
// 绑定 队列和交换机转发器
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, roukingKey);
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
* 拿到队列里的数据
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
// 拿到的消息
String msg = new String(body, "utf-8");
System.out.println("[2] 拿到的数据:" + msg);
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
在rabbitmq中,我们可以通过持久化数据 解决 rabbitmq服务器异常 的数据丢失问题。
问题:生产者将消息发送出去之后,消息到底有没有到达rabbitmq服务器?默认情况下是不知道的。
两种方式:
txSelect:用户将当前channel设置成transaction模式
txCommit:用于提交事务
txRollback:回滚 事务
package cn.lacknb.transaction;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxSend {
private static final String QUEUE_NAME = "test_queue_transaction";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello tx message ";
try {
// 开启事务
channel.txSelect();
int xx = 1 / 0;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("send message txRollback!");
} finally {
channel.close();
connection.close();
}
}
}
package cn.lacknb.transaction;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class TxSend {
private static final String QUEUE_NAME = "test_queue_transaction";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "hello tx message ";
try {
// 开启事务
channel.txSelect();
int xx = 1 / 0;
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
channel.txCommit();
} catch (Exception e) {
channel.txRollback();
System.out.println("send message txRollback!");
} finally {
channel.close();
connection.close();
}
}
}
生产者将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一个ID(从1开始),一旦消息被投递到所有匹配的队列后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产的确认消息中deliver-tag域报包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于它是异步
开启confirm模式
channel.confirmSelect()
编程模式:
package cn.lacknb.confirm;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 单条消息
*/
public class Send {
private static final String QUEUE_NAME = "test_queue_confirm_01";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*
* 如果 当前队列 已经设置为了事务模式,就不能再设 confirm 模式,否则会报异常。
* */
// 生产者调用confirmSelect 将 channel设置为confirm模式
channel.confirmSelect();
String msg = "hello confirm message ";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 发送之后, 确认是否发送成功
if (!channel.waitForConfirms()) {
System.out.println("message send failed !");
} else {
System.out.println("message end success");
}
channel.close();
connection.close();
}
}
其实 就在发送消息那 加一个for循环,发完之后,再确认是否发送成功。
估计 如果有一条消息发送失败,channel.waitForConfirms() 就会false
暂时无法测试
都是串行 的。
Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,没publish一条数据,集合中元素加1,每回调一次handAck方法,unconfirm集合删掉 相应的一条(multiple=false) 或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。
package cn.lacknb.confirm;
import cn.lacknb.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;
/**
* 单条消息
*/
public class SendMore {
private static final String QUEUE_NAME = "test_queue_confirm_01";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
// 声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*
* 如果 当前队列 已经设置为了事务模式,就不能再设 confirm 模式,否则会报异常。
* */
// 生产者调用confirmSelect 将 channel设置为confirm模式
channel.confirmSelect();
/**
* 首先 将所有的未处理消息序号存入集合中,然后每处理一条,集合-1条
*/
// 未确认的消息标识
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.addConfirmListener(new ConfirmListener() {
/**
* 成功了 会调这个
* @param deliveryTag
* @param multiple 这个值为true时,表示处理成功多条消息。
* @throws IOException
*/
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("----handleAck --- multiple");
// headSet方法返回的是指定的元素之前的 元素SortedSet,需要加一才能包含上当前返回的元素
// 将之前的所有元素(+1 包括当前元素) 全部清理掉。
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("-----handleAck --- multiple false");
confirmSet.remove(deliveryTag);
}
}
/**
* 失败了 会调这个。
* @param deliveryTag
* @param multiple 这个值为true时,表示处理成功多条消息。
* @throws IOException
*/
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("----handleNack --- multiple");
confirmSet.headSet(deliveryTag + 1).clear();
} else {
System.out.println("-----handleNack --- multiple false");
confirmSet.remove(deliveryTag);
}
}
});
String msg = "hello confirm message ";
while (true) {
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
confirmSet.add(seqNo);
}
}
}