فهرست منبع

调整采集定时任务,返回数据处理调整,兼容新传输模式

chenwen 8 ماه پیش
والد
کامیت
bc7d4c4bf4

+ 22 - 7
src/main/java/com/hb/proj/api/controller/APIController.java

@@ -153,15 +153,17 @@ public class APIController {
 	 */
 	@RequestMapping("/liquid/gather")
 	public RespVO<Object> gatherLiquidData(@NotBlank(message="设备编号不能为空") String serial){
-		Channel channel=ChannelGroupMgr.get(serial);
+		String[] chnAddr=parseSerial(serial);
+		String chnSerail=chnAddr[0];
+		Channel channel=ChannelGroupMgr.get(chnSerail);
 		if(channel==null) {
 			return RespVOBuilder.error("该设备已不在线,请稍后再试");
 		}
-		if(!ChannelGroupMgr.isDone(serial)) {
+		if(!ChannelGroupMgr.isDone(chnSerail)) {
 			return RespVOBuilder.error("该设备正在执行其它操作,请稍后再试");
 		}
 		
-		ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(new GatherLiquidTask(channel)));
+		ChannelGroupMgr.addFuture(chnSerail,GatherTaskExecutor.submit(new GatherLiquidTask(channel,Integer.parseInt(chnAddr[1]))));
 		repRedis.put("liquid_"+serial, MapUtils.build("status","complete","action","gather"),true);
 		return RespVOBuilder.ok();
 	}
@@ -178,18 +180,31 @@ public class APIController {
 	 * @return
 	 */
 	private RespVO<Object> liquidCtr(String serial,int action,LiquidParam param) {
-		Channel channel=ChannelGroupMgr.get(serial);
+		String[] chnAddr=parseSerial(serial);
+		String chnSerail=chnAddr[0];
+		Channel channel=ChannelGroupMgr.get(chnSerail);
 		if(channel==null) {
 			return RespVOBuilder.error("该设备已不在线,请稍后再试");
 		}
-		if(!ChannelGroupMgr.isDone(serial)) {
+		if(!ChannelGroupMgr.isDone(chnSerail)) {
 			return RespVOBuilder.error("该设备正在执行其它操作,请稍后再试");
 		}
-		ManualLiquidTask task=new ManualLiquidTask(channel,action,param);
-		ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(task));
+		ManualLiquidTask task=new ManualLiquidTask(channel,Integer.parseInt(chnAddr[1]),action,param);
+		ChannelGroupMgr.addFuture(chnSerail,GatherTaskExecutor.submit(task));
 		return RespVOBuilder.ok();
 	}
 	
+	/**
+	 * 解析设备编号
+	 * @param serial 原设备编号就是通道编号(心跳值),现改为:通道编号-设置地址号
+	 * @return
+	 */
+	private String[] parseSerial(String serial) {
+		serial=serial.replaceAll("\\s+", "");
+		String[] rst=serial.split("-");
+		return rst.length>1?rst:(new String[] {rst[0],"1"});
+	}
+	
 	
 	/**
 	 * 采集周期设置 单位秒

+ 26 - 0
src/main/java/com/hb/proj/gather/model/GatherDevicePO.java

@@ -0,0 +1,26 @@
+package com.hb.proj.gather.model;
+
+import lombok.Data;
+
+@Data
+public class GatherDevicePO {
+
+	//设备编号, 网络连接识别号(心跳包)+  设备地址号
+	private String deviceCode;
+	
+	//设备类型   liquid:动液面设备;gather:单井采集设备
+	private String deviceType;
+	
+	//网络连接识别号
+	private String channelSerial;
+	
+	//设备地址号(默认为1)
+	private Integer addrNum;
+	
+	public void setDeviceCode(String deviceCode) {
+		this.deviceCode=deviceCode.replaceAll("\\s+", "");
+		String[] codes=this.deviceCode.split("-");
+		this.channelSerial=codes[0];
+		this.addrNum=codes.length>1?Integer.parseInt(codes[1]):1;
+	}
+}

+ 13 - 5
src/main/java/com/hb/proj/gather/process/DataTransConfig.java

@@ -8,6 +8,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.hb.proj.gather.model.AlarmDefineVO;
+import com.hb.proj.gather.model.GatherDevicePO;
 import com.hb.proj.gather.model.WellParamVO;
 import com.hb.proj.gather.rep.WellConfigService;
 import com.hb.xframework.util.ApplicationContextUtils;
@@ -25,7 +26,7 @@ public class DataTransConfig {
 	
 	private static Map<String,AlarmDefineVO> configerAlarm=null;
 	
-	private static Map<String,Object> configerDevType=null;
+	private static Map<String,List<GatherDevicePO>> configerDevType=null;
 	
 	private static WellConfigService configService=null;
 	
@@ -34,14 +35,14 @@ public class DataTransConfig {
 		configService=ApplicationContextUtils.getBean("wellConfigService", WellConfigService.class);
 		configerParam=configService.loadWellParams();
 		configerAlarm=configService.loadAlarmDefines();
-		configerDevType=configService.loadDevTypeMapping();
+		configerDevType=configService.loadDevices();
 		logger.info("完成加载参数配置、报警设置、设备类型配置");
 	}
 	
 	public static void reloadAll() {
 		configerParam=configService.loadWellParams();
 		configerAlarm=configService.loadAlarmDefines();
-		configerDevType=configService.loadDevTypeMapping();
+		configerDevType=configService.loadDevices();
 	}
 	
 	
@@ -60,7 +61,7 @@ public class DataTransConfig {
 	}
 	
 	public static void reloadDevs() {
-		configerDevType=configService.loadDevTypeMapping();
+		configerDevType=configService.loadDevices();
 	}
 	
 	
@@ -100,16 +101,23 @@ public class DataTransConfig {
 	 * @param devSerial
 	 * @return
 	 */
+	
+	/*
 	public static boolean isGatherDev(String devSerial) {
 		return configerDevType.containsKey(devSerial)  &&  configerDevType.get(devSerial).equals("gather") ;
-	}
+	}*/
 	
 	/**
 	 * 判断是否为动液面设备
 	 * @param devSerial
 	 * @return
 	 */
+	/*
 	public static boolean isLiquidDev(String devSerial) {
 		return configerDevType.containsKey(devSerial)  &&  configerDevType.get(devSerial).equals("liquid") ;
+	}*/
+	
+	public static List<GatherDevicePO>  getDevsByChnAndType(String channel,String type){
+		return configerDevType.get(channel+"_"+type);
 	}
 }

+ 1 - 1
src/main/java/com/hb/proj/gather/process/DataTransRepSingleTask.java

@@ -35,7 +35,7 @@ public class DataTransRepSingleTask implements Runnable{
 	@Override
 	public void run() {
 		try {
-			logger.info("开始单值数据转换处理{}",singleCombPO.getDevSerial());
+			logger.info("开始单值数据转换处理{}",singleCombPO.getDevSerial()); //getDevSerial()实际值已改为 通道号-地址号2024.11.5
 			Map<String,Float>  gatherDatas=singleCombPO.getGatherDatas();
 			WellParamVO paramConfig=null;
 			SingleInsertPO insPOItm=null;

+ 4 - 0
src/main/java/com/hb/proj/gather/protocol/ChannelGroupMgr.java

@@ -102,6 +102,10 @@ public class ChannelGroupMgr {
 		CHANNEL_FUTURES.put(serial, future);
 	}
 	
+	public static Future<Boolean> getFuture(String serial){
+		return CHANNEL_FUTURES.get(serial);
+	}
+	
 	
 	public static boolean isDone(String serial) {
 		if(!CHANNEL_FUTURES.containsKey(serial)) {

+ 4 - 0
src/main/java/com/hb/proj/gather/protocol/GatherRespParserFacade.java

@@ -30,6 +30,10 @@ public class GatherRespParserFacade {
 	public static void parse(ByteBuf byteBuf,int dataLen,String cmdName,Channel channel) {
 		ZLOpdProtCMDEnum cmdEnum=ZLOpdProtCMDEnum.valueOf(cmdName);
 		String serial=channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
+		int addrNum=byteBuf.getByte(0)&0xff;  //设备地址号
+		
+		serial+="-"+String.valueOf(addrNum); // 后续的serial,实际存储将改为:通道号-地址号
+		
 		
 		if(cmdName.startsWith("LIQUID")) {
 			boolean rst=LiquidParser.parse(byteBuf, dataLen, cmdEnum, serial);

+ 2 - 2
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -46,8 +46,8 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 		
 		int byteCount=byteBuf.readableBytes();
 		
-		boolean isWriteBackMsg=hexmsg.startsWith("0106"); //写指令返回消息
-		boolean isReadBackMsg=hexmsg.startsWith("0103");  //读指令返回消息
+		boolean isWriteBackMsg=ByteUtils.isWriteBackMsg(hexmsg); //hexmsg.startsWith("0106"); //写指令返回消息
+		boolean isReadBackMsg=ByteUtils.isReadBackMsg(hexmsg); //hexmsg.startsWith("0103");  //读指令返回消息
 		boolean valiHead=isReadBackMsg || isWriteBackMsg ;
 		
 		//开头两字节且不以0103开头,就认为是心跳数据-作为设备号,该方法并不可靠,有可能把采集的残包数据当作心跳

+ 5 - 2
src/main/java/com/hb/proj/gather/protocol/ZlA11MsgDecoder.java

@@ -5,6 +5,8 @@ import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.hb.proj.gather.utils.ByteUtils;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
 import io.netty.channel.ChannelHandlerContext;
@@ -20,6 +22,7 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 	private final static  Logger logger = LoggerFactory.getLogger(ZlA11MsgDecoder.class);
 	
 	
+	
 	@Override
 	protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> out) throws Exception {
 		
@@ -28,8 +31,8 @@ public class ZlA11MsgDecoder extends ByteToMessageDecoder {
 		int byteCount=byteBuf.readableBytes();
 		ByteBuf outByteBuf=null;
 		
-		boolean isWriteBackMsg=hexmsg.startsWith("0106"); //写指令返回消息(动液面设备返回消息不规范,不做crc校验)
-		boolean isReadBackMsg=hexmsg.startsWith("0103");  //读指令返回消息
+		boolean isWriteBackMsg=ByteUtils.isWriteBackMsg(hexmsg); //  hexmsg.startsWith("0106"); //写指令返回消息(动液面设备返回消息不规范,不做crc校验)
+		boolean isReadBackMsg=ByteUtils.isReadBackMsg(hexmsg);   //hexmsg.startsWith("0103");  //读指令返回消息
 		boolean valiHead=isReadBackMsg || isWriteBackMsg ;
 		
 		logger.debug("ZlA11MsgDecoder 解码前数据:{}", hexmsg);

+ 1 - 1
src/main/java/com/hb/proj/gather/protocol/parser/LiquidParser.java

@@ -71,7 +71,7 @@ public class LiquidParser {
 			}
 			else if(cmdEnum.name().startsWith("LIQUID_SET") || cmdEnum.name().startsWith("LIQUID_BOOT_TEST")) {
 				//设备参数设置结果不用入库,不用等待组装,直接进入redis
-				repRedis.put("liquid_"+serial, cmdEnum.getParamCodes()[0],"1"); //0:超时或失败;1:成,指令发送时重置为0
+				repRedis.put("liquid_"+serial, cmdEnum.getParamCodes()[0],"1"); //0:超时或失败;1:成,指令发送时重置为0
 			}
 			
 		}

+ 29 - 0
src/main/java/com/hb/proj/gather/rep/WellConfigService.java

@@ -1,5 +1,6 @@
 package com.hb.proj.gather.rep;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -8,6 +9,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import com.hb.proj.gather.model.AlarmDefineVO;
+import com.hb.proj.gather.model.GatherDevicePO;
 import com.hb.proj.gather.model.WellParamVO;
 import com.hb.xframework.dao.core.SpringJdbcDAO;
 
@@ -22,6 +24,33 @@ public class WellConfigService {
 	@Autowired
 	private SpringJdbcDAO  dao;
 	
+	public Map<String,List<GatherDevicePO>> loadDevices() {
+		String sql="""
+				select device_code,device_type
+				from tzl_gather_device dev
+				where dev.del_if=false
+				""";
+		List<GatherDevicePO> devices=dao.queryForList(sql, GatherDevicePO.class);
+		if(devices==null || devices.size()==0) {
+			return null;
+		}
+		
+		Map<String,List<GatherDevicePO>> mapping=new HashMap<>(devices.size());
+		List<GatherDevicePO> devList=null;
+		String key=null;
+		for(GatherDevicePO  dev : devices) {
+			key=dev.getChannelSerial()+"_"+dev.getDeviceType();
+			if(!mapping.containsKey(key)) {
+				mapping.put(key, new ArrayList<>());
+			}
+			devList=mapping.get(key);
+			devList.add(dev);
+		}
+		
+		
+		return mapping;
+	}
+	
 	/**
 	 * 加载所有设备的类型配置
 	 * @return

+ 0 - 110
src/main/java/com/hb/proj/gather/scheduler/DataAssembler.java

@@ -1,110 +0,0 @@
-package com.hb.proj.gather.scheduler;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.hb.proj.gather.model.DiagramPO;
-import com.hb.proj.gather.model.SingleCombPO;
-import com.hb.proj.gather.process.DataTransRepSingleTask;
-import com.hb.proj.gather.process.DataTransRepTask;
-import com.hb.proj.gather.protocol.parser.DataPieceDiagramBox;
-import com.hb.proj.gather.protocol.parser.DataPieceSingleBox;
-
-/**
- * 采集数据组装器(将分批采集的数据组装成完整数据,特别是功图数据)
- * 目前不使用线程定时检测
- * @author cwen
- *
- */
-public class DataAssembler extends Thread{
-
-	private final static  Logger logger = LoggerFactory.getLogger(DataAssembler.class);
-	
-	private static boolean stopTag=false;
-			
-	private static Map<String,DataPieceDiagramBox> diagramPkgs=new HashMap<String,DataPieceDiagramBox>(100);
-	
-	private static Map<String,DataPieceSingleBox> singlePkgs=new HashMap<String,DataPieceSingleBox>(100);
-	
-	public  static SingleCombPO putPieceData(String channelSerial,Map<String,Float> pieceData) {
-		//logger.info("接收单值片段数据:{},{}",channelSerial,pieceData);
-		if(!singlePkgs.containsKey(channelSerial)) {
-			singlePkgs.put(channelSerial, new DataPieceSingleBox(channelSerial));
-		}
-		return singlePkgs.get(channelSerial).putAll(pieceData);
-	}
-	
-	public  static DiagramPO putPieceData(String channelSerial,String pieceName,List<Float> pieceDatas) {
-		//logger.info("接收功图片段数据:{},{}",channelSerial,pieceName);
-		if(!diagramPkgs.containsKey(channelSerial)) {
-			diagramPkgs.put(channelSerial, new DataPieceDiagramBox(channelSerial));
-		}
-		return diagramPkgs.get(channelSerial).put(pieceName,pieceDatas);
-	}
-
-	@Override
-	@Deprecated
-	public void run() {
-		logger.info("开始数据组装检测");
-		DataPieceDiagramBox  diaPiece=null;
-		DiagramPO  diagramPO=null;
-		
-		DataPieceSingleBox  singPiece=null;
-		SingleCombPO  singlePO=null;
-		
-		try {
-			while(!stopTag) {
-				
-				for(String channelSerial : singlePkgs.keySet()) {
-					singPiece=singlePkgs.get(channelSerial);
-					singlePO=singPiece.assemble();
-					if(singlePO!=null) {
-						GatherTaskExecutor.execute(new DataTransRepSingleTask(singlePO)); 
-					}
-				}
-				
-				for(String channelSerial : diagramPkgs.keySet()) {
-					/*
-					 * if(ChannelGroupMgr.get(channelSerial)== null) { //通道已不存在,释放片段数据缓存
-					 * diagramPkgs.remove(channelSerial); continue; }
-					 */
-					diaPiece=diagramPkgs.get(channelSerial);
-					
-					diagramPO=diaPiece.assembleDiagramLoad();
-					
-					if(diagramPO!=null) {
-						GatherTaskExecutor.execute(new DataTransRepTask(diagramPO)); 
-					}
-					
-					diagramPO=diaPiece.assembleDiagramCurr();
-					
-					if(diagramPO!=null) {
-						GatherTaskExecutor.execute(new DataTransRepTask(diagramPO)); 
-					}
-					
-					diagramPO=diaPiece.assembleDiagramPower();
-					
-					if(diagramPO!=null) {
-						GatherTaskExecutor.execute(new DataTransRepTask(diagramPO)); 
-					}
-				}
-				
-				Thread.sleep(500);
-			}
-		}
-		catch (InterruptedException e) {
-			e.printStackTrace();
-		}
-		
-	}
-	
-	public static void stopRun() {
-		logger.info("即将停止数据组装器线程");
-		stopTag=true;
-	}
-	
-}

+ 10 - 4
src/main/java/com/hb/proj/gather/scheduler/GatherLiquidTask.java

@@ -25,12 +25,16 @@ public class GatherLiquidTask implements Runnable {
 
 	private Channel  channel;
 	
+	//设备地址号,默认为1(DTU模式:一dtu一采集设备地址号默认1,LORA模式一网关多采集设备)
+	private int addrNum=1;
+	
 	private long cmdTimeout=25*1000; //指令等待回复超时时间 
 	
 	private int timeoutCount=0;  //超时发送指令次数
 	
-	public GatherLiquidTask(Channel channel) {
+	public GatherLiquidTask(Channel channel,int addrNum) {
 		this.channel=channel;
+		this.addrNum=addrNum;
 	}
 	
 	/**
@@ -57,7 +61,7 @@ public class GatherLiquidTask implements Runnable {
 			synchronized(channel) {
 				
 				channel.attr(ChannelGroupMgr.ATTR_KEY_STOP_NEXT).set(false); //每次任务开始前重置
-				
+				byte[]  cmdbtyes=null;
 				for(ZLOpdProtCMDEnum cmd : cmds) {
 					
 					if(needCloseChannel()) {
@@ -67,7 +71,9 @@ public class GatherLiquidTask implements Runnable {
 					}
 					
 					byteBuf=alloc.directBuffer();
-					byteBuf.writeBytes(cmd.getCmd());
+					cmdbtyes=cmd.getCmd();
+					cmdbtyes[0]=(byte)addrNum;
+					byteBuf.writeBytes(cmdbtyes);
 					channel.writeAndFlush(byteBuf);
 					
 					channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
@@ -157,7 +163,7 @@ public class GatherLiquidTask implements Runnable {
 		String serial=channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
 		if(ChannelGroupMgr.isDone(serial)) {
 			logger.info("设备【{}】将进行补采",serial);
-			ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(new GatherLiquidTask(channel)));
+			ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(new GatherLiquidTask(channel,addrNum)));
 		}
 	}
 	

+ 0 - 112
src/main/java/com/hb/proj/gather/scheduler/GatherMultiTask.java

@@ -1,112 +0,0 @@
-package com.hb.proj.gather.scheduler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.hb.proj.gather.protocol.ChannelGroupMgr;
-import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-
-/**
- * 功图采集任务
- * 关键点:1、先检测功图点数是否为255,是:功图就绪,否:不可采集功图;
- * 2、功图中的位移、载荷、电流、功率 每项是分3次获取。
- * @author cwen
- *
- */
-@Deprecated
-public class GatherMultiTask implements Runnable {
-
-	private final static  Logger logger = LoggerFactory.getLogger(GatherMultiTask.class);
-
-	private Channel  channel;
-	
-	public GatherMultiTask(Channel channel) {
-		this.channel=channel;
-	}
-	
-	
-	/**
-	 * 约8s采集完  用时不稳定,有时很慢
-	 */
-	@Override
-	public void run() {
-		logger.info("多值采集开始...");
-		ZLOpdProtCMDEnum preCmd=ZLOpdProtCMDEnum.DIAGRAM_POINT_COUNT;  //前置命令,返回数据为250 才继续后面的
-		ZLOpdProtCMDEnum[]  cmds=	{
-				ZLOpdProtCMDEnum.DIAGRAM_DISP_1,
-				ZLOpdProtCMDEnum.DIAGRAM_DISP_2,
-				ZLOpdProtCMDEnum.DIAGRAM_DISP_3,
-				ZLOpdProtCMDEnum.DIAGRAM_LOAD_1,
-				ZLOpdProtCMDEnum.DIAGRAM_LOAD_2,	
-				ZLOpdProtCMDEnum.DIAGRAM_LOAD_3,
-				ZLOpdProtCMDEnum.DIAGRAM_CURR_1,
-				ZLOpdProtCMDEnum.DIAGRAM_CURR_2,	
-				ZLOpdProtCMDEnum.DIAGRAM_CURR_3,	
-				ZLOpdProtCMDEnum.DIAGRAM_POWER_1,
-				ZLOpdProtCMDEnum.DIAGRAM_POWER_2,	
-				ZLOpdProtCMDEnum.DIAGRAM_POWER_3	
-				
-		};
-		ByteBufAllocator alloc=channel.alloc(); 
-		ByteBuf byteBuf=null;
-		
-		synchronized(channel) {
-			try {
-				
-				byteBuf=alloc.directBuffer();
-				
-				checkDiagramPoint(byteBuf,preCmd);
-				
-				if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { 
-					logger.info("功图数据还未准备就绪,准备重试一次");
-					Thread.sleep(500); //重试一次
-					byteBuf=alloc.directBuffer();
-					checkDiagramPoint(byteBuf,preCmd);
-				}
-				
-				if(!channel.attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).get()) { 
-					logger.info("功图数据还未准备就绪");
-					return;
-				}
-				
-			}	
-			catch (InterruptedException e) {
-				e.printStackTrace();
-			} 
-				
-			//开始功图采集
-			for(ZLOpdProtCMDEnum cmd : cmds) {
-				
-				byteBuf=alloc.directBuffer();
-				byteBuf.writeBytes(cmd.getCmd());
-				try {
-					channel.writeAndFlush(byteBuf);
-					channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
-					logger.info("发送完后指令:{}",cmd.name());
-					channel.wait(10*1000);  //等待接收返回数据后继续,最多等待10s
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				} 
-				
-			}
-			
-		}
-
-	}
-	
-	
-	private void checkDiagramPoint(ByteBuf byteBuf,ZLOpdProtCMDEnum preCmd) throws InterruptedException {
-		byteBuf.writeBytes(preCmd.getCmd());
-		channel.writeAndFlush(byteBuf);
-		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
-		channel.wait(10*1000);
-	}
-	
-	
-	
-
-}

+ 47 - 34
src/main/java/com/hb/proj/gather/scheduler/GatherScheduler.java

@@ -1,13 +1,16 @@
 package com.hb.proj.gather.scheduler;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Timer;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 
 import com.hb.proj.gather.model.GatherConfig;
+import com.hb.proj.gather.model.GatherDevicePO;
 import com.hb.proj.gather.process.DataTransConfig;
 import com.hb.proj.gather.protocol.ChannelGroupMgr;
 import com.hb.proj.utils.ConfigUtils;
@@ -67,7 +70,7 @@ public class GatherScheduler {
 
 			@Override
 			public void run() {
-				startGather(false);
+				startGather("gather",false);
 			}
 			
 		};
@@ -76,7 +79,7 @@ public class GatherScheduler {
 
 			@Override
 			public void run() {
-				startGather(true);
+				startGather("gather",true);
 			}
 			
 		};
@@ -85,7 +88,7 @@ public class GatherScheduler {
 
 			@Override
 			public void run() {
-				startLiquidGather();
+				startGather("liquid",true);
 			}
 			
 		};
@@ -105,57 +108,67 @@ public class GatherScheduler {
 	
 	/**
 	 * isMulti:true 多值采集,false:单值采集
-	 * @param isMulti
+	 * [2024.11.4]调整为2级循环:1级:网络通道;2级:通道所挂载的采集设备 
+	 * @param  deviceType:[liquid、gather]
+	 * @param  isMulti  单值/图形采集,deviceType=gather 才有效
 	 */
-	public void startGather(boolean isMulti) {
+	public void startGather(String deviceType,boolean isMulti) {
 		logger.info("{}定时采集启动...",isMulti?"多值":"单值");
+		
 		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
 		Channel channel=null;
 		String serial=null;
+		List<GatherDevicePO> devices=null;
+		
 		while(iterator.hasNext()) {
 			channel=iterator.next();
 			serial=channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
-			if(DataTransConfig.isGatherDev(serial) && ChannelGroupMgr.isDone(serial)) {  //isDone 保证同一设备同一时间只有一个任务在执行
-				ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(new GatherTask(channel,isMulti)));  
+			
+			devices=DataTransConfig.getDevsByChnAndType(serial, deviceType);
+			if(devices==null) {
+				continue;
 			}
-			else if(!ChannelGroupMgr.isDone(serial)) {
-				logger.warn("设备{}有任务未结束,本次{}采集忽略",serial,isMulti?"多值":"单值");
+			
+			for(GatherDevicePO dev : devices) {
+				if(ChannelGroupMgr.isDone(serial)) { //isDone 保证同一通道同一时间只有一个任务在执行
+					submitTask(channel,dev.getAddrNum(),deviceType,isMulti);
+				}
+				else {
+					logger.warn("通道{}有任务未结束,本次设备序号:{},开始检测是否可延后补偿采集{}",serial,dev.getAddrNum(),deviceType);
+					try {
+						ChannelGroupMgr.getFuture(serial).get(20, TimeUnit.SECONDS);  //最多等20s在执行任务
+					} catch (Exception e) {
+						logger.error(e.getMessage());
+					} 
+					if(ChannelGroupMgr.isDone(serial)) {  //等待一定时间后,及时加入周期任务(前提等待后前一任务执行完),不用等到下个周期才执行
+						submitTask(channel,dev.getAddrNum(),deviceType,isMulti);
+						logger.warn("延后补偿采集,通道:{},设备地址号:{},采集类型:{}",serial,dev.getAddrNum(),deviceType);
+					}
+				}
 			}
 			
+			
+			
 		}
 	}
 	
-	
-	
-	
-
-	/**
-	 * 单值1分钟执行一次采集,首次等待1分钟,等待服务启动,设备连接上
-	 */
-	//@Scheduled(fixedRate=60*1000,initialDelay= 60000)  //每分钟执行一次
-	/*
-	public void startGather() {
-		logger.info("定时采集启动...");
-		scheNum+=1;
-		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
-		Channel channel=null;
-		while(iterator.hasNext()) {
-			channel=iterator.next();
-			if(DataTransConfig.isGatherDev(channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get())) {
-				GatherTaskExecutor.execute(new GatherTask(channel));  //复合任务包含单值、多值
-			}
-			
+	private void submitTask(Channel channel,int addrNum,String deviceType,boolean isMulti) {
+		String serial=channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
+		Runnable task=new GatherTask(channel,addrNum,isMulti);
+		if("liquid".equalsIgnoreCase(deviceType)) {
+			task=new GatherLiquidTask(channel,addrNum);
 		}
-		
-		scheNum=scheNum%5==0?0:scheNum;
+		ChannelGroupMgr.addFuture(serial,GatherTaskExecutor.submit(task)); 
 	}
-	*/
+	
 	
 	
 	/**
 	 * 动液面定时采集
 	 */
-	//@Scheduled(fixedRate = 20 * 60 * 1000,initialDelay= 300000)  
+	//@Scheduled(fixedRate = 20 * 60 * 1000,initialDelay= 300000) 
+	
+	/*
 	public void startLiquidGather() {
 			logger.info("动液面定时采集启动...");
 			Iterator<Channel> iterator=ChannelGroupMgr.iterator();
@@ -169,7 +182,7 @@ public class GatherScheduler {
 				}
 				
 			}
-	}
+	}*/
 	
 	
 	@PreDestroy

+ 0 - 70
src/main/java/com/hb/proj/gather/scheduler/GatherSingleTask.java

@@ -1,70 +0,0 @@
-package com.hb.proj.gather.scheduler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.hb.proj.gather.protocol.ChannelGroupMgr;
-import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.channel.Channel;
-
-/**
- * 单值采集任务  具体执行采集任务,下发采集指令   
- * @author cwen
- *
- */
-@Deprecated
-public class GatherSingleTask implements Runnable {
-	
-	private final static  Logger logger = LoggerFactory.getLogger(GatherSingleTask.class);
-
-	private Channel  channel;
-	
-	public GatherSingleTask(Channel channel) {
-		this.channel=channel;
-	}
-	
-	/**
-	 * 约2-5s采集完
-	 */
-	@Override
-	public void run() {
-		logger.info("单值采集开始...");
-		
-		ZLOpdProtCMDEnum[]  cmds=	{ZLOpdProtCMDEnum.SINGLE_PRESS_TEMP_LOAD,
-				ZLOpdProtCMDEnum.SINGLE_CURR_VOL_LOS_PW,
-				ZLOpdProtCMDEnum.SINGLE_FREQ_STROKE};
-		ByteBufAllocator alloc=channel.alloc(); 
-		ByteBuf byteBuf=null;
-		
-		synchronized(channel) {
-			for(ZLOpdProtCMDEnum cmd : cmds) {
-				
-				byteBuf=alloc.directBuffer();
-				
-				byteBuf.writeBytes(cmd.getCmd());
-				try {
-					channel.writeAndFlush(byteBuf);
-					channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
-					logger.info("发送完指令:{}",cmd.name());
-					
-					channel.wait(5*1000);  //等待接收返回数据后继续,最多等待5s,此处释放锁,回复还未收到就被多值任务获得锁并发指令,会导致两个指令间隔很短
-					
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				} 
-				
-			}
-			
-		}
-		
-		
-
-	}
-	
-	
-	
-
-}

+ 18 - 15
src/main/java/com/hb/proj/gather/scheduler/GatherTask.java

@@ -7,6 +7,7 @@ import org.slf4j.LoggerFactory;
 
 import com.hb.proj.gather.protocol.ChannelGroupMgr;
 import com.hb.proj.gather.protocol.ZLOpdProtCMDEnum;
+import com.hb.proj.gather.utils.ByteUtils;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -18,15 +19,19 @@ public class GatherTask implements Runnable{
 
 	private Channel  channel;
 	
+	//设备地址号,默认为1(DTU模式:一dtu一采集设备地址号默认1,LORA模式一网关多采集设备)
+	private int addrNum=1;
+	
 	private Boolean isMulti=false; //是否为多值采集
 	
 	private long cmdTimeout=25*1000; //指令等待回复超时时间 2次超时=50s  不能超过采集周期60s
 	
 	private int timeoutCount=0;  //超时发送指令次数
 	
-	public GatherTask(Channel channel,Boolean isMulti) {
+	public GatherTask(Channel channel,int addrNum,Boolean isMulti) {
 		this.channel=channel;
 		this.isMulti=isMulti;
+		this.addrNum=addrNum;
 	}
 	
 	@Override
@@ -37,15 +42,6 @@ public class GatherTask implements Runnable{
 		
 		ByteBufAllocator alloc=channel.alloc(); 
 		synchronized(channel) {
-			/**
-			logger.info("单值采集开始...");
-			singleGahter(alloc);
-			if((GatherScheduler.getScheNum())%5!=0) {  //5次单值进行一次多值
-				return;
-			}
-			logger.info("多值采集开始...");
-			multiGather(alloc,channel);
-			**/
 			
 			if(isMulti) {
 				multiGather(alloc);
@@ -62,6 +58,7 @@ public class GatherTask implements Runnable{
 					ZLOpdProtCMDEnum.SINGLE_CURR_VOL_LOS_PW,
 					ZLOpdProtCMDEnum.SINGLE_FREQ_STROKE};
 			ByteBuf byteBuf=null;
+			byte[]  cmdbtyes=null;
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 				
 				if(needCloseChannel()) {
@@ -71,12 +68,14 @@ public class GatherTask implements Runnable{
 				}
 				
 				byteBuf=alloc.directBuffer();
-				byteBuf.writeBytes(cmd.getCmd());
+				cmdbtyes=cmd.getCmd();
+				cmdbtyes[0]=(byte)addrNum;
+				byteBuf.writeBytes(cmdbtyes);
 				channel.writeAndFlush(byteBuf);
 				
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 				channel.attr(ChannelGroupMgr.ATTR_KEY_PRE_TIME).set((new Date()).getTime());
-				logger.info("发送完指令:{}",cmd.name());
+				logger.info("发送完指令:{},{}",cmd.name(),ByteUtils.toHexString(cmdbtyes));
 				
 				channel.wait(cmdTimeout);  //等待接收返回数据后继续,此处释放锁,回复还未收到因超时释放锁,就被多值任务获得锁并发指令,会导致两个指令间隔很短
 			}
@@ -123,7 +122,7 @@ public class GatherTask implements Runnable{
 			
 			ByteBuf byteBuf=null;
 			timeoutCount=0 ;  //多值采集开始前复位该值(因为单值与多值有5分钟间隔,该间隔不能算作超时)
-			
+			byte[] cmdbtyes=null;
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 				
 				if(needCloseChannel()) {
@@ -133,7 +132,9 @@ public class GatherTask implements Runnable{
 				}
 				
 				byteBuf=alloc.directBuffer();
-				byteBuf.writeBytes(cmd.getCmd());
+				cmdbtyes=cmd.getCmd();
+				cmdbtyes[0]=(byte)addrNum;
+				byteBuf.writeBytes(cmdbtyes);
 				channel.writeAndFlush(byteBuf);
 				
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
@@ -155,7 +156,9 @@ public class GatherTask implements Runnable{
 		
 		ZLOpdProtCMDEnum preCmd=ZLOpdProtCMDEnum.DIAGRAM_POINT_COUNT;  
 		ByteBuf byteBuf=alloc.directBuffer();
-		byteBuf.writeBytes(preCmd.getCmd());
+		byte[] cmdbytes=preCmd.getCmd();
+		cmdbytes[0]=(byte)addrNum;
+		byteBuf.writeBytes(cmdbytes);
 		channel.writeAndFlush(byteBuf);
 		channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(preCmd.name());
 		logger.info("发送完后指令:{}",preCmd.name());

+ 25 - 7
src/main/java/com/hb/proj/gather/scheduler/ManualLiquidTask.java

@@ -34,6 +34,9 @@ public class ManualLiquidTask implements Runnable {
 	
 	private Channel  channel;
 	
+	//设备地址号,默认为1(DTU模式:一dtu一采集设备地址号默认1,LORA模式一网关多采集设备)
+    private int addrNum=1;
+	
 	private int action=ACTION_READ;
 	
 	private LiquidParam  liquidParam;
@@ -44,15 +47,16 @@ public class ManualLiquidTask implements Runnable {
 	
 	private long cmdTimeout=25*1000; 
 	
-	public ManualLiquidTask(Channel  channel,int action) {
+	public ManualLiquidTask(Channel  channel,int addrNum,int action) {
 		this.channel=channel;
+		this.addrNum=addrNum;
 		this.action=action;
 		this.repRedis=ApplicationContextUtils.getBean("redisRepComponent", RedisRepComponent.class);
 		this.redisKey="liquid_"+channel.attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
 	}
 	
-	public ManualLiquidTask(Channel  channel,int action,LiquidParam liquidParam) {
-		this(channel,action);
+	public ManualLiquidTask(Channel  channel,int addrNum,int action,LiquidParam liquidParam) {
+		this(channel,addrNum,action);
 		this.liquidParam=liquidParam;
 	}
 			
@@ -93,10 +97,14 @@ public class ManualLiquidTask implements Runnable {
 			
 			repRedis.put(redisKey, MapUtils.build("status","start","action","read"),true);
 			
+			byte[]  cmdbtyes=null;
+			
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 				
 				byteBuf=alloc.directBuffer();
-				byteBuf.writeBytes(cmd.getCmd());
+				cmdbtyes=cmd.getCmd();
+				cmdbtyes[0]=(byte)addrNum;
+				byteBuf.writeBytes(cmdbtyes);
 				channel.writeAndFlush(byteBuf);
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 				repRedis.put(redisKey, MapUtils.build("status","running"));
@@ -137,10 +145,15 @@ public class ManualLiquidTask implements Runnable {
 			ByteBuf byteBuf=null;
 			
 			repRedis.put(redisKey, MapUtils.build("status","start","action","set"),true);
-			
+			byte[] cmdbtyes=null;
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 				byteBuf=alloc.directBuffer();
-				byteBuf.writeBytes(bindCMDVal(cmd));
+				
+				cmdbtyes=bindCMDVal(cmd);
+				cmdbtyes[0]=(byte)addrNum;
+				byteBuf.writeBytes(cmdbtyes);
+				
+				//byteBuf.writeBytes(bindCMDVal(cmd));
 				channel.writeAndFlush(byteBuf);
 				channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 				repRedis.put(redisKey, MapUtils.build("status","running"));
@@ -167,7 +180,12 @@ public class ManualLiquidTask implements Runnable {
 			repRedis.put(redisKey, MapUtils.build("status","start","action","boot"),true);
 			
 			ByteBuf byteBuf=alloc.directBuffer();
-			byteBuf.writeBytes(bindCMDVal(cmd));
+			
+			byte[] cmdbtyes=bindCMDVal(cmd);
+			cmdbtyes[0]=(byte)addrNum;
+			byteBuf.writeBytes(cmdbtyes);
+			
+			//byteBuf.writeBytes();
 			channel.writeAndFlush(byteBuf);
 			channel.attr(ChannelGroupMgr.ATTR_KEY_CMD).set(cmd.name());
 			repRedis.put(redisKey, MapUtils.build("status","running"));

+ 14 - 0
src/main/java/com/hb/proj/gather/utils/ByteUtils.java

@@ -2,8 +2,22 @@ package com.hb.proj.gather.utils;
 
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
+import java.util.regex.Pattern;
 
 public class ByteUtils {
+	
+	private final static Pattern writePtn=Pattern.compile("^[01][1-9]06$");
+	
+	private final static Pattern readPtn=Pattern.compile("^[01][1-9]03$");
+	
+	public static boolean  isWriteBackMsg(String hexmsg) {
+		return writePtn.matcher(hexmsg.substring(0, 4)).matches();
+	}
+	
+	public static boolean  isReadBackMsg(String hexmsg) {
+		return readPtn.matcher(hexmsg.substring(0, 4)).matches();
+	}
+	
 
 	public static String toHexString(byte[] data) {