RabbitMQ 拓展篇

jackson

在 rabbitMq 中还有个重要的组件是 MessageConverter,用于消息格式的设置。
默认使用amqp的 SimpleMessageConverter 使用text传输,在传输量较大的数据时比较消耗性能。
另一种就是 Jackson2JsonMessageConverter,使用json传输。

全局配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Configuration
public class RabbitMqConfig {

/* 设置消息传输形式 使用jackson 相对默认SimpleMessageConverter 提高性能 */

/**
* 发送消息设置用json的形式序列化
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}

/**
* 接受的时候使用jackson 反序列化
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}

接受者配置

1
2
3
4
5
6
7
8
9
10
@Component
@RabbitListener(queues = "dcloud.fanout.queue",containerFactory = "rabbitListenerContainerFactory")
public class FanoutReceiverA {

@RabbitHandler
public void process(@Payload String message) {
System.out.println("receive" + message);
}

}

贴出simple和jack ,string和Object 数据传输格式

javabean_jackson_messagejavabean_jackson_message

javabean_simple_messagejavabean_simple_message

string_jackson_messagestring_jackson_message

string_simple_messagestring_simple_message

消息高可用

消息不管在生成,传输,队列,消费中都可能存在问题,丢失或者重复消费等,因此需要配置一些参数或功能以达到消息高可用

消息持久化

持久化设置时三个缺一不可。

queue 持久化

设置queue为 durable ,new Queue() 和 RabbitMQ Managemnet 默认持久化

消息 持久化

核心为设置 Message 的 MessageDeliveryMode 为 PERSISTENT。
使用 rabbitTemplate.convertAndSend 方法中 默认为此模式

exchange 持久化

同queue 设置 exchange Type 为 durable …

消息确认

确认发送(生产者)

继承 RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback 两个接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 是否正确到达 Exchange
*
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)

/**
* Exchange 发送到 queue 发送确定
*
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey)

并设置 指定监听回调

1
2
3
4
5
6
template.setConfirmCallback(this);
template.setReturnCallback(this);
// 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,
// 那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);
// 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉
template.setMandatory(true);

确认发送(消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RabbitHandler
public void process(@Payload String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println("receive" + message);

//确认消息,deliveryTag为相对channel的消息唯一标识,
//multiple 批处理true 可以一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(tag, false);

//拒绝消息 requeue 是否重新进入队列, true 重新进入队列 false 消息被丢弃
channel.basicReject(tag, true);

//否认消息
channel.basicNack(tag, false, true);

}

死信和延时队列

保证消息高可用的场景还应包含对异常信息的处理,这部分数据在死信交换机中,
延时队列实现异步延迟操作的功能。

应用场景

  • 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单
  • 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用
  • 延迟重试。比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试
  • 物联网系统经常会遇到向终端下发命令,如果命令一段时间没有应答,就需要设置成超时

死信触发条件

  • 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false
  • 消息/队列 因为设置了TTL而过期
  • 消息进入了一条已经达到最大长度的队列

设置方法

  • 新增死信交换器和队列(和普通的无差)
  • 新建延时队列设置args (x-dead-letter-exchange/x-dead-letter-routing-key)
  • 设置队列延时 args(x-message-ttl)或消息延时(message.getMessageProperties().setExpiration)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 延时队列
*
* @return
*/
@Bean
public Queue deployQueue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-dead-letter-exchange", "dcloud.dlxExchange");
args.put("x-dead-letter-routing-key", "dlx");
// 设置消息的过期时间, 单位是毫秒
args.put("x-message-ttl", 5000);
return new Queue("dcloud.deployQueue", true, false, false, args);
}

发送测试

this.rabbitTemplate.convertAndSend(exchange, routingKey, message, message -> {
// 设置消息延时 单位 毫秒
message.getMessageProperties().setExpiration(10000 + "");
return message;
}, correlationDataExtend);

RabbitMQ 基础篇

消息队列

单线程中间件,主要用于异步通知、消息分发、缓存、分布式事务等场景。

RabbitMQ

主要有Exchange 交换器 和 Queue 队列功能组件。
生产者会向Exchange发送消息并且绑定一个RoutingKey,
Exchange 用来接收生产者发送的消息并通过模式和规则将这些消息路由给服务器中的队列,
Exchange通过BindingKey找到匹配的队列,Queue 用来保存消息直到发送给消费者。
rabbitmq_model

四种运行模式

  • fanout

每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。
fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,
每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。
很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout
类型转发消息是最快的,忽略RoutingKey
fanout 模式

  • direct

Exchange 通过比配RoutingKey和BindingKey相同的队列进行转发
direct 模式

  • topic

