Преглед изворни кода

多值定时采集任务、数据组装、转换标定、入库处理

chenwen пре 1 година
родитељ
комит
7e240967b2

+ 6 - 0
src/main/java/com/hb/proj/allconfig/SpringMvcConfigurer.java

@@ -13,6 +13,8 @@ import org.springframework.web.filter.CorsFilter;
 import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
 import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry;
 import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
 import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
 
 
+import com.hb.xframework.util.ApplicationContextUtils;
+
 @Configuration
 @Configuration
 public class SpringMvcConfigurer implements WebMvcConfigurer {
 public class SpringMvcConfigurer implements WebMvcConfigurer {
 
 
@@ -63,6 +65,10 @@ public class SpringMvcConfigurer implements WebMvcConfigurer {
         return bean;
         return bean;
     }
     }
     
     
+    @Bean
+    public ApplicationContextUtils applicationContextUtils() {
+    	return new ApplicationContextUtils();
+    }
     
     
   
   
     
     

+ 85 - 0
src/main/java/com/hb/proj/gather/business/DataAssembler.java

@@ -0,0 +1,85 @@
+package com.hb.proj.gather.business;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.model.DiagramPO;
+
+/**
+ * 采集数据组装器(将分批采集的数据组装成完整数据,特别是功图数据)
+ * @author cwen
+ *
+ */
+public class DataAssembler extends Thread{
+
+	private final static  Logger logger = LoggerFactory.getLogger(DataAssembler.class);
+	
+	private static boolean stopTag=false;
+			
+	private static Map<String,DiagramPieceBox> diagramPkgs=new HashMap<String,DiagramPieceBox>(100);
+	
+	public  static void putPieceData(String channelSerial,Map<String,Float> pieceData) {
+		logger.info("接收单值片段数据:{},{}",channelSerial,pieceData);
+	}
+	
+	public  static void putPieceData(String channelSerial,String pieceName,List<Float> pieceDatas) {
+		logger.info("接收功图片段数据:{},{}",channelSerial,pieceName);
+		if(!diagramPkgs.containsKey(channelSerial)) {
+			diagramPkgs.put(channelSerial, new DiagramPieceBox(channelSerial));
+		}
+		diagramPkgs.get(channelSerial).put(pieceName,pieceDatas);
+	}
+
+	@Override
+	public void run() {
+		logger.info("开始数据组装检测");
+		DiagramPieceBox  diaPiece=null;
+		DiagramPO  diagramPO=null;
+		
+		try {
+			while(!stopTag) {
+				for(String channelSerial : diagramPkgs.keySet()) {
+					/*
+					 * if(ChannelGroupMgr.get(channelSerial)== null) { //通道已不存在,释放片段数据缓存
+					 * diagramPkgs.remove(channelSerial); continue; }
+					 */
+					diaPiece=diagramPkgs.get(channelSerial);
+					
+					diagramPO=diaPiece.assembleDiagramLoad();
+					
+					if(diagramPO!=null) {
+						GatherTaskExecutor.execute(new DataTransRepTask(diagramPO)); 
+					}
+					
+					diagramPO=diaPiece.assembleDiagramCurr();
+					
+					if(diagramPO!=null) {
+						GatherTaskExecutor.execute(new DataTransRepTask(diagramPO)); 
+					}
+					
+					diagramPO=diaPiece.assembleDiagramPower();
+					
+					if(diagramPO!=null) {
+						GatherTaskExecutor.execute(new DataTransRepTask(diagramPO)); 
+					}
+				}
+				
+				Thread.sleep(500);
+			}
+		}
+		catch (InterruptedException e) {
+			e.printStackTrace();
+		}
+		
+	}
+	
+	public static void stopRun() {
+		logger.info("即将停止数据组装器线程");
+		stopTag=true;
+	}
+	
+}

+ 31 - 0
src/main/java/com/hb/proj/gather/business/DataTransConfig.java

@@ -0,0 +1,31 @@
+package com.hb.proj.gather.business;
+
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.model.WellParamVO;
+import com.hb.proj.gather.rep.WellParamService;
+import com.hb.xframework.util.ApplicationContextUtils;
+
+/**
+ * 采集数据转换配置数据
+ * @author cwen
+ *
+ */
+public class DataTransConfig {
+	
+	private final static  Logger logger = LoggerFactory.getLogger(DataTransConfig.class);
+
+	private static Map<String,WellParamVO> configer=null;
+	
+	public static WellParamVO get(String serialParamCode) {
+		if(configer==null) {
+			logger.info("加载参数配置...");
+			WellParamService  repService=ApplicationContextUtils.getBean("wellParamService", WellParamService.class);
+			configer=repService.loadWellParams();
+		}
+		return configer.get(serialParamCode);
+	}
+}

+ 42 - 0
src/main/java/com/hb/proj/gather/business/DataTransRepTask.java

@@ -0,0 +1,42 @@
+package com.hb.proj.gather.business;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.hb.proj.gather.model.DiagramPO;
+import com.hb.proj.gather.model.WellParamVO;
+import com.hb.proj.gather.rep.GatherDataRepService;
+import com.hb.xframework.util.ApplicationContextUtils;
+
+public class DataTransRepTask implements Runnable{
+	
+	private final static  Logger logger = LoggerFactory.getLogger(DataTransRepTask.class);
+
+	private DiagramPO  diagramPO;
+	
+	public DataTransRepTask(DiagramPO  diagramPO) {
+		this.diagramPO=diagramPO;
+	}
+	
+	
+	@Override
+	public void run() {
+		logger.info("开始数据转换处理{}",diagramPO.getDevSerial());
+		WellParamVO paramConfig=DataTransConfig.get(diagramPO.getDevSerial()+"_"+diagramPO.getParamCode());
+		if(paramConfig==null) {
+			logger.info("未找到参数配置{}_{}",diagramPO.getDevSerial(),diagramPO.getParamCode());
+			return;
+		}
+		
+		
+		
+		DataTransUtils.transMulti(diagramPO, paramConfig); //数据转换
+		
+		logger.info("数据转换完:{}",diagramPO.getParamCode());
+		
+		GatherDataRepService repService=ApplicationContextUtils.getBean("gatherDataRepService", GatherDataRepService.class);
+		
+		repService.save(diagramPO);  //入库
+	}
+
+}

+ 42 - 0
src/main/java/com/hb/proj/gather/business/DataTransUtils.java

@@ -0,0 +1,42 @@
+package com.hb.proj.gather.business;
+
+import java.util.List;
+
+import com.hb.proj.gather.model.DiagramPO;
+import com.hb.proj.gather.model.WellParamVO;
+
+/**
+ * 通用数据转换方法(标定计算,转为入库单位)
+ * @author cwen
+ *
+ */
+public class DataTransUtils {
+
+	public static void transMulti(DiagramPO diaramData,WellParamVO  paramConfig) {
+		
+		diaramData.setWellParam(paramConfig.getParamId());
+		
+		transMulti(diaramData.getOths(),paramConfig.getCalibrateA(),paramConfig.getCalibrateB(),paramConfig.getCalibrateC(),paramConfig.getGatInsScale());
+		
+		transMulti(diaramData.getDisps(),paramConfig.getCalibrateA2(),paramConfig.getCalibrateB2(),paramConfig.getCalibrateC2(),paramConfig.getGatInsScale2());
+	}
+	
+	public static void transMulti(List<Float> datas,Double calA,Double calB,Double calC,Double unitScale) {
+		if(calA==null) {
+			calA=0.0;
+		}
+		if(calB==null) {
+			calB=1.0;
+		}
+		if(calC==null) {
+			calC=0.0;
+		}
+		
+		unitScale=unitScale!=null?unitScale:1;
+		
+		for(Float data : datas) {
+			data=(float)((calA*data*data+calB*data+calC)*unitScale);
+		}
+	}
+	
+}

+ 100 - 0
src/main/java/com/hb/proj/gather/business/DiagramPieceBox.java

@@ -0,0 +1,100 @@
+package com.hb.proj.gather.business;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.hb.proj.gather.model.DiagramPO;
+
+/**
+ * 功图片段数据包
+ * @author cwen
+ *
+ */
+public class DiagramPieceBox {
+	
+	private int gatherNum;  //采集批次号
+	
+	private String channelSerial; //通道编号
+	
+	
+	public DiagramPieceBox(String channelSerial) {
+		this.channelSerial=channelSerial;
+	}
+	
+	
+	private Map<String,List<Float>>  pieceMap=new HashMap<String,List<Float>>(12);
+	
+	public void put(String pieceName,List<Float> pieceDatas) {
+		pieceMap.put(pieceName, pieceDatas);
+		//this.canBeAssemble();
+	}
+	
+	public DiagramPO  assembleDiagramLoad() {
+		return assembleDiagram(DiagramPO.DIAGRAM_LOAD_CODE,"chartload_1","chartload_2","chartload_3"); //与指令ZLOpdProtCMDEnum中配置的参数名一致
+	}
+	
+	public DiagramPO  assembleDiagramCurr() {
+		return assembleDiagram(DiagramPO.DIAGRAM_CURR_CODE,"chartcurr_1","chartcurr_2","chartcurr_3");
+	}
+	
+	public DiagramPO  assembleDiagramPower() {
+		return assembleDiagram(DiagramPO.DIAGRAM_POWER_CODE,"chartpower_1","chartpower_2","chartpower_3");
+	}
+	
+	public DiagramPO  assembleDiagram(String diagramCode,String othkey1,String othkey2,String othkey3) {
+		boolean dispOk=pieceMap.get("disp_1")!=null&&pieceMap.get("disp_2")!=null&&pieceMap.get("disp_3")!=null;
+		boolean othOk=pieceMap.get(othkey1)!=null&&pieceMap.get(othkey2)!=null&&pieceMap.get(othkey3)!=null;
+		
+		if(!dispOk||!othOk) {
+			return null;
+		}
+		
+		List<Float> disps=pieceMap.get("disp_1");
+		disps.addAll(pieceMap.get("disp_2"));
+		disps.addAll(pieceMap.get("disp_3"));
+		
+		List<Float> oths=pieceMap.get(othkey1);
+		oths.addAll(pieceMap.get(othkey2));
+		oths.addAll(pieceMap.get(othkey3));
+		
+		clearPiece(othkey1,othkey2,othkey3); //组装功图后,清除原数据,避免轮询检测发现还有数据又进行功图的组装
+		
+		return new DiagramPO(channelSerial,diagramCode,disps,oths);
+		
+	}
+	
+	public void clearPiece(String...  piecekeys) {
+		for(String key : piecekeys) {
+			pieceMap.remove(key);
+		}
+	}
+	
+	
+
+	public int getGatherNum() {
+		return gatherNum;
+	}
+
+	public void setGatherNum(int gatherNum) {
+		this.gatherNum = gatherNum;
+	}
+
+	public String getChannelSerial() {
+		return channelSerial;
+	}
+
+	public void setChannelSerial(String channelSerial) {
+		this.channelSerial = channelSerial;
+	}
+
+	
+	public Map<String, List<Float>> getPieceMap() {
+		return pieceMap;
+	}
+
+	public void setPieceMap(Map<String, List<Float>> pieceMap) {
+		this.pieceMap = pieceMap;
+	}
+
+}

+ 1 - 0
src/main/java/com/hb/proj/gather/business/GatherMultiTask.java

@@ -80,6 +80,7 @@ public class GatherMultiTask implements Runnable {
 				
 				
 			//开始功图采集
 			//开始功图采集
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 			for(ZLOpdProtCMDEnum cmd : cmds) {
+				
 				byteBuf=alloc.directBuffer();
 				byteBuf=alloc.directBuffer();
 				byteBuf.writeBytes(cmd.getCmd());
 				byteBuf.writeBytes(cmd.getCmd());
 				try {
 				try {

+ 2 - 2
src/main/java/com/hb/proj/gather/business/GatherScheduler.java

@@ -42,9 +42,9 @@ public class GatherScheduler {
 	
 	
 	
 	
 	/**
 	/**
-	 * 多值3分钟执行一次采集,首次延时15s
+	 * 多值3分钟执行一次采集,首次延时30s
 	 */
 	 */
-	@Scheduled(fixedRate = 300 * 1000,initialDelay= 15000)  
+	@Scheduled(fixedRate = 300 * 1000,initialDelay= 30000)  
 	public void startMultiGather() {
 	public void startMultiGather() {
 		logger.info("多值定时采集启动...");
 		logger.info("多值定时采集启动...");
 		Iterator<Channel> iterator=ChannelGroupMgr.iterator();
 		Iterator<Channel> iterator=ChannelGroupMgr.iterator();

+ 1 - 0
src/main/java/com/hb/proj/gather/business/GatherSingleTask.java

@@ -38,6 +38,7 @@ public class GatherSingleTask implements Runnable {
 		
 		
 		synchronized(channel) {
 		synchronized(channel) {
 			for(ZLOpdProtCMDEnum cmd : cmds) {
 			for(ZLOpdProtCMDEnum cmd : cmds) {
+				
 				byteBuf=alloc.directBuffer();
 				byteBuf=alloc.directBuffer();
 				
 				
 				byteBuf.writeBytes(cmd.getCmd());
 				byteBuf.writeBytes(cmd.getCmd());

+ 1 - 1
src/main/java/com/hb/proj/gather/business/GatherTaskExecutor.java

@@ -24,7 +24,7 @@ public class GatherTaskExecutor {
 	}
 	}
 	
 	
 	public static void shutdown() {
 	public static void shutdown() {
-		logger.info("关闭定时采集执行器");
+		logger.info("关闭采集执行器");
 		executor.shutdownNow();
 		executor.shutdownNow();
 	}
 	}
 }
 }

+ 17 - 0
src/main/java/com/hb/proj/gather/business/SinglePieceBox.java

@@ -0,0 +1,17 @@
+package com.hb.proj.gather.business;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SinglePieceBox {
+
+	private int gatherNum;  //采集批次号
+	
+	private String channelSerial; //通道编号
+	
+	private Map<String,Float>  pieceMap=new HashMap<String,Float>(20);
+	
+	public void put(String pieceName,Float val) {
+		pieceMap.put(pieceName, val);
+	}
+}

+ 86 - 0
src/main/java/com/hb/proj/gather/model/DiagramPO.java

@@ -0,0 +1,86 @@
+package com.hb.proj.gather.model;
+
+import java.util.Date;
+import java.util.List;
+
+public class DiagramPO {
+	
+	public static final String DIAGRAM_LOAD_CODE="diagram_load";
+	
+	public static final String DIAGRAM_CURR_CODE="diagram_current";
+	
+	public static final String DIAGRAM_POWER_CODE="diagram_power";
+	
+	private String devSerial;  //设备序号
+
+	private String paramCode;
+	
+	private String wellParam;  //对应的井采集参数  wellId_paramCode
+	
+	private List<Float> disps;
+	
+	private List<Float> oths;
+	
+	private Date  gatherTime;
+	
+	public DiagramPO() {
+		
+	}
+	
+	public DiagramPO(String devSerial,String paramCode,List<Float> disps,List<Float> oths) {
+		this.gatherTime=new Date();
+		this.devSerial=devSerial;
+		this.paramCode=paramCode;
+		this.disps=disps;
+		this.oths=oths;
+		
+	}
+
+	public String getParamCode() {
+		return paramCode;
+	}
+
+	public void setParamCode(String paramCode) {
+		this.paramCode = paramCode;
+	}
+
+	public List<Float> getDisps() {
+		return disps;
+	}
+
+	public void setDisps(List<Float> disps) {
+		this.disps = disps;
+	}
+
+	public List<Float> getOths() {
+		return oths;
+	}
+
+	public void setOths(List<Float> oths) {
+		this.oths = oths;
+	}
+
+	public Date getGatherTime() {
+		return gatherTime;
+	}
+
+	public void setGatherTime(Date gatherTime) {
+		this.gatherTime = gatherTime;
+	}
+
+	public String getDevSerial() {
+		return devSerial;
+	}
+
+	public void setDevSerial(String devSerial) {
+		this.devSerial = devSerial;
+	}
+
+	public String getWellParam() {
+		return wellParam;
+	}
+
+	public void setWellParam(String wellParam) {
+		this.wellParam = wellParam;
+	}
+}

+ 60 - 0
src/main/java/com/hb/proj/gather/model/WellParamVO.java

@@ -0,0 +1,60 @@
+package com.hb.proj.gather.model;
+
+import lombok.Data;
+
+@Data
+public class WellParamVO {
+	
+	private String deviceCode;
+
+	private String paramId;
+	
+	private String wellId;
+	
+	private String paramName;
+	
+	private String paramCode;
+	
+	private String displayUnit;
+	
+	private String insertUnit;
+	
+	private String gatherUnit;
+	
+	private Double disInsScale;
+	
+	private Double gatInsScale;
+	
+	private String displayFormat;
+	
+	
+	
+	private Double calibrateA;
+	
+	private Double calibrateB;
+	
+	private Double calibrateC;
+	
+	
+	private Double calibrateA2;  //带2表示多值数据中的第二参数(固定为位移)
+	
+	private Double calibrateB2;
+	
+	private Double calibrateC2;
+	
+	private String displayUnit2;
+	
+	private String insertUnit2;
+	
+	private String gatherUnit2;
+	
+	private Double disInsScale2;
+	
+	private Double gatInsScale2;
+	
+	private String displayFormat2;
+	
+	
+
+	
+}

+ 28 - 14
src/main/java/com/hb/proj/gather/protocol/GatherRespParser.java

@@ -1,25 +1,37 @@
 package com.hb.proj.gather.protocol;
 package com.hb.proj.gather.protocol;
 
 
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 
 
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import com.hb.proj.gather.business.DataAssembler;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBuf;
 
 
 public class GatherRespParser {
 public class GatherRespParser {
 
 
 	private final static  Logger logger = LoggerFactory.getLogger(GatherRespParser.class);
 	private final static  Logger logger = LoggerFactory.getLogger(GatherRespParser.class);
 	
 	
-	
-	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,String cmd) {
+	/**
+	 * 数据解析入口
+	 * @param byteBuf  接收的消息
+	 * @param startIndex 解析开始索引
+	 * @param dataLen 数据长度字节数
+	 * @param cmd
+	 * @param serial  设备编号
+	 */
+	public static void parse(ByteBuf byteBuf ,int startIndex,int dataLen,String cmd,String serial) {
 		ZLOpdProtCMDEnum  cmdEum=ZLOpdProtCMDEnum.valueOf(cmd);
 		ZLOpdProtCMDEnum  cmdEum=ZLOpdProtCMDEnum.valueOf(cmd);
 		if(cmdEum.getItemBytCount()==4) {
 		if(cmdEum.getItemBytCount()==4) {
-			parseFloat(byteBuf,startIndex,dataLen);
+			 Map<String,Float> dataMap=parseFloat(byteBuf,startIndex,dataLen,cmdEum.getParamCodes());
 		}
 		}
-		else if(cmdEum.getItemBytCount()==2) {
-			parseShort(byteBuf,startIndex,dataLen);
+		else if(cmdEum.getItemBytCount()==2) { //默认为功图数据解析
+			 List<Float> datas=parseShort2Float(byteBuf,startIndex,dataLen);
+			 DataAssembler.putPieceData(serial,(cmdEum.getParamCodes())[0], datas);
 		}
 		}
 	}
 	}
 			
 			
@@ -28,18 +40,20 @@ public class GatherRespParser {
 	 * @param byteBuf
 	 * @param byteBuf
 	 * @param  startIndex 数据区开始索引
 	 * @param  startIndex 数据区开始索引
 	 * @param  dataLen  数据区长度
 	 * @param  dataLen  数据区长度
+	 * @param  paramCodes  数据项编码
 	 */
 	 */
-	public static List<Float> parseFloat(ByteBuf byteBuf ,int startIndex,int dataLen) {
+	public static Map<String,Float> parseFloat(ByteBuf byteBuf ,int startIndex,int dataLen,String[] paramCodes) {
 		byteBuf.readerIndex(startIndex);
 		byteBuf.readerIndex(startIndex);
-		List<Float> rtns=new ArrayList<Float>();
+		Map<String,Float> rtnData=new HashMap<String,Float>(paramCodes.length);
+		int i=0;
 		while(true) {
 		while(true) {
-			rtns.add(byteBuf.readFloat() ); //顺序读取,readIndex 自动后移
-			if(byteBuf.readerIndex()>=(startIndex+dataLen)) {
+			rtnData.put(paramCodes[i++],byteBuf.readFloat() );   //顺序读取,readIndex 自动后移
+			if(i>=paramCodes.length || byteBuf.readerIndex()>=(startIndex+dataLen)) {
 				break;
 				break;
 			}
 			}
 		}
 		}
-		logger.info("数据解析完:{}",rtns);
-		return rtns;
+		logger.info("数据解析完:{}",rtnData);
+		return rtnData;
 		
 		
 		
 		
 	}
 	}
@@ -52,11 +66,11 @@ public class GatherRespParser {
 	 * @param dataLen
 	 * @param dataLen
 	 * @return
 	 * @return
 	 */
 	 */
-	public static List<Short> parseShort(ByteBuf byteBuf ,int startIndex,int dataLen) {
+	public static List<Float> parseShort2Float(ByteBuf byteBuf ,int startIndex,int dataLen) {
 		byteBuf.readerIndex(startIndex);
 		byteBuf.readerIndex(startIndex);
-		List<Short> rtns=new ArrayList<Short>();
+		List<Float> rtns=new ArrayList<Float>();
 		while(true) {
 		while(true) {
-			rtns.add(byteBuf.readShort() ); //顺序读取,readIndex 自动后移
+			rtns.add(byteBuf.readShort()+0.0f); //顺序读取,readIndex 自动后移
 			if(byteBuf.readerIndex()>=(startIndex+dataLen)) {
 			if(byteBuf.readerIndex()>=(startIndex+dataLen)) {
 				break;
 				break;
 			}
 			}

+ 2 - 1
src/main/java/com/hb/proj/gather/protocol/ZLOpdProtHandler.java

@@ -104,7 +104,8 @@ public class ZLOpdProtHandler extends ChannelInboundHandlerAdapter {
 						ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(pcount==250);
 						ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_DIAGRAM_READY).set(pcount==250);
 					}
 					}
 					else{
 					else{
-						GatherRespParser.parse(byteBuf,headBtyCount,datalen,cmd);
+						String serial=ctx.channel().attr(ChannelGroupMgr.ATTR_KEY_SERIAL).get();
+						GatherRespParser.parse(byteBuf,headBtyCount,datalen,cmd,serial);
 					}
 					}
 					
 					
 					ctx.channel().notifyAll(); //已经收到回复消息,通知指令发送进程可以继续,同步块或者同步方法执行完后才释放锁
 					ctx.channel().notifyAll(); //已经收到回复消息,通知指令发送进程可以继续,同步块或者同步方法执行完后才释放锁

+ 36 - 0
src/main/java/com/hb/proj/gather/rep/GatherDataRepService.java

@@ -0,0 +1,36 @@
+package com.hb.proj.gather.rep;
+
+import java.util.List;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.hb.proj.gather.model.DiagramPO;
+import com.hb.xframework.dao.core.SpringJdbcDAO;
+
+/**
+ * 采集程序数据持久化
+ * @author cwen
+ *
+ */
+@Service
+public class GatherDataRepService {
+
+	@Autowired
+	private SpringJdbcDAO  dao;
+	
+	public void save(DiagramPO diagramPO) {
+		String sql="""
+				insert into tzl_gather_data_multi(well_param,gather_time,data_val1,data_val2) values(?,?,?,?)
+				""";
+		dao.exeUpdate(sql, diagramPO.getWellParam(),diagramPO.getGatherTime(),list2Str(diagramPO.getDisps()),list2Str(diagramPO.getOths()));
+	}
+	
+	private String list2Str(List<Float>  datas) {
+		StringBuilder strb=new StringBuilder(datas.size()*10);
+		for(Float d : datas) {
+			strb.append(","+String.valueOf(d));
+		}
+		return strb.substring(1);
+	}
+}

+ 47 - 0
src/main/java/com/hb/proj/gather/rep/WellParamService.java

@@ -0,0 +1,47 @@
+package com.hb.proj.gather.rep;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import com.hb.proj.gather.model.WellParamVO;
+import com.hb.xframework.dao.core.SpringJdbcDAO;
+
+/**
+ * 获取参数配置信息
+ * @author cwen
+ *
+ */
+@Service
+public class WellParamService {
+
+	@Autowired
+	private SpringJdbcDAO  dao;
+	
+	public Map<String,WellParamVO>  loadWellParams(){
+		String sql="""
+				select 
+				(select device_code from tzl_gather_device d where d.well_id=wp.well_id and d.del_if=false limit 1) device_code,
+				param_id,well_id,param_code,gat_ins_scale,gat_ins_scale2,calibrate_a,calibrate_b,calibrate_c, 
+				calibrate_a2,calibrate_b2,calibrate_c2
+				from tzl_well_param wp
+				where del_if=false
+				""";
+		List<WellParamVO>  wellParams=dao.queryForList(sql, WellParamVO.class);
+		if(wellParams==null||wellParams.size()==0) {
+			return null;
+		}
+		
+		Map<String,WellParamVO> mapping=new HashMap<String,WellParamVO>();
+		
+		for(WellParamVO wellParam : wellParams) {
+			mapping.put(wellParam.getDeviceCode()+"_"+wellParam.getParamCode(), wellParam);
+		}
+		
+		return mapping;
+		
+	}
+}

+ 5 - 0
src/main/java/com/hb/proj/gather/server/NettyGatherRunner.java

@@ -8,6 +8,8 @@ import org.springframework.boot.ApplicationRunner;
 import org.springframework.core.annotation.Order;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
+import com.hb.proj.gather.business.DataAssembler;
+
 @Component
 @Component
 @Order(1)
 @Order(1)
 public class NettyGatherRunner implements ApplicationRunner {
 public class NettyGatherRunner implements ApplicationRunner {
@@ -21,7 +23,10 @@ public class NettyGatherRunner implements ApplicationRunner {
 	public void run(ApplicationArguments args) throws Exception {
 	public void run(ApplicationArguments args) throws Exception {
 		
 		
 		new Thread(()-> {
 		new Thread(()-> {
+			
+			(new DataAssembler()).start();
 			nettyGatherServer.start(9610);
 			nettyGatherServer.start(9610);
+			
 		}).start();
 		}).start();
 		
 		
 		logger.info("开始启动采集程序");
 		logger.info("开始启动采集程序");

+ 3 - 0
src/main/java/com/hb/proj/gather/server/NettyGatherServer.java

@@ -4,6 +4,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
 
 
+import com.hb.proj.gather.business.DataAssembler;
 import com.hb.proj.gather.business.GatherTaskExecutor;
 import com.hb.proj.gather.business.GatherTaskExecutor;
 
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.bootstrap.ServerBootstrap;
@@ -70,5 +71,7 @@ public class NettyGatherServer {
         }
         }
         
         
         GatherTaskExecutor.shutdown();
         GatherTaskExecutor.shutdown();
+        
+        DataAssembler.stopRun();
     }
     }
 }
 }