ソースを参照

增加多值定时采集任务

chenwen 2 年 前
コミット
fce681d761

+ 1 - 3
src/main/java/com/hb/proj/allconfig/SpringMvcConfigurer.java

@@ -2,7 +2,6 @@ package com.hb.proj.allconfig;
 
 import java.util.Arrays;
 
-import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.web.servlet.FilterRegistrationBean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -17,8 +16,7 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
 @Configuration
 public class SpringMvcConfigurer implements WebMvcConfigurer {
 
-	@Value("${api.filter.exclude}") 
-	private String excludePath;
+	
 	
 	private static final long MAX_AGE=24*60*60;  //跨域预请求最大有效期(有效期内【只对同一请求?】不再预请求)
 	

+ 120 - 0
src/main/java/com/hb/proj/gather/business/GatherMultiTask.java

@@ -0,0 +1,120 @@
+package com.hb.proj.gather.business;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.protocol.ChannelGroupMgr;
+import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.channel.Channel;
+
+/**
+ * 功图采集任务
+ * 关键点:1、先检测功图点数是否为255,是:功图就绪,否:不可采集功图;
+ * 2、功图中的位移、载荷、电流、功率 每项是分3次获取。
+ * @author cwen
+ *
+ */
+
+public class GatherMultiTask implements Runnable {
+
+	private final static  Logger logger = LoggerFactory.getLogger(GatherMultiTask.class);
+
+	private Channel  channel;
+	
+	public GatherMultiTask(Channel channel) {
+		this.channel=channel;
+	}
+	
+	
+	/**
+	 * 约8s采集完
+	 */
+	@Override
+	public void run() {
+		logger.info("多值采集开始...");
+		ZLOpdProtCMDEnum preCmd=ZLOpdProtCMDEnum.DIAGRAM_POINT_COUNT;  //前置命令,返回数据为250 才继续后面的
+		ZLOpdProtCMDEnum[]  cmds=	{
+				ZLOpdProtCMDEnum.DIAGRAM_DISP_1,
+				ZLOpdProtCMDEnum.DIAGRAM_DISP_2,
+				ZLOpdProtCMDEnum.DIAGRAM_DISP_3,
+				ZLOpdProtCMDEnum.DIAGRAM_LOAD_1,
+				ZLOpdProtCMDEnum.DIAGRAM_LOAD_2,	
+				ZLOpdProtCMDEnum.DIAGRAM_LOAD_3,
+				ZLOpdProtCMDEnum.DIAGRAM_CURR_1,
+				ZLOpdProtCMDEnum.DIAGRAM_CURR_2,	
+				ZLOpdProtCMDEnum.DIAGRAM_CURR_3,	
+				ZLOpdProtCMDEnum.DIAGRAM_POWER_1,
+				ZLOpdProtCMDEnum.DIAGRAM_POWER_2,	
+				ZLOpdProtCMDEnum.DIAGRAM_POWER_3	
+				
+		};
+		ByteBufAllocator alloc=channel.alloc(); 
+		ByteBuf byteBuf=null;
+		
+		synchronized(channel) {
+			try {
+				
+				byteBuf=alloc.directBuffer();
+				
+				checkDiagramPoint(byteBuf,preCmd);
+				
+				if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { 
+					logger.info("功图数据还未准备就绪,准备重试一次");
+					Thread.sleep(500); //重试一次
+					byteBuf=alloc.directBuffer();
+					checkDiagramPoint(byteBuf,preCmd);
+				}
+				
+				if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { 
+					logger.info("功图数据还未准备就绪");
+					return;
+				}
+				
+				/**
+				byteBuf.writeBytes(preCmd.getCmd());
+				channel.writeAndFlush(byteBuf);
+				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
+				channel.wait(10*1000);  //等待接收返回数据后继续,最多等待10s
+				if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { 
+					logger.info("功图数据还未准备就绪,准备重试一次");
+					
+					return;
+				}
+				**/
+				
+			}	
+			catch (InterruptedException e) {
+				e.printStackTrace();
+			} 
+				
+			//开始功图采集
+			for(ZLOpdProtCMDEnum cmd : cmds) {
+				byteBuf=alloc.directBuffer();
+				byteBuf.writeBytes(cmd.getCmd());
+				try {
+					channel.writeAndFlush(byteBuf);
+					channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+					logger.info("发送完后指令:{}",cmd.name());
+					channel.wait(10*1000);  //等待接收返回数据后继续,最多等待10s
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				} 
+				
+			}
+			
+		}
+
+	}
+	
+	
+	private void checkDiagramPoint(ByteBuf byteBuf,ZLOpdProtCMDEnum preCmd) throws InterruptedException {
+		byteBuf.writeBytes(preCmd.getCmd());
+		channel.writeAndFlush(byteBuf);
+		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
+		channel.wait(10*1000);
+	}
+
+}

+ 18 - 2
src/main/java/com/hb/proj/gather/business/GatherScheduler.java

@@ -24,13 +24,14 @@ import io.netty.channel.Channel;
 public class GatherScheduler {
 	
 	private final static  Logger logger = LoggerFactory.getLogger(GatherScheduler.class);
+	
 
 	/**
 	 * 单值1分钟执行一次采集
 	 */
-	@Scheduled(fixedRate = 60 * 1000)  //每分钟执行一次
+	@Scheduled(fixedRate=60*1000)  //每分钟执行一次
 	public void startSingleGather() {
-		logger.info("定时采集计划启动...");
+		logger.info("单值定时采集启动...");
 		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
 		Channel channel=null;
 		while(iterator.hasNext()) {
@@ -38,4 +39,19 @@ public class GatherScheduler {
 			GatherTaskExecutor.execute(new GatherSingleTask(channel));
 		}
 	}
+	
+	
+	/**
+	 * 多值3分钟执行一次采集,首次延时3s
+	 */
+	@Scheduled(fixedRate = 180 * 1000,initialDelay= 5000)  
+	public void startMultiGather() {
+		logger.info("多值定时采集启动...");
+		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
+		Channel channel=null;
+		while(iterator.hasNext()) {
+			channel=iterator.next();
+			GatherTaskExecutor.execute(new GatherMultiTask(channel));
+		}
+	}
 }

+ 24 - 27
src/main/java/com/hb/proj/gather/business/GatherSingleTask.java

@@ -1,7 +1,5 @@
 package com.hb.proj.gather.business;
 
-import java.time.LocalDateTime;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -10,9 +8,7 @@ import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.ByteBufUtil;
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 
 /**
  * 单值采集任务  具体执行采集任务,下发采集指令   
@@ -29,37 +25,38 @@ public class GatherSingleTask implements Runnable {
 		this.channel=channel;
 	}
 	
+	/**
+	 * 约2s采集完
+	 */
 	@Override
 	public void run() {
 		logger.info("单值采集开始...");
+		
 		ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.PRESS_TEMP_LOAD,ZLOpdProtCMDEnum.CURR_VOL_LOS_PW,ZLOpdProtCMDEnum.FREQ_STROKE};
 		ByteBufAllocator alloc=channel.alloc(); 
-		ByteBuf byteBuf=alloc.directBuffer(); //
-		//ChannelFuture future=null;
-		for(ZLOpdProtCMDEnum cmd : cmds) {
-			byteBuf=alloc.directBuffer();
-			
-			byteBuf.writeBytes(cmd.getCmd());
-			try {
-				//logger.info("准备发送指令:{},{}",ByteBufUtil.hexDump(byteBuf),LocalDateTime.now());
-				channel.writeAndFlush(byteBuf).sync();
-				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+		ByteBuf byteBuf=null;
+		
+		synchronized(channel) {
+			for(ZLOpdProtCMDEnum cmd : cmds) {
+				byteBuf=alloc.directBuffer();
 				
-				logger.info("发送完后:{}",LocalDateTime.now());
-				//byteBuf.readerIndex(byteBuf.writerIndex());  //已写入的数据标记为已读,成为可丢弃数据
-				//byteBuf.discardReadBytes();
-				Thread.sleep(1000);  //等待0.1秒再发下条指令  时间过短 多条指令的返回消息会粘包半包
-			} catch (InterruptedException e) {
-				e.printStackTrace();
-			} 
-			//byteBuf.readerIndex(byteBuf.writerIndex());  //已写入的数据标记为已读,成为可丢弃数据
-			//byteBuf.discardReadBytes(); //丢弃已读数据,使原本的空间可以重新算作可写
-			//byteBuf.clear();  //清空下次复用
-			//byteBuf.resetWriterIndex();
-			//byteBuf.release();
+				byteBuf.writeBytes(cmd.getCmd());
+				try {
+					channel.writeAndFlush(byteBuf);
+					channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
+					logger.info("发送完指令:{}",cmd.name());
+					
+					channel.wait(5*1000);  //等待接收返回数据后继续,最多等待5s,此处释放锁,回复还未收到就被多值任务获得锁并发指令,会导致两个指令间隔很短
+					
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				} 
+				
+			}
+			
 		}
 		
-		//byteBuf.release();  //手动释放
+		
 
 	}
 

+ 2 - 0
src/main/java/com/hb/proj/gather/protocol/ChannelGroupMgr.java

@@ -28,6 +28,8 @@ public class ChannelGroupMgr {
 	
 	public  static final AttributeKey<String> ATTR_KEY_CMD=AttributeKey.valueOf("cmd");  //当前正执行的指令 指令枚举对象名称
 	
+	public  static final AttributeKey<Boolean> ATTR_KEY_DIAGRAM_READY=AttributeKey.valueOf("false");  //是否可以采集功图数据(是否已准备好)
+	
 	public static void add(Channel channel,String serial) {
 		logger.info("增加客户端通道:{}",serial);
 		channel.attr(ATTR_KEY_SERIAL).set(serial);

+ 23 - 0
src/main/java/com/hb/proj/gather/protocol/GatherRespParser.java

@@ -11,6 +11,17 @@ import io.netty.buffer.ByteBuf;
 public class GatherRespParser {
 
 	private final static  Logger logger = LoggerFactory.getLogger(GatherRespParser.class);
+	
+	
+	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,String cmd) {
+		ZLOpdProtCMDEnum  cmdEum=ZLOpdProtCMDEnum.valueOf(cmd);
+		if(cmdEum.getItemBytCount()==4) {
+			parseFloat(byteBuf,startIndex,dataLen);
+		}
+		else if(cmdEum.getItemBytCount()==2) {
+			parseShort(byteBuf,startIndex,dataLen);
+		}
+	}
 			
 	/**
 	 * 解析消息中的数据部分 每个数据项 4字节
@@ -55,4 +66,16 @@ public class GatherRespParser {
 		
 		
 	}
+	
+	/**
+	 * 专为功图点数检测解析
+	 * @param byteBuf
+	 * @param startIndex
+	 * @param dataLen
+	 * @return
+	 */
+	public static short parseDiagramPoint(ByteBuf byteBuf ,int startIndex) {
+		byteBuf.readerIndex(startIndex);
+		return byteBuf.readShort();
+	}
 }

+ 27 - 6
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -46,6 +46,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 			return;
 		}
 		
+		
 		String hexmsg=ByteBufUtil.hexDump(byteBuf);
 		
 		logger.debug("接收到数据:{}",hexmsg);
@@ -72,7 +73,7 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 			int headBtyCount=3,crc16BtyCount=2; //头部字节数,校验位字节数
 			
 			int datalen=byteBuf.getByte(2)&0xff; //数据区字节数   byteBuf.get方法不改变readIndex,writeIndex,readXX方法会
-			logger.info("数据字节长度:{}",datalen);
+			//logger.info("数据字节长度:{}",datalen);
 			
 			if(byteCount<(datalen+headBtyCount+crc16BtyCount)) {  // 读取的字节数量不够---拆包了,目前处理:舍弃
 				return;
@@ -86,12 +87,31 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 			
 			int calCrc16=Crc16Utils.getCRC(headAndDatas);
 			
-			logger.info("接收CRC:{}:{},计算CRC:{}",ByteUtils.toHexString(crc16),ByteUtils.byte2ToIntHL(crc16),calCrc16);
+			//logger.info("接收CRC:{}:{},计算CRC:{}",ByteUtils.toHexString(crc16),ByteUtils.byte2ToIntHL(crc16),calCrc16);
 			
 			if(ByteUtils.byte2ToIntHL(crc16)==calCrc16) {  //crc校验通过
-				String cmd=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_CMD).get();
-				logger.info("解析指令{}返回消息",cmd);
-				GatherRespParser.parseFloat(byteBuf,headBtyCount,datalen);
+				
+				String cmd=null;
+				
+				synchronized(ctx.channel()) {  //成功收到消息才通知可发指令,未成功收到消息则等超时
+					
+					cmd=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_CMD).get();
+					logger.info("解析指令{}返回消息",cmd);
+					
+					if("DIAGRAM_POINT_COUNT".equalsIgnoreCase(cmd)) {  //如果是功图点数检测,还需要明确结果
+						short pcount=GatherRespParser.parseDiagramPoint(byteBuf,headBtyCount);
+						logger.debug("功图点数{}",pcount);
+						ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(pcount==250);
+					}
+					else{
+						GatherRespParser.parse(byteBuf,headBtyCount,datalen,cmd);
+					}
+					
+					ctx.channel().notifyAll(); //已经收到回复消息,通知指令发送进程可以继续,同步块或者同步方法执行完后才释放锁
+					
+				}
+				
+				
 				
 			}
 		}
