生产端可靠性消息投递方案,消息落库,对消息状态打标
+ * + * @author SpiderMan + * @version 1.0.0: com.haidapu.garbage.producer.message.manager.BrokerMessageLogManager,v 0.1 2021/8/2 22:31 Exp $$ + */ +@Service +public class DbMessageLogService { + @Autowired + private BrokerMessageLogDao brokerMessageLogDao; + + /** + * 消息落库,对消息状态打标 + * @param messageId 消息唯一ID + * @param message 保存消息整体 转为JSON 格式存储入库 + * @param time 超时时间 + * @param status 消息投递状态 0投递中,1投递成功,2投递失败 + * @param nextRetryTime 下一次重试时间 + */ + public void addBrokerMessageLog(String messageId, String message, LocalDateTime time,int status,LocalDateTime nextRetryTime){ + BrokerMessageLogEntity brokerMessageLog = new BrokerMessageLogEntity(); + brokerMessageLog.setId(messageId); + // 保存消息整体 转为JSON 格式存储入库 + brokerMessageLog.setMessage(message); + brokerMessageLog.setStatus(status); + // 设置消息未确认超时时间窗口为 一分钟 + brokerMessageLog.setNextRetry(nextRetryTime); + brokerMessageLog.setCreateTime(LocalDateTime.now()); + brokerMessageLog.setUpdateTime(LocalDateTime.now()); + brokerMessageLogDao.insert(brokerMessageLog); + } + + /** + * 更新最终消息发送结果 成功 or 失败 + * @param messageId 消息唯一ID + * @param status 消息状态 + * @param updateTime 更新时间 + */ + public void changeBrokerMessageLogStatus(String messageId, int status, LocalDateTime updateTime) { + brokerMessageLogDao.changeBrokerMessageLogStatus(messageId,status,updateTime); + } +} diff --git a/jt808-server/src/main/java/com/haidapu/jtserver/producer/service/MessageLogService.java b/jt808-server/src/main/java/com/haidapu/jtserver/producer/service/MessageLogService.java new file mode 100644 index 0000000..97baad0 --- /dev/null +++ b/jt808-server/src/main/java/com/haidapu/jtserver/producer/service/MessageLogService.java @@ -0,0 +1,27 @@ +package com.haidapu.jtserver.producer.service; + +import cn.hutool.core.date.LocalDateTimeUtil; +import com.haidapu.jtserver.constant.RabbitConstants; +import com.haidapu.jtserver.enums.MessageStatusEnum; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.time.temporal.ChronoUnit; + +/** + * mq消息日志 service 实现 + * + * @author SpiderMan + * @version : com.haidapu.garbage.modules.base.service.MessageLogServiceImpl,v 0.1 2021/8/8 10:18 Exp $$ + */ +@Service +public class MessageLogService { + @Autowired + private DbMessageLogService dbMessageLogService; + + public void addBrokerMessageLog(String messageId, String jsonStr, LocalDateTime time) { + LocalDateTime nextRetryTime = LocalDateTimeUtil.offset(time, RabbitConstants.ORDER_TIMEOUT, ChronoUnit.MINUTES); + dbMessageLogService.addBrokerMessageLog(messageId,jsonStr,time, MessageStatusEnum.MSG_SENDING.getValue(),nextRetryTime); + } +} diff --git a/jt808-server/src/main/java/com/haidapu/jtserver/service/CarLocationService.java b/jt808-server/src/main/java/com/haidapu/jtserver/service/CarLocationService.java new file mode 100644 index 0000000..09030e6 --- /dev/null +++ b/jt808-server/src/main/java/com/haidapu/jtserver/service/CarLocationService.java @@ -0,0 +1,16 @@ +package com.haidapu.jtserver.service; + +import com.haidapu.jtserver.dto.CarMessageDto; + +/** + * 车辆位置 service + */ +public interface CarLocationService { + + /** + * 发送车辆位置消息 + * @param + */ + void sendCarLocationMsg(CarMessageDto carMessageDto); + +} diff --git a/jt808-server/src/main/java/com/haidapu/jtserver/service/impl/CarLocationServiceImpl.java b/jt808-server/src/main/java/com/haidapu/jtserver/service/impl/CarLocationServiceImpl.java new file mode 100644 index 0000000..b646c57 --- /dev/null +++ b/jt808-server/src/main/java/com/haidapu/jtserver/service/impl/CarLocationServiceImpl.java @@ -0,0 +1,40 @@ +package com.haidapu.jtserver.service.impl; + + +import com.haidapu.core.common.utils.FastJsonUtil; +import com.haidapu.jtserver.dto.CarMessageDto; +import com.haidapu.jtserver.producer.sender.RabbitMqSender; +import com.haidapu.jtserver.service.CarLocationService; +import com.haidapu.jtserver.producer.service.MessageLogService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.time.LocalDateTime; +import java.util.UUID; + +/** + * 车辆位置对接实现 + * @author :ljg + * @date :Created in 2021/9/2 18:26 + */ +@Service +public class CarLocationServiceImpl implements CarLocationService { + + @Autowired + private MessageLogService messageLogService; + @Autowired + private RabbitMqSender rabbitMqSender; + + + @Override + public void sendCarLocationMsg(CarMessageDto carMessageDto) { + String messageId = UUID.randomUUID().toString().replaceAll("-",""); + carMessageDto.setMessageId(messageId); + String jsonStr = FastJsonUtil.toJSONString(carMessageDto); + messageLogService.addBrokerMessageLog(messageId, jsonStr, LocalDateTime.now()); + //发送消息到RabbitMQ + rabbitMqSender.sendCarLocationMsg(carMessageDto); + } + + +} diff --git a/jt808-server/src/main/resources/application.yml b/jt808-server/src/main/resources/application.yml index 2c77fb1..fee480a 100644 --- a/jt808-server/src/main/resources/application.yml +++ b/jt808-server/src/main/resources/application.yml @@ -6,13 +6,18 @@ server: port: 10001 jt808: enabled: true + # mina | netty 两个框架选其一 use: mina + # 使用协议 protocol: tcp + # 服务端口 port: 10001 processCount: 2 + # 核心线程池的线程数量 corePoolSize: 1 + # 最大的线程池线程数量 maximumPoolSize: 10 - # 单位毫秒 + # 线程活动保持时间,线程池的工作线程空闲后,保持存活的时间 单位毫秒 keepAliveTime: 1000 # 单位秒 idleTime: 10 diff --git a/jtserver-core/pom.xml b/jtserver-core/pom.xml index e710c71..7c1c730 100644 --- a/jtserver-core/pom.xml +++ b/jtserver-core/pom.xml @@ -11,7 +11,7 @@