Browse Source

采集程序增加对动液面设备的数据采集、远程控制

chenwen 1 năm trước cách đây
mục cha
commit
086be6ad70

+ 78 - 0
src/main/java/com/hb/proj/api/controller/APIController.java

@@ -6,9 +6,13 @@ import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RestController;
 
+import com.hb.proj.gather.model.LiquidParam;
 import com.hb.proj.gather.process.DataTransConfig;
 import com.hb.proj.gather.protocol.ChannelGroupMgr;
 import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+import com.hb.proj.gather.scheduler.GatherLiquidTask;
+import com.hb.proj.gather.scheduler.GatherTaskExecutor;
+import com.hb.proj.gather.scheduler.ManualLiquidTask;
 import com.hb.proj.gather.utils.ByteUtils;
 import com.hb.proj.utils.RespVO;
 import com.hb.proj.utils.RespVOBuilder;
@@ -22,6 +26,8 @@ import jakarta.validation.constraints.NotBlank;
 @RequestMapping("/api")
 @Validated
 public class APIController {
+	
+	
 
 	@RequestMapping("/sendCommond")
 	public RespVO<Object> sendCommond(@NotBlank(message="指令不能为空") String cmd){
@@ -74,4 +80,76 @@ public class APIController {
 		DataTransConfig.reloadAlarms(wellId);
 		return RespVOBuilder.ok();
 	}
+	
+	
+	/**
+	 * 同步更新设备(新加的设备能及时识别)
+	 * @return
+	 */
+	@RequestMapping("/synDevs")
+	public RespVO<Object> synDevs(){
+		DataTransConfig.reloadDevs();
+		return RespVOBuilder.ok();
+	}
+	
+	/**
+	 * 读取动液面设备参数
+	 * @param serial
+	 * @return
+	 */
+	@RequestMapping("/liquid/read")
+	public RespVO<Object> readLiquidParams(@NotBlank(message="设备编号不能为空") String serial){
+		Channel channel=ChannelGroupMgr.get(serial);
+		return liquidCtr(serial,new ManualLiquidTask(channel,ManualLiquidTask.ACTION_READ));
+	}
+	
+	/**
+	 * 设置动液面设备参数
+	 * @param serial
+	 * @param param
+	 * @return
+	 */
+	@RequestMapping("/liquid/set")
+	public RespVO<Object> setLiquidParams(@NotBlank(message="设备编号不能为空") String serial,@Validated  LiquidParam param){
+		Channel channel=ChannelGroupMgr.get(serial);
+		return liquidCtr(serial,new ManualLiquidTask(channel,ManualLiquidTask.ACTION_SET,param));
+	}
+	
+	/**
+	 * 采集动液面数据
+	 * @param serial
+	 * @return
+	 */
+	@RequestMapping("/liquid/gather")
+	public RespVO<Object> gatherLiquidData(@NotBlank(message="设备编号不能为空") String serial){
+		if(!ChannelGroupMgr.isDone(serial)) {
+			return RespVOBuilder.error("该设备正在执行其它操作,请稍后再试");
+		}
+		Channel channel=ChannelGroupMgr.get(serial);
+		ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(new GatherLiquidTask(channel)));
+		return RespVOBuilder.ok();
+	}
+	
+	/**
+	 * 启动动液面测试
+	 * @param serial
+	 * @param param
+	 * @return
+	 */
+	@RequestMapping("/liquid/test")
+	public RespVO<Object> testLiquid(@NotBlank(message="设备编号不能为空") String serial,@Validated  LiquidParam param){
+		Channel channel=ChannelGroupMgr.get(serial);
+		return liquidCtr(serial,new ManualLiquidTask(channel,ManualLiquidTask.ACTION_BOOT_TEST,param));
+	}
+	
+	
+	
+	
+	private RespVO<Object> liquidCtr(String serial,ManualLiquidTask task) {
+		if(!ChannelGroupMgr.isDone(serial)) {
+			return RespVOBuilder.error("该设备正在执行其它操作,请稍后再试");
+		}
+		ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(task));
+		return RespVOBuilder.ok();
+	}
 }

+ 129 - 0
src/main/java/com/hb/proj/gather/model/LiquidParam.java

