package com.hb.proj.gather.scheduler; import java.util.ArrayList; import java.util.Date; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.hb.proj.gather.protocol.ChannelGroupMgr; import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; /** * 动液面采集任务 具体执行采集任务,下发采集指令 * @author cwen * */ public class GatherLiquidTask implements Runnable { private final static Logger logger = LoggerFactory.getLogger(GatherLiquidTask.class); private Channel channel; private long cmdTimeout=25*1000; //指令等待回复超时时间 private int timeoutCount=0; //超时发送指令次数 public GatherLiquidTask(Channel channel) { this.channel=channel; } /** * 约2-5s采集完 */ @Override public void run() { logger.info("动液面采集开始..."); List cmds=new ArrayList<>(41); cmds.add(ZLOpdProtCMDEnum.LIQUID_OTHER); for(int i=1;i<41;i++) { cmds.add(ZLOpdProtCMDEnum.valueOf("LIQUID_SERIAL_"+i)); } try { ByteBufAllocator alloc=channel.alloc(); ByteBuf byteBuf=null; synchronized(channel) { channel.attr(ChannelGroupMgr.ATTR_KEY_STOP_NEXT).set(false); //每次任务开始前重置 for(ZLOpdProtCMDEnum cmd : cmds) { if(needCloseChannel()) { ChannelGroupMgr.disconnect(channel); logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连"); return; } byteBuf=alloc.directBuffer(); byteBuf.writeBytes(cmd.getCmd()); channel.writeAndFlush(byteBuf); channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name()); channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime()); logger.info("发送完指令:{}",cmd.name()); channel.wait(cmdTimeout); //等待接收返回数据后继续,最多等待cmdTimeout,此处释放锁,回复还未收到就被多值任务获得锁并发指令,会导致两个指令间隔很短 if(channel.attr(ChannelGroupMgr.ATTR_KEY_STOP_NEXT).get()) { logger.info("本次动液面采集取消(数据已采集过)"); return; } if(checkCancel(cmd.name())) { logger.info("本次动液面采集取消(中途有数据未采到)"); return; } } } } catch (InterruptedException e) { e.printStackTrace(); logger.error("定时采集出现异常:{}",e.getMessage()); } } private boolean checkCancel(String cmdName) { Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get(); if(pre==null||pre==0) { return false; } boolean isTimeout=((new Date()).getTime()-pre.longValue())>(cmdTimeout-1000); if(!isTimeout) { return false; } //超时后的处理 if(cmdName.startsWith("LIQUID_OTHER")) { return true; } else if(cmdName.startsWith("LIQUID_SERIAL")){ int num=Integer.parseInt((cmdName.split("_"))[2]); if(num<30) { //只要前30包数据有效就可以继续 return true; } } return false; } private boolean needCloseChannel() { Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get(); if(pre==null||pre==0) { return false; } boolean isTimeout= ((new Date()).getTime()-pre.longValue())>(cmdTimeout-1000); if(!isTimeout) { timeoutCount=0; //有一次不超时就清空超时次数 } else { timeoutCount+=1; //记录一次任务中连续超时次数 } return timeoutCount>1; //多次超时(2次及以上)需要关闭通道,等待重连 } }