瀏覽代碼

增加动液面数据采集,处理、入库

chenwen 1 年之前
父節點
當前提交
0a11e8fdf6
共有 21 個文件被更改,包括 505 次插入268 次删除
  1. 106 0
      src/main/java/com/hb/proj/gather/model/LiquidPO.java
  2. 45 0
      src/main/java/com/hb/proj/gather/process/DataTransRepLiquidTask.java
  3. 2 4
      src/main/java/com/hb/proj/gather/process/DataTransRepTask.java
  4. 1 1
      src/main/java/com/hb/proj/gather/protocol/ChannelGroupMgr.java
  5. 3 2
      src/main/java/com/hb/proj/gather/protocol/GatherRespParser.java
  6. 28 73
      src/main/java/com/hb/proj/gather/protocol/GatherRespParserFacade.java
  7. 67 67
      src/main/java/com/hb/proj/gather/protocol/ZLOpdProtCMDEnum.java
  8. 3 10
      src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java
  9. 56 23
      src/main/java/com/hb/proj/gather/protocol/parser/DataPieceLiquidBox.java
  10. 3 3
      src/main/java/com/hb/proj/gather/protocol/parser/DataPieceSingleBox.java
  11. 18 10
      src/main/java/com/hb/proj/gather/protocol/parser/DiagramParser.java
  12. 42 28
      src/main/java/com/hb/proj/gather/protocol/parser/LiquidParser.java
  13. 5 10
      src/main/java/com/hb/proj/gather/protocol/parser/SingleParser.java
  14. 43 8
      src/main/java/com/hb/proj/gather/rep/GatherDataRepService.java
  15. 32 0
      src/main/java/com/hb/proj/gather/rep/WellConfigService.java
  16. 40 4
      src/main/java/com/hb/proj/gather/scheduler/GatherLiquidTask.java
  17. 1 1
      src/main/java/com/hb/proj/gather/scheduler/GatherScheduler.java
  18. 3 1
      src/main/java/com/hb/proj/gather/scheduler/GatherSingleTask.java
  19. 3 1
      src/main/java/com/hb/proj/gather/scheduler/GatherTask.java
  20. 4 2
      src/main/java/com/hb/proj/gather/server/NettyGatherServer.java
  21. 0 20
      src/main/java/com/hb/proj/gather/utils/ByteUtils.java

+ 106 - 0
src/main/java/com/hb/proj/gather/model/LiquidPO.java

@@ -0,0 +1,106 @@
+package com.hb.proj.gather.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class LiquidPO {
+
+	private String wellId; 
+	
+	private String devSerial;  //设备序号
+	
+	private String testTime;  //测量时间
+	
+	private Double soundSpeedDev;  //音速   cm/s--->m/s
+	
+	private Double liquidDepthDev; // 测量深度  0.1米--->m
+	
+	private Double casingPressDev; // 当前套压  kpa--->MPa
+	
+	private List<Short> liquidDatas; //动液面数据
+	
+	private List<Short> hoopDatas;  //接箍数据
+	
+	public LiquidPO() {
+		
+	}
+	
+	public LiquidPO(String wellId,String devSerial) {
+		this.wellId=wellId;
+		this.devSerial=devSerial;
+	}
+	
+	public void setDatas(List<Short> datas) {
+		if(datas==null||datas.size()==0) {
+			return;
+		}
+		liquidDatas=new ArrayList<>(datas.size()/2);
+		hoopDatas=new ArrayList<>(datas.size()/2);
+		for(int i=0,len=datas.size();i<len;i++) {
+			if(i%2==0) {
+				liquidDatas.add(datas.get(i));
+			}
+			else {
+				hoopDatas.add(datas.get(i));
+			}
+		}
+	}
+
+	public String getDevSerial() {
+		return devSerial;
+	}
+
+	public void setDevSerial(String devSerial) {
+		this.devSerial = devSerial;
+	}
+
+	public String getTestTime() {
+		return testTime;
+	}
+
+	public void setTestTime(String testTime) {
+		this.testTime = testTime;
+	}
+
+	public Double getSoundSpeedDev() {
+		return soundSpeedDev;
+	}
+
+	public void setSoundSpeedDev(Double soundSpeedDev) {
+		this.soundSpeedDev = soundSpeedDev;
+	}
+
+	
+
+	public String getWellId() {
+		return wellId;
+	}
+
+	public void setWellId(String wellId) {
+		this.wellId = wellId;
+	}
+
+	public Double getLiquidDepthDev() {
+		return liquidDepthDev;
+	}
+
+	public void setLiquidDepthDev(Double liquidDepthDev) {
+		this.liquidDepthDev = liquidDepthDev;
+	}
+
+	public Double getCasingPressDev() {
+		return casingPressDev;
+	}
+
+	public void setCasingPressDev(Double casingPressDev) {
+		this.casingPressDev = casingPressDev;
+	}
+
+	public List<Short> getLiquidDatas() {
+		return liquidDatas;
+	}
+
+	public List<Short> getHoopDatas() {
+		return hoopDatas;
+	}
+}

+ 45 - 0
src/main/java/com/hb/proj/gather/process/DataTransRepLiquidTask.java

@@ -0,0 +1,45 @@
+package com.hb.proj.gather.process;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.model.LiquidPO;
+import com.hb.proj.gather.rep.GatherDataRepService;
+import com.hb.proj.gather.rep.WellConfigService;
+import com.hb.xframework.util.ApplicationContextUtils;
+
+public class DataTransRepLiquidTask implements Runnable {
+
+	private final static  Logger logger = LoggerFactory.getLogger(DataTransRepLiquidTask.class);
+	
+	private LiquidPO  po;
+	
+	public DataTransRepLiquidTask(LiquidPO  po) {
+		this.po=po;
+	}
+			
+	@Override
+	public void run() {
+		logger.info("开始动液面数据转换处理{}",po.getDevSerial());
+		try {
+			WellConfigService configService=ApplicationContextUtils.getBean("wellConfigService", WellConfigService.class);
+			String wellId=configService.getWellIdByDev(po.getDevSerial());
+			if(StringUtils.isBlank(wellId)) {
+				logger.warn("动液面设备{}未找到对应井,取消入库",po.getDevSerial());
+				return;
+			}
+			po.setWellId(wellId);
+			
+			GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
+			repService.save(po);
+			logger.info("动液面数据入库完成{}",po.getDevSerial());
+		}
+		catch(Exception e) {
+			e.printStackTrace();
+			logger.error("动液面数据转换、入库任务执行出现异常:{}",e.getMessage());
+		}
+
+	}
+
+}

+ 2 - 4
src/main/java/com/hb/proj/gather/process/DataTransRepTask.java

@@ -35,14 +35,12 @@ public class DataTransRepTask implements Runnable{
 			}
 			
 			diagramPO.setWellParam(paramConfig.getParamId());
-			
 			DataTransUtils.transMulti(diagramPO, paramConfig); //数据转换
 			
-			logger.info("数据转换完:{}",diagramPO.getParamCode());
-			
 			GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
-			
 			repService.save(diagramPO);  //入库