@@ -0,0 +1,129 @@
+package com.hb.proj.gather.model;
+
+import java.util.Calendar;
+import java.util.Date;
+
+import org.hibernate.validator.constraints.Range;
+
+import jakarta.validation.constraints.Min;
+import jakarta.validation.constraints.NotNull;
+
+public class LiquidParam {
+	
+	public static interface  LiquidSetting {};
+	
+	public static interface  LiquidTesting {};
+
+	private Integer avgLenPipe; //厘米
+	
+	@NotNull(message="音速不能为空",groups=LiquidSetting.class)
+	@Min(value=0,message="音速应大于0")
+	private Integer soundSpeed;   //厘米/秒
+	
+	@NotNull(message="音标深度不能为空",groups=LiquidSetting.class)
+	@Range(min=2000,max=100000,message="音标深度应在【2000-100000】厘米之间")
+	private Integer soundMarkDepth; //单位厘米,20-1000m之间
+	
+	@NotNull(message="测试间隔不能为空",groups=LiquidSetting.class)
+	@Min(value=600,message="测试间隔应大于600秒")
+	private Integer testInterval;  //自动测试间隔,单位秒,最小10分钟
+	
+	@NotNull(message="传感器时间不能为空",groups=LiquidSetting.class)
+	private Date sensorTime; //传感器时间,年份采用yyyy
+	
+	private Integer  casingPress;  //套压,只读
+	
+	private Integer referLiquidDepth1;  //参考液面深度1
+	
+	@NotNull(message="接箍灵敏度不能为空",groups=LiquidTesting.class)
+	@Range(min=0,max=7,message="灵敏度有效值0-7")
+	private Integer hoopSensitivity=0;  //接箍灵敏度
+	
+	@NotNull(message="液面灵敏度不能为空",groups=LiquidTesting.class)
+	@Range(min=0,max=7,message="灵敏度有效值0-7")
+	private Integer liquidSensitivity=0;  //液面灵敏度
+	
+	public int getBootCMDVal() { //获得启动测试指令值
+		return 0x8000 | ((hoopSensitivity & 0x07) << 3) | (liquidSensitivity & 0x07) ;
+	}
+	
+	public int[]  getTimeParts() {
+		Calendar cal=Calendar.getInstance();
+		if(sensorTime!=null) {
+			cal.setTime(sensorTime);
+		}
+		return new int[]{cal.get(Calendar.YEAR),cal.get(Calendar.MONTH)+1,cal.get(Calendar.DAY_OF_MONTH),cal.get(Calendar.HOUR_OF_DAY),cal.get(Calendar.MINUTE),cal.get(Calendar.SECOND)};
+	}
+
+	public Integer getAvgLenPipe() {
+		return avgLenPipe;
+	}
+
+	public void setAvgLenPipe(Integer avgLenPipe) {
+		this.avgLenPipe = avgLenPipe;
+	}
+
+	public Integer getSoundSpeed() {
+		return soundSpeed;
+	}
+
+	public void setSoundSpeed(Integer soundSpeed) {
+		this.soundSpeed = soundSpeed;
+	}
+
+	public Integer getSoundMarkDepth() {
+		return soundMarkDepth;
+	}
+
+	public void setSoundMarkDepth(Integer soundMarkDepth) {
+		this.soundMarkDepth = soundMarkDepth;
+	}
+
+	public Integer getTestInterval() {
+		return testInterval;
+	}
+
+	public void setTestInterval(Integer testInterval) {
+		this.testInterval = testInterval;
+	}
+
+	public Date getSensorTime() {
+		return sensorTime;
+	}
+
+	public void setSensorTime(Date sensorTime) {
+		this.sensorTime = sensorTime;
+	}
+
+	public Integer getCasingPress() {
+		return casingPress;
+	}
+
+	public void setCasingPress(Integer casingPress) {
+		this.casingPress = casingPress;
+	}
+
+	public Integer getReferLiquidDepth1() {
+		return referLiquidDepth1;
+	}
+
+	public void setReferLiquidDepth1(Integer referLiquidDepth1) {
+		this.referLiquidDepth1 = referLiquidDepth1;
+	}
+
+	public Integer getHoopSensitivity() {
+		return hoopSensitivity;
+	}
+
+	public void setHoopSensitivity(Integer hoopSensitivity) {
+		this.hoopSensitivity = hoopSensitivity;
+	}
+
+	public Integer getLiquidSensitivity() {
+		return liquidSensitivity;
+	}
+
+	public void setLiquidSensitivity(Integer liquidSensitivity) {
+		this.liquidSensitivity = liquidSensitivity;
+	}
+}

+ 39 - 2
src/main/java/com/hb/proj/gather/process/DataTransConfig.java

@@ -25,19 +25,23 @@ public class DataTransConfig {
 	
 	private static Map<String,AlarmDefineVO> configerAlarm=null;
 	
+	private static Map<String,Object> configerDevType=null;
+	
 	private static WellConfigService configService=null;
 	
 	public static void init() {
-		logger.info("开始加载参数配置、报警设置...");
+		logger.info("开始加载参数配置、报警设置、设备类型配置...");
 		configService=ApplicationContextUtils.getBean("wellConfigService", WellConfigService.class);
 		configerParam=configService.loadWellParams();
 		configerAlarm=configService.loadAlarmDefines();
-		logger.info("完成加载参数配置、报警设置");
+		configerDevType=configService.loadDevTypeMapping();
+		logger.info("完成加载参数配置、报警设置、设备类型配置");
 	}
 	
 	public static void reloadAll() {
 		configerParam=configService.loadWellParams();
 		configerAlarm=configService.loadAlarmDefines();
+		configerDevType=configService.loadDevTypeMapping();
 	}
 	
 	
@@ -55,7 +59,16 @@ public class DataTransConfig {
 		}
 	}
 	
+	public static void reloadDevs() {
+		configerDevType=configService.loadDevTypeMapping();
+	}
+	
 	
