package com.hb.proj.gather.protocol; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.hb.proj.gather.utils.ByteUtils; import com.hb.proj.gather.utils.Crc16Utils; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * 按协议解析数据 zl-opd mudbus 参考 A11标准 * @author cwen * */ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter { private final static Logger logger = LoggerFactory.getLogger(ZLOpdProtHandler.class); //整个处理链路中只执行一次,顺序靠前的执行 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { logger.info("有设备连接上:{}",ctx.channel().remoteAddress()); } /** * 协议标准:0103[数据区字节数 1字节][数据区 若干字节][CRC16校验 2字节] * 01:dtu上位地址 03:表示读取 * * 心跳2字节,间隔约30s */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //msg:如果设置了decoder编码器,则msg为编码后的类型,可强制转换 ByteBuf byteBuf=(ByteBuf)msg; if(!byteBuf.isReadable()) { logger.info("没有数据可接收"); return; } String hexmsg=ByteBufUtil.hexDump(byteBuf); logger.debug("接收到数据:{}",hexmsg); //byte[] temp=ByteBufUtil.getBytes(byteBuf, 0, 2); //两字节都没有既不是心跳也不是采集数据,忽略 int byteCount=byteBuf.readableBytes(); if(byteCount<2) { return ; } //byte[] allBytes=new byte[byteBuf.readableBytes()]; //byteBuf.readBytes(allBytes); //开头两字节且不以0103开头,就认为是心跳数据-作为设备号,该方法并不可靠,有可能把采集的残包数据当作心跳 if(byteCount==2&&(!hexmsg.startsWith("0103"))) { if(!ChannelGroupMgr.contains(ctx.channel())) { ChannelGroupMgr.add(ctx.channel(),ByteUtils.toIntStr(ByteBufUtil.getBytes(byteBuf,0,byteCount))); return; } } else if(byteCount>2&&hexmsg.startsWith("0103")){ int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数 int datalen=byteBuf.getByte(2)&0xff; //数据区字节数 byteBuf.get方法不改变readIndex,writeIndex,readXX方法会 logger.info("数据字节长度:{}",datalen); if(byteCount<(datalen+headBtyCount+crc16BtyCount)) { // 读取的字节数量不够---拆包了,目前处理:舍弃 return; } //读取头部+数据区 /* byte[] headAndDatas=new byte[datalen+headBtyCount]; byteBuf.readerIndex(0); //重置读索引至起始 byteBuf.readBytes(headAndDatas); */ byte[] headAndDatas=ByteBufUtil.getBytes(byteBuf,0,headBtyCount+datalen); //读取校验位 /* byte[] crc16=new byte[crc16BtyCount]; byteBuf.readerIndex(datalen+headBtyCount); byteBuf.readBytes(crc16);*/ byte[] crc16=ByteBufUtil.getBytes(byteBuf,headBtyCount+datalen,2); int calCrc16=Crc16Utils.getCRC(headAndDatas); logger.info("接收CRC:{}:{},计算CRC:{}",ByteUtils.toHexString(crc16),ByteUtils.byte2ToIntHL(crc16),calCrc16); if(ByteUtils.byte2ToIntHL(crc16)==calCrc16) { //crc校验通过 List smpdatas=GatherRespParser.parseFloat(byteBuf,headBtyCount,datalen); } } /** * 继续流转到下个handler 直到最后默认的tail handler 由它来释放 * 关键点:数据能流转到最后、数据类型为byteBuf 中途没有被改变 * * 也可创建继承SimpleChanneInboundHandler的自定义handler,实现channelRead0方法。 * SimpleChannelInboundHandler负责释放 */ ctx.fireChannelRead(msg); } }