GatherTask.java 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. package com.hb.proj.gather.scheduler;
  2. import java.util.Date;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import com.hb.proj.gather.protocol.ChannelGroupMgr;
  6. import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
  7. import com.hb.proj.gather.utils.ByteUtils;
  8. import io.netty.buffer.ByteBuf;
  9. import io.netty.buffer.ByteBufAllocator;
  10. import io.netty.channel.Channel;
  11. public class GatherTask implements Runnable{
  12. private final static Logger logger = LoggerFactory.getLogger(GatherTask.class);
  13. private Channel channel;
  14. //设备地址号,默认为1(DTU模式:一dtu一采集设备地址号默认1,LORA模式一网关多采集设备)
  15. private int addrNum=1;
  16. private Boolean isMulti=false; //是否为多值采集
  17. private long cmdTimeout=25*1000; //指令等待回复超时时间 2次超时=50s 不能超过采集周期60s
  18. private int timeoutCount=0; //超时发送指令次数
  19. public GatherTask(Channel channel,int addrNum,Boolean isMulti) {
  20. this.channel=channel;
  21. this.isMulti=isMulti;
  22. this.addrNum=addrNum;
  23. }
  24. @Override
  25. public void run() {
  26. //新的采集开始先复位 ATTR_KEY_PRE_TIME,timeoutCount
  27. channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set(null);
  28. timeoutCount=0;
  29. ByteBufAllocator alloc=channel.alloc();
  30. synchronized(channel) {
  31. if(isMulti) {
  32. multiGather(alloc);
  33. }
  34. else {
  35. singleGahter(alloc);
  36. }
  37. }
  38. }
  39. private void singleGahter(ByteBufAllocator alloc) {
  40. try {
  41. ZLOpdProtCMDEnum[] cmds= {ZLOpdProtCMDEnum.SINGLE_PRESS_TEMP_LOAD,
  42. ZLOpdProtCMDEnum.SINGLE_CURR_VOL_LOS_PW,
  43. ZLOpdProtCMDEnum.SINGLE_FREQ_STROKE};
  44. ByteBuf byteBuf=null;
  45. byte[] cmdbtyes=null;
  46. for(ZLOpdProtCMDEnum cmd : cmds) {
  47. if(needCloseChannel()) {
  48. ChannelGroupMgr.disconnect(channel);
  49. logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连");
  50. return;
  51. }
  52. byteBuf=alloc.directBuffer();
  53. cmdbtyes=cmd.getCmd();
  54. cmdbtyes[0]=(byte)addrNum;
  55. byteBuf.writeBytes(cmdbtyes);
  56. channel.writeAndFlush(byteBuf);
  57. channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
  58. channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
  59. logger.info("发送完指令:{},{}",cmd.name(),ByteUtils.toHexString(cmdbtyes));
  60. channel.wait(cmdTimeout); //等待接收返回数据后继续,此处释放锁,回复还未收到因超时释放锁,就被多值任务获得锁并发指令,会导致两个指令间隔很短
  61. }
  62. }
  63. catch (InterruptedException e) {
  64. e.printStackTrace();
  65. logger.error("定时采集出现异常:{}",e.getMessage());
  66. }
  67. }
  68. private void multiGather(ByteBufAllocator alloc) {
  69. try {
  70. checkDiagramPoint(alloc);
  71. if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) {
  72. logger.info("功图数据还未准备就绪,准备重试一次");
  73. Thread.sleep(1000); //重试一次
  74. checkDiagramPoint(alloc);
  75. }
  76. if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) {
  77. logger.warn("功图数据还未准备就绪");
  78. return;
  79. }
  80. ZLOpdProtCMDEnum[] cmds= {
  81. ZLOpdProtCMDEnum.DIAGRAM_DISP_1,
  82. ZLOpdProtCMDEnum.DIAGRAM_DISP_2,
  83. ZLOpdProtCMDEnum.DIAGRAM_DISP_3,
  84. ZLOpdProtCMDEnum.DIAGRAM_LOAD_1,
  85. ZLOpdProtCMDEnum.DIAGRAM_LOAD_2,
  86. ZLOpdProtCMDEnum.DIAGRAM_LOAD_3,
  87. ZLOpdProtCMDEnum.DIAGRAM_CURR_1,
  88. ZLOpdProtCMDEnum.DIAGRAM_CURR_2,
  89. ZLOpdProtCMDEnum.DIAGRAM_CURR_3,
  90. ZLOpdProtCMDEnum.DIAGRAM_POWER_1,
  91. ZLOpdProtCMDEnum.DIAGRAM_POWER_2,
  92. ZLOpdProtCMDEnum.DIAGRAM_POWER_3
  93. };
  94. ByteBuf byteBuf=null;
  95. timeoutCount=0 ; //多值采集开始前复位该值(因为单值与多值有5分钟间隔,该间隔不能算作超时)
  96. byte[] cmdbtyes=null;
  97. for(ZLOpdProtCMDEnum cmd : cmds) {
  98. if(needCloseChannel()) {
  99. ChannelGroupMgr.disconnect(channel);
  100. logger.error("设备多次未对指令及时响应,准备关闭连接,等待重连");
  101. return;
  102. }
  103. byteBuf=alloc.directBuffer();
  104. cmdbtyes=cmd.getCmd();
  105. cmdbtyes[0]=(byte)addrNum;
  106. byteBuf.writeBytes(cmdbtyes);
  107. channel.writeAndFlush(byteBuf);
  108. channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
  109. channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
  110. logger.info("发送完后指令:{}",cmd.name());
  111. channel.wait(cmdTimeout); //等待接收返回数据后继续,超时后自动释放锁,继续后面执行
  112. }
  113. }
  114. catch (InterruptedException e) {
  115. e.printStackTrace();
  116. logger.error("定时采集出现异常:{}",e.getMessage());
  117. }
  118. }
  119. private void checkDiagramPoint(ByteBufAllocator alloc) throws InterruptedException {
  120. channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(false); //避免之前的功图检测状态还在,且此处因超时返回继续后面的判断
  121. ZLOpdProtCMDEnum preCmd=ZLOpdProtCMDEnum.DIAGRAM_POINT_COUNT;
  122. ByteBuf byteBuf=alloc.directBuffer();
  123. byte[] cmdbytes=preCmd.getCmd();
  124. cmdbytes[0]=(byte)addrNum;
  125. byteBuf.writeBytes(cmdbytes);
  126. channel.writeAndFlush(byteBuf);
  127. channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
  128. logger.info("发送完后指令:{}",preCmd.name());
  129. channel.wait(cmdTimeout);
  130. }
  131. private boolean needCloseChannel() {
  132. Long pre=channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).get();
  133. if(pre==null||pre==0) {
  134. return false;
  135. }
  136. boolean isTimeout= ((new Date()).getTime()-pre.longValue())>(cmdTimeout-1000);
  137. if(!isTimeout) {
  138. timeoutCount=0; //有一次不超时就清空超时次数
  139. }
  140. else {
  141. timeoutCount+=1; //记录一次任务中连续超时次数
  142. }
  143. return timeoutCount>1; //多次超时(2次及以上)需要关闭通道,等待重连
  144. }
  145. }