ZLOpdProtHandler.java 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package com.hb.proj.gather.protocol;
  2. import java.util.List;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import com.hb.proj.gather.utils.ByteUtils;
  6. import com.hb.proj.gather.utils.Crc16Utils;
  7. import io.netty.buffer.ByteBuf;
  8. import io.netty.buffer.ByteBufUtil;
  9. import io.netty.channel.ChannelHandlerContext;
  10. import io.netty.channel.ChannelInboundHandlerAdapter;
  11. /**
  12. * 按协议解析数据 zl-opd mudbus 参考 A11标准
  13. * @author cwen
  14. *
  15. */
  16. public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
  17. private final static Logger logger = LoggerFactory.getLogger(ZLOpdProtHandler.class);
  18. //整个处理链路中只执行一次,顺序靠前的执行
  19. @Override
  20. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  21. logger.info("有设备连接上:{}",ctx.channel().remoteAddress());
  22. }
  23. /**
  24. * 协议标准:0103[数据区字节数 1字节][数据区 若干字节][CRC16校验 2字节]
  25. * 01:dtu上位地址 03:表示读取
  26. *
  27. * 心跳2字节,间隔约30s
  28. */
  29. @Override
  30. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  31. //msg:如果设置了decoder编码器,则msg为编码后的类型,可强制转换
  32. ByteBuf byteBuf=(ByteBuf)msg;
  33. if(!byteBuf.isReadable()) {
  34. logger.info("没有数据可接收");
  35. return;
  36. }
  37. String hexmsg=ByteBufUtil.hexDump(byteBuf);
  38. logger.debug("接收到数据:{}",hexmsg);
  39. //byte[] temp=ByteBufUtil.getBytes(byteBuf, 0, 2);
  40. //两字节都没有既不是心跳也不是采集数据,忽略
  41. int byteCount=byteBuf.readableBytes();
  42. if(byteCount<2) {
  43. return ;
  44. }
  45. //byte[] allBytes=new byte[byteBuf.readableBytes()];
  46. //byteBuf.readBytes(allBytes);
  47. //开头两字节且不以0103开头,就认为是心跳数据-作为设备号,该方法并不可靠,有可能把采集的残包数据当作心跳
  48. if(byteCount==2&&(!hexmsg.startsWith("0103"))) {
  49. if(!ChannelGroupMgr.contains(ctx.channel())) {
  50. ChannelGroupMgr.add(ctx.channel(),ByteUtils.toIntStr(ByteBufUtil.getBytes(byteBuf,0,byteCount)));
  51. return;
  52. }
  53. }
  54. else if(byteCount>2&&hexmsg.startsWith("0103")){
  55. int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数
  56. int datalen=byteBuf.getByte(2)&0xff; //数据区字节数 byteBuf.get方法不改变readIndex,writeIndex,readXX方法会
  57. logger.info("数据字节长度:{}",datalen);
  58. if(byteCount<(datalen+headBtyCount+crc16BtyCount)) { // 读取的字节数量不够---拆包了,目前处理:舍弃
  59. return;
  60. }
  61. //读取头部+数据区
  62. /*
  63. byte[] headAndDatas=new byte[datalen+headBtyCount];
  64. byteBuf.readerIndex(0); //重置读索引至起始
  65. byteBuf.readBytes(headAndDatas);
  66. */
  67. byte[] headAndDatas=ByteBufUtil.getBytes(byteBuf,0,headBtyCount+datalen);
  68. //读取校验位
  69. /*
  70. byte[] crc16=new byte[crc16BtyCount];
  71. byteBuf.readerIndex(datalen+headBtyCount);
  72. byteBuf.readBytes(crc16);*/
  73. byte[] crc16=ByteBufUtil.getBytes(byteBuf,headBtyCount+datalen,2);
  74. int calCrc16=Crc16Utils.getCRC(headAndDatas);
  75. logger.info("接收CRC:{}:{},计算CRC:{}",ByteUtils.toHexString(crc16),ByteUtils.byte2ToIntHL(crc16),calCrc16);
  76. if(ByteUtils.byte2ToIntHL(crc16)==calCrc16) { //crc校验通过
  77. List<Float> smpdatas=GatherRespParser.parseFloat(byteBuf,headBtyCount,datalen);
  78. }
  79. }
  80. /**
  81. * 继续流转到下个handler 直到最后默认的tail handler 由它来释放
  82. * 关键点:数据能流转到最后、数据类型为byteBuf 中途没有被改变
  83. *
  84. * 也可创建继承SimpleChanneInboundHandler的自定义handler,实现channelRead0方法。
  85. * SimpleChannelInboundHandler负责释放
  86. */
  87. ctx.fireChannelRead(msg);
  88. }
  89. }