This commit is contained in:
shuhongfan
2023-09-04 16:40:17 +08:00
commit cf5ac25c14
8267 changed files with 1305066 additions and 0 deletions

3
sl-express-mq/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
.idea
target/
*.iml

126
sl-express-mq/README.md Normal file
View File

@@ -0,0 +1,126 @@
# sl-express-mq使用手册
## 1、说明
为了将MQ的使用相关的代码进行统一所以将发送消息的代码、消费者的配置抽取到sl-express-mq工程中。
主要功能:
- 为RabbitTemplate设置了ReturnsCallback如果消息发送到交换机但是没有到达队列会进行日志的记录。
- 统一了配置了消息的消费消费者如果处理消息失败会进行重试如果依然是失败的话会将错误消息发送到error.queue队列后续需要人工进行处理。
- 统一了发送消息代码如果网络等异常情况导致发送消息失败会进行重试如果依然失败的话将消息内容持久化到mysql数据库后续通过xxl-job任务进行重新发送如果其他情况导致失败不会进行重试直接存储消息到mysql数据库中。
## 2、SQL脚本
错误消息记录的SQL脚本
~~~sql
CREATE TABLE `sl_fail_msg` (
`id` bigint NOT NULL,
`msg_id` varchar(32) DEFAULT NULL COMMENT '消息id',
`exchange` varchar(100) DEFAULT NULL COMMENT '交换机',
`routing_key` varchar(50) DEFAULT NULL COMMENT '路由key',
`msg` text COMMENT '消息内容',
`reason` text CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci COMMENT '失败原因',
`created` datetime DEFAULT NULL COMMENT '创建时间',
`updated` datetime DEFAULT NULL COMMENT '更新时间',
PRIMARY KEY (`id`),
KEY `created` (`created`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='失败消息记录表';
~~~
## 3、使用
### 3.1、导入依赖
~~~xml
<dependency>
<groupId>com.sl-express.mq</groupId>
<artifactId>sl-express-mq</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
~~~
### 3.2、启动类增加@EnableRetry
例如:
~~~java
package com.sl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.retry.annotation.EnableRetry;
@EnableRetry //开启重试机制
@EnableFeignClients
@SpringBootApplication
public class DispatchApplication {
public static void main(String[] args) {
SpringApplication.run(DispatchApplication.class, args);
}
}
~~~
### 3.3、编写配置
在springboot的配置文件中bootstrap-*.yml修改配置
~~~yaml
spring:
rabbitmq: #mq的配置
host: ${rabbitmq.host}
port: ${rabbitmq.port}
username: ${rabbitmq.username}
password: ${rabbitmq.password}
virtual-host: ${rabbitmq.virtual-host}
publisher-confirm-type: correlated #发送消息的异步回调,记录消息是否发送成功
publisher-returns: true #开启publish-return功能消息到达交换机但是没有到达对列表
template:
mandatory: true #消息路由失败时的策略, true: 调用ReturnCallback, false丢弃消息
listener:
simple:
acknowledge-mode: auto #出现异常时返回nack消息回滚到mq没有异常返回ack
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态false有状态。如果业务中包含事务这里改为false
~~~
### 3.4、使用
~~~java
//注入Service
@Resource
private MQService mqService;
....................
//发送消息
this.mqService.sendMsg(exchange, null, msg);
~~~
> **消息的消费依然是SpringBoot的写法。例如**
>
> ~~~java
> @Component
> public class OrderMQListener {
>
> @RabbitListener(bindings = @QueueBinding(
> value = @Queue(name = "${rabbitmq.order.queue}"),
> exchange = @Exchange(name = "${rabbitmq.order.exchange}", type = ExchangeTypes.TOPIC),
> key = "#"
> ))
> public void listenOrderMsg(String msg) {
>
> //此处编写业务逻辑
>
> }
> }
> ~~~

45
sl-express-mq/pom.xml Normal file
View File

@@ -0,0 +1,45 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.sl-express</groupId>
<artifactId>sl-express-parent</artifactId>
<version>1.4</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>1.1-SNAPSHOT</version>
<groupId>com.sl-express.mq</groupId>
<artifactId>sl-express-mq</artifactId>
<description>mq微服务</description>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<sl-express-common.version>1.2-SNAPSHOT</sl-express-common.version>
</properties>
<dependencies>
<dependency>
<groupId>com.sl-express.common</groupId>
<artifactId>sl-express-common</artifactId>
<version>${sl-express-common.version}</version>
</dependency>
<!--AMQP依赖包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.0</version>
</dependency>
<!--Spring重试模块-->
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,45 @@
package com.sl.mq.config;
import com.sl.transport.common.constant.Constants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ErrorMessageConfig {
@Value("${spring.application.name}") //获取微服务的名称
private String appName;
@Bean
public TopicExchange errorMessageExchange() {
//定义错误消息的交换机类型为topic
return new TopicExchange(Constants.MQ.Exchanges.ERROR, true, false);
}
@Bean
public Queue errorQueue() {
//【前缀+微服务】名作为错误消息存放的队列名称,并且开启了持久化
return new Queue(Constants.MQ.Queues.ERROR_PREFIX + appName, true);
}
@Bean
public Binding errorBinding(Queue errorQueue, TopicExchange errorMessageExchange) {
//完成绑定关系
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(appName);
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
//设置全部重试失败后进行重新发送消息指定了交换机以及路由key
//需要注意的是路由key是应用名称与上述的绑定关系中的路由key一致
return new RepublishMessageRecoverer(rabbitTemplate, Constants.MQ.Exchanges.ERROR, appName);
}
}

View File

@@ -0,0 +1,38 @@
package com.sl.mq.config;
import cn.hutool.core.util.StrUtil;
import com.sl.transport.common.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class MessageConfig implements ApplicationContextAware {
/**
* 发送者回执 没有路由到队列的情况
*
* @param applicationContext 应用上下文
* @throws BeansException 异常
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 获取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 设置ReturnCallback
rabbitTemplate.setReturnsCallback(message -> {
if (StrUtil.contains(message.getExchange(), Constants.MQ.DELAYED_KEYWORD)) {
//延迟消息没有发到队列是正常情况,无需记录日志
return;
}
// 投递失败,记录日志
log.error("消息没有投递到队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}",
message.getReplyCode(), message.getReplyText(), message.getExchange(), message.getRoutingKey(), message.getMessage());
});
}
}

View File

@@ -0,0 +1,29 @@
package com.sl.mq.entity;
import com.baomidou.mybatisplus.annotation.TableName;
import com.sl.transport.common.entity.BaseEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* 失败消息记录表
*
* @author zzj
* @version 1.0
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@TableName("sl_fail_msg")
public class FailMsgEntity extends BaseEntity {
private String msgId; //消息id
private String exchange; //交换机
private String routingKey; //路由key
private String msg; //消息内容
private String reason; //失败原因
}

View File

@@ -0,0 +1,21 @@
package com.sl.mq.exception;
import com.sl.mq.entity.FailMsgEntity;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author zzj
* @version 1.0
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class MsgException extends RuntimeException {
private FailMsgEntity failMsgEntity;
}

View File

@@ -0,0 +1,54 @@
package com.sl.mq.job;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.sl.mq.entity.FailMsgEntity;
import com.sl.mq.service.FailMsgService;
import com.sl.mq.service.MQService;
import com.xxl.job.core.handler.annotation.XxlJob;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
/**
* 失败消息的处理任务
*
* @author zzj
* @version 1.0
*/
@Slf4j
@Component
@ConditionalOnBean({MQService.class, FailMsgService.class})
public class FailMsgJob {
@Resource
private FailMsgService failMsgService;
@Resource
private MQService mqService;
@XxlJob("failMsgJob")
public void execute() {
//查询失败的数据每次最多处理100条错误消息
LambdaQueryWrapper<FailMsgEntity> queryWrapper = new LambdaQueryWrapper<FailMsgEntity>()
.orderByAsc(FailMsgEntity::getCreated)
.last("limit 100");
List<FailMsgEntity> failMsgEntityList = this.failMsgService.list(queryWrapper);
if (CollUtil.isEmpty(failMsgEntityList)) {
return;
}
for (FailMsgEntity failMsgEntity : failMsgEntityList) {
try {
//发送消息
this.mqService.sendMsg(failMsgEntity.getExchange(), failMsgEntity.getRoutingKey(), failMsgEntity.getMsg());
//删除数据
this.failMsgService.removeById(failMsgEntity.getId());
} catch (Exception e) {
log.error("处理错误消息失败, failMsgEntity = {}", failMsgEntity);
}
}
}
}

View File

@@ -0,0 +1,15 @@
package com.sl.mq.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.sl.mq.entity.FailMsgEntity;
import org.apache.ibatis.annotations.Mapper;
/**
* 失败消息记录mapper
*
* @author zzj
* @version 1.0
*/
@Mapper
public interface FailMsgMapper extends BaseMapper<FailMsgEntity> {
}

View File

@@ -0,0 +1,13 @@
package com.sl.mq.service;
import com.baomidou.mybatisplus.extension.service.IService;
import com.sl.mq.entity.FailMsgEntity;
/**
* 失败消息处理服务
*
* @author zzj
* @version 1.0
*/
public interface FailMsgService extends IService<FailMsgEntity> {
}

View File

@@ -0,0 +1,31 @@
package com.sl.mq.service;
/**
* 消息处理类
*
* @author zzj
* @version 1.0
*/
public interface MQService {
/**
* 发送实时消息
*
* @param exchange 交换机
* @param routingKey 路由key
* @param msg 消息对象会将对象序列化成json字符串发出
* @return 是否成功
*/
Boolean sendMsg(String exchange, String routingKey, Object msg);
/**
* 发送延迟消息
*
* @param exchange 交换机
* @param routingKey 路由key
* @param msg 消息对象会将对象序列化成json字符串发出
* @param delay 延时时间,单位:毫秒
* @return 是否成功
*/
Boolean sendMsg(String exchange, String routingKey, Object msg, int delay);
}

View File

@@ -0,0 +1,21 @@
package com.sl.mq.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.sl.mq.entity.FailMsgEntity;
import com.sl.mq.mapper.FailMsgMapper;
import com.sl.mq.service.FailMsgService;
import com.sl.mq.service.MQService;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Service;
/**
* 失败消息处理服务
*
* @author zzj
* @version 1.0
*/
@Service
@ConditionalOnBean(MQService.class)
public class FailMsgServiceImpl extends ServiceImpl<FailMsgMapper, FailMsgEntity>
implements FailMsgService {
}

View File

@@ -0,0 +1,135 @@
package com.sl.mq.service.impl;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.util.CharsetUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.sl.mq.entity.FailMsgEntity;
import com.sl.mq.exception.MsgException;
import com.sl.mq.service.FailMsgService;
import com.sl.mq.service.MQService;
import com.sl.transport.common.constant.Constants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Recover;
import org.springframework.retry.annotation.Retryable;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
/**
* 消息处理类
*
* @author zzj
* @version 1.0
*/
@Slf4j
@Service
//设置初始化条件只有在配置文件中设置sl.mq.enable=true才能触发
@ConditionalOnProperty(prefix = "sl.mq", value = "enable")
public class MQServiceImpl implements MQService {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private FailMsgService failMsgService;
/**
* 发送消息 重试3次
*
* @param exchange 交换机
* @param routingKey 路由key
* @param msg 消息对象会将对象序列化成json字符串发出
* @return 是否发送成功
*/
@Retryable(value = MsgException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5))
@Override
public Boolean sendMsg(String exchange, String routingKey, Object msg) {
// 实时发送
return this.sendMsg(exchange, routingKey, msg, Constants.MQ.DEFAULT_DELAY);
}
@Retryable(value = MsgException.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5))
@Override
public Boolean sendMsg(String exchange, String routingKey, Object msg, int delay) {
// 1.获取消息内容,如果非字符串将其序列化
String jsonMsg = (msg instanceof String) ? (String) msg : JSONUtil.toJsonStr(msg);
String msgId = IdUtil.simpleUUID();
log.info("消息发送exchange = {}, routingKey = {}, msg = {}, msgId = {}", exchange, routingKey, jsonMsg, msgId);
// 2.全局唯一的消息ID需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(msgId);
// 3.添加callback
correlationData.getFuture().addCallback(
result -> {
if (result.isAck()) {
// 3.1.ack消息成功
log.debug("消息发送成功, ID:{}", correlationData.getId());
} else {
// 3.2.nack消息失败
log.error("消息发送失败, ID:{}, 原因:{}", correlationData.getId(), result.getReason());
FailMsgEntity failMsgEntity = FailMsgEntity.builder()
.exchange(exchange)
.msgId(correlationData.getId())
.msg(jsonMsg)
.reason(result.getReason()).build();
this.failMsgService.save(failMsgEntity);
}
},
ex -> {
//出现此异常情况,可能是发送消息后无法连接到发送者或其他一些未知的异常情况
String exceptionInfo = ExceptionUtil.getMessage(ex);
log.error("消息发送异常, ID:{}, 原因:{}", correlationData.getId(), exceptionInfo);
FailMsgEntity failMsgEntity = FailMsgEntity.builder()
.exchange(exchange)
.msgId(correlationData.getId())
.msg(jsonMsg)
.reason(exceptionInfo).build();
this.failMsgService.save(failMsgEntity);
}
);
// 4.构造消息对象
Message message = MessageBuilder.withBody(StrUtil.bytes(jsonMsg, CharsetUtil.CHARSET_UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
.build();
try {
// 5.发送消息
this.rabbitTemplate.convertAndSend(exchange, routingKey, message, messagePostProcessor -> {
//设置延时发送时间delay小于0实时发送
messagePostProcessor.getMessageProperties().setDelay(delay);
return messagePostProcessor;
}, correlationData);
} catch (Exception e) {
//出现异常这里将异常转化为自定义异常MsgException主要是为了触发重试机制
throw MsgException.builder()
.failMsgEntity(FailMsgEntity.builder()
.exchange(exchange)
.msgId(correlationData.getId())
.msg(jsonMsg)
.reason(ExceptionUtil.getMessage(e)).build())
.build();
}
return true;
}
/**
* 3次失败后 存入数据库
*
* @param msgException 失败消息
*/
@Recover
public Boolean saveFailMag(MsgException msgException) {
//发送消息失败,需要将消息持久化到数据库,通过任务调度的方式处理失败的消息
this.failMsgService.save(msgException.getFailMsgEntity());
return true;
}
}