Browse Source

增加设备连接无效检测

chenwen 1 năm trước cách đây
mục cha
commit
49d26334c5

+ 47 - 4
src/main/java/com/hb/proj/gather/business/GatherTask.java

@@ -1,5 +1,7 @@
 package com.hb.proj.gather.business;
 package com.hb.proj.gather.business;
 
 
+import java.util.Date;
+
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
@@ -16,6 +18,10 @@ public class GatherTask implements Runnable{
 
 
 	private Channel  channel;
 	private Channel  channel;
 	
 	
+	private long cmdTimeout=35*1000; //指令等待回复超时时间 30s
+	
+	private int timeoutCount=0;  //超时发送指令次数
+	
 	public GatherTask(Channel channel) {
 	public GatherTask(Channel channel) {
 		this.channel=channel;
 		this.channel=channel;
 	}
 	}
@@ -41,13 +47,22 @@ public class GatherTask implements Runnable{
 			ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.PRESS_TEMP_LOAD,ZLOpdProtCMDEnum.CURR_VOL_LOS_PW,ZLOpdProtCMDEnum.FREQ_STROKE};
 			ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.PRESS_TEMP_LOAD,ZLOpdProtCMDEnum.CURR_VOL_LOS_PW,ZLOpdProtCMDEnum.FREQ_STROKE};
 			ByteBuf byteBuf=null;
 			ByteBuf byteBuf=null;
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 			for(ZLOpdProtCMDEnum cmd : cmds) {
-				byteBuf=alloc.directBuffer();
 				
 				
+				if(needCloseChannel()) {
+					ChannelGroupMgr.disconnect(channel);
+					logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连");
+					return;
+				}
+				
+				byteBuf=alloc.directBuffer();
 				byteBuf.writeBytes(cmd.getCmd());
 				byteBuf.writeBytes(cmd.getCmd());
 				channel.writeAndFlush(byteBuf);
 				channel.writeAndFlush(byteBuf);
+				
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+				channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
 				logger.info("发送完指令:{}",cmd.name());
 				logger.info("发送完指令:{}",cmd.name());
-				channel.wait(30*1000);  //等待接收返回数据后继续,此处释放锁,回复还未收到因超时释放锁,就被多值任务获得锁并发指令,会导致两个指令间隔很短
+				
+				channel.wait(cmdTimeout);  //等待接收返回数据后继续,此处释放锁,回复还未收到因超时释放锁,就被多值任务获得锁并发指令,会导致两个指令间隔很短
 			}
 			}
 		}
 		}
 		catch (InterruptedException e) {
 		catch (InterruptedException e) {
@@ -91,12 +106,22 @@ public class GatherTask implements Runnable{
 			};
 			};
 			ByteBuf byteBuf=null;
 			ByteBuf byteBuf=null;
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 			for(ZLOpdProtCMDEnum cmd : cmds) {
+				
+				if(needCloseChannel()) {
+					ChannelGroupMgr.disconnect(channel);
+					logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连");
+					return;
+				}
+				
 				byteBuf=alloc.directBuffer();
 				byteBuf=alloc.directBuffer();
 				byteBuf.writeBytes(cmd.getCmd());
 				byteBuf.writeBytes(cmd.getCmd());
 				channel.writeAndFlush(byteBuf);
 				channel.writeAndFlush(byteBuf);
+				
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+				channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
 				logger.info("发送完后指令:{}",cmd.name());
 				logger.info("发送完后指令:{}",cmd.name());
-				channel.wait(30*1000);  //等待接收返回数据后继续,超时后自动释放锁,继续后面执行
+				
+				channel.wait(cmdTimeout);  //等待接收返回数据后继续,超时后自动释放锁,继续后面执行
 			}
 			}
 		}
 		}
 		catch (InterruptedException e) {
 		catch (InterruptedException e) {
@@ -115,6 +140,24 @@ public class GatherTask implements Runnable{
 		channel.writeAndFlush(byteBuf);
 		channel.writeAndFlush(byteBuf);
 		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
 		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
 		logger.info("发送完后指令:{}",preCmd.name());
 		logger.info("发送完后指令:{}",preCmd.name());
-		channel.wait(30*1000);
+		channel.wait(cmdTimeout);
+	}
+	
+	private boolean needCloseChannel() {
+		Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get();
+		if(pre==null||pre==0) {
+			return false;
+		}
+		boolean isTimeout= ((new Date()).getTime()-pre.longValue())>(cmdTimeout-5000);
+		
+		if(!isTimeout) {
+			timeoutCount=0;  //有一次不超时就清空超时次数
+		}
+		else {
+			timeoutCount+=1;  //记录一次任务中连续超时次数
+		}
+		
+		return timeoutCount>1;  //多次超时(2次及以上)需要关闭通道,等待重连
+		
 	}
 	}
 }
 }

+ 15 - 1
src/main/java/com/hb/proj/gather/protocol/ChannelGroupMgr.java

@@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
 import io.netty.channel.Channel;
 import io.netty.channel.Channel;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.ChannelGroupFuture;
+import io.netty.channel.group.ChannelMatcher;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.util.AttributeKey;
 import io.netty.util.AttributeKey;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.GlobalEventExecutor;
@@ -28,7 +29,9 @@ public class ChannelGroupMgr {
 	
 	
 	public  static final AttributeKey<String> ATTR_KEY_CMD=AttributeKey.valueOf("cmd");  //当前正执行的指令 指令枚举对象名称
 	public  static final AttributeKey<String> ATTR_KEY_CMD=AttributeKey.valueOf("cmd");  //当前正执行的指令 指令枚举对象名称
 	
 	
-	public  static final AttributeKey<Boolean> ATTR_KEY_DIAGRAM_READY=AttributeKey.valueOf("false");  //是否可以采集功图数据(是否已准备好)
+	public  static final AttributeKey<Boolean> ATTR_KEY_DIAGRAM_READY=AttributeKey.valueOf("diagram_ready");  //是否可以采集功图数据(是否已准备好)
+	
+	public  static final AttributeKey<Long> ATTR_KEY_PRE_TIME=AttributeKey.valueOf("pre_time");  //上一指令时间,检测是否超时发送指令
 	
 	
 	public static void add(Channel channel,String serial) {
 	public static void add(Channel channel,String serial) {
 		logger.info("增加客户端通道:{}",serial);
 		logger.info("增加客户端通道:{}",serial);
@@ -76,4 +79,15 @@ public class ChannelGroupMgr {
 		return null;
 		return null;
 	}
 	}
 	
 	
+	
+	public static void disconnect(Channel targetCha) {
+		CHANNEL_GROUP.disconnect(new ChannelMatcher() {
+
+			@Override
+			public boolean matches(Channel channel) {
+				return targetCha.id().equals(channel.id());
+			}
+		});
+	}
+	
 }
 }

+ 1 - 1
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -101,7 +101,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 					
 					
 					if("DIAGRAM_POINT_COUNT".equalsIgnoreCase(cmd)) {  //如果是功图点数检测,还需要明确结果
 					if("DIAGRAM_POINT_COUNT".equalsIgnoreCase(cmd)) {  //如果是功图点数检测,还需要明确结果
 						short pcount=GatherRespParser.parseDiagramPoint(byteBuf,headBtyCount);
 						short pcount=GatherRespParser.parseDiagramPoint(byteBuf,headBtyCount);
-						logger.debug("功图点数{}",pcount);
+						logger.warn("功图点数{}",pcount);
 						ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(pcount==250);
 						ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(pcount==250);
 					}
 					}
 					else{
 					else{