Sfoglia il codice sorgente

采集核心处理完善、定时采集处理

chenwen 2 anni fa
parent
commit
8bdb9d4f6d

+ 3 - 14
src/main/java/com/hb/proj/api/controller/APIController.java

@@ -30,23 +30,10 @@ public class APIController {
 			return RespVOBuilder.error("未找到客户端");
 		}
 		
-		/**
-		System.out.println("找到客户端"+channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get());
-		
-		//byte[] cmd={0x01, 0x03, 0x01, 0x2c, 0x00, 0x0a} ; 
-		
-		byte[] cmd={0x01, 0x03, 0x01, (byte)0xa4, 0x00, 0x04} ; 
-		
-		int crcInt=Crc16Utils.getCRC(cmd);
-		byte[] crcBytes=ByteUtils.int2Bytes(crcInt, 2);
-		
-		byte[] fullCmd=new byte[cmd.length+crcBytes.length];
-		System.arraycopy(cmd,0,fullCmd,0,cmd.length);
-		System.arraycopy(crcBytes,0,fullCmd,cmd.length,crcBytes.length);
-		**/
 		
 		byte[] cmdBytes=ZLOpdProtCMDEnum.valueOf(cmd).getCmd();
 		
+		//ZLOpdProtCMDEnum.PRESS_TEMP_LOAD.name();
 		
 		
 		ByteBufAllocator alloc=channel.alloc();  //类型为:PooledByteBufAllocator  netty默认内存分配器
@@ -60,6 +47,8 @@ public class APIController {
 		
 		System.out.println("指令发送时间:"+LocalDateTime.now());
 		
+		//channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd);
+		
 		return RespVOBuilder.ok(ByteUtils.toHexString(cmdBytes));
 	}
 }

+ 41 - 0
src/main/java/com/hb/proj/gather/business/GatherScheduler.java

@@ -0,0 +1,41 @@
+package com.hb.proj.gather.business;
+
+import java.util.Iterator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import com.hb.proj.gather.protocol.ChannelGroupMgr;
+
+import io.netty.channel.Channel;
+
+/**
+ * 采集定时任务 
+ * 基于@EnableScheduling, @Scheduled的定时任务是单线程执行的
+ * @author cwen
+ *
+ */
+
+@Component
+@EnableScheduling
+public class GatherScheduler {
+	
+	private final static  Logger logger = LoggerFactory.getLogger(GatherScheduler.class);
+
+	/**
+	 * 单值1分钟执行一次采集
+	 */
+	@Scheduled(fixedRate = 60 * 1000)  //每分钟执行一次
+	public void startSingleGather() {
+		logger.info("定时采集计划启动...");
+		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
+		Channel channel=null;
+		while(iterator.hasNext()) {
+			channel=iterator.next();
+			GatherTaskExecutor.execute(new GatherSingleTask(channel));
+		}
+	}
+}

+ 66 - 0
src/main/java/com/hb/proj/gather/business/GatherSingleTask.java

@@ -0,0 +1,66 @@
+package com.hb.proj.gather.business;
+
+import java.time.LocalDateTime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.protocol.ChannelGroupMgr;
+import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+
+/**
+ * 单值采集任务  具体执行采集任务,下发采集指令   
+ * @author cwen
+ *
+ */
+public class GatherSingleTask implements Runnable {
+	
+	private final static  Logger logger = LoggerFactory.getLogger(GatherSingleTask.class);
+
+	private Channel  channel;
+	
+	public GatherSingleTask(Channel channel) {
+		this.channel=channel;
+	}
+	
+	@Override
+	public void run() {
+		logger.info("单值采集开始...");
+		ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.PRESS_TEMP_LOAD,ZLOpdProtCMDEnum.CURR_VOL_LOS_PW,ZLOpdProtCMDEnum.FREQ_STROKE};
+		ByteBufAllocator alloc=channel.alloc(); 
+		ByteBuf byteBuf=alloc.directBuffer(); //
+		//ChannelFuture future=null;
+		for(ZLOpdProtCMDEnum cmd : cmds) {
+			byteBuf=alloc.directBuffer();
+			
+			byteBuf.writeBytes(cmd.getCmd());
+			try {
+				//logger.info("准备发送指令:{},{}",ByteBufUtil.hexDump(byteBuf),LocalDateTime.now());
+				channel.writeAndFlush(byteBuf).sync();
+				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+				
+				logger.info("发送完后:{}",LocalDateTime.now());
+				//byteBuf.readerIndex(byteBuf.writerIndex());  //已写入的数据标记为已读,成为可丢弃数据
+				//byteBuf.discardReadBytes();
+				Thread.sleep(1000);  //等待0.1秒再发下条指令  时间过短 多条指令的返回消息会粘包半包
+			} catch (InterruptedException e) {
+				e.printStackTrace();
+			} 
+			//byteBuf.readerIndex(byteBuf.writerIndex());  //已写入的数据标记为已读,成为可丢弃数据
+			//byteBuf.discardReadBytes(); //丢弃已读数据,使原本的空间可以重新算作可写
+			//byteBuf.clear();  //清空下次复用
+			//byteBuf.resetWriterIndex();
+			//byteBuf.release();
+		}
+		
+		//byteBuf.release();  //手动释放
+
+	}
+
+}

+ 30 - 0
src/main/java/com/hb/proj/gather/business/GatherTaskExecutor.java

