Browse Source

feat:集成消息队列

master
lingjingong 3 years ago
parent
commit
345616565a
  1. 14
      jt808-server/pom.xml
  2. 69
      jt808-server/src/main/java/com/haidapu/jtserver/constant/RabbitConstants.java
  3. 27
      jt808-server/src/main/java/com/haidapu/jtserver/dao/BrokerMessageLogDao.java
  4. 35
      jt808-server/src/main/java/com/haidapu/jtserver/dto/CarLocationDto.java
  5. 27
      jt808-server/src/main/java/com/haidapu/jtserver/dto/CarMessageDto.java
  6. 52
      jt808-server/src/main/java/com/haidapu/jtserver/entity/BrokerMessageLogEntity.java
  7. 37
      jt808-server/src/main/java/com/haidapu/jtserver/enums/MessageStatusEnum.java
  8. 24
      jt808-server/src/main/java/com/haidapu/jtserver/handler/LocationPackHandler.java
  9. 35
      jt808-server/src/main/java/com/haidapu/jtserver/producer/sender/RabbitMqSender.java
  10. 52
      jt808-server/src/main/java/com/haidapu/jtserver/producer/service/DbMessageLogService.java
  11. 27
      jt808-server/src/main/java/com/haidapu/jtserver/producer/service/MessageLogService.java
  12. 16
      jt808-server/src/main/java/com/haidapu/jtserver/service/CarLocationService.java
  13. 40
      jt808-server/src/main/java/com/haidapu/jtserver/service/impl/CarLocationServiceImpl.java
  14. 7
      jt808-server/src/main/resources/application.yml
  15. 2
      jtserver-core/pom.xml
  16. 6
      jtserver-core/src/main/java/com/haidapu/jtserver/entity/LocationInfo.java
  17. 14
      pom.xml

14
jt808-server/pom.xml

@ -10,7 +10,7 @@
<groupId>com.haidapu.jtserver</groupId>
<artifactId>jt808-server</artifactId>
<version>1.3.2</version>
<version>1.3.2.RELEASE</version>
<modelVersion>4.0.0</modelVersion>
<name>jt808-server</name>
@ -50,9 +50,13 @@
</dependency>
<dependency>
<groupId>com.haidapu.core</groupId>
<artifactId>haidapu-core</artifactId>
</dependency>
<dependency>
<groupId>com.haidapu.jtserver</groupId>
<artifactId>jtserver-core</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
@ -84,12 +88,6 @@
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>com.haidapu.jtserver</groupId>
<artifactId>jtserver-core</artifactId>
<version>1.3.2</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>

69
jt808-server/src/main/java/com/haidapu/jtserver/constant/RabbitConstants.java

@ -0,0 +1,69 @@
package com.haidapu.jtserver.constant;
/**
* RabbitMQ常量池
*
* @author SpiderMan
* @version 1.0.0: com.haidapu.garbage.common.constant.RabbitConstants,v 0.1 2021/7/30 09:52 Exp $$
*/
public interface RabbitConstants {
/**
* 分钟超时单位min
*/
int ORDER_TIMEOUT = 1;
/**
* 直接模式1,routing_key 完全匹配交换机与MQ绑定
*/
String DIRECT_MODE_QUEUE_ONE = "queue.direct";
/**
* 直接模式投放记录消息队列
*/
String DIRECT_MODE_QUEUE_LAUNCH = "queue.device.launch";
/**
* 直接模式车辆gps消息队列
*/
String DIRECT_MODE_QUEUE_CAR= "queue.device.car";
/**
* 直接模式车辆位置消息队列
*/
String DIRECT_MODE_QUEUE_CAR_LOCATION= "queue.car.location";
/**
* 队列2
*/
String QUEUE_TWO = "queue.two";
/**
* 队列3
*/
String QUEUE_THREE = "queue.three";
/**
* 主题模式,routing_key 模糊匹配*#交换机与MQ绑定
*/
String TOPIC_MODE_QUEUE = "topic.mode";
/**
* 路由1
*/
String TOPIC_ROUTING_KEY_ONE = "test.#";
/**
* 路由2
*/
String TOPIC_ROUTING_KEY_TWO = "*.test";
/**
* 延迟队列交换器
*/
String DELAY_MODE_QUEUE = "delay.mode";
/**
* 延迟队列
*/
String DELAY_QUEUE = "queue.delay";
}

27
jt808-server/src/main/java/com/haidapu/jtserver/dao/BrokerMessageLogDao.java

