ソースを参照

采集相关组件参数微调

chenwen 2 年 前
コミット
f57969f6bd

+ 3 - 12
src/main/java/com/hb/proj/gather/business/GatherMultiTask.java

@@ -73,18 +73,6 @@ public class GatherMultiTask implements Runnable {
 					return;
 				}
 				
-				/**
-				byteBuf.writeBytes(preCmd.getCmd());
-				channel.writeAndFlush(byteBuf);
-				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
-				channel.wait(10*1000);  //等待接收返回数据后继续,最多等待10s
-				if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { 
-					logger.info("功图数据还未准备就绪,准备重试一次");
-					
-					return;
-				}
-				**/
-				
 			}	
 			catch (InterruptedException e) {
 				e.printStackTrace();
@@ -116,5 +104,8 @@ public class GatherMultiTask implements Runnable {
 		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
 		channel.wait(10*1000);
 	}
+	
+	
+	
 
 }

+ 2 - 2
src/main/java/com/hb/proj/gather/business/GatherScheduler.java

@@ -42,9 +42,9 @@ public class GatherScheduler {
 	
 	
 	/**
-	 * 多值3分钟执行一次采集,首次延时3s
+	 * 多值3分钟执行一次采集,首次延时15s
 	 */
-	@Scheduled(fixedRate = 180 * 1000,initialDelay= 5000)  
+	@Scheduled(fixedRate = 300 * 1000,initialDelay= 15000)  
 	public void startMultiGather() {
 		logger.info("多值定时采集启动...");
 		Iterator<Channel> iterator=ChannelGroupMgr.iterator();

+ 4 - 1
src/main/java/com/hb/proj/gather/business/GatherSingleTask.java

@@ -26,7 +26,7 @@ public class GatherSingleTask implements Runnable {
 	}
 	
 	/**
-	 * 约2s采集完
+	 * 约2-5s采集完
 	 */
 	@Override
 	public void run() {
@@ -59,5 +59,8 @@ public class GatherSingleTask implements Runnable {
 		
 
 	}
+	
+	
+	
 
 }

+ 113 - 0
src/main/java/com/hb/proj/gather/business/GatherTask.java

@@ -0,0 +1,113 @@
+package com.hb.proj.gather.business;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.protocol.ChannelGroupMgr;
+import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+
+public class GatherTask implements Runnable{
+
+	private final static  Logger logger = LoggerFactory.getLogger(GatherTask.class);
+
+	private Channel  channel;
+	
+	public GatherTask(Channel channel) {
+		this.channel=channel;
+	}
+	
+	@Override
+	public void run() {
+		
+		ByteBufAllocator alloc=channel.alloc(); 
+		synchronized(channel) {
+			logger.info("单值采集开始...");
+			singleGahter(alloc);
+			logger.info("多值采集开始...");
+			multiGather(alloc,channel);
+		}
+	}
+	
+	private void singleGahter(ByteBufAllocator alloc) {
+		try {
+			ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.PRESS_TEMP_LOAD,ZLOpdProtCMDEnum.CURR_VOL_LOS_PW,ZLOpdProtCMDEnum.FREQ_STROKE};
+			ByteBuf byteBuf=null;
+			for(ZLOpdProtCMDEnum cmd : cmds) {
+				byteBuf=alloc.directBuffer();
+				
+				byteBuf.writeBytes(cmd.getCmd());
+				channel.writeAndFlush(byteBuf);
+				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+				logger.info("发送完指令:{}",cmd.name());
+				channel.wait(5*1000);  //等待接收返回数据后继续,最多等待5s,此处释放锁,回复还未收到就被多值任务获得锁并发指令,会导致两个指令间隔很短
+			}
+		}
+		catch (InterruptedException e) {
+			e.printStackTrace();
+			logger.error("定时采集出现异常:{}",e.getMessage());
+		} 
+	}
+	
+	
+	
+	
+	private void multiGather(ByteBufAllocator alloc,Channel  channel) {
+		try {
+			checkDiagramPoint(alloc);
+			
+			if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { 
+				logger.info("功图数据还未准备就绪,准备重试一次");
+				Thread.sleep(500); //重试一次
+				checkDiagramPoint(alloc);
+			}
+			
+			if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { 
+				logger.info("功图数据还未准备就绪");
+				return;
+			}
+			
+			ZLOpdProtCMDEnum[]  cmds=	{
+					ZLOpdProtCMDEnum.DIAGRAM_DISP_1,
+					ZLOpdProtCMDEnum.DIAGRAM_DISP_2,
+					ZLOpdProtCMDEnum.DIAGRAM_DISP_3,
+					ZLOpdProtCMDEnum.DIAGRAM_LOAD_1,
+					ZLOpdProtCMDEnum.DIAGRAM_LOAD_2,	
+					ZLOpdProtCMDEnum.DIAGRAM_LOAD_3,
+					ZLOpdProtCMDEnum.DIAGRAM_CURR_1,
+					ZLOpdProtCMDEnum.DIAGRAM_CURR_2,	
+					ZLOpdProtCMDEnum.DIAGRAM_CURR_3,	
+					ZLOpdProtCMDEnum.DIAGRAM_POWER_1,
+					ZLOpdProtCMDEnum.DIAGRAM_POWER_2,	
+					ZLOpdProtCMDEnum.DIAGRAM_POWER_3	
+					
+			};
+			ByteBuf byteBuf=null;
+			for(ZLOpdProtCMDEnum cmd : cmds) {
+				byteBuf=alloc.directBuffer();
+				byteBuf.writeBytes(cmd.getCmd());
+				channel.writeAndFlush(byteBuf);
+				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+				logger.info("发送完后指令:{}",cmd.name());
+				channel.wait(10*1000);  //等待接收返回数据后继续,最多等待10s
+			}
+		}
+		catch (InterruptedException e) {
+			e.printStackTrace();
+			logger.error("定时采集出现异常:{}",e.getMessage());
+		} 
+		
+	}
+	
+	private void checkDiagramPoint(ByteBufAllocator alloc) throws InterruptedException {
+		ZLOpdProtCMDEnum preCmd=ZLOpdProtCMDEnum.DIAGRAM_POINT_COUNT;  
+		ByteBuf byteBuf=alloc.directBuffer();
+		byteBuf.writeBytes(preCmd.getCmd());
+		channel.writeAndFlush(byteBuf);
+		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
+		channel.wait(10*1000);
+	}
+}

+ 1 - 1
src/main/java/com/hb/proj/gather/business/GatherTaskExecutor.java

@@ -17,7 +17,7 @@ public class GatherTaskExecutor {
 
 	private final static  Logger logger = LoggerFactory.getLogger(GatherTaskExecutor.class);
 			
-	private static final ExecutorService  executor=new ThreadPoolExecutor(5,10,30,TimeUnit.MINUTES,new LinkedBlockingQueue<Runnable>(20),new ThreadPoolExecutor.CallerRunsPolicy());
+	private static final ExecutorService  executor=new ThreadPoolExecutor(5,30,30,TimeUnit.MINUTES,new LinkedBlockingQueue<Runnable>(20),new ThreadPoolExecutor.CallerRunsPolicy());
 	
 	public static void execute(Runnable task){
 		executor.execute(task);