+	/**
+	 * 根据设备编号+参数编码 获取参数配置
+	 * @param serialParamCode
+	 * @return
+	 */
 	public static WellParamVO get(String serialParamCode) {
 		if(configerParam==null) {
 			configerParam=configService.loadWellParams();
@@ -63,6 +76,12 @@ public class DataTransConfig {
 		return configerParam.get(serialParamCode);
 	}
 	
+	
+	/**
+	 * 根据井号获取报警定义
+	 * @param wellId
+	 * @return
+	 */
 	public static List<AlarmDefineVO> getAlarmDefines(String wellId) {
 		if(configerAlarm==null) {
 			configerAlarm=configService.loadAlarmDefines();
@@ -75,4 +94,22 @@ public class DataTransConfig {
 		}
 		return rtnAlarms;
 	}
+	
+	/**
+	 * 判断是否为采集设备
+	 * @param devSerial
+	 * @return
+	 */
+	public static boolean isGatherDev(String devSerial) {
+		return configerDevType.containsKey(devSerial)  &&  configerDevType.get(devSerial).equals("gather") ;
+	}
+	
+	/**
+	 * 判断是否为动液面设备
+	 * @param devSerial
+	 * @return
+	 */
+	public static boolean isLiquidDev(String devSerial) {
+		return configerDevType.containsKey(devSerial)  &&  configerDevType.get(devSerial).equals("liquid") ;
+	}
 }

+ 18 - 0
src/main/java/com/hb/proj/gather/protocol/ChannelGroupMgr.java

@@ -1,6 +1,9 @@
 package com.hb.proj.gather.protocol;
 
 import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -25,6 +28,8 @@ public class ChannelGroupMgr {
 
 	private static final ChannelGroup CHANNEL_GROUP = new DefaultChannelGroup("ChannelGroups", GlobalEventExecutor.INSTANCE);
 	
+	private static final Map<String,Future<Boolean>> CHANNEL_FUTURES = new ConcurrentHashMap<>();
+	
 	public  static final AttributeKey<String> ATTR_KEY_SERIAL=AttributeKey.valueOf("serial");  //通道自定义属性key,用于保存属性数据,便于后面通过该属性查找对应channel
 	
 	public  static final AttributeKey<String> ATTR_KEY_CMD=AttributeKey.valueOf("cmd");  //当前正执行的指令 指令枚举对象名称
@@ -92,4 +97,17 @@ public class ChannelGroupMgr {
 		});
 	}
 	
+	
+	public static void addFuture(String serial,Future<Boolean> future) {
+		CHANNEL_FUTURES.put(serial, future);
+	}
+	
+	
+	public static boolean isDone(String serial) {
+		if(!CHANNEL_FUTURES.containsKey(serial)) {
+			return true;
+		}
+		return CHANNEL_FUTURES.get(serial).isDone();
+	}
+	
 }

+ 0 - 109
src/main/java/com/hb/proj/gather/protocol/GatherRespParser.java

@@ -1,109 +0,0 @@
-package com.hb.proj.gather.protocol;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.hb.proj.gather.model.DiagramPO;
-import com.hb.proj.gather.model.SingleCombPO;
-import com.hb.proj.gather.process.DataTransRepSingleTask;
-import com.hb.proj.gather.process.DataTransRepTask;
-import com.hb.proj.gather.scheduler.DataAssembler;
-import com.hb.proj.gather.scheduler.GatherTaskExecutor;
-
-import io.netty.buffer.ByteBuf;
-
-@Deprecated
-public class GatherRespParser {
-
-	private final static  Logger logger = LoggerFactory.getLogger(GatherRespParser.class);
-	
-	/**
-	 * 数据解析入口
-	 * @param byteBuf  接收的消息
-	 * @param startIndex 解析开始索引
-	 * @param dataLen 数据长度字节数
-	 * @param cmd
-	 * @param serial  设备编号
-	 */
-	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,String cmd,String serial) {
-		ZLOpdProtCMDEnum  cmdEum=ZLOpdProtCMDEnum.valueOf(cmd);
-		if("single".equals(cmdEum.getType())) {
-			 Map<String,Float> dataMap=parseFloat(byteBuf,startIndex,dataLen,cmdEum.getParamCodes());
-			 SingleCombPO po=DataAssembler.putPieceData(serial, dataMap);
-			 if(po!=null) {
-				 GatherTaskExecutor.execute(new DataTransRepSingleTask(po)); 
-			 }
-		}
-		else if("diagram".equals(cmdEum.getType())) { //默认为功图数据解析
-			 List<Float> datas=parseShort2Float(byteBuf,startIndex,dataLen);
-			 DiagramPO po=DataAssembler.putPieceData(serial,(cmdEum.getParamCodes())[0], datas);
-			 if(po!=null) {
-				 GatherTaskExecutor.execute(new DataTransRepTask(po)); 
-			 }
-			 logger.info("解析完数据{}",cmd);
-		}
-	}
-			
-	/**
-	 * 解析消息中的数据部分 每个数据项 4字节
-	 * @param byteBuf
-	 * @param  startIndex 数据区开始索引
-	 * @param  dataLen  数据区长度
-	 * @param  paramCodes  数据项编码
-	 */
-	public static Map<String,Float> parseFloat(ByteBuf byteBuf ,int startIndex,int dataLen,String[] paramCodes) {
-		byteBuf.readerIndex(startIndex);
-		Map<String,Float> rtnData=new HashMap<String,Float>(paramCodes.length);
-		int i=0;
-		while(true) {
-			rtnData.put(paramCodes[i++],byteBuf.readFloat() );   //顺序读取,readIndex 自动后移
-			if(i>=paramCodes.length || byteBuf.readerIndex()>=(startIndex+dataLen)) {
-				break;
-			}
-		}
-		logger.info("数据解析完:{}",rtnData);
-		return rtnData;
-		
-		
-	}
-	
-	
-	/**
-	 * 解析消息中的数据部分 每个数据项 2字节
-	 * @param byteBuf
-	 * @param startIndex
-	 * @param dataLen
-	 * @return
-	 */
-	public static List<Float> parseShort2Float(ByteBuf byteBuf ,int startIndex,int dataLen) {
-		byteBuf.readerIndex(startIndex);
-		List<Float> rtns=new ArrayList<Float>();
-		while(true) {
-			rtns.add(byteBuf.readShort()+0.0f); //顺序读取,readIndex 自动后移
-			if(byteBuf.readerIndex()>=(startIndex+dataLen)) {
-				break;
-			}
-		}
-		logger.info("数据解析完:{}",rtns);
-		return rtns;
-		
-		
-	}
-	
-	/**
-	 * 专为功图点数检测解析
-	 * @param byteBuf
-	 * @param startIndex
-	 * @param dataLen
-	 * @return
-	 */
-	public static short parseDiagramPoint(ByteBuf byteBuf ,int startIndex) {
-		byteBuf.readerIndex(startIndex);
-		return byteBuf.readShort();
-	}
-}