@ -0,0 +1,27 @@
package com.haidapu.jtserver.dao;
import com.baomidou.mybatisplus.mapper.BaseMapper;
import com.haidapu.jtserver.entity.BrokerMessageLogEntity;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.time.LocalDateTime;
/**
* rabbitmq 消息日志表Dao
*
* @author SpiderMan
* @email spiderMan@mail.com
* @date 2021-08-02 22:17:05
*/
@Mapper
public interface BrokerMessageLogDao extends BaseMapper<BrokerMessageLogEntity> {
/**
* 更新最终消息发送结果 成功 or 失败
* @param id 消息唯一ID
* @param status 消息状态
* @param updateTime 更新时间
*/
void changeBrokerMessageLogStatus(@Param("id") String id, @Param("status") int status, @Param("updateTime") LocalDateTime updateTime);
}

35
jt808-server/src/main/java/com/haidapu/jtserver/dto/CarLocationDto.java

@ -0,0 +1,35 @@
package com.haidapu.jtserver.dto;
import lombok.Data;
import java.io.Serializable;
/**
* 车辆位置消息体
* @author ljg
* @date Created in 2021/9/3 10:25
*/
@Data
public class CarLocationDto implements Serializable {
/**
* 设备终端标识
*/
private String deviceId;
/**
* 经度
*/
private double longitude;
/**
* 纬度
*/
private double latitude;
/**
* 速度
*/
private double speed;
/**
* 时间 YY-MM-DD-hh-mm-ss GMT+8 时间
*/
private String datetime;
}

27
jt808-server/src/main/java/com/haidapu/jtserver/dto/CarMessageDto.java

@ -0,0 +1,27 @@
package com.haidapu.jtserver.dto;
import lombok.Data;
import java.io.Serializable;
/**
* 车辆位置上报
* @author ljg
* @date Created in 2021/9/2 17:34
*/
@Data
public class CarMessageDto implements Serializable{
/**
* 消息唯一ID主要用于解决消息幂等
*/
private String messageId;
/**
* 车辆位置信息
*/
private CarLocationDto carLocationDto;
}

52
jt808-server/src/main/java/com/haidapu/jtserver/entity/BrokerMessageLogEntity.java

@ -0,0 +1,52 @@
package com.haidapu.jtserver.entity;
import com.baomidou.mybatisplus.annotations.TableName;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* rabbitmq 消息日志表
*
* @author SpiderMan
* @email spiderMan@mail.com
* @date 2021-08-02 22:17:05
*/
@TableName("mq_broker_message_log")
@Data
public class BrokerMessageLogEntity implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 消息唯一ID
*/
private String id;
/**
* 消息内容
*/
private String message;
/**
* 重试次数
*/
private Integer tryCount;
/**
* 消息投递状态 0投递中1投递成功2投递失败
*/
private Integer status;
/**
* 下一次重试时间
*/
private LocalDateTime nextRetry;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}

37
jt808-server/src/main/java/com/haidapu/jtserver/enums/MessageStatusEnum.java

@ -0,0 +1,37 @@
package com.haidapu.jtserver.enums;
/**
* 消息状态
*
* @author SpiderMan
* @version 1.0.0: com.haidapu.garbage.common.enums.MessageStatusEnum,v 0.1 2021/8/2 22:39 Exp $$
*/
public enum MessageStatusEnum {
/**
* 发送中
*/
MSG_SENDING(0),
/**
* 成功
*/
MSG_SEND_SUCCESS(1),
/**
* 失败
*/
MSG_SEND_FAILURE(2);
private int value;
/**
* 联运垃圾投递任务执行时间间隔毫秒key
*/
public static String LIANYUN_LAUNCH_TIMER = "LIANYUN_LAUNCH_TIMER";
MessageStatusEnum(int value) {
this.value = value;
}
public int getValue() {
return value;
}
}

24
jt808-server/src/main/java/com/haidapu/jtserver/handler/LocationPackHandler.java