+			
+			logger.info("数据转换入库完成:{}",diagramPO.getParamCode());
 		}
 		catch(Exception e) {
 			e.printStackTrace();

+ 1 - 1
src/main/java/com/hb/proj/gather/protocol/ChannelGroupMgr.java

@@ -33,7 +33,7 @@ public class ChannelGroupMgr {
 	
 	public  static final AttributeKey<Long> ATTR_KEY_PRE_TIME=AttributeKey.valueOf("pre_time");  //上一指令时间,检测是否超时发送指令
 	
-	public  static final AttributeKey<Boolean> ATTR_KEY_RECEIVED=AttributeKey.valueOf("received"); //是否完整收到回复数据
+	public  static final AttributeKey<Boolean> ATTR_KEY_STOP_NEXT=AttributeKey.valueOf("stop_next"); //停止后续采集
 	
 	public static void add(Channel channel,String serial) {
 		logger.info("增加客户端通道:{}",serial);

+ 3 - 2
src/main/java/com/hb/proj/gather/protocol/GatherRespParser.java

@@ -17,6 +17,7 @@ import com.hb.proj.gather.scheduler.GatherTaskExecutor;
 
 import io.netty.buffer.ByteBuf;
 
+@Deprecated
 public class GatherRespParser {
 
 	private final static  Logger logger = LoggerFactory.getLogger(GatherRespParser.class);
@@ -31,14 +32,14 @@ public class GatherRespParser {
 	 */
 	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,String cmd,String serial) {
 		ZLOpdProtCMDEnum  cmdEum=ZLOpdProtCMDEnum.valueOf(cmd);
-		if(cmdEum.getItemBytCount()==4) {
+		if("single".equals(cmdEum.getType())) {
 			 Map<String,Float> dataMap=parseFloat(byteBuf,startIndex,dataLen,cmdEum.getParamCodes());
 			 SingleCombPO po=DataAssembler.putPieceData(serial, dataMap);
 			 if(po!=null) {
 				 GatherTaskExecutor.execute(new DataTransRepSingleTask(po)); 
 			 }
 		}
-		else if(cmdEum.getItemBytCount()==2) { //默认为功图数据解析
+		else if("diagram".equals(cmdEum.getType())) { //默认为功图数据解析
 			 List<Float> datas=parseShort2Float(byteBuf,startIndex,dataLen);
 			 DiagramPO po=DataAssembler.putPieceData(serial,(cmdEum.getParamCodes())[0], datas);
 			 if(po!=null) {

+ 28 - 73
src/main/java/com/hb/proj/gather/protocol/GatherRespParserFacade.java

@@ -1,15 +1,14 @@
 package com.hb.proj.gather.protocol;
 
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.Map;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.hb.proj.gather.protocol.parser.DiagramParser;
 import com.hb.proj.gather.protocol.parser.LiquidParser;
+import com.hb.proj.gather.protocol.parser.SingleParser;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
 
 /**
  * 协议解析统一类,针对不同指令由其它相关类实现解析
@@ -20,91 +19,47 @@ public class GatherRespParserFacade {
 
 	private final static  Logger logger = LoggerFactory.getLogger(GatherRespParserFacade.class);
 	
-	private final static  Map<String,Method> parserClsMapping=new HashMap<>();
 	
 	/**
 	 * 数据解析入口
-	 * @param byteBuf  原始返回数据
-	 * @param startIndex  解析开始索引
+	 * @param byteBuf  接收的二进制数据流  此处的readIndex已经跳过消息头,可直接读取数据区
 	 * @param dataLen 可以通过数据字节数明确解析器(前提:同一通道内各采集数据长度要不一样)
 	 * @param cmdEnum 可以通过指令对象明确解析器(目前采用方式)
 	 * @param serial   设备号
 	 */
-	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,String cmdName,String serial) {
-		logger.info("parse:{}",cmdName);
+	public static void parse(ByteBuf byteBuf,int dataLen,String cmdName,Channel channel) {
 		ZLOpdProtCMDEnum cmdEnum=ZLOpdProtCMDEnum.valueOf(cmdName);
+		String serial=channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
 		
 		if(cmdName.startsWith("LIQUID")) {
-			LiquidParser.parse(byteBuf, startIndex, dataLen, cmdEnum, serial);
+			boolean rst=LiquidParser.parse(byteBuf, dataLen, cmdEnum, serial);
+			if(!rst) { //取消后面的采集,数据已存在
+				channel.attr(ChannelGroupMgr.ATTR_KEY_STOP_NEXT).set(true);
+				return;
+			}
+			
 		}
-		
-		/**
-		Method parseMethd=getParseMethod(cmdName);
-		if(parseMethd==null) {
-			logger.error("无法加载或不存在解析器及解析方法,取消解析");
-			return;
+		else if(cmdName.startsWith("DIAGRAM")) {
+			
+			if(cmdName.equals("DIAGRAM_POINT_COUNT")) { //如果是功图点数检测
+				
+				short count=DiagramParser.parseDiagramPoint(byteBuf);
+				logger.warn("功图点数{}",count);
+				channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(count==250);
+			}
+			else {
+				DiagramParser.parse(byteBuf, dataLen, cmdEnum, serial);
+			}
+			
+		}
+		else if(cmdName.startsWith("SINGLE")) {
+			SingleParser.parse(byteBuf, dataLen, cmdEnum, serial);
 		}
-		
-		try {
-			parseMethd.invoke(null, byteBuf,startIndex,dataLen,cmdEnum,serial);
-		} catch (Exception e) {
-			e.printStackTrace();
-			logger.error("解析器解析数据出错:{}",e.getMessage());
-		} */
 	}
 	
 	
-	/**
-	 * 专为功图点数检测解析
-	 * @param byteBuf
-	 * @param startIndex
-	 * @param dataLen
-	 * @return
-	 */
-	public static short parseDiagramPoint(ByteBuf byteBuf ,int startIndex) {
-		byteBuf.readerIndex(startIndex);
-		return byteBuf.readShort();
-	}
 	
 	
-	/**
-	 * 基于cmdname的格式化加载对应解析器方法(cmdName的前缀+Parser)
-	 * @param cmd
-	 * @return
-	 */
-	private static Method  getParseMethod(String cmd){
-		try {
-			if(!parserClsMapping.containsKey(cmd)) {
-				String clsName=(cmd.split("_"))[0].toLowerCase();
-				clsName=clsName.substring(0,1).toUpperCase()+clsName.substring(1)+"Parser";
-				Class<?> cls=loadParseClass(clsName);
-				if(cls!=null) {
-					parserClsMapping.put(cmd, cls.getDeclaredMethod("parse"));
-				}
-			}
-			
-			return parserClsMapping.get(cmd);
-		}
-		catch(NoSuchMethodException e) {
-			e.printStackTrace();
-			return null;
-		}
-		
-		
-		
-	}
 	
-	/**
-	 * 加载具体解析器class对象
-	 */
-	private static Class<?> loadParseClass(String parserClsName) {
-		try {
-			Class<?> cls=Class.forName("com.hb.proj.gather.protocol.parser."+parserClsName);
-			return cls;
-		} catch (ClassNotFoundException e) {
-			e.printStackTrace();
-			logger.error("未找到协议解析器{}",parserClsName);
-			return null;
-		}
-	}
+	
 }

+ 67 - 67
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtCMDEnum.java

@@ -7,34 +7,34 @@ package com.hb.proj.gather.protocol;
  */
 public enum ZLOpdProtCMDEnum {
 	
-	//20[应返回数据区字节数] 一次读取到油压、套压、回压、井口温度、载荷 01 03 01 2c 00 0a    校验位两字节         应返回字节数     每个数据项字节数  参数编码表
-	PRESS_TEMP_LOAD(
+	//20-10[应返回数据区字节数] 一次读取到油压、套压、回压、井口温度、载荷 01 03 01 2c 00 0a    校验位两字节         应返回字节数     每个数据项字节数  参数编码表
+	SINGLE_PRESS_TEMP_LOAD(
 			new byte[] {0x01, 0x03, 0x01, 0x2c, 0x00, 0x0a,       0x05, (byte)0xf8},   
 			0x14,  
-			4, 
+			"single",
 			new String[]{"oil_press","casing_press","back_press","well_head_temp","load"}),
 	
 	
 	
 	//48 电机电流A、B、C,电压A、B、C,有功功耗,无功功耗,有功功率,无功功率,反向功率,功率因数 01 03 01 5f 00 18
-	CURR_VOL_LOS_PW(
+	SINGLE_CURR_VOL_LOS_PW(
 			new byte[] {0x01, 0x03, 0x01, 0x5f, 0x00, 0x18,    0x74, (byte)0x2e},  
 			0x30,  
-			4,
+			"single",
 			new String[]{"current_a","current_b","current_c","voltage_a","voltage_b","voltage_c","useful_power_loss","unuseful_power_loss","useful_power","unuseful_power","reverse_power","power_factor"}),
 	
 	//8  冲次、冲程 01 03 01 a4 00 04
-	FREQ_STROKE(
+	SINGLE_FREQ_STROKE(
 			new byte[] {0x01, 0x03, 0x01, (byte)0xa4, 0x00, 0x04,        0x04, 0x16},  
 			0x08,  
-			4,
+			"single",
 			new String[]{"freq","stroke"}),
 	
 	//2 功图实际点数 01 03 03 d7 00 01
 	DIAGRAM_POINT_COUNT(
 			new byte[] {0x01, 0x03, 0x03, (byte)0xd7, 0x00, 0x01,        0x34, 0x76},  
 			0x02,  
-			2,
+			"diagram",
 			new String[]{"diagram_point"}),
 	
 	
@@ -44,21 +44,21 @@ public enum ZLOpdProtCMDEnum {
 	DIAGRAM_DISP_1(
 			new byte[] {0x01, 0x03, 0x03, (byte)0xe8, 0x00, 0x62,        (byte)0x44, 0x53},  
 			0xc4,  
-			2,
+			"diagram",
 			new String[]{"disp_1"}),
 	
 	//200-100 功图位移第2部分 01 03 04 4a 00 64
 	DIAGRAM_DISP_2(
 			new byte[] {0x01, 0x03, 0x04, 0x4a, 0x00, 0x64,        0x64, (byte)0xc7},  
 			0xc8,  
-			2,
+			"diagram",
 			new String[]{"disp_2"}),
 	
 	//104-52 功图位移第3部分 01 03 04 ae 00 34
 	DIAGRAM_DISP_3(
 			new byte[] {0x01, 0x03, 0x04, (byte)0xae, 0x00, 0x34,        0x24, (byte)0xcc},  
 			0x68,  
-			2,
+			"diagram",
 			new String[]{"disp_3"}),
 	
 	
@@ -68,21 +68,21 @@ public enum ZLOpdProtCMDEnum {
 	DIAGRAM_LOAD_1(
 			new byte[] {0x01, 0x03, 0x04, (byte)0xe2, 0x00, 0x60,        (byte)0xe4, (byte)0xe4},  
 			0xc0,  
-			2,
+			"diagram",
 			new String[]{"chartload_1"}),
 		
 	//180-90 功图载荷第2部分 01 03 05 42 00 5a
 	DIAGRAM_LOAD_2(
 			new byte[] {0x01, 0x03, 0x05, 0x42, 0x00, 0x5a,       0x65, 0x29},  
 			0xb4,  
-			2,
+			"diagram",
 			new String[]{"chartload_2"}),
 		
 	//128-64 功图载荷第3部分  01 03 05 9c 00 40
 	DIAGRAM_LOAD_3(
 			new byte[] {0x01, 0x03, 0x05, (byte)0x9c, 0x00, 0x40,        (byte)0x84, (byte)0xd8},  
 			0x80,  
-			2,
+			"diagram",
 			new String[]{"chartload_3"}),
 	
 	
@@ -94,21 +94,21 @@ public enum ZLOpdProtCMDEnum {
 	DIAGRAM_CURR_1(
 			new byte[] {0x01, 0x03, 0x05, (byte)0xdc, 0x00, 0x5e,        0x05, 0x04},  
 			0xbc,  
-			2,
+			"diagram",
 			new String[]{"chartcurr_1"}),
 			
 	//176-88 电流图电流第2部分 01 03 06 3a 00 58
 	DIAGRAM_CURR_2(
 			new byte[] {0x01, 0x03, 0x06, 0x3a, 0x00, 0x58,        0x64, (byte)0xb5},  
 			0xb0,  
-			2,
+			"diagram",
 			new String[]{"chartcurr_2"}),
 			
 	//136-68 电流图电流第3部分 01 03 06 92 00 44 
 	DIAGRAM_CURR_3(
 			new byte[] {0x01, 0x03, 0x06, (byte)0x92, 0x00, 0x44,        (byte)0xe4, (byte)0x9c},  
 			0x88,  
-			2,
+			"diagram",
 			new String[]{"chartcurr_3"}),
 	
 	
@@ -119,21 +119,21 @@ public enum ZLOpdProtCMDEnum {
 	DIAGRAM_POWER_1(
 			new byte[] {0x01, 0x03, 0x06, (byte)0xd6, 0x00, 0x5c,        (byte)0xa4, (byte)0x83},  
 			0xb8,  
-			2,
+			"diagram",
 			new String[]{"chartpower_1"}),
 				
 	//172-86 功率图功率第2部分  01 03 07 32 00 56
 	DIAGRAM_POWER_2(
 			new byte[] {0x01, 0x03, 0x07, 0x32, 0x00, 0x56,        0x65, 0x4f},  
 			0xac,  
-			2,
+			"diagram",
 			new String[]{"chartpower_2"}),
 				
 	//144-72 功率图功率第3部分 01 03 07 88 00 48
 	DIAGRAM_POWER_3(
 			new byte[] {0x01, 0x03, 0x07, (byte)0x88, 0x00, 0x48,       (byte)0xc4, (byte)0xa2},  
 			0x90,  
-			2,
+			"diagram",
 			new String[]{"chartpower_3"}),
 	
 	
@@ -141,7 +141,7 @@ public enum ZLOpdProtCMDEnum {
 	LIQUID_OTHER(
 			 new byte[] {0x01, 0x03, 0x18, 0x00, 0x00, 0x09,       (byte)0x83, 0x6c}, 
 			 0x12,
-			 2,
+			 "liquid",
 			 new String[]{"test_time_yy","test_time_mm","test_time_dd","test_time_hr","test_time_mi","test_time_ss","sound_speed","test_depth","casing_pre"}
 			),
 	
@@ -149,241 +149,241 @@ public enum ZLOpdProtCMDEnum {
 	LIQUID_SERIAL_1(
 			 new byte[] {0x01,0x03,0x18,0x10,0x00,0x7d,(byte)0x82,(byte)0x8e}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_1"}
 			),
 	LIQUID_SERIAL_2(
 			 new byte[] {0x01,0x03,0x18,(byte)0x8D,0x00,0x7d,0x13,0x60}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_2"}
 			),
 	LIQUID_SERIAL_3(
 			 new byte[] {0x01,0x03,0x19,0x0A,0x00,0x7d,(byte)0xa2,(byte)0xb5}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_3"}
 			),
 	LIQUID_SERIAL_4(
 			 new byte[] {0x01,0x03,0x19,(byte)0x87,0x00,0x7d,0x32,(byte)0x9e}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_4"}
 			),
 	LIQUID_SERIAL_5(
 			 new byte[] {0x01,0x03,0x1A,0x04,0x00,0x7d,(byte)0xc3,0x32}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_5"}
 			),
 	LIQUID_SERIAL_6(
 			 new byte[] {0x01,0x03,0x1A,(byte)0x81,0x00,0x7d,(byte)0xd2,(byte)0xdb}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_6"}
 			),
 	LIQUID_SERIAL_7(
 			 new byte[] {0x01,0x03,0x1A,(byte)0xFE,0x00,0x7d,(byte)0xe3,0x03}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_7"}
 			),
 	LIQUID_SERIAL_8(
 			 new byte[] {0x01,0x03,0x1B,0x7B,0x00,0x7d,(byte)0xf3,0x16}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_8"}
 			),
 	LIQUID_SERIAL_9(
-			 new byte[] {0x01,0x03,0x1B,(byte)0xF8,0x00,0x7d}, 
+			 new byte[] {0x01,0x03,0x1B,(byte)0xF8,0x00,0x7d,0x02,(byte)0xfe}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_9"}
 			),
 	LIQUID_SERIAL_10(
 			 new byte[] {0x01,0x03,0x1C,0x75,0x00,0x7d,(byte)0x93,(byte)0xa1}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_10"}
 			),
 	LIQUID_SERIAL_11(
 			 new byte[] {0x01,0x03,0x1C,(byte)0xF2,0x00,0x7d,0x23,(byte)0x88}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_11"}
 			),
 	LIQUID_SERIAL_12(
 			 new byte[] {0x01,0x03,0x1D,0x6F,0x00,0x7d,(byte)0xb3,(byte)0x9a}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_12"}
 			),
 	LIQUID_SERIAL_13(
 			 new byte[] {0x01,0x03,0x1D,(byte)0xEC,0x00,0x7d,0x42,0x72}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_13"}
 			),
 	LIQUID_SERIAL_14(
 			 new byte[] {0x01,0x03,0x1E,0x69,0x00,0x7d,0x53,(byte)0xdf}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_14"}
 			),
 	LIQUID_SERIAL_15(
 			 new byte[] {0x01,0x03,0x1E,(byte)0xE6,0x00,0x7d,0x62,0x34}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_15"}
 			),
 	LIQUID_SERIAL_16(
 			 new byte[] {0x01,0x03,0x1F,0x63,0x00,0x7d,0x72,0x21}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_16"}
 			),
 	LIQUID_SERIAL_17(
 			 new byte[] {0x01,0x03,0x1F,(byte)0xE0,0x00,0x7d,(byte)0x83,(byte)0xc9}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_17"}
 			),
 	LIQUID_SERIAL_18(
 			 new byte[] {0x01,0x03,0x20,0x5D,0x00,0x7d,0x1f,(byte)0xf9}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_18"}
 			),
 	LIQUID_SERIAL_19(
 			 new byte[] {0x01,0x03,0x20,(byte)0xDA,0x00,0x7d,(byte)0xaf,(byte)0xd0}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_19"}
 			),
 	LIQUID_SERIAL_20(
 			 new byte[] {0x01,0x03,0x21,0x57,0x00,0x7d,0x3e,0x07}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_20"}
 			),
 	LIQUID_SERIAL_21(
 			 new byte[] {0x01,0x03,0x21,(byte)0xD4,0x00,0x7d,(byte)0xcf,(byte)0xef}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_21"}
 			),
 	LIQUID_SERIAL_22(
 			 new byte[] {0x01,0x03,0x22,0x51,0x00,0x7d,(byte)0xde,0x42}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_22"}
 			),
 	LIQUID_SERIAL_23(
 			 new byte[] {0x01,0x03,0x22,(byte)0xCE,0x00,0x7d,(byte)0xee,0x6c}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_23"}
 			),
 	LIQUID_SERIAL_24(
 			 new byte[] {0x01,0x03,0x23,0x4B,0x00,0x7d,(byte)0xfe,0x79}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_24"}
 			),
 	LIQUID_SERIAL_25(
 			 new byte[] {0x01,0x03,0x23,(byte)0xC8,0x00,0x7d,0x0f,(byte)0x91}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_25"}
 			),
 	LIQUID_SERIAL_26(
 			 new byte[] {0x01,0x03,0x24,0x45,0x00,0x7d,(byte)0x9e,(byte)0xce}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_26"}
 			),
 	LIQUID_SERIAL_27(
 			 new byte[] {0x01,0x03,0x24,(byte)0xC2,0x00,0x7d,0x2e,(byte)0xe7}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_27"}
 			),
 	LIQUID_SERIAL_28(
 			 new byte[] {0x01,0x03,0x25,0x3F,0x00,0x7d,(byte)0xbe,(byte)0xeb}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_28"}
 			),
 	LIQUID_SERIAL_29(
 			 new byte[] {0x01,0x03,0x25,(byte)0xBC,0x00,0x7d,0x4f,0x03}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_29"}
 			),
 	LIQUID_SERIAL_30(
 			 new byte[] {0x01,0x03,0x26,0x39,0x00,0x7d,0x5e,(byte)0xae}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_30"}
 			),
 	LIQUID_SERIAL_31(
 			 new byte[] {0x01,0x03,0x26,(byte)0xB6,0x00,0x7d,0x6f,0x45}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_31"}
 			),
 	LIQUID_SERIAL_32(
 			 new byte[] {0x01,0x03,0x27,0x33,0x00,0x7d,0x7f,0x50}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_32"}
 			),
 	LIQUID_SERIAL_33(
 			 new byte[] {0x01,0x03,0x27,(byte)0xB0,0x00,0x7d,(byte)0x8e,(byte)0xb8}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_33"}
 			),
 	LIQUID_SERIAL_34(
 			 new byte[] {0x01,0x03,0x28,0x2D,0x00,0x7d,0x1c,0x42}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_34"}
 			),
 	LIQUID_SERIAL_35(
 			 new byte[] {0x01,0x03,0x28,(byte)0xAA,0x00,0x7d,(byte)0xac,0x6b}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_35"}
 			),
 	LIQUID_SERIAL_36(
 			 new byte[] {0x01,0x03,0x29,0x27,0x00,0x7d,0x3d,(byte)0xbc}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_36"}
 			),
 	LIQUID_SERIAL_37(
 			 new byte[] {0x01,0x03,0x29,(byte)0xA4,0x00,0x7d,(byte)0xcc,0x54}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_37"}
 			),
 	LIQUID_SERIAL_38(
 			 new byte[] {0x01,0x03,0x2A,0x21,0x00,0x7d,(byte)0xdd,(byte)0xf9}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_38"}
 			),
 	LIQUID_SERIAL_39(
 			 new byte[] {0x01,0x03,0x2A,(byte)0x9E,0x00,0x7d,(byte)0xec,0x1d}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_39"}
 			),
 	LIQUID_SERIAL_40(
 			 new byte[] {0x01,0x03,0x2B,0x1B,0x00,0x7d,(byte)0xfc,0x08}, 
 			 0xfa,
-			 1,
+			 "liquid",
 			 new String[] {"serial_40"}
 			);
 	
@@ -391,14 +391,14 @@ public enum ZLOpdProtCMDEnum {
 	
 	private int totalBytCount; //返回的字节数(以此区分不同指令的返回数据)
 	
-	private int itemBytCount; //每个数据项字节数  目前为4字节或2字节一个数据
+	private String type; //指令类型(single,diagram,liquid)
 	
 	private String[]  paramCodes; //返回数据项对应参数编码
 	
-	private ZLOpdProtCMDEnum(byte[]  cmd,int totalBytCount,int itemBytCount,String[] paramCodes) {
+	private ZLOpdProtCMDEnum(byte[]  cmd,int totalBytCount,String type,String[] paramCodes) {
 		this.cmd=cmd;
 		this.totalBytCount=totalBytCount;
-		this.itemBytCount=itemBytCount;
+		this.type=type;
 		this.paramCodes=paramCodes;
 	}
 
@@ -410,8 +410,8 @@ public enum ZLOpdProtCMDEnum {
 		return totalBytCount;
 	}
 
-	public int getItemBytCount() {
-		return itemBytCount;
+	public String getType() {
+		return type;
 	}
 
 	public String[] getParamCodes() {

+ 3 - 10
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -49,7 +49,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 		
 		String hexmsg=ByteBufUtil.hexDump(byteBuf);
 		
-		logger.debug("接收到数据:{}",hexmsg);
+		//logger.debug("接收到数据:{}",hexmsg);
 		
 		//byte[] temp=ByteBufUtil.getBytes(byteBuf, 0, 2);
 		
@@ -92,6 +92,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 			
 			if(ByteUtils.byte2ToIntHL(crc16)==calCrc16) {  //crc校验通过
 				
+				byteBuf.readerIndex(headBtyCount);  //readIndex设置到数据区起始位置,后面解析直接可读数据
 				String cmd=null;
 				
 				synchronized(ctx.channel()) {  //成功收到消息才通知可发指令,未成功收到消息则等超时
@@ -99,15 +100,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 					cmd=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_CMD).get();
 					logger.info("解析指令{}返回消息",cmd);
 					
-					if("DIAGRAM_POINT_COUNT".equalsIgnoreCase(cmd)) {  //如果是功图点数检测,还需要明确结果
-						short pcount=GatherRespParserFacade.parseDiagramPoint(byteBuf,headBtyCount);
-						logger.warn("功图点数{}",pcount);
-						ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(pcount==250);
-					}
-					else{
-						String serial=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
-						GatherRespParserFacade.parse(byteBuf,headBtyCount,datalen,cmd,serial);
-					}
+					GatherRespParserFacade.parse(byteBuf,datalen,cmd,ctx.channel());
 					
 					ctx.channel().notifyAll(); //已经收到回复消息,通知指令发送进程可以继续,同步块或者同步方法执行完后才释放锁
 					

+ 56 - 23
src/main/java/com/hb/proj/gather/protocol/parser/DataPieceLiquidBox.java

@@ -4,7 +4,15 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.model.LiquidPO;
+
 public class DataPieceLiquidBox {
+	
+	private final static  Logger logger = LoggerFactory.getLogger(DataPieceLiquidBox.class);
 
 	private int gatherNum;  //采集批次号
 	
@@ -12,34 +20,58 @@ public class DataPieceLiquidBox {
 	
 	private String testTime;  //测量时间
 	
-	private Short soundSpeed;  //音速   cm/s
+	private Integer soundSpeed;  //音速   cm/s
 	
-	private Short testDepth; // 测量深度  0.1米
+	private Integer testDepth; // 测量深度  0.1米
 	
-	private Short casingPre; // 当前套压  kpa
+	private Integer casingPress; // 当前套压  kpa
 	
 	
-	private List<Integer>  pieceSerial=new ArrayList<>(10000);  //接箍序列(高字节),动液面序列(低字节) 分别对应奇数位,偶数位
+	private List<Short>  pieceSerial=new ArrayList<>(10000);  //接箍序列(高字节),动液面序列(低字节) 分别对应奇数位,偶数位
 	
 	public DataPieceLiquidBox(String channelSerial) {
 		this.channelSerial=channelSerial;
 	}
 	
-	public void putSerial(List<Integer> serial) {
+	public LiquidPO putSerial(List<Short> serial,boolean canAssemble) {
 		this.pieceSerial.addAll(serial);
+		logger.info("liquid serial size:{}",this.pieceSerial.size());
+		return canAssemble?assemble():null;
 	}
 	
-	public void putOther(Map<String,Short> gatherOther) {
-		this.testTime="20"+String.format("%2d", gatherOther.get("test_time_yy"))+
-				"-"+String.format("%2d", gatherOther.get("test_time_mm"))+
-				"-"+String.format("%2d", gatherOther.get("test_time_dd"))+
-				" "+String.format("%2d", gatherOther.get("test_time_hr"))+
-				":"+String.format("%2d", gatherOther.get("test_time_mi"))+
-				":"+String.format("%2d", gatherOther.get("test_time_ss"));
+	public void putOther(Map<String,Integer> gatherOther) {
+		this.testTime="20"+String.format("%02d", gatherOther.get("test_time_yy"))+
+				"-"+String.format("%02d", gatherOther.get("test_time_mm"))+
+				"-"+String.format("%02d", gatherOther.get("test_time_dd"))+
+				" "+String.format("%02d", gatherOther.get("test_time_hr"))+
+				":"+String.format("%02d", gatherOther.get("test_time_mi"))+
+				":"+String.format("%02d", gatherOther.get("test_time_ss"));
 		
 		this.soundSpeed=gatherOther.get("sound_speed");
 		this.testDepth=gatherOther.get("test_depth");
-		this.casingPre=gatherOther.get("casing_pre");
+		this.casingPress=gatherOther.get("casing_pre");
+		
+		//表明是新的采集,先清除旧数据
+		pieceSerial.clear();
+	}
+	
+	//组装liquid,并进行数据转换
+	public LiquidPO assemble() {
+		logger.info("liquid assemble:{},{}",testTime,pieceSerial.size());
+		if(StringUtils.isBlank(testTime) || pieceSerial.size()<10000) {
+			return null;
+		}
+		LiquidPO  po=new LiquidPO(null,channelSerial);  //此处还无法获得wellId
+		
+		po.setSoundSpeedDev(soundSpeed!=null?(soundSpeed/100.0):null);
+		po.setLiquidDepthDev(testDepth!=null?(testDepth/10.0):null);
+		po.setCasingPressDev(casingPress!=null?(casingPress/1000.0):null);
+		po.setTestTime(testTime);
+		
+		po.setDatas(pieceSerial);
+		
+		clear();
+		return po;
 		
 	}
 	
@@ -49,6 +81,7 @@ public class DataPieceLiquidBox {
 	}
 	
 	public void clear() {
+		testTime=null;
 		pieceSerial.clear();
 	}
 
@@ -80,35 +113,35 @@ public class DataPieceLiquidBox {
 		this.testTime = testTime;
 	}
 
-	public Short getSoundSpeed() {
+	public Integer getSoundSpeed() {
 		return soundSpeed;
 	}
 
-	public void setSoundSpeed(Short soundSpeed) {
+	public void setSoundSpeed(Integer soundSpeed) {
 		this.soundSpeed = soundSpeed;
 	}
 
-	public Short getTestDepth() {
+	public Integer getTestDepth() {
 		return testDepth;
 	}
 
-	public void setTestDepth(Short testDepth) {
+	public void setTestDepth(Integer testDepth) {
 		this.testDepth = testDepth;
 	}
 
-	public Short getCasingPre() {
-		return casingPre;
+	public Integer getCasingPress() {
+		return casingPress;
 	}
 
-	public void setCasingPre(Short casingPre) {
-		this.casingPre = casingPre;
+	public void setCasingPress(Integer casingPress) {
+		this.casingPress = casingPress;
 	}
 
-	public List<Integer> getPieceSerial() {
+	public List<Short> getPieceSerial() {
 		return pieceSerial;
 	}
 
-	public void setPieceSerial(List<Integer> pieceSerial) {
+	public void setPieceSerial(List<Short> pieceSerial) {
 		this.pieceSerial = pieceSerial;
 	}
 

+ 3 - 3
src/main/java/com/hb/proj/gather/protocol/parser/DataPieceSingleBox.java

@@ -38,9 +38,9 @@ public class DataPieceSingleBox {
 	
 	
 	public SingleCombPO  assemble() {
-		boolean b1=checkReady(ZLOpdProtCMDEnum.PRESS_TEMP_LOAD.getParamCodes());
-		boolean b2=checkReady(ZLOpdProtCMDEnum.CURR_VOL_LOS_PW.getParamCodes());
-		boolean b3=checkReady(ZLOpdProtCMDEnum.FREQ_STROKE.getParamCodes());
+		boolean b1=checkReady(ZLOpdProtCMDEnum.SINGLE_PRESS_TEMP_LOAD.getParamCodes());
+		boolean b2=checkReady(ZLOpdProtCMDEnum.SINGLE_CURR_VOL_LOS_PW.getParamCodes());
+		boolean b3=checkReady(ZLOpdProtCMDEnum.SINGLE_FREQ_STROKE.getParamCodes());
 		
 		if(!b1 || !b2 || !b3) {
 			return null;

+ 18 - 10
src/main/java/com/hb/proj/gather/protocol/parser/DiagramParser.java

@@ -29,35 +29,43 @@ public class DiagramParser {
 	
 	/**
 	 * 解析数据并进行后续数据处理
-	 * @param byteBuf
-	 * @param startIndex
+	 * @param byteBuf  此处的readIndex已经跳过消息头,可直接读取数据区
 	 * @param dataLen
 	 * @param cmdEnum
 	 * @param serial
 	 */
-	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,ZLOpdProtCMDEnum cmdEnum,String serial) {
-		 List<Float> datas=parseShort2Float(byteBuf,startIndex,dataLen);
+	public static void parse(ByteBuf byteBuf ,int dataLen,ZLOpdProtCMDEnum cmdEnum,String serial) {
+		 List<Float> datas=parseShort2Float(byteBuf,dataLen);
 		 DiagramPO po=putPieceData(serial,(cmdEnum.getParamCodes())[0], datas);
 		 if(po!=null) {
 			 GatherTaskExecutor.execute(new DataTransRepTask(po)); 
 		 }
 	}
 	
+	/**
+	 * 专为功图点数检测解析 2字节
+	 * @param byteBuf
+	 * @param startIndex
+	 * @param dataLen
+	 * @return
+	 */
+	public static short parseDiagramPoint(ByteBuf byteBuf) {
+		return byteBuf.readShort();
+	}
+	
+	
 	/**
 	 * 解析消息中的数据部分 每个数据项 2字节
+	 * 目前是按有符号的short读取,如果数据越界,要换无符号readUnsignedShort方法
 	 * @param byteBuf
 	 * @param startIndex
 	 * @param dataLen
 	 * @return
 	 */
-	public static List<Float> parseShort2Float(ByteBuf byteBuf ,int startIndex,int dataLen) {
-		byteBuf.readerIndex(startIndex);
+	public static List<Float> parseShort2Float(ByteBuf byteBuf,int dataLen) {
 		List<Float> rtns=new ArrayList<Float>();
-		while(true) {
+		while(byteBuf.readableBytes()>3) {  //1+2   2字节的校验码要排除
 			rtns.add(byteBuf.readShort()+0.0f); //顺序读取,readIndex 自动后移
-			if(byteBuf.readerIndex()>=(startIndex+dataLen)) {
-				break;
-			}
 		}
 		logger.info("数据解析完:{}",rtns);
 		return rtns;

+ 42 - 28
src/main/java/com/hb/proj/gather/protocol/parser/LiquidParser.java

@@ -8,7 +8,12 @@ import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.hb.proj.gather.model.LiquidPO;
+import com.hb.proj.gather.process.DataTransRepLiquidTask;
 import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+import com.hb.proj.gather.rep.GatherDataRepService;
+import com.hb.proj.gather.scheduler.GatherTaskExecutor;
+import com.hb.xframework.util.ApplicationContextUtils;
 
 import io.netty.buffer.ByteBuf;
 
@@ -16,47 +21,57 @@ public class LiquidParser {
 
 	private final static  Logger logger = LoggerFactory.getLogger(LiquidParser.class);
 	
+	private  static GatherDataRepService repService=null;
+	
 	private static Map<String,DataPieceLiquidBox> pkgs=new HashMap<>(100);
 	
 	
 	/**
 	 * 解析数据并进行后续数据处理
 	 * 数据入库前先检测该测量时间是否已经在库里,只有不存在才进行后面的采集
-	 * @param byteBuf
-	 * @param startIndex
+	 * @param byteBuf  此处的readIndex已经跳过消息头,可直接读取数据区
 	 * @param dataLen
 	 * @param cmdEnum
-	 * @param serial
+	 * @param serial 设备号
 	 */
-	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,ZLOpdProtCMDEnum cmdEnum,String serial) {
+	public static boolean parse(ByteBuf byteBuf ,int dataLen,ZLOpdProtCMDEnum cmdEnum,String serial) {
 		if(cmdEnum.name().startsWith("LIQUID_OTHER")) {
-			Map<String,Short> dataMap=parseOther(byteBuf,startIndex,dataLen,cmdEnum.getParamCodes());
+			Map<String,Integer> otherData=parseOther(byteBuf,dataLen,cmdEnum.getParamCodes());
+			putPieceOther(serial,otherData);
+			if(repService==null) {
+				repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
+			}
+			if(repService.existsLiquid(serial, pkgs.get(serial).getTestTime())) {
+				logger.warn("动液面数据{}-{}已存在,忽略本次采集",serial,pkgs.get(serial).getTestTime());
+				return false;
+			}
 			
 		}
-		else if(cmdEnum.name().startsWith("LIQUID_CURVE")) {
-			List<Integer>  dataSerial=parseSerial(byteBuf,startIndex,dataLen);
+		else if(cmdEnum.name().startsWith("LIQUID_SERIAL")) {
+			List<Short>  dataSerial=parseSerial(byteBuf,dataLen);
+			LiquidPO po=putPieceSerial(serial,dataSerial,cmdEnum.name().equals("LIQUID_SERIAL_40"));  //最后一包数据收到就组装数据
+			if(po!=null) {
+				GatherTaskExecutor.execute(new DataTransRepLiquidTask(po));
+			}
 		}
 		
+		return true;
 	}
 	
 	
 	/**
 	 * 解析动液面单值数据(测量时间、音速、套压)
+	 * 音速数值越界,需要特殊处理
 	 * @param byteBuf
-	 * @param startIndex
 	 * @param dataLen
 	 * @param paramCodes
 	 * @return
 	 */
-	private static Map<String,Short> parseOther(ByteBuf byteBuf ,int startIndex,int dataLen,String[] paramCodes) {
-		byteBuf.readerIndex(startIndex);
-		Map<String,Short> rtnData=new HashMap<>(paramCodes.length);
+	private static Map<String,Integer> parseOther(ByteBuf byteBuf,int dataLen,String[] paramCodes) {
+		Map<String,Integer> rtnData=new HashMap<>(paramCodes.length);
 		int i=0;
-		while(true) {
-			rtnData.put(paramCodes[i++],byteBuf.readShort() );   //顺序读取,readIndex 自动后移
-			if(i>=paramCodes.length || byteBuf.readerIndex()>=(startIndex+dataLen)) {
-				break;
-			}
+		while(i<paramCodes.length && byteBuf.readableBytes()>3) {    //1+2   2字节的校验码要排除
+			rtnData.put(paramCodes[i++],byteBuf.readUnsignedShort());   //顺序读取,readIndex 自动后移
 		}
 		logger.info("数据解析完:{}",rtnData);
 		return rtnData;
@@ -65,18 +80,13 @@ public class LiquidParser {
 	/**
 	 * 解析动液面测量数据
 	 * @param byteBuf
-	 * @param startIndex
 	 * @param dataLen
 	 * @return
 	 */
-	private static List<Integer> parseSerial(ByteBuf byteBuf ,int startIndex,int dataLen) {
-		byteBuf.readerIndex(startIndex);
-		List<Integer> serial=new ArrayList<>(dataLen);
-		while(true) {
-			serial.add(byteBuf.readByte()+0); //顺序读取,readIndex 自动后移
-			if(byteBuf.readerIndex()>=(startIndex+dataLen)) {
-				break;
-			}
+	private static List<Short> parseSerial(ByteBuf byteBuf,int dataLen) {
+		List<Short> serial=new ArrayList<>(dataLen);
+		while(byteBuf.readableBytes()>2) {  //0+2   2字节的校验码要排除
+			serial.add(byteBuf.readUnsignedByte()); //顺序读取,readIndex 自动后移
 		}
 		logger.info("数据解析完,曲线序列:{}",serial);
 		
@@ -91,7 +101,7 @@ public class LiquidParser {
 	 * @param channelSerial
 	 * @param pieceData
 	 */
-	private static void putPieceOther(String channelSerial,Map<String,Short> pieceData) {
+	private static void putPieceOther(String channelSerial,Map<String,Integer> pieceData) {
 		if(!pkgs.containsKey(channelSerial)) {
 			pkgs.put(channelSerial, new DataPieceLiquidBox(channelSerial));
 		}
@@ -100,11 +110,15 @@ public class LiquidParser {
 	
 	/**
 	 * 放入片段数据,符合组装要求时返回组装好的动液面对象(曲线数据,)
+	 * @param channelSerial
+	 * @param pieceData
+	 * @param canAssemble  是否组装数据
+	 * @return
 	 */
-	private static void putPieceSerial(String channelSerial,List<Integer> pieceData) {
+	private static LiquidPO putPieceSerial(String channelSerial,List<Short> pieceData,boolean canAssemble) {
 		if(!pkgs.containsKey(channelSerial)) {
 			pkgs.put(channelSerial, new DataPieceLiquidBox(channelSerial));
 		}
-		pkgs.get(channelSerial).putSerial(pieceData);
+		return pkgs.get(channelSerial).putSerial(pieceData,canAssemble);
 	}
 }

+ 5 - 10
src/main/java/com/hb/proj/gather/protocol/parser/SingleParser.java

@@ -27,14 +27,13 @@ public class SingleParser {
 
 	/**
 	 * 解析数据并进行后续数据处理
-	 * @param byteBuf
-	 * @param startIndex
+	 * @param byteBuf  此处的readIndex已经跳过消息头,可直接读取数据区
 	 * @param dataLen
 	 * @param cmdEnum
 	 * @param serial
 	 */
-	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,ZLOpdProtCMDEnum cmdEnum,String serial) {
-		 Map<String,Float> dataMap=parseFloat(byteBuf,startIndex,dataLen,cmdEnum.getParamCodes());
+	public static void parse(ByteBuf byteBuf ,int dataLen,ZLOpdProtCMDEnum cmdEnum,String serial) {
+		 Map<String,Float> dataMap=parseFloat(byteBuf,dataLen,cmdEnum.getParamCodes());
 		 SingleCombPO po=putPieceData(serial, dataMap);
 		 if(po!=null) {
 			 GatherTaskExecutor.execute(new DataTransRepSingleTask(po)); 
@@ -51,15 +50,11 @@ public class SingleParser {
 	 * @param  dataLen  数据区长度
 	 * @param  paramCodes  数据项编码
 	 */
-	public static Map<String,Float> parseFloat(ByteBuf byteBuf ,int startIndex,int dataLen,String[] paramCodes) {
-		byteBuf.readerIndex(startIndex);
+	public static Map<String,Float> parseFloat(ByteBuf byteBuf ,int dataLen,String[] paramCodes) {
 		Map<String,Float> rtnData=new HashMap<String,Float>(paramCodes.length);
 		int i=0;
-		while(true) {
+		while(i<paramCodes.length && byteBuf.readableBytes()>5) {  //3+2   2字节的校验码要排除
 			rtnData.put(paramCodes[i++],byteBuf.readFloat() );   //顺序读取,readIndex 自动后移
-			if(i>=paramCodes.length || byteBuf.readerIndex()>=(startIndex+dataLen)) {
-				break;
-			}
 		}
 		logger.info("数据解析完:{}",rtnData);
 		return rtnData;

+ 43 - 8
src/main/java/com/hb/proj/gather/rep/GatherDataRepService.java

@@ -9,6 +9,7 @@ import org.springframework.stereotype.Service;
 
 import com.hb.proj.gather.model.AlarmLogVO;
 import com.hb.proj.gather.model.DiagramPO;
+import com.hb.proj.gather.model.LiquidPO;
 import com.hb.proj.gather.model.SingleInsertPO;
 import com.hb.xframework.dao.core.SpringJdbcDAO;
 
@@ -60,16 +61,14 @@ public class GatherDataRepService {
 				insert into tzl_gather_data_multi(well_param,gather_time,data_val1,data_val2) values(?,?,?,?)
 				""";
 		
-		dao.exeUpdate(sql, diagramPO.getWellParam(),diagramPO.getGatherTime(),list2Str(diagramPO.getDisps()),list2Str(diagramPO.getOths()));
+		dao.exeUpdate(sql, diagramPO.getWellParam(),diagramPO.getGatherTime(),listNum2Str(diagramPO.getDisps()),listNum2Str(diagramPO.getOths()));
 	}
 	
-	private String list2Str(List<Float>  datas) {
-		StringBuilder strb=new StringBuilder(datas.size()*10);
-		for(Float d : datas) {
-			strb.append(","+String.valueOf(d));
-		}
-		return strb.substring(1);
-	}
+	/*
+	 * private String list2Str(List<Float> datas) { StringBuilder strb=new
+	 * StringBuilder(datas.size()*10); for(Float d : datas) {
+	 * strb.append(","+String.valueOf(d)); } return strb.substring(1); }
+	 */
 	
 	/**
 	 * 构建单值批量入库参数
@@ -102,4 +101,40 @@ public class GatherDataRepService {
 		
 		return params;
 	}
+	
+	/**
+	 * 检测动液面数据是否已存在
+	 * @param serial
+	 * @param testTime
+	 * @return
+	 */
+	public boolean existsLiquid(String serial,String testTime) {
+		String sql="""
+				select count(1) from tzl_gather_data_liquid liq
+				 where  well_id=(select well_id from tzl_gather_device d where d.device_code=? limit 1) and test_time=?
+				""" ;
+		return dao.queryForObject(sql, Integer.class, serial,testTime)>0;
+	}
+	
+	/**
+	 * 动液面采集数据入库
+	 * @param liquid
+	 */
+	public void save(LiquidPO  liquidPO) {
+		String sql="""
+				insert into tzl_gather_data_liquid(well_id,test_time,liquid_datas,hoop_datas,sound_speed_dev,liquid_depth_dev,casing_press_dev) 
+				values(?,?,?,?,?,?,?)
+				""";
+		dao.exeUpdate(sql, liquidPO.getWellId(),liquidPO.getTestTime(),listNum2Str(liquidPO.getLiquidDatas()),listNum2Str(liquidPO.getHoopDatas()),
+				liquidPO.getSoundSpeedDev(),liquidPO.getLiquidDepthDev(),liquidPO.getCasingPressDev());
+	}
+	
+	private <T extends Number> String listNum2Str(List<T> datas) {
+		StringBuilder strb=new StringBuilder(datas.size()*5);
+		for(Number d : datas) {
+			strb.append(","+String.valueOf(d));
+		}
+		return strb.substring(1);
+	}
+	
 }

+ 32 - 0
src/main/java/com/hb/proj/gather/rep/WellConfigService.java

@@ -22,9 +22,17 @@ public class WellConfigService {
 	@Autowired
 	private SpringJdbcDAO  dao;
 	
+	/**
+	 * 加载所有井的参数配置
+	 * @return
+	 */
 	public Map<String,WellParamVO>  loadWellParams(){
 		return loadWellParams(null);
 	}
+	
+	/**
+	 * 加载指定井的参数配置
+	 */
 	public Map<String,WellParamVO>  loadWellParams(String wellId){
 		
 		Object[] args=null;
@@ -58,10 +66,20 @@ public class WellConfigService {
 		
 	}
 	
+	
+	/**
+	 * 加载所有井的报警设置
+	 * @return
+	 */
 	public Map<String,AlarmDefineVO>  loadAlarmDefines(){
 		return loadAlarmDefines(null);
 	}
 	
+	/**
+	 * 加载指定井的报警设置
+	 * @param wellId
+	 * @return
+	 */
 	public Map<String,AlarmDefineVO>  loadAlarmDefines(String wellId){
 		Object[] args=null;
 		String sql="""
@@ -96,4 +114,18 @@ public class WellConfigService {
 		return mapping;
 		
 	}
+	
+	
+	/**
+	 * 获得指定设备的关联井
+	 * @param serial
+	 * @return
+	 */
+	public String getWellIdByDev(String serial) {
+		String sql="select well_id from tzl_gather_device where device_code=? and del_if=false limit 1";
+		Map<String,Object> rst=dao.queryForMap(sql, serial);
+		
+		return rst!=null?((String)rst.get("wellId")):null;
+		
+	}
 }

+ 40 - 4
src/main/java/com/hb/proj/gather/scheduler/GatherLiquidTask.java

@@ -51,6 +51,9 @@ public class GatherLiquidTask implements Runnable {
 			ByteBuf byteBuf=null;
 			
 			synchronized(channel) {
+				
+				channel.attr(ChannelGroupMgr.ATTR_KEY_STOP_NEXT).set(false); //每次任务开始前重置
+				
 				for(ZLOpdProtCMDEnum cmd : cmds) {
 					
 					if(needCloseChannel()) {
@@ -69,10 +72,16 @@ public class GatherLiquidTask implements Runnable {
 					
 					channel.wait(cmdTimeout);  //等待接收返回数据后继续,最多等待cmdTimeout,此处释放锁,回复还未收到就被多值任务获得锁并发指令,会导致两个指令间隔很短
 				
-					/*
-					 * if(!channel.attr(ChannelGroupMgr.ATTR_KEY_RECEIVED).get()) { //如果是超时等待后执行
-					 * logger.info("动液面采集:{}超时未回复,取消此次任务",cmd.name()); return; }
-					 */
+					if(channel.attr(ChannelGroupMgr.ATTR_KEY_STOP_NEXT).get()) {
+						logger.info("本次动液面采集取消(数据已采集过)");
+						return;
+					}
+					
+					if(checkCancel(cmd.name())) {
+						logger.info("本次动液面采集取消(中途有数据未采到)");
+						return;
+					}
+					
 				}
 				
 			}
@@ -86,6 +95,33 @@ public class GatherLiquidTask implements Runnable {
 		
 		
 
+	}
+	
+	
+	private boolean checkCancel(String cmdName) {
+		
+		Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get();
+		if(pre==null||pre==0) {
+			return false;
+		}
+		boolean isTimeout=((new Date()).getTime()-pre.longValue())>(cmdTimeout-1000);
+		
+		if(!isTimeout) {
+			return false;
+		}
+		//超时后的处理
+		if(cmdName.startsWith("LIQUID_OTHER")) {
+			return true;
+		}
+		else if(cmdName.startsWith("LIQUID_SERIAL")){
+			int num=Integer.parseInt((cmdName.split("_"))[2]);
+			if(num<30) { //只要前30包数据有效就可以继续
+				return true;
+			}
+		}
+		
+		return false;
+		
 	}
 	
 	private boolean needCloseChannel() {

+ 1 - 1
src/main/java/com/hb/proj/gather/scheduler/GatherScheduler.java

@@ -67,7 +67,7 @@ public class GatherScheduler {
 	}
 	
 	
-	@Scheduled(fixedRate = 60 * 60 * 1000,initialDelay= 30000)  
+	@Scheduled(fixedRate = 60 * 60 * 1000,initialDelay= 60000)  
 	public void startLiquidGather() {
 			logger.info("动液面定时采集启动...");
 			Iterator<Channel> iterator=ChannelGroupMgr.iterator();

+ 3 - 1
src/main/java/com/hb/proj/gather/scheduler/GatherSingleTask.java

@@ -32,7 +32,9 @@ public class GatherSingleTask implements Runnable {
 	public void run() {
 		logger.info("单值采集开始...");
 		
-		ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.PRESS_TEMP_LOAD,ZLOpdProtCMDEnum.CURR_VOL_LOS_PW,ZLOpdProtCMDEnum.FREQ_STROKE};
+		ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.SINGLE_PRESS_TEMP_LOAD,
+				ZLOpdProtCMDEnum.SINGLE_CURR_VOL_LOS_PW,
+				ZLOpdProtCMDEnum.SINGLE_FREQ_STROKE};
 		ByteBufAllocator alloc=channel.alloc(); 
 		ByteBuf byteBuf=null;
 		

+ 3 - 1
src/main/java/com/hb/proj/gather/scheduler/GatherTask.java

@@ -47,7 +47,9 @@ public class GatherTask implements Runnable{
 	
 	private void singleGahter(ByteBufAllocator alloc) {
 		try {
-			ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.PRESS_TEMP_LOAD,ZLOpdProtCMDEnum.CURR_VOL_LOS_PW,ZLOpdProtCMDEnum.FREQ_STROKE};
+			ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.SINGLE_PRESS_TEMP_LOAD,
+					ZLOpdProtCMDEnum.SINGLE_CURR_VOL_LOS_PW,
+					ZLOpdProtCMDEnum.SINGLE_FREQ_STROKE};
 			ByteBuf byteBuf=null;
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 				

+ 4 - 2
src/main/java/com/hb/proj/gather/server/NettyGatherServer.java

@@ -1,5 +1,7 @@
 package com.hb.proj.gather.server;
 
+import java.util.concurrent.TimeUnit;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
@@ -55,8 +57,8 @@ public class NettyGatherServer {
 	     }
 	     finally {
 	    	 logger.info("采集服务停止前,关闭处理池...");
-	    	 bossGroup.shutdownGracefully();  //防止线程泄漏
-	    	 workerGroup.shutdownGracefully();
+	    	 bossGroup.shutdownGracefully(0,10,TimeUnit.SECONDS);  //防止线程泄漏  0:静默时间,10:截止时间
+	    	 workerGroup.shutdownGracefully(0,10,TimeUnit.SECONDS);
 	    	 logger.info("采集服务即将停止");
 	     }
 	     		  

+ 0 - 20
src/main/java/com/hb/proj/gather/utils/ByteUtils.java

@@ -98,27 +98,7 @@ public class ByteUtils {
     }
     
     public static void main(String[] args) {
-    	byte[] bytes= {0x40, (byte)0x9B, (byte)0x85, 0x1F};
     	
-    	byte[] bytes2= {0x40,  0x3B,  0x00,  0x00};
-    	
-    	byte[] bytes3= {0x40, (byte)0xa9, (byte)0xa3, (byte)0xd7};
-    	
-    	byte[] bytes4= {0x40,  0x28,  0x00,  0x00};
-    	
-    	byte[] bytes5= {0x40, (byte)0xA9, (byte)0xE5, 0x60};
-    	//System.out.println(ByteUtils.toIntStr(bytes));
-    	//System.out.println(ByteUtils.toIntHL(bytes));
-    	//System.out.println(ByteUtils.toIntLH(bytes));
-    	//System.out.println(ByteUtils.toIntLH(bytes2));
-    	//System.out.println(ByteUtils.toIntLH(bytes3));
-    	//System.out.println(ByteUtils.toIntLH(bytes4));
-    	
-    	//System.out.println(ByteUtils.toIntLH(bytes2));
-    	System.out.println(ByteUtils.toIntLH4(bytes5));
-    	System.out.println(ByteUtils.toIntLH(bytes5));
-    	System.out.println(ByteUtils.toIntHL(bytes5));
-    	//System.out.println(ByteUtils.toHexString(bytes2));
     }
 
 }