Sfoglia il codice sorgente

采集程序调整,支持单通道多模块,多通道多模块

chenwen 7 mesi fa
parent
commit
0397b55dbc

+ 3 - 0
pom.xml

@@ -125,6 +125,9 @@
                     <source>${java.version}</source>
                     <target>${java.version}</target>
                     <encoding>${project.build.sourceEncoding}</encoding>
+                    <compilerArgs>
+						<compilerArg>-parameters</compilerArg>
+					</compilerArgs>
                 </configuration>
             </plugin>
 			<plugin>

+ 2 - 2
src/main/java/com/hb/proj/gather/process/DataTransRepTask.java

@@ -29,7 +29,7 @@ public class DataTransRepTask implements Runnable{
 	@Override
 	public void run() {
 		try {
-			logger.info("开始数据转换处理{}",diagramPO.getDevSerial());
+			logger.info("开始多值数据转换处理{}",diagramPO.getDevSerial());
 			WellParamVO paramConfig=DataTransConfig.get(diagramPO.getDevSerial()+"_"+diagramPO.getParamCode());
 			if(paramConfig==null) {
 				logger.info("未找到参数配置{}_{}",diagramPO.getDevSerial(),diagramPO.getParamCode());
@@ -49,7 +49,7 @@ public class DataTransRepTask implements Runnable{
 			GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
 			repService.save(diagramPO);
 			
-			logger.info("数据转换入库完成:{},{}",diagramPO.getParamCode(),diagramPO.getDisps().size());
+			logger.info("多值数据转换入库完成:{},{}",diagramPO.getParamCode(),diagramPO.getDisps().size());
 			
 			//功图诊断及量液
 			if("diagram_load".equalsIgnoreCase(diagramPO.getParamCode())) {

+ 10 - 7
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -48,17 +48,20 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 		
 		boolean isWriteBackMsg=ByteUtils.isWriteBackMsg(hexmsg); //hexmsg.startsWith("0106"); //写指令返回消息
 		boolean isReadBackMsg=ByteUtils.isReadBackMsg(hexmsg); //hexmsg.startsWith("0103");  //读指令返回消息
-		boolean valiHead=isReadBackMsg || isWriteBackMsg ;
+		//boolean valiHead=isReadBackMsg || isWriteBackMsg ;
 		
-		//开头两字节且不以0103开头,就认为是心跳数据-作为设备号,该方法并不可靠,有可能把采集的残包数据当作心跳
-		if(byteCount==2 && !valiHead) { 
+		//心跳数据-作为通道号(16进制)
+		if(ByteUtils.isHeat(hexmsg)) {
+			//String heatVal=ByteUtils.toIntStr(ByteBufUtil.getBytes(byteBuf,0,byteCount));
 			if(!ChannelGroupMgr.contains(ctx.channel())) {
-				ChannelGroupMgr.add(ctx.channel(),ByteUtils.toIntStr(ByteBufUtil.getBytes(byteBuf,0,byteCount)));
+				ChannelGroupMgr.add(ctx.channel(),hexmsg);
 				return;
 			}
 			
+			logger.info("收到心跳:{}",hexmsg);
+			
 		}
-		else if(byteCount>2 && isWriteBackMsg){ //(动液面设备返回消息不规范,不做crc校验)
+		else if(byteCount>2 && isWriteBackMsg){ //(动液面设备设置指令返回消息不规范(按原指令返回),不做crc校验)
 			
 			synchronized(ctx.channel()) {  //成功收到消息才通知可发指令,未成功收到消息则等超时
 				
@@ -101,7 +104,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 				synchronized(ctx.channel()) {  //成功收到消息才通知可发指令,未成功收到消息则等超时
 					
 					cmd=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_CMD).get();
-					logger.info("解析指令{}返回消息",cmd);
+					logger.info("解析指令{}返回消息:{}",cmd,hexmsg);
 					
 					GatherRespParserFacade.parse(byteBuf,datalen,cmd,ctx.channel());
 					
@@ -146,7 +149,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 		cause.printStackTrace();
 		String serial=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
-		logger.error("连接【{}】出现异常准备关闭",serial);
+		logger.error("连接【{}】出现异常准备关闭:{}",serial,cause.getMessage());
 		ctx.close(); 
 	}
 

+ 15 - 3
src/main/java/com/hb/proj/gather/protocol/ZlA11MsgDecoder.java

@@ -33,10 +33,16 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 		
 		boolean isWriteBackMsg=ByteUtils.isWriteBackMsg(hexmsg); //  hexmsg.startsWith("0106"); //写指令返回消息(动液面设备返回消息不规范,不做crc校验)
 		boolean isReadBackMsg=ByteUtils.isReadBackMsg(hexmsg);   //hexmsg.startsWith("0103");  //读指令返回消息
-		boolean valiHead=isReadBackMsg || isWriteBackMsg ;
+		//boolean valiHead=isReadBackMsg || isWriteBackMsg ;
 		
 		logger.debug("ZlA11MsgDecoder 解码前数据:{}", hexmsg);
 		
+		if(ByteUtils.isHeat(hexmsg)) { //心跳包
+			byteBuf.readerIndex(beginIndex+byteCount);
+			outByteBuf=byteBuf.slice(beginIndex, byteCount);
+		}
+		
+		/**
 		if(byteCount<2) {
 			return;
 		}
@@ -48,7 +54,8 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 		if(byteCount==2 && !valiHead) { //心跳
 			byteBuf.readerIndex(beginIndex+byteCount);
 			outByteBuf=byteBuf.slice(beginIndex, byteCount);
-		}
+		}**/
+		
 		else if(byteCount>2 && isReadBackMsg) {  //数据消息
 			
 			int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数
@@ -64,10 +71,15 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 			}
 			
 		}
-		else if(byteCount>2 && isWriteBackMsg) {   //(动液面设备返回消息不规范,不做格式、crc校验)
+		else if(byteCount>2 && isWriteBackMsg) {   //(动液面设备设置指令返回消息不规范,不做格式、crc校验)
 			byteBuf.readerIndex(beginIndex+byteCount);
 			outByteBuf=byteBuf.slice(beginIndex, byteCount);
 		}
+		else {  //其它定义为乱流,舍弃
+			byteBuf.readerIndex(beginIndex+byteCount);
+			return;
+		}
+		
 		
 		if(outByteBuf!=null) {
 			outByteBuf.retain();  //避免引用计数为0,后面的处理器无法使用解码后的数据

+ 21 - 2
src/main/java/com/hb/proj/gather/scheduler/GatherScheduler.java

@@ -3,6 +3,7 @@ package com.hb.proj.gather.scheduler;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Timer;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
@@ -117,6 +118,7 @@ public class GatherScheduler {
 		
 		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
 		Channel channel=null;
+		Future<Boolean> chnTskFuture=null;
 		String serial=null;
 		List<GatherDevicePO> devices=null;
 		
@@ -130,6 +132,22 @@ public class GatherScheduler {
 			}
 			
 			for(GatherDevicePO dev : devices) {
+				chnTskFuture=ChannelGroupMgr.getFuture(serial);
+				try {
+					if(chnTskFuture!=null) {
+						chnTskFuture.get(80, TimeUnit.SECONDS);  //最多等25s在执行任务(心跳20s),在25s内返回的可以继续下个任务
+					}
+					submitTask(channel,dev.getAddrNum(),deviceType,isMulti);
+				} catch (Exception e) {
+					//e.printStackTrace();
+					chnTskFuture.cancel(true);  //任务线程应该有可被中断的处理在能结束任务
+					logger.error("等待通道未执行任务25s超时,取消该通道其它任务");
+					//超过25s通道任务还未执行完,大概率通道有问题,没有必要进行该通道的其它任务
+					break;
+				} 
+				
+				
+				/*
 				if(ChannelGroupMgr.isDone(serial)) { //isDone 保证同一通道同一时间只有一个任务在执行
 					submitTask(channel,dev.getAddrNum(),deviceType,isMulti);
 				}
@@ -138,13 +156,14 @@ public class GatherScheduler {
 					try {
 						ChannelGroupMgr.getFuture(serial).get(20, TimeUnit.SECONDS);  //最多等20s在执行任务
 					} catch (Exception e) {
-						logger.error(e.getMessage());
+						e.printStackTrace();
+						logger.error("通道有任务未结束,等待20s异常:{},{}",e.getMessage());
 					} 
 					if(ChannelGroupMgr.isDone(serial)) {  //等待一定时间后,及时加入周期任务(前提等待后前一任务执行完),不用等到下个周期才执行
 						submitTask(channel,dev.getAddrNum(),deviceType,isMulti);
 						logger.warn("延后补偿采集,通道:{},设备地址号:{},采集类型:{}",serial,dev.getAddrNum(),deviceType);
 					}
-				}
+				}*/
 			}
 			
 			

+ 2 - 2
src/main/java/com/hb/proj/gather/scheduler/GatherTask.java

@@ -139,7 +139,7 @@ public class GatherTask implements Runnable{
 				
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 				channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
-				logger.info("发送完指令:{}",cmd.name());
+				logger.info("发送完指令:{},{}",cmd.name(),ByteUtils.toHexString(cmdbtyes));
 				
 				channel.wait(cmdTimeout);  //等待接收返回数据后继续,超时后自动释放锁,继续后面执行
 			}
@@ -161,7 +161,7 @@ public class GatherTask implements Runnable{
 		byteBuf.writeBytes(cmdbytes);
 		channel.writeAndFlush(byteBuf);
 		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
-		logger.info("发送完指令:{}",preCmd.name());
+		logger.info("发送完指令:{}",preCmd.name());
 		channel.wait(cmdTimeout);
 	}
 	

+ 17 - 4
src/main/java/com/hb/proj/gather/utils/ByteUtils.java

@@ -6,16 +6,29 @@ import java.util.regex.Pattern;
 
 public class ByteUtils {
 	
-	private final static Pattern writePtn=Pattern.compile("^[01][1-9]06$");
+	//private final static  Logger logger = LoggerFactory.getLogger(ByteUtils.class);
+			
+	private final static Pattern writePtn=Pattern.compile("^0[1-9]06$");
 	
-	private final static Pattern readPtn=Pattern.compile("^[01][1-9]03$");
+	private final static Pattern readPtn=Pattern.compile("^0[1-9]03$");
 	
 	public static boolean  isWriteBackMsg(String hexmsg) {
-		return writePtn.matcher(hexmsg.substring(0, 4)).matches();
+		return hexmsg.length()>=4 && writePtn.matcher(hexmsg.substring(0, 4)).matches();
 	}
 	
 	public static boolean  isReadBackMsg(String hexmsg) {
-		return readPtn.matcher(hexmsg.substring(0, 4)).matches();
+		return hexmsg.length()>=4 && readPtn.matcher(hexmsg.substring(0, 4)).matches();
+	}
+	
+	/**
+	 * 心跳包格式
+	 * LORA 8位4字节,18开头
+	 * DTU  4位2字节,17开头
+	 * @param hexmsg
+	 * @return
+	 */
+	public static boolean isHeat(String hexmsg) {
+		return (hexmsg.length()==8 && hexmsg.startsWith("18")) || (hexmsg.length()==4 && hexmsg.startsWith("17"));
 	}