@ -2,16 +2,21 @@ package com.haidapu.jtserver.handler;
import com.haidapu.jtserver.core.Jt808Pack;
import com.haidapu.jtserver.core.PackHandler;
import com.haidapu.jtserver.dto.CarLocationDto;
import com.haidapu.jtserver.dto.CarMessageDto;
import com.haidapu.jtserver.entity.LocationAttachInfo;
import com.haidapu.jtserver.entity.LocationInfo;
import com.haidapu.jtserver.helper.Analyzer;
import com.haidapu.jtserver.helper.ByteArrHelper;
import com.haidapu.jtserver.helper.ResHelper;
import com.haidapu.jtserver.service.CarLocationService;
import com.haidapu.jtserver.service.DataService;
import com.haidapu.jtserver.util.DataHelper;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
@ -28,6 +33,9 @@ public class LocationPackHandler implements PackHandler {
private Analyzer analyzer;
private ResHelper resHelper;
@Autowired
private CarLocationService carLocationService;
@Override
public byte[] handle( byte[] deviceId, byte[] streamNum, byte[] msgId, byte[] msgBody) {
tpe.execute(() -> {
@ -47,12 +55,26 @@ public class LocationPackHandler implements PackHandler {
// 方向
log.info("0200 终端位置信息汇报 方向:"+locationInfo.getDirection());
// 高度
log.info("0200 终端位置信息汇报 高度:"+locationInfo.getHeight());
// log.info("0200 终端位置信息汇报 高度:"+locationInfo.getHeight());
List<LocationAttachInfo> attachInfo = locationInfo.getAttachInfo();
String locationAttachInfo = DataHelper.toHexString(attachInfo.get(attachInfo.size()-2).getData());
// 载重
log.info("0200 终端位置信息汇报 载重:"+ DataHelper.covert(locationAttachInfo.substring(30,38)));
log.info("============ 0200 终端位置信息汇报 结束 =============");
// 构造车辆位置消息
CarMessageDto carMessage = new CarMessageDto();
CarLocationDto carLocation = new CarLocationDto();
carLocation.setDeviceId(strDeviceId);
carLocation.setLongitude(locationInfo.getLongitude());
carLocation.setLatitude(locationInfo.getLatitude());
carLocation.setSpeed(locationInfo.getSpeed());
carLocation.setDatetime(locationInfo.getDatetime());
carMessage.setCarLocationDto(carLocation);
// 发送车辆位置消息到RabbitMQ
carLocationService.sendCarLocationMsg(carMessage);
log.info("============ 0200 终端位置信息汇报 发送消息到RabbitMQ 结束 =============");
dataService.terminalLocation(strDeviceId, locationInfo, null);
});

35
jt808-server/src/main/java/com/haidapu/jtserver/producer/sender/RabbitMqSender.java

@ -0,0 +1,35 @@
package com.haidapu.jtserver.producer.sender;
import com.haidapu.jtserver.constant.RabbitConstants;
import com.haidapu.jtserver.dto.CarMessageDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* rabbit生产者测试
*
* @author SpiderMan
* @version 1.0.0: com.haidapu.garbage.producer.sender.RabbitTestSender,v 0.1 2021/8/1 11:20 Exp $$
*/
@Component
@Slf4j
public class RabbitMqSender {
private final RabbitTemplate rabbitTemplate;
public RabbitMqSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* 车辆位置信息
* @param carMessageDto 消息内容
*/
public void sendCarLocationMsg(CarMessageDto carMessageDto) {
CorrelationData correlationData = new CorrelationData(carMessageDto.getMessageId());
rabbitTemplate.convertAndSend(RabbitConstants.DIRECT_MODE_QUEUE_CAR_LOCATION, carMessageDto, correlationData);
}
}

52
jt808-server/src/main/java/com/haidapu/jtserver/producer/service/DbMessageLogService.java

@ -0,0 +1,52 @@
package com.haidapu.jtserver.producer.service;
import com.haidapu.jtserver.dao.BrokerMessageLogDao;
import com.haidapu.jtserver.entity.BrokerMessageLogEntity;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
/**
* Mq消息统一处理
* <p>生产端可靠性消息投递方案消息落库对消息状态打标</p>
*
* @author SpiderMan
* @version 1.0.0: com.haidapu.garbage.producer.message.manager.BrokerMessageLogManager,v 0.1 2021/8/2 22:31 Exp $$
*/
@Service
public class DbMessageLogService {
@Autowired
private BrokerMessageLogDao brokerMessageLogDao;
/**
* 消息落库对消息状态打标
* @param messageId 消息唯一ID
* @param message 保存消息整体 转为JSON 格式存储入库
* @param time 超时时间
* @param status 消息投递状态 0投递中1投递成功2投递失败
* @param nextRetryTime 下一次重试时间
*/
public void addBrokerMessageLog(String messageId, String message, LocalDateTime time,int status,LocalDateTime nextRetryTime){
BrokerMessageLogEntity brokerMessageLog = new BrokerMessageLogEntity();
brokerMessageLog.setId(messageId);
// 保存消息整体 转为JSON 格式存储入库
brokerMessageLog.setMessage(message);
brokerMessageLog.setStatus(status);
// 设置消息未确认超时时间窗口为 一分钟
brokerMessageLog.setNextRetry(nextRetryTime);
brokerMessageLog.setCreateTime(LocalDateTime.now());
brokerMessageLog.setUpdateTime(LocalDateTime.now());
brokerMessageLogDao.insert(brokerMessageLog);
}
/**
* 更新最终消息发送结果 成功 or 失败
* @param messageId 消息唯一ID
* @param status 消息状态
* @param updateTime 更新时间
*/
public void changeBrokerMessageLogStatus(String messageId, int status, LocalDateTime updateTime) {
brokerMessageLogDao.changeBrokerMessageLogStatus(messageId,status,updateTime);
}
}

27
jt808-server/src/main/java/com/haidapu/jtserver/producer/service/MessageLogService.java

@ -0,0 +1,27 @@
package com.haidapu.jtserver.producer.service;
import cn.hutool.core.date.LocalDateTimeUtil;
import com.haidapu.jtserver.constant.RabbitConstants;
import com.haidapu.jtserver.enums.MessageStatusEnum;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
/**
* mq消息日志 service 实现
*
* @author SpiderMan
* @version : com.haidapu.garbage.modules.base.service.MessageLogServiceImpl,v 0.1 2021/8/8 10:18 Exp $$
*/
@Service
public class MessageLogService {
@Autowired
private DbMessageLogService dbMessageLogService;
public void addBrokerMessageLog(String messageId, String jsonStr, LocalDateTime time) {
LocalDateTime nextRetryTime = LocalDateTimeUtil.offset(time, RabbitConstants.ORDER_TIMEOUT, ChronoUnit.MINUTES);
dbMessageLogService.addBrokerMessageLog(messageId,jsonStr,time, MessageStatusEnum.MSG_SENDING.getValue(),nextRetryTime);
}
}

16
jt808-server/src/main/java/com/haidapu/jtserver/service/CarLocationService.java

@ -0,0 +1,16 @@
package com.haidapu.jtserver.service;
import com.haidapu.jtserver.dto.CarMessageDto;
/**
* 车辆位置 service
*/
public interface CarLocationService {
/**
* 发送车辆位置消息
* @param
*/
void sendCarLocationMsg(CarMessageDto carMessageDto);
}

40
jt808-server/src/main/java/com/haidapu/jtserver/service/impl/CarLocationServiceImpl.java

@ -0,0 +1,40 @@
package com.haidapu.jtserver.service.impl;
import com.haidapu.core.common.utils.FastJsonUtil;
import com.haidapu.jtserver.dto.CarMessageDto;
import com.haidapu.jtserver.producer.sender.RabbitMqSender;
import com.haidapu.jtserver.service.CarLocationService;
import com.haidapu.jtserver.producer.service.MessageLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.UUID;
/**
* 车辆位置对接实现
* @author ljg
* @date Created in 2021/9/2 18:26
*/
@Service
public class CarLocationServiceImpl implements CarLocationService {
@Autowired
private MessageLogService messageLogService;
@Autowired
private RabbitMqSender rabbitMqSender;
@Override
public void sendCarLocationMsg(CarMessageDto carMessageDto) {
String messageId = UUID.randomUUID().toString().replaceAll("-","");
carMessageDto.setMessageId(messageId);
String jsonStr = FastJsonUtil.toJSONString(carMessageDto);
messageLogService.addBrokerMessageLog(messageId, jsonStr, LocalDateTime.now());
//发送消息到RabbitMQ
rabbitMqSender.sendCarLocationMsg(carMessageDto);
}
}

7
jt808-server/src/main/resources/application.yml

@ -6,13 +6,18 @@ server:
port: 10001
jt808:
enabled: true
# mina | netty 两个框架选其一
use: mina
# 使用协议
protocol: tcp
# 服务端口
port: 10001
processCount: 2
# 核心线程池的线程数量
corePoolSize: 1
# 最大的线程池线程数量
maximumPoolSize: 10
# 单位毫秒
# 线程活动保持时间,线程池的工作线程空闲后,保持存活的时间 单位毫秒
keepAliveTime: 1000
# 单位秒
idleTime: 10

2
jtserver-core/pom.xml

@ -11,7 +11,7 @@
<groupId>com.haidapu.jtserver</groupId>
<artifactId>jtserver-core</artifactId>
<version>1.3.2</version>
<version>1.3.2.RELEASE</version>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>

6
jtserver-core/src/main/java/com/haidapu/jtserver/entity/LocationInfo.java

@ -26,17 +26,17 @@ public class LocationInfo {
private StatusInfo statusInfo;
/**
*
*
*/
private double longitude;
/**
*
*
*/
private double latitude;
/**
*
*
*/
private int height;

14
pom.xml

@ -10,7 +10,7 @@
<packaging>pom</packaging>
<modules>
<module>jt808-core</module>
<module>jtserver-core</module>
<module>jt808-server</module>
</modules>
@ -80,6 +80,7 @@
<artifactId>kaptcha</artifactId>
<version>${kaptcha.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
@ -99,7 +100,16 @@
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<dependency>
<groupId>com.haidapu.jtserver</groupId>
<artifactId>jtserver-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.haidapu.core</groupId>
<artifactId>haidapu-core</artifactId>
<version>1.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>

Loading…
Cancel
Save