123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- package com.hb.proj.gather.scheduler;
- import java.util.Date;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.hb.proj.gather.protocol.ChannelGroupMgr;
- import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
- import com.hb.proj.gather.utils.ByteUtils;
- import io.netty.buffer.ByteBuf;
- import io.netty.buffer.ByteBufAllocator;
- import io.netty.channel.Channel;
- public class GatherTask implements Runnable{
- private final static Logger logger = LoggerFactory.getLogger(GatherTask.class);
- private Channel channel;
-
- //设备地址号,默认为1(DTU模式:一dtu一采集设备地址号默认1,LORA模式一网关多采集设备)
- private int addrNum=1;
-
- private Boolean isMulti=false; //是否为多值采集
-
- private long cmdTimeout=25*1000; //指令等待回复超时时间 2次超时=50s 不能超过采集周期60s
-
- private int timeoutCount=0; //超时发送指令次数
-
- public GatherTask(Channel channel,int addrNum,Boolean isMulti) {
- this.channel=channel;
- this.isMulti=isMulti;
- this.addrNum=addrNum;
- }
-
- @Override
- public void run() {
- //新的采集开始先复位 ATTR_KEY_PRE_TIME,timeoutCount
- channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set(null);
- timeoutCount=0;
-
- ByteBufAllocator alloc=channel.alloc();
- synchronized(channel) {
-
- if(isMulti) {
- multiGather(alloc);
- }
- else {
- singleGahter(alloc);
- }
- }
- }
-
- private void singleGahter(ByteBufAllocator alloc) {
- try {
- ZLOpdProtCMDEnum[] cmds= {ZLOpdProtCMDEnum.SINGLE_PRESS_TEMP_LOAD,
- ZLOpdProtCMDEnum.SINGLE_CURR_VOL_LOS_PW,
- ZLOpdProtCMDEnum.SINGLE_FREQ_STROKE};
- ByteBuf byteBuf=null;
- byte[] cmdbtyes=null;
- for(ZLOpdProtCMDEnum cmd : cmds) {
-
- if(needCloseChannel()) {
- ChannelGroupMgr.disconnect(channel);
- logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连");
- return;
- }
-
- byteBuf=alloc.directBuffer();
- cmdbtyes=cmd.getCmd();
- cmdbtyes[0]=(byte)addrNum;
- byteBuf.writeBytes(cmdbtyes);
- channel.writeAndFlush(byteBuf);
-
- channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
- channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
- logger.info("发送完指令:{},{}",cmd.name(),ByteUtils.toHexString(cmdbtyes));
-
- channel.wait(cmdTimeout); //等待接收返回数据后继续,此处释放锁,回复还未收到因超时释放锁,就被多值任务获得锁并发指令,会导致两个指令间隔很短
- }
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- logger.error("定时采集出现异常:{}",e.getMessage());
- }
- }
-
-
-
-
- private void multiGather(ByteBufAllocator alloc) {
- try {
- checkDiagramPoint(alloc);
-
- if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) {
- logger.info("功图数据还未准备就绪,准备重试一次");
- Thread.sleep(1000); //重试一次
- checkDiagramPoint(alloc);
- }
-
- if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) {
- logger.warn("功图数据还未准备就绪");
- return;
- }
-
- ZLOpdProtCMDEnum[] cmds= {
- ZLOpdProtCMDEnum.DIAGRAM_DISP_1,
- ZLOpdProtCMDEnum.DIAGRAM_DISP_2,
- ZLOpdProtCMDEnum.DIAGRAM_DISP_3,
- ZLOpdProtCMDEnum.DIAGRAM_LOAD_1,
- ZLOpdProtCMDEnum.DIAGRAM_LOAD_2,
- ZLOpdProtCMDEnum.DIAGRAM_LOAD_3,
- ZLOpdProtCMDEnum.DIAGRAM_CURR_1,
- ZLOpdProtCMDEnum.DIAGRAM_CURR_2,
- ZLOpdProtCMDEnum.DIAGRAM_CURR_3,
- ZLOpdProtCMDEnum.DIAGRAM_POWER_1,
- ZLOpdProtCMDEnum.DIAGRAM_POWER_2,
- ZLOpdProtCMDEnum.DIAGRAM_POWER_3
-
- };
-
- ByteBuf byteBuf=null;
- timeoutCount=0 ; //多值采集开始前复位该值(因为单值与多值有5分钟间隔,该间隔不能算作超时)
- byte[] cmdbtyes=null;
- for(ZLOpdProtCMDEnum cmd : cmds) {
-
- if(needCloseChannel()) {
- ChannelGroupMgr.disconnect(channel);
- logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连");
- return;
- }
-
- byteBuf=alloc.directBuffer();
- cmdbtyes=cmd.getCmd();
- cmdbtyes[0]=(byte)addrNum;
- byteBuf.writeBytes(cmdbtyes);
- channel.writeAndFlush(byteBuf);
-
- channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
- channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
- logger.info("发送完后指令:{}",cmd.name());
-
- channel.wait(cmdTimeout); //等待接收返回数据后继续,超时后自动释放锁,继续后面执行
- }
- }
- catch (InterruptedException e) {
- e.printStackTrace();
- logger.error("定时采集出现异常:{}",e.getMessage());
- }
-
- }
-
- private void checkDiagramPoint(ByteBufAllocator alloc) throws InterruptedException {
- channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(false); //避免之前的功图检测状态还在,且此处因超时返回继续后面的判断
-
- ZLOpdProtCMDEnum preCmd=ZLOpdProtCMDEnum.DIAGRAM_POINT_COUNT;
- ByteBuf byteBuf=alloc.directBuffer();
- byte[] cmdbytes=preCmd.getCmd();
- cmdbytes[0]=(byte)addrNum;
- byteBuf.writeBytes(cmdbytes);
- channel.writeAndFlush(byteBuf);
- channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
- logger.info("发送完后指令:{}",preCmd.name());
- channel.wait(cmdTimeout);
- }
-
- private boolean needCloseChannel() {
- Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get();
- if(pre==null||pre==0) {
- return false;
- }
- boolean isTimeout= ((new Date()).getTime()-pre.longValue())>(cmdTimeout-1000);
-
-
- if(!isTimeout) {
- timeoutCount=0; //有一次不超时就清空超时次数
- }
- else {
- timeoutCount+=1; //记录一次任务中连续超时次数
- }
-
- return timeoutCount>1; //多次超时(2次及以上)需要关闭通道,等待重连
-
-
-
- }
- }
|