package com.hb.proj.gather.scheduler; import java.util.Date; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.hb.proj.gather.protocol.ChannelGroupMgr; import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum; import com.hb.proj.gather.utils.ByteUtils; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.Channel; public class GatherTask implements Runnable{ private final static Logger logger = LoggerFactory.getLogger(GatherTask.class); private Channel channel; //设备地址号,默认为1(DTU模式:一dtu一采集设备地址号默认1,LORA模式一网关多采集设备) private int addrNum=1; private Boolean isMulti=false; //是否为多值采集 private long cmdTimeout=25*1000; //指令等待回复超时时间 2次超时=50s 不能超过采集周期60s private int timeoutCount=0; //超时发送指令次数 public GatherTask(Channel channel,int addrNum,Boolean isMulti) { this.channel=channel; this.isMulti=isMulti; this.addrNum=addrNum; } @Override public void run() { //新的采集开始先复位 ATTR_KEY_PRE_TIME,timeoutCount channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set(null); timeoutCount=0; ByteBufAllocator alloc=channel.alloc(); synchronized(channel) { if(isMulti) { multiGather(alloc); } else { singleGahter(alloc); } } } private void singleGahter(ByteBufAllocator alloc) { try { ZLOpdProtCMDEnum[] cmds= {ZLOpdProtCMDEnum.SINGLE_PRESS_TEMP_LOAD, ZLOpdProtCMDEnum.SINGLE_CURR_VOL_LOS_PW, ZLOpdProtCMDEnum.SINGLE_FREQ_STROKE}; ByteBuf byteBuf=null; byte[] cmdbtyes=null; for(ZLOpdProtCMDEnum cmd : cmds) { if(needCloseChannel()) { ChannelGroupMgr.disconnect(channel); logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连"); return; } byteBuf=alloc.directBuffer(); cmdbtyes=cmd.getCmd(); cmdbtyes[0]=(byte)addrNum; byteBuf.writeBytes(cmdbtyes); channel.writeAndFlush(byteBuf); channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name()); channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime()); logger.info("发送完指令:{},{}",cmd.name(),ByteUtils.toHexString(cmdbtyes)); channel.wait(cmdTimeout); //等待接收返回数据后继续,此处释放锁,回复还未收到因超时释放锁,就被多值任务获得锁并发指令,会导致两个指令间隔很短 } } catch (InterruptedException e) { e.printStackTrace(); logger.error("定时采集出现异常:{}",e.getMessage()); } } private void multiGather(ByteBufAllocator alloc) { try { checkDiagramPoint(alloc); if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { logger.info("功图数据还未准备就绪,准备重试一次"); Thread.sleep(1000); //重试一次 checkDiagramPoint(alloc); } if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { logger.warn("功图数据还未准备就绪"); return; } ZLOpdProtCMDEnum[] cmds= { ZLOpdProtCMDEnum.DIAGRAM_DISP_1, ZLOpdProtCMDEnum.DIAGRAM_DISP_2, ZLOpdProtCMDEnum.DIAGRAM_DISP_3, ZLOpdProtCMDEnum.DIAGRAM_LOAD_1, ZLOpdProtCMDEnum.DIAGRAM_LOAD_2, ZLOpdProtCMDEnum.DIAGRAM_LOAD_3, ZLOpdProtCMDEnum.DIAGRAM_CURR_1, ZLOpdProtCMDEnum.DIAGRAM_CURR_2, ZLOpdProtCMDEnum.DIAGRAM_CURR_3, ZLOpdProtCMDEnum.DIAGRAM_POWER_1, ZLOpdProtCMDEnum.DIAGRAM_POWER_2, ZLOpdProtCMDEnum.DIAGRAM_POWER_3 }; ByteBuf byteBuf=null; timeoutCount=0 ; //多值采集开始前复位该值(因为单值与多值有5分钟间隔,该间隔不能算作超时) byte[] cmdbtyes=null; for(ZLOpdProtCMDEnum cmd : cmds) { if(needCloseChannel()) { ChannelGroupMgr.disconnect(channel); logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连"); return; } byteBuf=alloc.directBuffer(); cmdbtyes=cmd.getCmd(); cmdbtyes[0]=(byte)addrNum; byteBuf.writeBytes(cmdbtyes); channel.writeAndFlush(byteBuf); channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name()); channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime()); logger.info("发送完后指令:{}",cmd.name()); channel.wait(cmdTimeout); //等待接收返回数据后继续,超时后自动释放锁,继续后面执行 } } catch (InterruptedException e) { e.printStackTrace(); logger.error("定时采集出现异常:{}",e.getMessage()); } } private void checkDiagramPoint(ByteBufAllocator alloc) throws InterruptedException { channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(false); //避免之前的功图检测状态还在,且此处因超时返回继续后面的判断 ZLOpdProtCMDEnum preCmd=ZLOpdProtCMDEnum.DIAGRAM_POINT_COUNT; ByteBuf byteBuf=alloc.directBuffer(); byte[] cmdbytes=preCmd.getCmd(); cmdbytes[0]=(byte)addrNum; byteBuf.writeBytes(cmdbytes); channel.writeAndFlush(byteBuf); channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name()); logger.info("发送完后指令:{}",preCmd.name()); channel.wait(cmdTimeout); } private boolean needCloseChannel() { Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get(); if(pre==null||pre==0) { return false; } boolean isTimeout= ((new Date()).getTime()-pre.longValue())>(cmdTimeout-1000); if(!isTimeout) { timeoutCount=0; //有一次不超时就清空超时次数 } else { timeoutCount+=1; //记录一次任务中连续超时次数 } return timeoutCount>1; //多次超时(2次及以上)需要关闭通道,等待重连 } }