+ 99 - 1
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtCMDEnum.java

@@ -385,7 +385,105 @@ public enum ZLOpdProtCMDEnum {
 			 0xfa,
 			 "liquid",
 			 new String[] {"serial_40"}
-			);
+			),
+	
+	LIQUID_READ_PIPE_SOUND(
+			new byte[] {0x01, 0x03, 0x14, 0x12, 0x00, 0x03,(byte)0xa0,0x3e},
+			0x06,
+			"liquid",
+			new String[] {"avg_len_pipe","sound_speed","sound_mark_depth"}
+			),
+	LIQUID_READ_SENSORTIME(
+			new byte[] {0x01, 0x03, 0x14, 0x25, 0x00, 0x06,(byte)0xd1,(byte)0xf3},
+			0x0c,
+			"liquid",
+			new String[]{"sensor_yyyy","sensor_mm","sensor_dd","sensor_hr","sensor_mi","sensor_ss"}
+			),
+	LIQUID_READ_TEST_INTERVAL(
+			new byte[] {0x01, 0x03, 0x14, 0x20, 0x00, 0x01,(byte)0x80,0x30},
+			0x02,
+			"liquid",
+			new String[] {"test_interval"}
+			),
+	LIQUID_READ_SENSITI(
+			new byte[] {0x01, 0x03, 0x14, 0x2d, 0x00, 0x01,0x11,(byte)0xf3},
+			0x02,
+			"liquid",
+			new String[] {"sensiti_hoop","sensiti_liquid"}
+			),
+	LIQUID_READ_CASING_PRE(
+			new byte[] {0x01, 0x03, 0x14, 0x2c, 0x00, 0x01,0x40,0x33},
+			0x02,
+			"liquid",
+			new String[] {"casing_pre"}
+			),
+	LIQUID_SET_AVG_PIPE(
+			new byte[] {0x01, 0x06, 0x14, 0x12},
+			0x02,
+			"liquid",
+			new String[] {"avg_pipe"}
+			),
+	LIQUID_SET_SOUND_SPEED(
+			new byte[] {0x01, 0x06, 0x14, 0x13},
+			0x02,
+			"liquid",
+			new String[] {"sound_speed"}
+			),
+	LIQUID_SET_SOUND_MARK(
+			new byte[] {0x01, 0x06, 0x14, 0x14},
+			0x02,
+			"liquid",
+			new String[] {"sound_mark_depth"}
+			),
+	LIQUID_SET_TEST_INTERVAL(
+			new byte[] {0x01, 0x06, 0x14, 0x20},
+			0x02,
+			"liquid",
+			new String[] {"test_interval"}
+			),
+	LIQUID_SET_TIME_YYYY(
+			new byte[] {0x01, 0x06, 0x14, 0x25},
+			0x02,
+			"liquid",
+			new String[] {"time_yyyy"}
+			),
+	LIQUID_SET_TIME_MM(
+			new byte[] {0x01, 0x06, 0x14, 0x26},
+			0x02,
+			"liquid",
+			new String[] {"time_mm"}
+			),
+	LIQUID_SET_TIME_DD(
+			new byte[] {0x01, 0x06, 0x14, 0x27},
+			0x02,
+			"liquid",
+			new String[] {"time_dd"}
+			),
+	LIQUID_SET_TIME_HR(
+			new byte[] {0x01, 0x06, 0x14, 0x28},
+			0x02,
+			"liquid",
+			new String[] {"time_hr"}
+			),
+	LIQUID_SET_TIME_MI(
+			new byte[] {0x01, 0x06, 0x14, 0x29},
+			0x02,
+			"liquid",
+			new String[] {"time_mi"}
+			),
+	LIQUID_SET_TIME_SS(
+			new byte[] {0x01, 0x06, 0x14, 0x2A},
+			0x02,
+			"liquid",
+			new String[] {"time_ss"}
+			),
+	LIQUID_BOOT_TEST(
+			new byte[] {0x01, 0x06, 0x14, 0x2D},
+			0x02,
+			"liquid",
+			new String[] {"boot_test"}
+			)
+	;
 	
 	private byte[]  cmd;  //读取指令 最后字节CRC16
 	

+ 4 - 16
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -32,7 +32,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 	
 	/**
 	 * 协议标准:0103[数据区字节数 1字节][数据区 若干字节][CRC16校验 2字节]
-	 * 01:dtu上位地址  03:表示读取  
+	 * 01:dtu上位地址  03:表示读取  06:写入
 	 * 
 	 * 心跳2字节,间隔约30s
 	 */
@@ -41,34 +41,22 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 		//msg:如果设置了decoder编码器,则msg为编码后的类型,可强制转换
 		ByteBuf byteBuf=(ByteBuf)msg;
 		
-		if(!byteBuf.isReadable()) {
-			logger.info("没有数据可接收");
-			return;
-		}
-		
 		
 		String hexmsg=ByteBufUtil.hexDump(byteBuf);
 		
-		//logger.debug("接收到数据:{}",hexmsg);
-		
-		//byte[] temp=ByteBufUtil.getBytes(byteBuf, 0, 2);
-		
-		//两字节都没有既不是心跳也不是采集数据,忽略
 		int byteCount=byteBuf.readableBytes();
-		if(byteCount<2) {
-			return ;
-		}
 		
