Selaa lähdekoodia

单值采集数据入库时,强制保留3位小数(避免异常小数位数超过数据库设定);动液面采集指令发送调整(增加补采,中途失败的从失败位置补采一次);动液面采集接收数据处理调整

chenwen 1 viikko sitten
vanhempi
commit
ea1643f75f

+ 18 - 3
src/main/java/com/hb/proj/gather/protocol/parser/DataPieceLiquidBox.java

@@ -35,10 +35,24 @@ public class DataPieceLiquidBox {
 		this.channelSerial=channelSerial;
 	}
 	
-	public LiquidPO putSerial(List<Short> serial,boolean canAssemble) {
+	/**
+	 * 加入采集到的单包数据
+	 * @param serial
+	 * @param pieceIdx 数据包索引  pieceIdx==40  表示采集完,进行组装   一包数据250个
+	 * @return
+	 */
+	public LiquidPO putSerial(List<Short> serial,int pieceIdx) {
+		if(pieceIdx==1) { // 为1表示是新的采集,需要重置数据容器,避免上次没有组装成功,有残留数据
+			pieceSerial.clear();
+		}
+		if(this.pieceSerial.size()>((pieceIdx-1)*250)) { //该包数据已存在,表示是补采(这种情况很少:超时失败,准备补采时又收到之前失败的数据)
+			logger.info("补采数据替换");
+			this.pieceSerial.removeAll(this.pieceSerial.subList(this.pieceSerial.size()-250, this.pieceSerial.size()));
+			
+		}
 		this.pieceSerial.addAll(serial);
 		logger.info("liquid serial size:{}",this.pieceSerial.size());
-		return canAssemble?assemble():null;
+		return pieceIdx==40?assemble():null;
 	}
 	
 	public void putOther(Map<String,Integer> gatherOther) {
@@ -82,7 +96,8 @@ public class DataPieceLiquidBox {
 	//组装liquid,并进行数据转换
 	public LiquidPO assemble() {
 		logger.info("liquid assemble:{},{}",testTime,pieceSerial.size());
-		if(StringUtils.isBlank(testTime) || pieceSerial.size()<10000) {
+		if(StringUtils.isBlank(testTime) || pieceSerial.size()<7500) {
+			logger.error("动液面数据组装时,缺少测试时间或数据少于7500,取消组装。 ");
 			return null;
 		}
 		LiquidPO  po=new LiquidPO(null,channelSerial);  //此处还无法获得wellId

+ 6 - 4
src/main/java/com/hb/proj/gather/protocol/parser/LiquidParser.java

@@ -50,7 +50,8 @@ public class LiquidParser {
 		}
 		else if(cmdEnum.name().startsWith("LIQUID_SERIAL")) {
 			List<Short>  dataSerial=parseSerial(byteBuf,dataLen);
-			LiquidPO po=putPieceSerial(serial,dataSerial,cmdEnum.name().equals("LIQUID_SERIAL_40"));  //最后一包数据收到就组装数据
+			int idx=Integer.parseInt((cmdEnum.name().split("_"))[2]);  //获取是第几包数据
+			LiquidPO po=putPieceSerial(serial,dataSerial,idx);  //最后一包数据收到就组装数据
 			if(po!=null) {
 				GatherTaskExecutor.execute(new DataTransRepLiquidTask(po));
 			}
@@ -153,13 +154,14 @@ public class LiquidParser {
 	 * 放入片段数据,符合组装要求时返回组装好的动液面对象(曲线数据,)
 	 * @param channelSerial
 	 * @param pieceData
-	 * @param canAssemble  是否组装数据
+	 * @param pieceIdx  数据包索引   pieceIdx==40  表示采集完,进行组装
 	 * @return
 	 */
-	private static LiquidPO putPieceSerial(String channelSerial,List<Short> pieceData,boolean canAssemble) {
+	private static LiquidPO putPieceSerial(String channelSerial,List<Short> pieceData,int pieceIdx) {
+		
 		if(!pkgs.containsKey(channelSerial)) {
 			pkgs.put(channelSerial, new DataPieceLiquidBox(channelSerial));
 		}
-		return pkgs.get(channelSerial).putSerial(pieceData,canAssemble);
+		return pkgs.get(channelSerial).putSerial(pieceData,pieceIdx);
 	}
 }

+ 8 - 3
src/main/java/com/hb/proj/gather/rep/GatherDataRepService.java

@@ -81,16 +81,21 @@ public class GatherDataRepService {
 	
 	
 	/**
-	 * 构建单值批量入库参数
+	 * 构建单值批量入库参数(强制保留3位小数,避免数据库:Data truncation)
 	 * @param singleInPOs
 	 * @return
 	 */
 	private List<Object[]>  buildBatchParams(Collection<SingleInsertPO>  singleInPOs){
 		
 		List<Object[]>  params=new ArrayList<>(singleInPOs.size());
-		
+		Float  val=null;
 		for(SingleInsertPO po : singleInPOs) {
-			params.add(new Object[] {po.getWellParam(),po.getGatherTimeStr(),po.getDataVal()});
+			val=po.getDataVal();
+			if(val!=null) {
+				val=Float.parseFloat(String.format("%.3f", val));
+			}
+			
+			params.add(new Object[] {po.getWellParam(),po.getGatherTimeStr(),val});
 		}
 		
 		return params;

+ 2 - 2
src/main/java/com/hb/proj/gather/scheduler/GatherDistributeTask.java

@@ -47,12 +47,12 @@ public class GatherDistributeTask implements Callable<Integer> {
 			
 		} 
 		catch (TimeoutException e) {  //等待超过1个周期,不再执行该次任务,直接等新的周期任务
-			logger.error("等待通道{},超时{}s",chnSerial,timeoutSec);
+			logger.error("等待通道{},提交采集任务时,超时{}s",chnSerial,timeoutSec);
 			return TaskFutureCode.FAILED;
 		}
 		catch(Exception e) {
 			e.printStackTrace();
-			logger.error("通道{},等待/准备执行任务时,出现异常:{}",chnSerial,e.getMessage());
+			logger.error("等待通道{},提交采集任务时,出现异常:{}",chnSerial,e.getMessage());
 			return TaskFutureCode.FAILED;
 		}
 		

+ 65 - 4
src/main/java/com/hb/proj/gather/scheduler/GatherLiquidTask.java

@@ -2,7 +2,9 @@ package com.hb.proj.gather.scheduler;
 
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 
 import org.slf4j.Logger;
@@ -85,9 +87,10 @@ public class GatherLiquidTask implements Callable<Integer> {
 			
 			channel.attr(ChannelGroupMgr.ATTR_KEY_STOP_NEXT).set(false); //每次任务开始前重置
 			byteBuf=alloc.directBuffer();
-				
+			Integer rst=null;	
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 					
+				/*
 					if(needCloseChannel()) {
 						//ChannelGroupMgr.disconnect(channel);
 						channel.close();
@@ -114,11 +117,27 @@ public class GatherLiquidTask implements Callable<Integer> {
 					}
 					
 					if(checkCancel(cmd.name())) {
-						logger.info("{}本次动液面采集取消(中途有数据未采到)",serialAddrNum);
+						channel.close();
+						logger.info("{}本次动液面采集取消(中途有数据未采到),准备关闭连接,等待重连",serialAddrNum);
+						return TaskFutureCode.CLOSED;
 						//regather();  //此处补采造成,future.get() 一直锁定
-						return TaskFutureCode.RETRY;
+						//return TaskFutureCode.RETRY;
 					}
-					
+				*/
+				
+				rst=writeCMD(byteBuf,cmd,addrNum,1);  //目前限制只重试1次
+				if(rst==TaskFutureCode.SUCCESS) {
+					continue;
+				}
+				else if(rst==TaskFutureCode.CANCELED) {
+					return TaskFutureCode.CANCELED;
+				}
+				else if(rst==TaskFutureCode.FAILED) {
+					channel.close();
+					logger.info("动液面设备:{},本次动液面采集取消(中途有数据未采到:{}),准备关闭连接,等待重连",serialAddrNum,cmd.name());
+					return TaskFutureCode.CLOSED;
+				}
+				
 			}	
 				
 			return TaskFutureCode.SUCCESS;
@@ -141,6 +160,48 @@ public class GatherLiquidTask implements Callable<Integer> {
 	}
 	
 	
+	private Integer writeCMD(ByteBuf byteBuf,ZLOpdProtCMDEnum cmd,Integer addrNum,int retryCount) throws InterruptedException {
+		
+		String serialAddrNum=channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get()+"-"+String.valueOf(addrNum);
+		
+		byteBuf.readerIndex(byteBuf.writerIndex());
+		
+		byteBuf.writeBytes(ByteUtils.buildExeCMD(cmd.getCmd(),addrNum));
+		channel.writeAndFlush(byteBuf.retainedSlice());
+		
+		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+		channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
+		
+		logger.info("动液面设备【{}】发送完指令:{},{}",serialAddrNum,cmd.name(),ByteUtils.toHexString(cmd.getCmd()));
+		
+		channel.wait(cmdTimeout);  //等待接收返回数据后继续,最多等待cmdTimeout,此处释放锁,回复还未收到就被多值任务获得锁并发指令,会导致两个指令间隔很短
+	
+		if(channel.attr(ChannelGroupMgr.ATTR_KEY_STOP_NEXT).get()) {
+			logger.info("本次动液面采集取消(数据已采集过)");
+			return TaskFutureCode.CANCELED;
+		}
+		
+		if(isTimeout()) {
+			if(retryCount>0) {
+				logger.info("动液面设备【{}】,采集超时失败,准备300ms后重新采集{},重试序号{}",serialAddrNum,cmd.name(),retryCount);
+				Thread.sleep(300);
+				return writeCMD(byteBuf,cmd,addrNum,--retryCount);
+			}
+			return TaskFutureCode.FAILED;
+		}
+		
+		return TaskFutureCode.SUCCESS;
+	}
+	
+	private boolean isTimeout() {
+		Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get();
+		if(pre==null||pre==0) {
+			return false;
+		}
+		return ((new Date()).getTime()-pre.longValue())>(cmdTimeout-1000);
+	}
+	
+	
 	private boolean checkCancel(String cmdName) {
 		
 		Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get();