|
常用的交换机
DirectExchange:直连型交换机,根据消息携带的路由键,将消息转发给对应的队列
FanoutExchange:扇形交换机,接收到消息后会将消息转发到所有队列
TopicExchange:主题交换机,根据消息携带的路由键和交换机与队列绑定键的规则,将消息转发给对应的队列
规则:
*(星号):表示一个字符必须出现
#(井号):表示任意数量的字符
准备
两个Spring Boot 项目:
- rabbitmq-provider(生产者)
- rabbitmq-comsumer(消费者)
版本号:2.1.7.RELEASE
依赖:
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>application.yml:
server:
port: 9000
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 192.168.1.45
port: 5672
username: admin
password: admin
#virtual-host:
# ------- 消息确认配置项 --------
# 确认消息已发送到交换机
#publisher-confirms: true
# 确认消息已发送队列
#publisher-returns: trueDirectExchange
rabbitmq-provider
- 1、创建DirectRabbitConfig.java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitConfig {
/**
* 交换机
*/
@Bean
public DirectExchange myDirectExchange() {
// 参数意义:
// name: 名称
// durable: true
// autoDelete: 自动删除
return new DirectExchange(&#34;myDirectExchange&#34;, true, false);
}
/**
* 队列
*/
@Bean
public Queue myDirectQueue() {
return new Queue(&#34;myDirectQueue&#34;, true);
}
/**
* 绑定
*/
@Bean
public Binding bindingDirect() {
return BindingBuilder.bind(myDirectQueue())
.to(myDirectExchange())
.with(&#34;my.direct.routing&#34;);
}
}
@Autowired
private RabbitTemplate rabbitTemplate;
@ResponseBody
@RequestMapping(&#34;/send&#34;)
public String send() {
String msg = &#34;hello&#34;;
rabbitTemplate.convertAndSend(&#34;myDirectExchange&#34;, &#34;my.direct.routing&#34;, msg);
return &#34;success&#34;;
}
- 3、启动项目,使用PostMan调用发送消息接口,观察RabbitMQ控制面板的队列和控制台输出
rabbitmq-consumer
- 1、创建消息处理者DirectReceiver.java
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
//@RabbitListener(queues = &#34;myDirectQueue&#34;)
public class DirectReceiver{
@RabbitHandler
@RabbitListener(queues = &#34;myDirectQueue&#34;)
public void process(String msg) {
System.out.println(msg);
}
}
监听的队列,在项目启动之前应该存在,否则会报错
解决方法:先启动生产者发送一次消息,这时RabbitMQ会自动创建队列,再启动消费者接收消息
FanoutExchange
创建多个队列绑定到扇形交换机,生产者发送一次消息,可以观察到多个处理者都收到了消息
rabbitmq-provider
- 1、创建FanoutRabbitConfig.java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
// ----- 交换机 -----
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(&#34;fanoutExchange&#34;, true, false);
}
// ----- 队列 -----
@Bean
public Queue fanoutQueueA() {
return new Queue(&#34;fanoutQueueA&#34;, true);
}
@Bean
public Queue fanoutQueueB() {
return new Queue(&#34;fanoutQueueB&#34;, true);
}
@Bean
public Queue fanoutQueueC() {
return new Queue(&#34;fanoutQueueC&#34;, true);
}
// ----- 绑定 -----
@Bean
public Binding bindingFanoutA() {
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutB() {
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
@Bean
public Binding bindingFanoutC() {
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}
}
@Autowired
private RabbitTemplate rabbitTemplate;
@ResponseBody
@RequestMapping(&#34;/sendByFanout&#34;)
public String sendByFanout() {
String msg = &#34;hello fanout&#34;;
rabbitTemplate.convertAndSend(&#34;fanoutExchange&#34;, null, msg);
return &#34;success&#34;;
}
rabbitmq-comsumer
package com.rabbitmq.demo.mq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectReceiver{
@RabbitHandler
@RabbitListener(queues = &#34;fanoutQueueA&#34;)
public void processA(String msg) {
System.out.println(&#34;fanoutQueueA &#34; + msg);
}
@RabbitHandler
@RabbitListener(queues = &#34;fanoutQueueB&#34;)
public void processB(String msg) {
System.out.println(&#34;fanoutQueueB &#34; + msg);
}
@RabbitHandler
@RabbitListener(queues = &#34;fanoutQueueC&#34;)
public void processC(String msg) {
System.out.println(&#34;fanoutQueueC &#34; + msg);
}
}
TopicExchange
创建两个队列,并使用通配符绑定到主题交换机
rabbitmq-provider
- 1、创建TopicRabbitConfig.java
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
// 交换机
@Bean
public TopicExchange myTopicExchange() {
return new TopicExchange(&#34;myTopicExchange&#34;, true, false);
}
// ----- 队列 -----
@Bean
public Queue myTopicQueue_01() {
return new Queue(&#34;myTopicQueue_01&#34;, true);
}
@Bean Queue myTopicQueue_02() {
return new Queue(&#34;myTopicQueue_02&#34;, true);
}
/**
* 绑定路由键为topic.01
*/
@Bean
public Binding binding_01() {
return BindingBuilder.bind(myTopicQueue_01()).to(myTopicExchange()).with(&#34;topic.01&#34;);
}
/**
* 绑定路由键为topic.#规则
*/
@Bean
public Binding binding_02() {
return BindingBuilder.bind(myTopicQueue_02()).to(myTopicExchange()).with(&#34;topic.#&#34;);
}
}
@Autowired
private RabbitTemplate rabbitTemplate;
@ResponseBody
@RequestMapping(&#34;/sendByTopic&#34;)
public String sendByTopic() {
String msg = &#34;hello topic&#34;;
rabbitTemplate.convertAndSend(&#34;myTopicExchange&#34;, &#34;topic.01&#34;, msg + &#34; topic.01&#34;);
rabbitTemplate.convertAndSend(&#34;myTopicExchange&#34;, &#34;topic.xxx&#34;, msg + &#34; topic.xxx&#34;);
return &#34;success&#34;;
}
rabbitmq-consumer
1、创建消息处理者
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectReceiver{
@RabbitHandler
@RabbitListener(queues = &#34;myTopicQueue_01&#34;)
public void process_01(String msg) {
System.out.println(&#34;myTopicQueue_01 &#34; + msg);
}
@RabbitHandler
@RabbitListener(queues = &#34;myTopicQueue_02&#34;)
public void process_02(String msg) {
System.out.println(&#34;myTopicQueue_02 &#34; + msg);
}
}
消息确认
- 1、application.yml文件开启rabbitmq消息确认
#确认消息已发送到交换机
publisher-confirms: true
#确认消息已发送到队列
publisher-returns: true高版本报错可能得使用: publisher-confirm-type: correlated
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
// 开启Mandatory, 才能触发回调函数,无论消息推送结果如何都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println(&#34;ConfirmCallback: &#34;+&#34;相关数据:&#34; + correlationData);
System.out.println(&#34;ConfirmCallback: &#34;+&#34;确认情况:&#34; + ack);
System.out.println(&#34;ConfirmCallback: &#34;+&#34;原因:&#34; + cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println(&#34;ReturnCallback: &#34;+&#34;消息:&#34; + message);
System.out.println(&#34;ReturnCallback: &#34;+&#34;回应码:&#34; + replyCode);
System.out.println(&#34;ReturnCallback: &#34;+&#34;回应信息:&#34; + replyText);
System.out.println(&#34;ReturnCallback: &#34;+&#34;交换机:&#34; + exchange);
System.out.println(&#34;ReturnCallback: &#34;+&#34;路由键:&#34; + routingKey);
}
});
return rabbitTemplate;
}
}
import com.rabbitmq.demo.mq.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessageListenerConfig {
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Autowired
private MyAckReceiver myAckReceiver;
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cachingConnectionFactory);
// 监听队列名
container.setQueueNames(&#34;myDirectQueue&#34;);
// 当前消费者数量
container.setConcurrentConsumers(1);
// 最大消费者数量
container.setMaxConcurrentConsumers(1);
// 手动确认
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置监听器
container.setMessageListener(myAckReceiver);
return container;
}
}
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 消息的唯一性ID
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String msg = message.toString();
System.out.println(&#34;消息: &#34; + msg);
System.out.println(&#34;消息来自: &#34;+message.getMessageProperties().getConsumerQueue());
// 手动确认
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 拒绝策略
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}5、启动生产者发送消息
6、启动消费者
7、观察生产者和消费者控制台输出 |
|