Browse Source

完善对写指令返回消息的解析,增加手动采集指令接口

chenwen 1 year ago
parent
commit
fa6e62a5c7

+ 8 - 3
src/main/java/com/hb/proj/api/controller/APIController.java

@@ -2,6 +2,7 @@ package com.hb.proj.api.controller;
 
 import java.time.LocalDateTime;
 
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
@@ -10,12 +11,14 @@ import com.hb.proj.gather.model.LiquidParam;
 import com.hb.proj.gather.process.DataTransConfig;
 import com.hb.proj.gather.protocol.ChannelGroupMgr;
 import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+import com.hb.proj.gather.rep.RedisRepComponent;
 import com.hb.proj.gather.scheduler.GatherLiquidTask;
 import com.hb.proj.gather.scheduler.GatherTaskExecutor;
 import com.hb.proj.gather.scheduler.ManualLiquidTask;
 import com.hb.proj.gather.utils.ByteUtils;
 import com.hb.proj.utils.RespVO;
 import com.hb.proj.utils.RespVOBuilder;
+import com.hb.xframework.util.MapUtils;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -27,7 +30,8 @@ import jakarta.validation.constraints.NotBlank;
 @Validated
 public class APIController {
 	
-	
+	@Autowired
+	private RedisRepComponent repRedis;
 
 	@RequestMapping("/sendCommond")
 	public RespVO<Object> sendCommond(@NotBlank(message="指令不能为空") String cmd){
@@ -110,7 +114,7 @@ public class APIController {
 	 * @return
 	 */
 	@RequestMapping("/liquid/set")
-	public RespVO<Object> setLiquidParams(@NotBlank(message="设备编号不能为空") String serial,@Validated  LiquidParam param){
+	public RespVO<Object> setLiquidParams(@NotBlank(message="设备编号不能为空") String serial,@Validated(LiquidParam.LiquidSetting.class)  LiquidParam param){
 		Channel channel=ChannelGroupMgr.get(serial);
 		return liquidCtr(serial,new ManualLiquidTask(channel,ManualLiquidTask.ACTION_SET,param));
 	}
@@ -127,6 +131,7 @@ public class APIController {
 		}
 		Channel channel=ChannelGroupMgr.get(serial);
 		ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(new GatherLiquidTask(channel)));
+		repRedis.put("liquid_"+serial, MapUtils.build("status","complete","action","gather"),true);
 		return RespVOBuilder.ok();
 	}
 	
@@ -137,7 +142,7 @@ public class APIController {
 	 * @return
 	 */
 	@RequestMapping("/liquid/test")
-	public RespVO<Object> testLiquid(@NotBlank(message="设备编号不能为空") String serial,@Validated  LiquidParam param){
+	public RespVO<Object> testLiquid(@NotBlank(message="设备编号不能为空") String serial,@Validated(LiquidParam.LiquidTesting.class)  LiquidParam param){
 		Channel channel=ChannelGroupMgr.get(serial);
 		return liquidCtr(serial,new ManualLiquidTask(channel,ManualLiquidTask.ACTION_BOOT_TEST,param));
 	}

+ 7 - 7
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtCMDEnum.java

@@ -421,7 +421,7 @@ public enum ZLOpdProtCMDEnum {
 			new byte[] {0x01, 0x06, 0x14, 0x12},
 			0x02,
 			"liquid",
-			new String[] {"avg_pipe"}
+			new String[] {"avg_len_pipe"}
 			),
 	LIQUID_SET_SOUND_SPEED(
 			new byte[] {0x01, 0x06, 0x14, 0x13},
@@ -445,37 +445,37 @@ public enum ZLOpdProtCMDEnum {
 			new byte[] {0x01, 0x06, 0x14, 0x25},
 			0x02,
 			"liquid",
-			new String[] {"time_yyyy"}
+			new String[] {"sensor_yyyy"}
 			),
 	LIQUID_SET_TIME_MM(
 			new byte[] {0x01, 0x06, 0x14, 0x26},
 			0x02,
 			"liquid",
-			new String[] {"time_mm"}
+			new String[] {"sensor_mm"}
 			),
 	LIQUID_SET_TIME_DD(
 			new byte[] {0x01, 0x06, 0x14, 0x27},
 			0x02,
 			"liquid",
-			new String[] {"time_dd"}
+			new String[] {"sensor_dd"}
 			),
 	LIQUID_SET_TIME_HR(
 			new byte[] {0x01, 0x06, 0x14, 0x28},
 			0x02,
 			"liquid",
-			new String[] {"time_hr"}
+			new String[] {"sensor_hr"}
 			),
 	LIQUID_SET_TIME_MI(
 			new byte[] {0x01, 0x06, 0x14, 0x29},
 			0x02,
 			"liquid",
-			new String[] {"time_mi"}
+			new String[] {"sensor_mi"}
 			),
 	LIQUID_SET_TIME_SS(
 			new byte[] {0x01, 0x06, 0x14, 0x2A},
 			0x02,
 			"liquid",
-			new String[] {"time_ss"}
+			new String[] {"sensor_ss"}
 			),
 	LIQUID_BOOT_TEST(
 			new byte[] {0x01, 0x06, 0x14, 0x2D},

+ 18 - 3
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -32,7 +32,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 	
 	/**
 	 * 协议标准:0103[数据区字节数 1字节][数据区 若干字节][CRC16校验 2字节]
-	 * 01:dtu上位地址  03:表示读取  06:写入
+	 * 01:dtu上位地址  03:表示读取  06:写入   原样返回(无法进行校验)
 	 * 
 	 * 心跳2字节,间隔约30s
 	 */
@@ -46,7 +46,9 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 		
 		int byteCount=byteBuf.readableBytes();
 		
-		boolean valiHead=hexmsg.startsWith("0103") || hexmsg.startsWith("0106");
+		boolean isWriteBackMsg=hexmsg.startsWith("0106"); //写指令返回消息
+		boolean isReadBackMsg=hexmsg.startsWith("0103");  //读指令返回消息
+		boolean valiHead=isReadBackMsg || isWriteBackMsg ;
 		
 		//开头两字节且不以0103开头,就认为是心跳数据-作为设备号,该方法并不可靠,有可能把采集的残包数据当作心跳
 		if(byteCount==2 && !valiHead) { 
@@ -56,7 +58,20 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 			}
 			
 		}
-		else if(byteCount>2 && valiHead){ 
+		else if(byteCount>2 && isWriteBackMsg){ //(动液面设备返回消息不规范,不做crc校验)
+			
+			synchronized(ctx.channel()) {  //成功收到消息才通知可发指令,未成功收到消息则等超时
+				
+				String cmd=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_CMD).get();
+				logger.info("解析指令{}返回消息",cmd);
+				
+				GatherRespParserFacade.parse(byteBuf,byteCount,cmd,ctx.channel());
+				
+				ctx.channel().notifyAll(); //已经收到回复消息,通知指令发送进程可以继续,同步块或者同步方法执行完后才释放锁
+				
+			}
+		}
+		else if(byteCount>2 && isReadBackMsg){ 
 			
 			int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数
 			

+ 9 - 3
src/main/java/com/hb/proj/gather/protocol/ZlA11MsgDecoder.java

@@ -28,7 +28,9 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 		int byteCount=byteBuf.readableBytes();
 		ByteBuf outByteBuf=null;
 		
-		boolean valiHead=hexmsg.startsWith("0103") || hexmsg.startsWith("0106");
+		boolean isWriteBackMsg=hexmsg.startsWith("0106"); //写指令返回消息(动液面设备返回消息不规范,不做crc校验)
+		boolean isReadBackMsg=hexmsg.startsWith("0103");  //读指令返回消息
+		boolean valiHead=isReadBackMsg || isWriteBackMsg ;
 		
 		logger.debug("ZlA11MsgDecoder 解码前数据:{}", hexmsg);
 		
@@ -44,7 +46,8 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 			byteBuf.readerIndex(beginIndex+byteCount);
 			outByteBuf=byteBuf.slice(beginIndex, byteCount);
 		}
-		else if(byteCount>2 && valiHead) {  //数据消息
+		else if(byteCount>2 && isReadBackMsg) {  //数据消息
+			
 			int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数
 			int datalen=byteBuf.getByte(2)&0xff; //数据区字节数 byteBuf.get方法不改变readIndex,writeIndex,readXX方法会
 			if(byteCount<(datalen+headBtyCount+crc16BtyCount)) {  // 读取的字节数量不够---拆包了
@@ -57,7 +60,10 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 				outByteBuf=byteBuf.slice(beginIndex, datalen+headBtyCount+crc16BtyCount);
 			}
 			
-			
+		}
+		else if(byteCount>2 && isWriteBackMsg) {   //(动液面设备返回消息不规范,不做格式、crc校验)
+			byteBuf.readerIndex(beginIndex+byteCount);
+			outByteBuf=byteBuf.slice(beginIndex, byteCount);
 		}
 		
 		if(outByteBuf!=null) {

+ 1 - 0
src/main/java/com/hb/proj/gather/scheduler/ManualLiquidTask.java

@@ -141,6 +141,7 @@ public class ManualLiquidTask implements Runnable {
 				channel.writeAndFlush(byteBuf);
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 				repRedis.put(redisKey, MapUtils.build("status","running"));
+				logger.info("liquid set param: {}",cmd.name());
 				channel.wait(cmdTimeout); 
 			}