与direct类似,更加灵活,RoutingKey 类似com.aa.bb./com.aa,bb.# ,用于
匹配BindingKey复合的队列(“
”用于匹配一个单词,“#”用于匹配多个单词(可以是零个))
topic 模式

  • headers

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。
该类型的Exchange没有用到过(不过也应该很有用武之地),所以不做介绍。

参考:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

安装 下载 命令

linux/centos : 需现安装Erlang 环境 再通过官网 rpm 文件安装或yum install

1
2
3
4
5
6
7
8
9
10
11
12
13
14
系统服务: server rabbitmq-server start
执行文件:
rabbitmq-server -detached # 使用守护进程方式启动
rabbitmq-server start # 使用阻塞方式启动
rabbitmqctl stop # 关闭rabbitmq
rabbitmqctl list_users # 查看后台管理员名单
rabbitmqctl list_queues # 查看当前的所有的队列
rabbitmqctl list_exchanges # 查看所有的交换机
rabbitmqctl list_bindings # 查看所有的绑定
rabbitmqctl list_connections # 查看所有的tcp连接
rabbitmqctl list_channels # 查看所有的信道
rabbitmqctl stop_app # 关闭应用
rabbitmqctl start_app # 打开应用
rabbitmqctl reset # 清空队列

Spring boot 集成

生成 Exchange 和 Queue 可以通过ip:15672 管理页面手动添加,也可以通过项目启动配置文件生成,
两种方式取并集。消息类型可以发送json字符串或对象。

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.dzdy.dcloud.dcloud.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* @author : wangzhiyong
* @date : 2019/1/17 14:47
* description : fanout 模式
*/
//@Configuration
public class FanoutRabbitConfig {

@Bean
public Queue fanoutQueueA(){
return new Queue("dcloud.fanout.queue");
}

@Bean
public Queue fanoutQueueB() {
return new Queue("dcloud.fanout.queue2");
}

@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("dcloud.fanout");
}

@Bean
public Binding bindingExchangeA(Queue fanoutQueueA,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueA).to(fanoutExchange);
}

@Bean
public Binding bindingExchangeB(Queue fanoutQueueB,FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueueB).to(fanoutExchange);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.dzdy.dcloud.dcloud.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author : wangzhiyong
* @date : 2019/1/17 16:26
* description :
*/
//@Configuration
public class DirectRabbitConfig {

@Bean
public DirectExchange directExchange(){
return new DirectExchange("dcloud.direct");
}

@Bean
public Queue directQueueA(){
return new Queue("dcloud.direct.queue1");
}

@Bean
public Queue directQueueB() {
return new Queue("dcloud.direct.queue2");
}

@Bean
public Binding bindingExchangeA(Queue directQueueA, DirectExchange directExchange){
return BindingBuilder.bind(directQueueA).to(directExchange).with("aa");
}

@Bean
public Binding bindingExchangeBA(Queue directQueueB,DirectExchange directExchange){
return BindingBuilder.bind(directQueueB).to(directExchange).with("aa");
}

@Bean
public Binding bindingExchangeBB(Queue directQueueB,DirectExchange directExchange){
return BindingBuilder.bind(directQueueB).to(directExchange).with("bb");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.dzdy.dcloud.dcloud.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;

/**
* @author : wangzhiyong
* @date : 2019/1/17 16:54
* description :
*/
//@Configuration
public class TopicRabbitConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("dcloud.topic");
}

@Bean
public Queue topicQueueA() {
return new Queue("dcloud.topic.queue10");
}

@Bean
public Queue topicQueueB() {
return new Queue("dcloud.topic.queue11");
}

@Bean
public Binding bindingExchangeA(Queue topicQueueA, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueA).to(topicExchange).with("com.ys");
}

//星号(*) :只能匹配一个单词 井号(#):可以匹配0个或多个单词
@Bean
public Binding bindingExchangeBA(Queue topicQueueB, TopicExchange topicExchange) {
return BindingBuilder.bind(topicQueueB).to(topicExchange).with("com.dzdy.#");
}

}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.dzdy.dcloud.dcloud;

import com.alibaba.fastjson.JSONObject;
import com.dzdy.dcloud.dcloud.vo.MqUserVo;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class DcloudMqApplicationTests {

@Resource
private AmqpTemplate rabbitTemplate;

private MqUserVo userVo = new MqUserVo("张氏", 12, 33.23, LocalDateTime.now());
@Test
public void contextLoads() {
}

// config 可以不用配置 如果配置会和网页配置的想重合

@Test
public void fanoutSendMsg() {
// rabbitTemplate.convertAndSend("dcloud.fanout","",new MqUserVo("张氏",12,33.23,LocalDateTime.now()));
rabbitTemplate.convertAndSend("dcloud.fanout", "", JSONObject.toJSONString(userVo));
// 直接发送到队列
// rabbitTemplate.convertAndSend("dcloud.fanout.queue","hello mq");
log.info("fanoutSendMsg - success");
}

@Test
public void directSendMsg() {
// 必须匹配 routingKey 不能为空
rabbitTemplate.convertAndSend("dcloud.direct", "bb", JSONObject.toJSONString(userVo));
log.info("directSendMsg - success");
}

@Test
public void topicSendMsg(){
List<MqUserVo> userVos = new ArrayList<>();
userVos.add(userVo);
userVos.add(userVo);
userVos.add(userVo);
rabbitTemplate.convertAndSend("dcloud.topic","com.dzdy",userVos);
log.info("topicSendMsg - success");
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.dzdy.dcloud.dcloud.receive;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author : wangzhiyong
* @date : 2019/1/17 15:03
* description :
*/
@Component
@RabbitListener(queues = "dcloud.direct.queue1")
public class DirectReceiverA {

@RabbitHandler
public void process(String message) {
System.out.println("receive3" + message);
}

// @RabbitHandler
// public void process(MqUserVo mqUserVo) {
// System.out.println("receive" + mqUserVo.toString());
// }
}