@@ -0,0 +1,30 @@
+package com.hb.proj.gather.business;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * 采集任务执行器(异步执行)
+ * @author cwen
+ *
+ */
+public class GatherTaskExecutor {
+
+	private final static  Logger logger = LoggerFactory.getLogger(GatherTaskExecutor.class);
+			
+	private static final ExecutorService  executor=new ThreadPoolExecutor(5,10,30,TimeUnit.MINUTES,new LinkedBlockingQueue<Runnable>(20),new ThreadPoolExecutor.CallerRunsPolicy());
+	
+	public static void execute(Runnable task){
+		executor.execute(task);
+	}
+	
+	public static void shutdown() {
+		logger.info("关闭定时采集执行器");
+		executor.shutdownNow();
+	}
+}

+ 6 - 0
src/main/java/com/hb/proj/gather/protocol/ChannelGroupMgr.java

@@ -26,6 +26,8 @@ public class ChannelGroupMgr {
 	
 	public  static final AttributeKey<String> ATTR_KEY_SERIAL=AttributeKey.valueOf("serial");  //通道自定义属性key,用于保存属性数据,便于后面通过该属性查找对应channel
 	
+	public  static final AttributeKey<String> ATTR_KEY_CMD=AttributeKey.valueOf("cmd");  //当前正执行的指令 指令枚举对象名称
+	
 	public static void add(Channel channel,String serial) {
 		logger.info("增加客户端通道:{}",serial);
 		channel.attr(ATTR_KEY_SERIAL).set(serial);
@@ -53,6 +55,10 @@ public class ChannelGroupMgr {
 		return CHANNEL_GROUP.size();
 	}
 	
+	public static Iterator<Channel> iterator() {
+		return CHANNEL_GROUP.iterator();
+	}
+	
 	public static  Channel  get(String serial) {
 		if(StringUtils.isBlank(serial)) {
 			return null;

+ 31 - 2
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -10,6 +10,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
 
 /**
  * 按协议解析数据    zl-opd mudbus 参考 A11标准
@@ -87,8 +89,9 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 			logger.info("接收CRC:{}:{},计算CRC:{}",ByteUtils.toHexString(crc16),ByteUtils.byte2ToIntHL(crc16),calCrc16);
 			
 			if(ByteUtils.byte2ToIntHL(crc16)==calCrc16) {  //crc校验通过
-				
-				GatherRespParser.parseShort(byteBuf,headBtyCount,datalen);
+				String cmd=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_CMD).get();
+				logger.info("解析指令{}返回消息",cmd);
+				GatherRespParser.parseFloat(byteBuf,headBtyCount,datalen);
 				
 			}
 		}
@@ -103,4 +106,30 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 		ctx.fireChannelRead(msg); 
 	}
 
+
+	@Override
+	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+		if(evt instanceof IdleStateEvent) {
+			IdleState state = ((IdleStateEvent) evt).state();
+			if (state == IdleState.READER_IDLE) {
+				String serial= ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
+				logger.info("设备超时未发数据,准备关闭连接:{}",serial);
+                ctx.channel().close();
+            }
+
+		}
+		else {
+			super.userEventTriggered(ctx, evt);
+		}
+	}
+
+
+	@Override
+	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+		cause.printStackTrace();
+		logger.info("出现异常准备关闭连接");
+		ctx.close(); 
+	}
+
+	
 }

+ 4 - 0
src/main/java/com/hb/proj/gather/server/MyChannelInitializer.java

@@ -1,9 +1,12 @@
 package com.hb.proj.gather.server;
 
+import java.util.concurrent.TimeUnit;
+
 import com.hb.proj.gather.protocol.ZLOpdProtHandler;
 
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
 
 /**
  * netty 通道消息处理器配置
@@ -20,6 +23,7 @@ public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
 	@Override
 	protected void initChannel(SocketChannel channel) throws Exception {
 		channel.pipeline()
+				.addLast(new IdleStateHandler(100,0,0,TimeUnit.SECONDS))  //100秒无读操作约三次心跳时间
 				.addLast(new ZLOpdProtHandler());
 				//发送的数据时行文字编码
                 //.addLast("encoder", new StringEncoder(StandardCharsets.UTF_8))

+ 4 - 0
src/main/java/com/hb/proj/gather/server/NettyGatherServer.java

@@ -4,6 +4,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
+import com.hb.proj.gather.business.GatherTaskExecutor;
+
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -66,5 +68,7 @@ public class NettyGatherServer {
         	logger.info("即将关闭服务 channel.close");
             serverChannel.close();
         }
+        
+        GatherTaskExecutor.shutdown();
     }
 }

+ 15 - 0
src/main/java/com/hb/proj/gather/test/GatherDataDecoder.java

@@ -8,6 +8,8 @@ import org.slf4j.LoggerFactory;
 import com.hb.proj.gather.utils.ByteUtils;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 
@@ -34,5 +36,18 @@ public class GatherDataDecoder extends ByteToMessageDecoder {
 	public void channelActive(ChannelHandlerContext ctx) throws Exception {
 		logger.info("有设备连接上:{}",ctx.channel().remoteAddress());
 	}
+	
+	public static void main(String[] args) {
+		ByteBuf buffer = Unpooled.buffer(10);
+		buffer.writeBytes(new byte[] {0x01,0x02,0x03,0x09});
+		System.out.println(ByteBufUtil.hexDump(buffer));
+		//buffer.readerIndex(buffer.writerIndex());  //已写入的数据标记为已读,成为可丢弃数据
+		//buffer.discardReadBytes();
+		buffer.clear();
+		
+		buffer.writeBytes(new byte[] {0x04,0x05,0x06});
+		System.out.println(ByteBufUtil.hexDump(buffer));
+		
+	}
 
 }