@@ -127,7 +147,8 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 	@Override
 	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 		cause.printStackTrace();
-		logger.info("出现异常准备关闭连接");
+		String serial=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
+		logger.error("连接【{}】出现异常准备关闭",serial);
 		ctx.close(); 
 	}
 

+ 8 - 23
src/main/resources/application-dev.properties

@@ -1,27 +1,21 @@
 # 应用名称
-spring.application.name=智能油田
+spring.application.name=智能油田-采集程序
 
 # 应用服务 WEB 访问端口
 server.port=8080
 
-server.servlet.context-path=/zl
+server.servlet.context-path=/zl-opd-gather
 server.tomcat.uri-encoding=UTF-8
 server.servlet.encoding.charset=UTF-8
 server.servlet.encoding.enabled=true
 server.servlet.encoding.force=true
 
-#语言国际化配置
-spring.messages.active=false
-spring.messages.encoding=UTF-8
-spring.messages.basename: static/i18n/messages
+
 
 #全局时间输出格式化(对map中的date无效),优先级低于实体属性上的格式化注解
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
 spring.jackson.time-zone=GMT+8
 
-#日志配置
-#业务日志配置
-spring.syslog.aspect.active=true
 
 #数据库连接池配置
 spring.datasource.url=jdbc:mysql://127.0.0.1:3306/zl_opd?useOldAliasMetadataBehavior=true
@@ -36,8 +30,8 @@ hikari.validation-timeout: 3000
 hikari.idle-timeout: 60000
 hikari.login-timeout: 5
 hikari.max-lifetime: 60000
-hikari.maximum-pool-size: 10
-hikari.minimum-idle: 5
+hikari.maximum-pool-size: 50
+hikari.minimum-idle: 10
 hikari.read-only: false
 
 #redis 连接配置
@@ -46,15 +40,6 @@ spring.redis.host=42.56.120.92
 spring.redis.port=9608
 spring.redis.password=redis7.0
 
-#缓存配置  登录信息过期时间,单位分钟
-cache.token.expire=30
-
-#token 在请求header中的name,默认为token
-token.header.name=token
-
-#api调用权限控制filter配置
-api.filter.exclude=/login
-
-#系统管理员
-sys.admin.account=admin
-sys.admin.pwd=2f459bf76f471ff9753caffd1be02bac
+#gather-scheduler 定时采集周期配置 毫秒  单值1分钟,多值3分钟
+scheduler.single.fixedrate=60000
+scheduler.multi.fixedrate=60000