chenwen 1 год назад
Родитель
Сommit
f932f8434e

+ 1 - 0
src/main/java/com/hb/proj/gather/protocol/GatherRespParser.java

@@ -44,6 +44,7 @@ public class GatherRespParser {
 			 if(po!=null) {
 				 GatherTaskExecutor.execute(new DataTransRepTask(po)); 
 			 }
+			 logger.info("解析完数据{}",cmd);
 		}
 	}
 			

+ 1 - 0
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -76,6 +76,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 			//logger.info("数据字节长度:{}",datalen);
 			
 			if(byteCount<(datalen+headBtyCount+crc16BtyCount)) {  // 读取的字节数量不够---拆包了,目前处理:舍弃
+				logger.info("数据包被拆分,取消后续");
 				return;
 			}
 			

+ 70 - 0
src/main/java/com/hb/proj/gather/protocol/ZlA11MsgDecoder.java

@@ -0,0 +1,70 @@
+package com.hb.proj.gather.protocol;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+/**
+ * 自定义解码器
+ * @author cwen
+ *
+ */
+public class ZlA11MsgDecoder extends ByteToMessageDecoder {
+	
+	private final static  Logger logger = LoggerFactory.getLogger(ZlA11MsgDecoder.class);
+	
+	private static int count = 0;
+
+	@Override
+	protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
+		
+		String hexmsg=ByteBufUtil.hexDump(byteBuf);
+		
+		logger.debug("ZlA11MsgDecoder call count={},解码前数据:{}", ++count,hexmsg);
+		
+		if(byteBuf.readableBytes()<2) {
+			return;
+		}
+		
+		int beginIndex = byteBuf.readerIndex();
+		
+		int byteCount=byteBuf.readableBytes();
+		
+		
+		ByteBuf outByteBuf=null;
+		
+		if(byteCount==2&&(!hexmsg.startsWith("0103"))) { //心跳
+			byteBuf.readerIndex(beginIndex+byteCount);
+			outByteBuf=byteBuf.slice(beginIndex, byteCount);
+		}
+		else if(byteCount>2&&hexmsg.startsWith("0103")) {  //数据消息
+			int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数
+			int datalen=byteBuf.getByte(2)&0xff; //数据区字节数 byteBuf.get方法不改变readIndex,writeIndex,readXX方法会
+			if(byteCount<(datalen+headBtyCount+crc16BtyCount)) {  // 读取的字节数量不够---拆包了
+				logger.info("数据包被拆分,取消后续");
+				byteBuf.readerIndex(beginIndex);  //半包数据,重置index至上次位置
+				return;
+			}
+			else {
+				byteBuf.readerIndex(beginIndex+datalen+headBtyCount+crc16BtyCount);
+				outByteBuf=byteBuf.slice(beginIndex, datalen+headBtyCount+crc16BtyCount);
+			}
+			
+			
+		}
+		
+		if(outByteBuf!=null) {
+			outByteBuf.retain();
+			out.add(outByteBuf);
+		}
+	
+		
+	}
+
+}

+ 2 - 0
src/main/java/com/hb/proj/gather/server/MyChannelInitializer.java

@@ -3,6 +3,7 @@ package com.hb.proj.gather.server;
 import java.util.concurrent.TimeUnit;
 
 import com.hb.proj.gather.protocol.ZLOpdProtHandler;
+import com.hb.proj.gather.protocol.ZlA11MsgDecoder;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
@@ -24,6 +25,7 @@ public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
 	protected void initChannel(SocketChannel channel) throws Exception {
 		channel.pipeline()
 				.addLast(new IdleStateHandler(100,0,0,TimeUnit.SECONDS))  //100秒无读操作约三次心跳时间
+				.addLast(new ZlA11MsgDecoder())
 				.addLast(new ZLOpdProtHandler());
 				//发送的数据时行文字编码
                 //.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8))

+ 3 - 0
src/main/java/com/hb/proj/gather/server/NettyGatherRunner.java

@@ -8,6 +8,8 @@ import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 
+import com.hb.proj.gather.business.DataTransConfig;
+
 @Component
 @Order(1)
 public class NettyGatherRunner implements ApplicationRunner {
@@ -23,6 +25,7 @@ public class NettyGatherRunner implements ApplicationRunner {
 		new Thread(()-> {
 			
 			//(new DataAssembler()).start();
+			DataTransConfig.init();
 			nettyGatherServer.start(9610);
 			
 		}).start();