+		boolean valiHead=hexmsg.startsWith("0103") || hexmsg.startsWith("0106");
 		
 		//开头两字节且不以0103开头,就认为是心跳数据-作为设备号,该方法并不可靠,有可能把采集的残包数据当作心跳
-		if(byteCount==2&&(!hexmsg.startsWith("0103"))) { 
+		if(byteCount==2 && !valiHead) { 
 			if(!ChannelGroupMgr.contains(ctx.channel())) {
 				ChannelGroupMgr.add(ctx.channel(),ByteUtils.toIntStr(ByteBufUtil.getBytes(byteBuf,0,byteCount)));
 				return;
 			}
 			
 		}
-		else if(byteCount>2&&hexmsg.startsWith("0103")){ 
+		else if(byteCount>2 && valiHead){ 
 			
 			int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数
 			

+ 5 - 3
src/main/java/com/hb/proj/gather/protocol/ZlA11MsgDecoder.java

@@ -28,21 +28,23 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 		int byteCount=byteBuf.readableBytes();
 		ByteBuf outByteBuf=null;
 		
+		boolean valiHead=hexmsg.startsWith("0103") || hexmsg.startsWith("0106");
+		
 		logger.debug("ZlA11MsgDecoder 解码前数据:{}", hexmsg);
 		
 		if(byteCount<2) {
 			return;
 		}
-		if(byteCount>2 && !hexmsg.startsWith("0103")) { //定义为乱流,舍弃
+		if(byteCount>2 && !valiHead) { //定义为乱流,舍弃
 			byteBuf.readerIndex(beginIndex+byteCount);
 			return;
 		}
 		
-		if(byteCount==2&&(!hexmsg.startsWith("0103"))) { //心跳
+		if(byteCount==2 && !valiHead) { //心跳
 			byteBuf.readerIndex(beginIndex+byteCount);
 			outByteBuf=byteBuf.slice(beginIndex, byteCount);
 		}
-		else if(byteCount>2&&hexmsg.startsWith("0103")) {  //数据消息
+		else if(byteCount>2 && valiHead) {  //数据消息
 			int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数
 			int datalen=byteBuf.getByte(2)&0xff; //数据区字节数 byteBuf.get方法不改变readIndex,writeIndex,readXX方法会
 			if(byteCount<(datalen+headBtyCount+crc16BtyCount)) {  // 读取的字节数量不够---拆包了

+ 41 - 0
src/main/java/com/hb/proj/gather/protocol/parser/LiquidParser.java

@@ -12,6 +12,7 @@ import com.hb.proj.gather.model.LiquidPO;
 import com.hb.proj.gather.process.DataTransRepLiquidTask;
 import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
 import com.hb.proj.gather.rep.GatherDataRepService;
+import com.hb.proj.gather.rep.RedisRepComponent;
 import com.hb.proj.gather.scheduler.GatherTaskExecutor;
 import com.hb.xframework.util.ApplicationContextUtils;
 
@@ -55,9 +56,49 @@ public class LiquidParser {
 			}
 		}
 		
+		else {
+			RedisRepComponent repRedis=ApplicationContextUtils.getBean("redisRepComponent", RedisRepComponent.class);
+			String redisKey="liquid_"+serial;
+			if(cmdEnum.name().startsWith("LIQUID_READ")) {
+				//设备参数读取不用入库,不用等待组装,直接进入redis
+				if(cmdEnum.name().startsWith("LIQUID_READ_SENSITI")) {
+					repRedis.put(redisKey, parseSensiti(byteBuf,dataLen,cmdEnum.getParamCodes()));
+				}
+				else {
+					repRedis.put(redisKey, parseOther(byteBuf,dataLen,cmdEnum.getParamCodes()));
+				}
+				
+			}
+			else if(cmdEnum.name().startsWith("LIQUID_SET") || cmdEnum.name().startsWith("LIQUID_BOOT_TEST")) {
+				//设备参数设置结果不用入库,不用等待组装,直接进入redis
+				repRedis.put("liquid_"+serial, cmdEnum.getParamCodes()[0],"1"); //0:超时或失败;1:成果,指令发送时重置为0
+			}
+			
+		}
+		
 		return true;
 	}
 	
+	/**
+	 * 灵敏度解析
+	 * @param byteBuf
+	 * @param dataLen
+	 * @param paramCodes
+	 * @return
+	 */
+	private static Map<String,Integer> parseSensiti(ByteBuf byteBuf,int dataLen,String[] paramCodes){
+		Map<String,Integer> rtnData=new HashMap<>(2);
+		int val=byteBuf.readUnsignedShort();
+		
+		int s1=val & 0x07,s2=(val & 0x38) >> 3;
+		
+		rtnData.put(paramCodes[0], s2); //接箍灵敏度
+		rtnData.put(paramCodes[1], s1); //液面灵敏度
+		
+		logger.info("灵敏度解析完:{}",rtnData);
+		return rtnData;
+	}
+	
 	
 	/**
 	 * 解析动液面单值数据(测量时间、音速、套压)

+ 4 - 0
src/main/java/com/hb/proj/gather/rep/RedisRepComponent.java

@@ -18,6 +18,10 @@ public class RedisRepComponent {
 	private RedisTemplate<String,Object> redisTemplate;
 	
 	
+	public void put(String key,String hashKey,Object val) {
+		redisTemplate.opsForHash().put(key, hashKey, val);
+	}
+	
 	/**
 	 * 实时数据入redis
 	 * @param wellId

+ 13 - 0
src/main/java/com/hb/proj/gather/rep/WellConfigService.java

@@ -22,6 +22,19 @@ public class WellConfigService {
 	@Autowired
 	private SpringJdbcDAO  dao;
 	
+	/**
+	 * 加载所有设备的类型配置
+	 * @return
+	 */
+	public Map<String,Object> loadDevTypeMapping(){
+		String sql="""
+				select device_code,device_type
+				from tzl_gather_device dev
+				where dev.del_if=false
+				""";
+		return dao.queryForMapping(sql, "device_code", "device_type");
+	}
+	
 	/**
 	 * 加载所有井的参数配置
 	 * @return

+ 6 - 3
src/main/java/com/hb/proj/gather/scheduler/GatherLiquidTask.java

@@ -21,7 +21,7 @@ import io.netty.channel.Channel;
  */
 public class GatherLiquidTask implements Runnable {
 	
-	private final static  Logger logger = LoggerFactory.getLogger(GatherLiquidTask.class);
+	private static final Logger logger = LoggerFactory.getLogger(GatherLiquidTask.class);
 
 	private Channel  channel;
 	
@@ -38,7 +38,10 @@ public class GatherLiquidTask implements Runnable {
 	 */
 	@Override
 	public void run() {
-		logger.info("动液面采集开始...");
+		
+		String serialNum=channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
+				
+		logger.info("动液面采集开始{}...",serialNum);
 		
 		List<ZLOpdProtCMDEnum> cmds=new ArrayList<>(41);
 		cmds.add(ZLOpdProtCMDEnum.LIQUID_OTHER);
@@ -68,7 +71,7 @@ public class GatherLiquidTask implements Runnable {
 					
 					channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 					channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
-					logger.info("发送完指令:{}",cmd.name());
+					logger.info("【{}】发送完指令:{}",serialNum,cmd.name());
 					
 					channel.wait(cmdTimeout);  //等待接收返回数据后继续,最多等待cmdTimeout,此处释放锁,回复还未收到就被多值任务获得锁并发指令,会导致两个指令间隔很短
 				

+ 1 - 1
src/main/java/com/hb/proj/gather/scheduler/GatherMultiTask.java

@@ -17,7 +17,7 @@ import io.netty.channel.Channel;
  * @author cwen
  *
  */
-
+@Deprecated
 public class GatherMultiTask implements Runnable {
 
 	private final static  Logger logger = LoggerFactory.getLogger(GatherMultiTask.class);

+ 14 - 16
src/main/java/com/hb/proj/gather/scheduler/GatherScheduler.java

@@ -8,6 +8,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
+import com.hb.proj.gather.process.DataTransConfig;
 import com.hb.proj.gather.protocol.ChannelGroupMgr;
 
 import io.netty.channel.Channel;
@@ -38,43 +39,40 @@ public class GatherScheduler {
 	 * 单值1分钟执行一次采集,首次等待1分钟,等待服务启动,设备连接上
 	 */
 	//@Scheduled(fixedRate=60*1000,initialDelay= 60000)  //每分钟执行一次
-	public void startSingleGather() {
+	public void startGather() {
 		logger.info("定时采集启动...");
 		scheNum+=1;
 		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
 		Channel channel=null;
 		while(iterator.hasNext()) {
 			channel=iterator.next();
-			GatherTaskExecutor.execute(new GatherTask(channel));  //复合任务包含单值、多值
+			if(DataTransConfig.isGatherDev(channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get())) {
+				GatherTaskExecutor.execute(new GatherTask(channel));  //复合任务包含单值、多值
+			}
+			
 		}
 		
 		scheNum=scheNum%5==0?0:scheNum;
 	}
 	
 	
+	
 	/**
-	 * 多值3分钟执行一次采集,首次延时30s
+	 * 动液面定时采集
 	 */
-	//@Scheduled(fixedRate = 300 * 1000,initialDelay= 30000)  
-	public void startMultiGather() {
-		logger.info("多值定时采集启动...");
-		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
-		Channel channel=null;
-		while(iterator.hasNext()) {
-			channel=iterator.next();
-			GatherTaskExecutor.execute(new GatherMultiTask(channel));
-		}
-	}
-	
-	
 	@Scheduled(fixedRate = 60 * 60 * 1000,initialDelay= 60000)  
 	public void startLiquidGather() {
 			logger.info("动液面定时采集启动...");
 			Iterator<Channel> iterator=ChannelGroupMgr.iterator();
 			Channel channel=null;
+			String serial=null;
 			while(iterator.hasNext()) {
 				channel=iterator.next();
-				GatherTaskExecutor.execute(new GatherLiquidTask(channel));
+				serial=channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
+				if(DataTransConfig.isLiquidDev(serial) && ChannelGroupMgr.isDone(serial)) {
+					ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(new GatherLiquidTask(channel)));
+				}
+				
 			}
 	}
 	

+ 1 - 0
src/main/java/com/hb/proj/gather/scheduler/GatherSingleTask.java

@@ -15,6 +15,7 @@ import io.netty.channel.Channel;
  * @author cwen
  *
  */
+@Deprecated
 public class GatherSingleTask implements Runnable {
 	
 	private final static  Logger logger = LoggerFactory.getLogger(GatherSingleTask.class);

+ 6 - 0
src/main/java/com/hb/proj/gather/scheduler/GatherTaskExecutor.java

@@ -1,6 +1,7 @@
 package com.hb.proj.gather.scheduler;
 
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -24,6 +25,11 @@ public class GatherTaskExecutor {
 		executor.submit(task);  
 	}
 	
+	//可获取返回结果的任务提交
+	public static Future<Boolean>  submit(Runnable task){
+		return executor.submit(task, Boolean.TRUE);
+	}
+	
 	public static void shutdown() {
 		logger.info("关闭采集执行器");
 		executor.shutdownNow();

+ 246 - 0
src/main/java/com/hb/proj/gather/scheduler/ManualLiquidTask.java

@@ -0,0 +1,246 @@
+package com.hb.proj.gather.scheduler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.model.LiquidParam;
+import com.hb.proj.gather.protocol.ChannelGroupMgr;
+import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+import com.hb.proj.gather.rep.RedisRepComponent;
+import com.hb.proj.gather.utils.Crc16Utils;
+import com.hb.xframework.util.ApplicationContextUtils;
+import com.hb.xframework.util.MapUtils;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+
+/**
+ * 手动控制动液面 任务(参数读取、设置;启动测试、读取数据)
+ * @author cwen
+ *
+ */
+public class ManualLiquidTask implements Runnable {
+
+	private static final Logger logger = LoggerFactory.getLogger(ManualLiquidTask.class);
+	
+	public static final int ACTION_READ=0;
+	
+	public static final int ACTION_SET=1;
+	
+	public static final int ACTION_BOOT_TEST=2;
+	
+	private Channel  channel;
+	
+	private int action=ACTION_READ;
+	
+	private LiquidParam  liquidParam;
+	
+	private RedisRepComponent repRedis;
+	
+	private String redisKey;
+	
+	private long cmdTimeout=25*1000; 
+	
+	public ManualLiquidTask(Channel  channel,int action) {
+		this.channel=channel;
+		this.action=action;
+		this.repRedis=ApplicationContextUtils.getBean("redisRepComponent", RedisRepComponent.class);
+		this.redisKey="liquid_"+channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
+	}
+	
+	public ManualLiquidTask(Channel  channel,int action,LiquidParam liquidParam) {
+		this(channel,action);
+		this.liquidParam=liquidParam;
+	}
+			
+	@Override
+	public void run() {
+		ByteBufAllocator alloc=channel.alloc(); 
+		synchronized(channel) {
+			logger.info("手动操作动液面设备...");
+			if(this.action==ACTION_READ) {
+				readParams(alloc);
+			}
+			else if(this.action==ACTION_SET) {
+				setParams(alloc);
+			}
+			else if(this.action==ACTION_BOOT_TEST) {
+				bootTest(alloc);
+			}
+		}
+
+	}
+	
+	
+	/**
+	 * 读取参数
+	 * @param alloc
+	 */
+	private void readParams(ByteBufAllocator alloc) {
+		try {
+			ZLOpdProtCMDEnum[]  cmds= {ZLOpdProtCMDEnum.LIQUID_READ_PIPE_SOUND,
+					ZLOpdProtCMDEnum.LIQUID_READ_SENSORTIME,
+					ZLOpdProtCMDEnum.LIQUID_READ_TEST_INTERVAL,
+					ZLOpdProtCMDEnum.LIQUID_READ_SENSITI,
+					ZLOpdProtCMDEnum.LIQUID_READ_CASING_PRE
+			};
+			ByteBuf byteBuf=null;
+			
+			repRedis.put(redisKey, MapUtils.build("status","start","action","read"),true);
+			
+			for(ZLOpdProtCMDEnum cmd : cmds) {
+				
+				byteBuf=alloc.directBuffer();
+				byteBuf.writeBytes(cmd.getCmd());
+				channel.writeAndFlush(byteBuf);
+				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+				repRedis.put(redisKey, MapUtils.build("status","running"));
+				channel.wait(cmdTimeout); 
+			}
+			
+			repRedis.put(redisKey, MapUtils.build("status","complete"));
+		}
+		catch (InterruptedException e) {
+			e.printStackTrace();
+			logger.error("读取动液面设备参数异常:{}",e.getMessage());
+		} 
+	}
+	
+	
+	/**
+	 * 设置参数
+	 * @param alloc
+	 */
+	private void setParams(ByteBufAllocator alloc) {
+		logger.info("动液面远程设置参数...");
+		try {
+			ZLOpdProtCMDEnum[]  cmds= {
+					ZLOpdProtCMDEnum.LIQUID_SET_AVG_PIPE,
+					
+					ZLOpdProtCMDEnum.LIQUID_SET_SOUND_SPEED,
+					ZLOpdProtCMDEnum.LIQUID_SET_SOUND_MARK,
+					ZLOpdProtCMDEnum.LIQUID_SET_TEST_INTERVAL,
+					ZLOpdProtCMDEnum.LIQUID_SET_TIME_YYYY,
+					ZLOpdProtCMDEnum.LIQUID_SET_TIME_MM,
+					ZLOpdProtCMDEnum.LIQUID_SET_TIME_DD,
+					ZLOpdProtCMDEnum.LIQUID_SET_TIME_HR,
+					ZLOpdProtCMDEnum.LIQUID_SET_TIME_MI,
+					ZLOpdProtCMDEnum.LIQUID_SET_TIME_SS
+					
+			};
+			ByteBuf byteBuf=null;
+			
+			repRedis.put(redisKey, MapUtils.build("status","start","action","set"),true);
+			
+			for(ZLOpdProtCMDEnum cmd : cmds) {
+				byteBuf=alloc.directBuffer();
+				byteBuf.writeBytes(bindCMDVal(cmd));
+				channel.writeAndFlush(byteBuf);
+				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+				repRedis.put(redisKey, MapUtils.build("status","running"));
+				channel.wait(cmdTimeout); 
+			}
+			
+			repRedis.put(redisKey, MapUtils.build("status","complete"));
+		}
+		catch (InterruptedException e) {
+			e.printStackTrace();
+			logger.error("读取动液面设备参数异常:{}",e.getMessage());
+		} 
+	}
+	
+	/**
+	 * 启动测试
+	 * @param alloc
+	 */
+	private void bootTest(ByteBufAllocator alloc) {
+		try {
+			ZLOpdProtCMDEnum  cmd=ZLOpdProtCMDEnum.LIQUID_BOOT_TEST;
+			
+			repRedis.put(redisKey, MapUtils.build("status","start","action","boot"),true);
+			
+			ByteBuf byteBuf=alloc.directBuffer();
+			byteBuf.writeBytes(bindCMDVal(cmd));
+			channel.writeAndFlush(byteBuf);
+			channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+			repRedis.put(redisKey, MapUtils.build("status","running"));
+			channel.wait(cmdTimeout);
+			
+			repRedis.put(redisKey, MapUtils.build("status","complete"));
+		}
+		catch (InterruptedException e) {
+			e.printStackTrace();
+			logger.error("启动动液面测试异常:{}",e.getMessage());
+		} 
+	}
+	
+	//绑定指令的参数值,2字节,对应无符号的short
+	private byte[]  bindCMDVal(ZLOpdProtCMDEnum cmd) {
+		try {
+			logger.info("动液面指令构建:{}",cmd.name());
+			ByteBuf byteBuf=Unpooled.buffer(8);
+			byteBuf.writeBytes(cmd.getCmd());
+			int[] times=liquidParam.getTimeParts();
+			switch (cmd.name()) {
+				case "LIQUID_SET_AVG_PIPE":
+					  byteBuf.writeShort(liquidParam.getAvgLenPipe());
+					  break;
+				case "LIQUID_SET_SOUND_SPEED":
+					  byteBuf.writeShort(liquidParam.getSoundSpeed());
+					  break;
+				case "LIQUID_SET_SOUND_MARK":
+					  byteBuf.writeShort(liquidParam.getSoundMarkDepth());
+					  break;
+				case "LIQUID_SET_TEST_INTERVAL":
+					  byteBuf.writeShort(liquidParam.getTestInterval());
+					  break;
+				case "LIQUID_BOOT_TEST":
+					  byteBuf.writeShort(liquidParam.getBootCMDVal());
+					  break;
+				case "LIQUID_SET_TIME_YYYY":
+					  byteBuf.writeShort(times[0]);
+					  break;
+				case "LIQUID_SET_TIME_MM":
+					  byteBuf.writeShort(times[1]);
+					  break;
+				case "LIQUID_SET_TIME_DD":
+					  byteBuf.writeShort(times[2]);
+					  break;
+				case "LIQUID_SET_TIME_HR":
+					  byteBuf.writeShort(times[3]);
+					  break;
+				case "LIQUID_SET_TIME_MI":
+					  byteBuf.writeShort(times[4]);
+					  break;
+				case "LIQUID_SET_TIME_SS":
+					  byteBuf.writeShort(times[5]);
+					  break;
+			}
+			int crc16=Crc16Utils.getCRC(ByteBufUtil.getBytes(byteBuf,0,byteBuf.readableBytes()));
+			byteBuf.writeShort(crc16);
+			logger.info("参数设置指令:{}",ByteBufUtil.hexDump(byteBuf));
+			return byteBuf.array();
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			logger.error(e.getMessage());
+			return null;
+		}
+		
+	
+	}
+	
+	/*
+	public static void main(String[] args) {
+		LiquidParam  param=new LiquidParam();
+		param.setSensorTime(new Date());
+		param.setAvgLenPipe(960);
+		ManualLiquidTask  task=new ManualLiquidTask(null,ManualLiquidTask.ACTION_SET,param);
+		task.bindCMDVal(ZLOpdProtCMDEnum.LIQUID_SET_AVG_PIPE);
+		System.out.println("over");
+	}*/
+
+}

+ 0 - 2
src/main/java/com/hb/proj/gather/test/GatherProtocolHandler.java

@@ -1,13 +1,11 @@
 package com.hb.proj.gather.test;
 
 import java.util.Arrays;
-import java.util.List;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.hb.proj.gather.protocol.ChannelGroupMgr;
-import com.hb.proj.gather.protocol.GatherRespParser;
 import com.hb.proj.gather.utils.ByteUtils;
 import com.hb.proj.gather.utils.Crc16Utils;
 

+ 9 - 1
src/main/java/com/hb/proj/gather/utils/Crc16Utils.java

@@ -67,6 +67,8 @@ public class Crc16Utils {
     public static void main(String[] args) {
        // int[] data = new int[]{0x01, 0x03, 0x03, 0xd7, 0x00, 0x01};
        // System.out.println(Crc16Utils.crc16(data));
+    	
+    	/*
     	String[] addrs= {"1810","188D","190A","1987","1A04","1A81","1AFE","1B7B","1BF8","1C75","1CF2","1D6F","1DEC","1E69","1EE6","1F63","1FE0","205D","20DA","2157","21D4","2251","22CE","234B","23C8","2445","24C2","253F","25BC","2639","26B6","2733","27B0","282D","28AA","2927","29A4","2A21","2A9E","2B1B"};
         int[]  cmds=null;
         int ad1=0,ad2=0;
@@ -99,8 +101,14 @@ public class Crc16Utils {
     		
     		
     		System.out.println(cmdstr);
-    	}
+    	}*/
        
        // System.out.println(Integer.toHexString(Crc16Utils.getCRC(data2)));
+    	
+    	System.out.println(Integer.toHexString(Crc16Utils.getCRC(new byte[] {0x01, 0x03, 0x14, 0x2c, 0x00, 0x01})));
+    	
+    	//int val= 0x8000 | ((3 & 0x07) << 3) | (4 & 0x07) ;
+    	
+    	//System.out.println(val);
     }
 }